123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- import pandas as pd
- import numpy as np
- import os
- import math
- class LKA:
- def __init__(self, df, obj, roadPos_df, roadMark_df, laneInfo_df):
- self.df = df
- self.segmented_data = []
- """
- 注释:zhangyu 0626 新增
- """
- self.roadPos_df = roadPos_df
- self.roadMark_df = roadMark_df
- self.laneInfo_df = laneInfo_df
- """
- 横向指标
- """
- self.lateral_control01_minimum_distance_nearby_lane = None
- self.lateral_control02_maximum_lateral_offset = None
- self.lateral_control03_maximum_heading_deviation = None
- self.lateral_control04_relative_position_oscillation_frequency = None
- self.lateral_control05_relative_position_oscillation_difference = None
- self.lateral_control06_absolute_lateral_offset_distribution_expectation = None
- self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = None
- self.lateral_control08_maximum_lateral_deviation = None
- self.lateral_control09_minimum_lateral_deviation = None
- self.lateral_control10_fixed_driving_direction_TLC = None
- # self.lateral_control10_absolute_position_oscillation_difference = None
- # self.lateral_control11_maximum_heading_deviation = None
- # self.lateral_control12_relative_position_oscillation_frequency = None
- # self.lateral_control13_relative_position_oscillation_difference = None
- self._lateral_control01_minimum_distance_nearby_lane()
- self._lateral_control02_maximum_lateral_offset()
- self._lateral_control03_maximum_heading_deviation()
- self._lateral_control04_relative_position_oscillation_frequency()
- self._lateral_control05_relative_position_oscillation_difference()
- self._lateral_control06_absolute_lateral_offset_distribution_expectation()
- self._lateral_control07_absolute_lateral_offset_distribution_standard_deviation()
- self._lateral_control08_maximum_lateral_deviation()
- self._lateral_control09_minimum_lateral_deviation()
- self._lateral_control10_fixed_driving_direction_TLC()
- def Compute_nearby_distance_to_lane_boundary(self, x, width_ego):
- """
- 计算自车到车道边界线的距离,需要减去自车宽度
- Args:
- x.lateralDist :到边界线距离
- width_ego:车宽
- Returns:
- 自车到车道边界线的距离
- """
- if x.lateralDist < abs(x.right_lateral_distance):
- return x.lateralDist - width_ego/2
- else:
- return abs(x.right_lateral_distance) - width_ego/2
- def func_laneOffset_abs(self, x):
- """
- 计算自车横向偏移量的绝对值
- Args:
- x.laneOffset :横向偏移量
- Returns:
- 横向偏移量的绝对值
- """
- return abs(x.laneOffset)
- def func_center_line_cycle_optical(self, l):
- """
- 计算偏离车道中心线的振荡周期和极差,极大值要大于0,极小值要小于0
- Args:
- l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
- Returns:
- cycle:振荡周期数
- maximum_range:最大极差
- """
- # print("\n穿过y=0的周期数: ")
- length = len(l)
- number_peak = 0
- number_trough = 0
- # 生成新的、只含有波峰波谷的列表
- new_list = []
- plus = []
- minus = []
- for number, value in enumerate(l):
- if number == 0:
- if value > l[number + 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if value < l[number + 1] and value < 0:
- number_trough += 1
- minus.append(value)
- continue
- if number == length - 1:
- if value > l[number - 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if value < l[number - 1] and value < 0:
- number_trough += 1
- minus.append(value)
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- continue
- if value >= l[number - 1] and value > l[number + 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if value < l[number - 1] and value <= l[number + 1] and value < 0:
- number_trough += 1
- minus.append(value)
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- cycle = min(number_peak, number_trough) - 1
- difference_list = []
- for i in range(len(new_list) - 1):
- difference = abs(new_list[i] - new_list[i + 1])
- difference_list.append(difference)
- if len(difference_list) == 0:
- maximum_range = -1
- # print(f"极差: {maximum_range}")
- else:
- maximum_range = max(difference_list)
- # print(f"极差: {maximum_range}")
- return cycle, maximum_range
- def func_normal_cycle_optical(self, l):
- """
- 计算振荡周期和极差,两个相邻的极大值(或极小值)算一次振荡
- Args:
- l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
- Returns:
- cycle:振荡周期数
- maximum_range:最大极差
- """
- # print("正常的周期数: ")
- length = len(l)
- number_peak = 0
- number_trough = 0
- # 生成新的、只含有波峰波谷的列表
- new_list = []
- for number, value in enumerate(l):
- if number == 0:
- if value > l[number + 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number + 1]:
- number_trough += 1
- new_list.append(value)
- continue
- if number == length - 1:
- if value > l[number - 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number - 1]:
- number_trough += 1
- new_list.append(value)
- continue
- if value >= l[number - 1] and value > l[number + 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number - 1] and value <= l[number + 1]:
- number_trough += 1
- new_list.append(value)
- if abs(number_peak - number_trough) > 1:
- pass
- else:
- cycle = max(number_peak, number_trough) - 1
- difference_list = []
- for i in range(len(new_list) - 1):
- difference = abs(new_list[i] - new_list[i + 1])
- difference_list.append(difference)
- if len(difference_list) == 0:
- maximum_range = -1
- # print(f"极差: {maximum_range}")
- else:
- maximum_range = max(difference_list)
- # print(f"极差: {maximum_range}")
- return cycle, maximum_range
- def fixed_driving_direction_TLC(self, x):
- TLC = (x['laneWidth'] / 2 - x['laneOffset_abs'] - x['dimY'] / 2 * math.cos(x['heading_deviation_abs'])) / (
- x['velocity_resultant'] * math.sin(x['heading_deviation_abs']))
- return TLC
- def _lateral_control01_minimum_distance_nearby_lane(self):
- """
- 横向控制01指标:计算离近侧车道线最小距离(m)
- Args:
- Returns:
- None
- """
- roadMark_df = self.roadMark_df
- player_df = self.df
- ego_df = player_df[player_df.playerId == 0]
- width_ego = ego_df['dimY'].values.tolist()[0]
- # 提取距离左车道线和右车道线距离
- roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True)
- roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True)
- roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist']
- # 计算到车道边界线距离
- roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply(
- lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1)
- nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary'])
- self.lateral_control01_minimum_distance_nearby_lane = round(nearby_distance_to_lane_boundary, 3)
- def _lateral_control02_maximum_lateral_offset(self):
- """
- 横向控制02指标:计算最大横向偏移量(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- # 提取自车的roadPos数据
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- # 计算横向偏移量的绝对值
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
- row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control02_maximum_lateral_offset = round(row_with_max_value, 3)
- def _lateral_control03_maximum_heading_deviation(self):
- """
- 横向控制3指标:最大偏航角
- Args:
- Returns:
- None
- """
- player_df = self.df
- road_mark_df = self.roadMark_df
- ego_df = player_df[player_df.playerId == 0].reset_index(drop=True)
- # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减
- road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True)
- road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True)
- road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor']
- road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor']
- road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \
- lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1)
- ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle']
- ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \
- lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1)
- row_with_max_value = max(ego_df['heading_deviation_abs'])
- self.lateral_control03_maximum_heading_deviation = row_with_max_value
- def _lateral_control04_relative_position_oscillation_frequency(self):
- """
- 横向控制12指标:横向相对位置振荡频率(Hz)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- cycle_number, _ = self.func_normal_cycle_optical(roadPos_ego_list)
- self.lateral_control04_relative_position_oscillation_frequency = round(cycle_number, 3)
- def _lateral_control05_relative_position_oscillation_difference(self):
- """
- 横向控制04指标:横向相对位置振荡极差(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- _, maximum_range = self.func_normal_cycle_optical(roadPos_ego_list)
- self.lateral_control05_relative_position_oscillation_difference = round(maximum_range, 3)
- def _lateral_control06_absolute_lateral_offset_distribution_expectation(self):
- """
- 横向控制06指标:计算绝对横向偏移量分布期望(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
- lambda x: self.func_laneOffset_abs(x), axis=1)
- mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].mean()
- self.lateral_control06_absolute_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3)
- def _lateral_control07_absolute_lateral_offset_distribution_standard_deviation(self):
- """
- 横向控制07指标:计算绝对横向偏移量分布标准差(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
- lambda x: self.func_laneOffset_abs(x), axis=1)
- mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].std()
- self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3)
- def _lateral_control08_maximum_lateral_deviation(self):
- """
- 横向控制08指标:横向距离极大值(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
- row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control08_maximum_lateral_deviation = row_with_max_value
- def _lateral_control09_minimum_lateral_deviation(self):
- """
- 横向控制09指标:横向距离极小值(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmin()
- row_with_min_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control09_minimum_lateral_deviation = row_with_min_value
- def _lateral_control10_fixed_driving_direction_TLC(self):
- """
- 横向控制10指标:固定行驶方向车辆跨道时间
- Args:
- Returns:
- None
- """
- player_df = self.df
- ego_df = player_df[player_df.playerId == 0].reset_index(drop=True)
- width_ego = ego_df['dimY'] # 自车宽度e
- road_mark_df = self.roadMark_df
- # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减
- road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True)
- road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True)
- road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor']
- road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor']
- road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \
- lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1)
- ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle']
- ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply(\
- lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) # 偏航角θ
- laneInfo_df = self.laneInfo_df
- laneInfo_df = laneInfo_df[laneInfo_df.id == -1].reset_index(drop=True) # laneInfo_df['width'] 车道宽度
- ego_df['velocity_resultant'] = ego_df[['speedX', 'speedY']].apply( \
- lambda x: math.sqrt(x['speedX'] ** 2 - x['speedY'] ** 2) / 3.6, axis=1) # 汽车行驶速度v
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) # 横向偏移量y0
- merged_df = pd.merge(roadPos_ego_df, ego_df, on='simFrame', how='inner')
- merged_df["laneWidth"] = 3.5
- merged_df['TLC'] = merged_df.apply(lambda x: self.fixed_driving_direction_TLC(x), axis=1)
- row_with_min_value = min(merged_df['TLC'])
- self.lateral_control10_fixed_driving_direction_TLC = row_with_min_value
- def _lateral_control11_fixed_driving_direction_TLC(self):
- """
- 横向控制11指标:固定方向盘转角车辆跨道时间
- Args:
- Returns:
- None
- """
- pass
- if __name__ == "__main__":
- objState_csv_data = "/guazai/single_evaluate/SINGLE_EVALUATE/task_0515/case0530-ica-post/data/EgoState.csv"
- directory = os.path.dirname(objState_csv_data)
- print("目录名:", directory)
- # 使用 os.path.join() 组合目录和文件名
- objState_sensor_csv_data = os.path.join(directory, "ObjState.csv")
- roadMark_csv_data = os.path.join(directory, "RoadMark.csv")
- roadPos_csv_data = os.path.join(directory, "RoadPos.csv")
- laneInfo_csv_data = os.path.join(directory, "LaneInfo.csv")
- objState_csv_data = pd.read_csv(objState_csv_data)
- objState_sensor_csv_data = pd.read_csv(objState_sensor_csv_data)
- roadMark_csv_data = pd.read_csv(roadMark_csv_data)
- roadPos_csv_data = pd.read_csv(roadPos_csv_data)
- laneInfo_csv_data = pd.read_csv(laneInfo_csv_data)
- ICA_object = LKA(objState_csv_data, objState_sensor_csv_data, roadPos_csv_data, roadMark_csv_data, laneInfo_csv_data)
- print(ICA_object.lateral_control01_minimum_distance_nearby_lane)
- print(ICA_object.lateral_control02_maximum_lateral_offset)
- print(ICA_object.lateral_control03_maximum_heading_deviation)
- print(ICA_object.lateral_control04_relative_position_oscillation_frequency)
- print(ICA_object.lateral_control05_relative_position_oscillation_difference)
- print(ICA_object.lateral_control06_absolute_lateral_offset_distribution_expectation)
- print(ICA_object.lateral_control07_absolute_lateral_offset_distribution_standard_deviation)
- print(ICA_object.lateral_control08_maximum_lateral_deviation)
- print(ICA_object.lateral_control09_minimum_lateral_deviation)
- print(ICA_object._lateral_control10_minimum_lateral_deviation)
|