import pandas as pd import numpy as np import os import math class LKA: def __init__(self, df, obj, roadPos_df, roadMark_df, laneInfo_df): self.df = df self.segmented_data = [] """ 注释:zhangyu 0626 新增 """ self.roadPos_df = roadPos_df self.roadMark_df = roadMark_df self.laneInfo_df = laneInfo_df """ 横向指标 """ self.lateral_control01_minimum_distance_nearby_lane = None self.lateral_control02_maximum_lateral_offset = None self.lateral_control03_maximum_heading_deviation = None self.lateral_control04_relative_position_oscillation_frequency = None self.lateral_control05_relative_position_oscillation_difference = None self.lateral_control06_absolute_lateral_offset_distribution_expectation = None self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = None self.lateral_control08_maximum_lateral_deviation = None self.lateral_control09_minimum_lateral_deviation = None self.lateral_control10_fixed_driving_direction_TLC = None # self.lateral_control10_absolute_position_oscillation_difference = None # self.lateral_control11_maximum_heading_deviation = None # self.lateral_control12_relative_position_oscillation_frequency = None # self.lateral_control13_relative_position_oscillation_difference = None self._lateral_control01_minimum_distance_nearby_lane() self._lateral_control02_maximum_lateral_offset() self._lateral_control03_maximum_heading_deviation() self._lateral_control04_relative_position_oscillation_frequency() self._lateral_control05_relative_position_oscillation_difference() self._lateral_control06_absolute_lateral_offset_distribution_expectation() self._lateral_control07_absolute_lateral_offset_distribution_standard_deviation() self._lateral_control08_maximum_lateral_deviation() self._lateral_control09_minimum_lateral_deviation() self._lateral_control10_fixed_driving_direction_TLC() def Compute_nearby_distance_to_lane_boundary(self, x, width_ego): """ 计算自车到车道边界线的距离,需要减去自车宽度 Args: x.lateralDist :到边界线距离 width_ego:车宽 Returns: 自车到车道边界线的距离 """ if x.lateralDist < abs(x.right_lateral_distance): return x.lateralDist - width_ego/2 else: return abs(x.right_lateral_distance) - width_ego/2 def func_laneOffset_abs(self, x): """ 计算自车横向偏移量的绝对值 Args: x.laneOffset :横向偏移量 Returns: 横向偏移量的绝对值 """ return abs(x.laneOffset) def func_center_line_cycle_optical(self, l): """ 计算偏离车道中心线的振荡周期和极差,极大值要大于0,极小值要小于0 Args: l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2] Returns: cycle:振荡周期数 maximum_range:最大极差 """ # print("\n穿过y=0的周期数: ") length = len(l) number_peak = 0 number_trough = 0 # 生成新的、只含有波峰波谷的列表 new_list = [] plus = [] minus = [] for number, value in enumerate(l): if number == 0: if value > l[number + 1] and value > 0: number_peak += 1 plus.append(value) if value < l[number + 1] and value < 0: number_trough += 1 minus.append(value) continue if number == length - 1: if value > l[number - 1] and value > 0: number_peak += 1 plus.append(value) if len(minus) != 0: new_list.append(min(minus)) minus = [] if value < l[number - 1] and value < 0: number_trough += 1 minus.append(value) if len(plus) != 0: new_list.append(max(plus)) plus = [] if len(minus) != 0: new_list.append(min(minus)) minus = [] if len(plus) != 0: new_list.append(max(plus)) plus = [] continue if value >= l[number - 1] and value > l[number + 1] and value > 0: number_peak += 1 plus.append(value) if len(minus) != 0: new_list.append(min(minus)) minus = [] if value < l[number - 1] and value <= l[number + 1] and value < 0: number_trough += 1 minus.append(value) if len(plus) != 0: new_list.append(max(plus)) plus = [] cycle = min(number_peak, number_trough) - 1 difference_list = [] for i in range(len(new_list) - 1): difference = abs(new_list[i] - new_list[i + 1]) difference_list.append(difference) if len(difference_list) == 0: maximum_range = -1 # print(f"极差: {maximum_range}") else: maximum_range = max(difference_list) # print(f"极差: {maximum_range}") return cycle, maximum_range def func_normal_cycle_optical(self, l): """ 计算振荡周期和极差,两个相邻的极大值(或极小值)算一次振荡 Args: l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2] Returns: cycle:振荡周期数 maximum_range:最大极差 """ # print("正常的周期数: ") length = len(l) number_peak = 0 number_trough = 0 # 生成新的、只含有波峰波谷的列表 new_list = [] for number, value in enumerate(l): if number == 0: if value > l[number + 1]: number_peak += 1 new_list.append(value) if value < l[number + 1]: number_trough += 1 new_list.append(value) continue if number == length - 1: if value > l[number - 1]: number_peak += 1 new_list.append(value) if value < l[number - 1]: number_trough += 1 new_list.append(value) continue if value >= l[number - 1] and value > l[number + 1]: number_peak += 1 new_list.append(value) if value < l[number - 1] and value <= l[number + 1]: number_trough += 1 new_list.append(value) if abs(number_peak - number_trough) > 1: pass else: cycle = max(number_peak, number_trough) - 1 difference_list = [] for i in range(len(new_list) - 1): difference = abs(new_list[i] - new_list[i + 1]) difference_list.append(difference) if len(difference_list) == 0: maximum_range = -1 # print(f"极差: {maximum_range}") else: maximum_range = max(difference_list) # print(f"极差: {maximum_range}") return cycle, maximum_range def fixed_driving_direction_TLC(self, x): TLC = (x['laneWidth'] / 2 - x['laneOffset_abs'] - x['dimY'] / 2 * math.cos(x['heading_deviation_abs'])) / ( x['velocity_resultant'] * math.sin(x['heading_deviation_abs'])) return TLC def _lateral_control01_minimum_distance_nearby_lane(self): """ 横向控制01指标:计算离近侧车道线最小距离(m) Args: Returns: None """ roadMark_df = self.roadMark_df player_df = self.df ego_df = player_df[player_df.playerId == 0] width_ego = ego_df['dimY'].values.tolist()[0] # 提取距离左车道线和右车道线距离 roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True) roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True) roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist'] # 计算到车道边界线距离 roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply( lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1) nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary']) self.lateral_control01_minimum_distance_nearby_lane = round(nearby_distance_to_lane_boundary, 3) def _lateral_control02_maximum_lateral_offset(self): """ 横向控制02指标:计算最大横向偏移量(m) Args: Returns: None """ roadPos_df = self.roadPos_df # 提取自车的roadPos数据 roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) # 计算横向偏移量的绝对值 roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax() row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control02_maximum_lateral_offset = round(row_with_max_value, 3) def _lateral_control03_maximum_heading_deviation(self): """ 横向控制3指标:最大偏航角 Args: Returns: None """ player_df = self.df road_mark_df = self.roadMark_df ego_df = player_df[player_df.playerId == 0].reset_index(drop=True) # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减 road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True) road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True) road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor'] road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor'] road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \ lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1) ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle'] ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \ lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) row_with_max_value = max(ego_df['heading_deviation_abs']) self.lateral_control03_maximum_heading_deviation = row_with_max_value def _lateral_control04_relative_position_oscillation_frequency(self): """ 横向控制12指标:横向相对位置振荡频率(Hz) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() cycle_number, _ = self.func_normal_cycle_optical(roadPos_ego_list) self.lateral_control04_relative_position_oscillation_frequency = round(cycle_number, 3) def _lateral_control05_relative_position_oscillation_difference(self): """ 横向控制04指标:横向相对位置振荡极差(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() _, maximum_range = self.func_normal_cycle_optical(roadPos_ego_list) self.lateral_control05_relative_position_oscillation_difference = round(maximum_range, 3) def _lateral_control06_absolute_lateral_offset_distribution_expectation(self): """ 横向控制06指标:计算绝对横向偏移量分布期望(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \ lambda x: self.func_laneOffset_abs(x), axis=1) mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].mean() self.lateral_control06_absolute_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3) def _lateral_control07_absolute_lateral_offset_distribution_standard_deviation(self): """ 横向控制07指标:计算绝对横向偏移量分布标准差(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \ lambda x: self.func_laneOffset_abs(x), axis=1) mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].std() self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3) def _lateral_control08_maximum_lateral_deviation(self): """ 横向控制08指标:横向距离极大值(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax() row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control08_maximum_lateral_deviation = row_with_max_value def _lateral_control09_minimum_lateral_deviation(self): """ 横向控制09指标:横向距离极小值(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmin() row_with_min_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control09_minimum_lateral_deviation = row_with_min_value def _lateral_control10_fixed_driving_direction_TLC(self): """ 横向控制10指标:固定行驶方向车辆跨道时间 Args: Returns: None """ player_df = self.df ego_df = player_df[player_df.playerId == 0].reset_index(drop=True) width_ego = ego_df['dimY'] # 自车宽度e road_mark_df = self.roadMark_df # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减 road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True) road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True) road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor'] road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor'] road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \ lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1) ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle'] ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply(\ lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) # 偏航角θ laneInfo_df = self.laneInfo_df laneInfo_df = laneInfo_df[laneInfo_df.id == -1].reset_index(drop=True) # laneInfo_df['width'] 车道宽度 ego_df['velocity_resultant'] = ego_df[['speedX', 'speedY']].apply( \ lambda x: math.sqrt(x['speedX'] ** 2 - x['speedY'] ** 2) / 3.6, axis=1) # 汽车行驶速度v roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) # 横向偏移量y0 merged_df = pd.merge(roadPos_ego_df, ego_df, on='simFrame', how='inner') merged_df["laneWidth"] = 3.5 merged_df['TLC'] = merged_df.apply(lambda x: self.fixed_driving_direction_TLC(x), axis=1) row_with_min_value = min(merged_df['TLC']) self.lateral_control10_fixed_driving_direction_TLC = row_with_min_value def _lateral_control11_fixed_driving_direction_TLC(self): """ 横向控制11指标:固定方向盘转角车辆跨道时间 Args: Returns: None """ pass if __name__ == "__main__": objState_csv_data = "/guazai/single_evaluate/SINGLE_EVALUATE/task_0515/case0530-ica-post/data/EgoState.csv" directory = os.path.dirname(objState_csv_data) print("目录名:", directory) # 使用 os.path.join() 组合目录和文件名 objState_sensor_csv_data = os.path.join(directory, "ObjState.csv") roadMark_csv_data = os.path.join(directory, "RoadMark.csv") roadPos_csv_data = os.path.join(directory, "RoadPos.csv") laneInfo_csv_data = os.path.join(directory, "LaneInfo.csv") objState_csv_data = pd.read_csv(objState_csv_data) objState_sensor_csv_data = pd.read_csv(objState_sensor_csv_data) roadMark_csv_data = pd.read_csv(roadMark_csv_data) roadPos_csv_data = pd.read_csv(roadPos_csv_data) laneInfo_csv_data = pd.read_csv(laneInfo_csv_data) ICA_object = LKA(objState_csv_data, objState_sensor_csv_data, roadPos_csv_data, roadMark_csv_data, laneInfo_csv_data) print(ICA_object.lateral_control01_minimum_distance_nearby_lane) print(ICA_object.lateral_control02_maximum_lateral_offset) print(ICA_object.lateral_control03_maximum_heading_deviation) print(ICA_object.lateral_control04_relative_position_oscillation_frequency) print(ICA_object.lateral_control05_relative_position_oscillation_difference) print(ICA_object.lateral_control06_absolute_lateral_offset_distribution_expectation) print(ICA_object.lateral_control07_absolute_lateral_offset_distribution_standard_deviation) print(ICA_object.lateral_control08_maximum_lateral_deviation) print(ICA_object.lateral_control09_minimum_lateral_deviation) print(ICA_object._lateral_control10_minimum_lateral_deviation)