import pandas as pd import numpy as np class ICA: def __init__(self, df): self.df = df #object_df self.segmented_data = [] """ 注释:zhangyu 0626 新增 """ self.roadPos_df = None self.roadMark_df = None """ 纵向指标 """ self.stable_start_time_cruise = None self.stable_average_speed = None self.percentage_accel = None self.percentage_lon_acc_roc = None self.delay_time_cruise = None self.delay_time_THW = None self.rise_time_cruise = None self.rise_time_THW = None self.peak_time_cruise = None self.peak_time_THW = None self.overshoot_cruise = None self.overshoot_THW = None self.settling_time = None self.steady_error_cruise = None self.steady_error_THW = None self.stable_start_time_THW = None self.stable_average_THW = None """ 横向指标 """ self.lateral_control01_minimum_distance_nearby_lane = None self.lateral_control02_maximum_lateral_offset = None self.lateral_control03_relative_lateral_offset_distribution_expectation = None self.lateral_control04_relative_lateral_offset_distribution_standard_deviation = None self.lateral_control05_absolute_lateral_offset_distribution_expectation = None self.lateral_control06_absolute_lateral_offset_distribution_standard_deviation = None self.lateral_control07_maximum_lateral_deviation = None self.lateral_control08_minimum_lateral_deviation = None self.lateral_control09_absolute_position_oscillation_frequency = None self.lateral_control10_absolute_position_oscillation_difference = None self.lateral_control11_maximum_heading_deviation = None self.lateral_control12_relative_position_oscillation_frequency = None self.lateral_control13_relative_position_oscillation_difference = None def _segment_data_by_status(self, status_name, status_value): """ 根据状态名称和状态值对DataFrame中的数据进行分段处理。 Args: status_name (str): DataFrame中用于标识状态的列名。 status_value (Any): 需要进行分段处理的状态值。 Returns: None Raises: KeyError: 如果DataFrame中不存在'ACC_status'或'simTime'列时,抛出KeyError异常。 """ if 'ACC_status' not in self.df.columns: raise KeyError("'ACC_status' column not found in the dataframe.") if 'simTime' not in self.df.columns: raise KeyError("'simTime' column not found in the dataframe.") filtered_data = self.df[self.df[status_name] == status_value] current_segment = [] for index, row in filtered_data.iterrows(): if not current_segment or abs(float(row['simTime']) - float(current_segment[-1]['simTime'])) <= 0.01: current_segment.append(row) else: self.segmented_data.append(pd.DataFrame(current_segment)) current_segment = [row] if current_segment: self.segmented_data.append(pd.DataFrame(current_segment)) def _get_headway_time(self, df): pass def _find_stable_speed_cruise(self, window_size, percent_deviation, set_value): """ 在给定的速度数据中查找稳定的巡航速度段,并计算该段的平均速度。 Args: window_size (int): 滑动窗口的大小,表示用于计算平均速度的速度数据点数量。 percent_deviation (float): 设定值的允许偏差百分比。 set_value (float): 期望的稳定速度设定值。 Returns: None """ speed_data = self.df['speedX'].values deviation = set_value * (percent_deviation / 100) stable_start = None stable_average_speed = None for i in range(len(speed_data) - window_size + 1): window_data = speed_data[i:i + window_size] if all(set_value - deviation <= s <= set_value + deviation for s in window_data): if stable_start is None: stable_start = i stable_end = i + window_size - 1 stable_average_speed = np.mean(window_data) j = i + window_size while j < len(speed_data) - window_size + 1: next_window_data = speed_data[j:j + window_size] if all(set_value - deviation <= s <= set_value + deviation for s in next_window_data): stable_end = j + window_size - 1 stable_average_speed = (stable_average_speed * (j - stable_start) + sum(next_window_data)) / ( j - stable_start + window_size) j += window_size else: stable_start = j + window_size - 1 stable_end = i + window_size - 1 stable_average_speed = np.mean(window_data) break self.stable_start_time_cruise = self.df['simTime'].iloc[stable_start] self.stable_average_speed = stable_average_speed def _find_stable_THW(self, window_size, percent_deviation, set_value): """ 在给定的数据窗口中查找稳定跟车时距离THW,并计算该段内THW的平均值。 Args: window_size (int): 窗口大小,表示在数据中寻找稳定段时考虑的连续数据点数量。 percent_deviation (float): THW值相对于设定值的允许偏差百分比。 set_value (float): THW的设定值。 Returns: None """ THW = self.df['THW'].values deviation = set_value * (percent_deviation / 100) stable_start = None stable_average_THW = None for i in range(len(THW) - window_size + 1): window_data = THW[i:i + window_size] if all(set_value - deviation <= s <= set_value + deviation for s in window_data): if stable_start is None: stable_start = i stable_end = i + window_size - 1 stable_average_THW = np.mean(window_data) j = i + window_size while j < len(THW) - window_size + 1: next_window_data = THW[j:j + window_size] if all(set_value - deviation <= s <= set_value + deviation for s in next_window_data): stable_end = j + window_size - 1 stable_average_THW = (stable_average_THW * (j - stable_start) + sum(next_window_data)) / ( j - stable_start + window_size) j += window_size else: stable_start = j + window_size - 1 stable_end = i + window_size - 1 stable_average_THW = np.mean(window_data) break self.stable_start_time_THW = self.df['simTime'].iloc[stable_start] self.stable_average_THW = stable_average_THW def _find_closest_time_stamp_cruise(self, df, target_speed, start_index): """ 在给定的数据帧df中,从start_index索引位置开始,查找与目标速度target_speed最接近的时间戳。 Args: df (pd.DataFrame): 包含速度和时间戳等信息的数据帧,需要至少包含'speedX'和'simTime'两列。 target_speed (float): 目标速度值,用于在数据帧中查找最接近此值的时间戳。 start_index (int): 开始查找的索引位置,即在数据帧df中从该索引位置开始向后查找。 Returns: pd.Timestamp: 与目标速度最接近的时间戳。 """ subset = df.loc[start_index + 1:] speed_diff = np.abs(subset['speedX'] - target_speed) closest_index = speed_diff.idxmin() closest_timestamp = subset.loc[closest_index, 'simTime'] return closest_timestamp def _find_closest_time_stamp_THW(self, df, target_THW, start_index): """ 在DataFrame中找到与目标THW值最接近的时间戳。 Args: df (pandas.DataFrame): 包含'THW'和'simTime'列的DataFrame,其中'THW'表示目标变量,'simTime'表示时间戳。 target_THW (float): 目标THW值。 start_index (int): 开始搜索的索引位置(不包含)。 Returns: pandas.Timestamp: 与目标THW值最接近的时间戳。 """ subset = df.loc[start_index + 1:] THW_diff = np.abs(subset['THW'] - target_THW) closest_index = THW_diff.idxmin() closest_timestamp = subset.loc[closest_index, 'simTime'] return closest_timestamp def _get_first_change_index_cruise(self): """ 获取数据集中第一次巡航速度发生变化的索引。 Args: 无参数。 Returns: Union[int, None]: 如果存在巡航速度发生变化的索引,则返回第一个发生变化的索引(int类型); 如果不存在,则返回None。 """ change_indices = self.df[self.df['set_cruise_speed'] != self.df['set_cruise_speed'].shift()].index if not change_indices.empty: first_change_index = change_indices.min() else: first_change_index = None return first_change_index def _get_first_change_index_THW(self): """ 获取DataFrame中'set_headway_time'列首次发生变化的索引值。 Args: 无参数。 Returns: Union[int, None]: 如果存在变化,则返回首次发生变化的索引值(int类型),否则返回None。 """ change_indices = self.df[self.df['set_headway_time'] != self.df['set_headway_time'].shift()].index if not change_indices.empty: first_change_index = change_indices.min() else: first_change_index = None return first_change_index def calculate_reasonable_acceleration_percentage(self): """ 计算合理的加速度百分比。 Args: 无。 Returns: 无返回值,但会更新对象的 percentage_accel 属性,表示合理的加速度百分比(单位:%)。 """ df = self.df_follow.copy() col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'accel', 'lon_acc_roc'] # target_id df = df[col_list].copy() count_accel_in_range = 0 total_accel_frames = 0 ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'accel', 'lon_acc_roc']] for row in self.segmented_data: df = row[col_list].copy() filtered_data = df[(df['accel'] >= -5) & (df['accel'] <= 4)] count_accel_in_range = count_accel_in_range + len(filtered_data) total_accel_frames = total_accel_frames + len(df) self.percentage_accel = (count_accel_in_range / total_accel_frames) * 100 print(f"percentage_accel: {self.percentage_accel}") def calculate_reasonable_acceleration_change_rate_percentage(self): """ 计算纵向加速度变化率(lon_acc_roc)的合理变化率百分比。 Args: 无参数。 Returns: 无返回值,但会更新对象的 percentage_lon_acc_roc 属性。 """ df = self.df_follow.copy() col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'accel', 'lon_acc_roc'] # target_id df = df[col_list].copy() count_lon_acc_roc_in_range = 0 total_lon_acc_roc_frames = 0 for row in self.segmented_data: df = row[col_list].copy() filtered_data = df[(df['lon_acc_roc'] >= -5) & (df['lon_acc_roc'] <= 6)] count_lon_acc_roc_in_range = count_lon_acc_roc_in_range + len(filtered_data) total_lon_acc_roc_frames = total_lon_acc_roc_frames + len(df) self.percentage_lon_acc_roc = (count_lon_acc_roc_in_range / total_lon_acc_roc_frames) * 100 def get_delay_time_cruise(self): """ 计算巡航速度改变后的延迟时间。 Args: 无。 Returns: 无返回值,但会设置self.delay_time为计算得到的延迟时间(秒)。 Raises: 无特定异常,但会捕获并打印任何发生的异常信息。 """ try: change_indices = self.df[self.df['set_cruise_speed'] != self.df['set_cruise_speed'].shift()].index print(f"Change indices of set speed: {change_indices}") if not change_indices.empty: first_change_index = change_indices[change_indices != 0].min() set_cruise_speed_at_change = self.df.loc[first_change_index, 'set_cruise_speed'] timestamp_at_change = self.df.loc[first_change_index, 'simTime'] print(f"Set speed at first change: {set_cruise_speed_at_change}, Timestamp: {timestamp_at_change}") target_speed = (self.stable_average_speed + self.df.loc[first_change_index, 'speedX']) / 2 closest_index = (self.df['speedX'] - target_speed).abs().idxmin() closest_current_speed = self.df.loc[closest_index, 'speedX'] closest_timestamp = self.df.loc[closest_index, 'simTime'] print(f"Closest speed: {closest_current_speed} at time: {closest_timestamp}") self.delay_time_cruise = closest_timestamp - timestamp_at_change print(f"Delay time: {self.delay_time_cruise}") else: print("No valid change point for further calculation.") except Exception as e: print(f"An error occurred: {e}") def get_delay_time_THW(self): """ 计算THW改变后的延迟时间。 Args: 无。 Returns: 无返回值,但会设置self.delay_time为计算得到的延迟时间(秒)。 Raises: 无特定异常,但会捕获并打印任何发生的异常信息。 """ try: change_indices = self.df[self.df['set_headway_time'] != self.df['set_headway_time'].shift()].index print(f"Change indices of set speed: {change_indices}") if not change_indices.empty: first_change_index = change_indices[change_indices != 0].min() set_THW_at_change = self.df.loc[first_change_index, 'set_headway_time'] timestamp_at_change = self.df.loc[first_change_index, 'simTime'] print(f"Set THW at first change: {set_THW_at_change}, Timestamp: {timestamp_at_change}") target_THW = (self.stable_average_THW + self.df.loc[first_change_index, 'set_headway_time']) / 2 closest_index = (self.df['set_headway_time'] - target_THW).abs().idxmin() closest_current_THW = self.df.loc[closest_index, 'set_headway_time'] closest_timestamp = self.df.loc[closest_index, 'simTime'] print(f"Closest speed: {closest_current_THW} at time: {closest_timestamp}") self.delay_time_THW = closest_timestamp - timestamp_at_change print(f"Delay time: {self.delay_time_THW}") else: print("No valid change point for further calculation.") except Exception as e: print(f"An error occurred: {e}") def get_rise_time_cruise(self): """ 计算巡航时从初速度到达稳定平均速度的90%的所需时间(rise time)。 Args: 无。 Returns: 无返回值,但会设置实例属性self.rise_time为计算得到的rise time。 """ first_change_index = self._get_first_change_index_cruise() initial_speed = self.df.loc[first_change_index, 'speedX'] target_speed_90 = initial_speed + (self.stable_average_speed - initial_speed) * 0.9 target_speed_10 = initial_speed + (self.stable_average_speed - initial_speed) * 0.1 timestamp_at_10 = self._find_closest_time_stamp_cruise(self.df, target_speed_10, first_change_index) timestamp_at_90 = self._find_closest_time_stamp_cruise(self.df, target_speed_90, first_change_index) print(f"Closest speed at 10% range from set speed: {timestamp_at_10}") print(f"Closest speed at 90% range from set speed: {timestamp_at_90}") self.rise_time_cruise = timestamp_at_90 - timestamp_at_10 print(f"Rise time: {self.rise_time_cruise}") def get_rise_time_THW(self): """ 计算巡航时从初THW到达稳定THW的90%的所需时间(rise time)。 Args: 无。 Returns: 无返回值,但会设置实例属性self.rise_time_THW为计算得到的rise time。 """ first_change_index = self._get_first_change_index_THW() initial_THW = self.df.loc[first_change_index, 'THW'] target_THW_90 = initial_THW + (self.stable_average_THW - initial_THW) * 0.9 target_THW_10 = initial_THW + (self.stable_average_THW - initial_THW) * 0.1 timestamp_at_10 = self._find_closest_time_stamp_THW(self.df, target_THW_10, first_change_index) timestamp_at_90 = self._find_closest_time_stamp_THW(self.df, target_THW_90, first_change_index) print(f"Closest speed at 10% range from set speed: {timestamp_at_10}") print(f"Closest speed at 90% range from set speed: {timestamp_at_90}") self.rise_time_THW = timestamp_at_90 - timestamp_at_10 print(f"Rise time: {self.rise_time_THW}") def get_peak_time_cruise(self): """ 计算巡航阶段速度峰值时间。 Args: 无。 Returns: 无返回值,但将计算得到的巡航阶段速度峰值时间(从首次变化时间开始计时)存储在实例变量 self.peak_time 中。 """ first_change_index = self._get_first_change_index_cruise() start_time = self.df.loc[first_change_index, 'simTime'] peak_time = self.df.loc[self.df['speedX'].idxmax(), 'simTime'] self.peak_time_cruise = peak_time - start_time def get_peak_time_THW(self): """ 计算THW阶段速度峰值时间。 Args: 无。 Returns: 无返回值,但将计算得到的巡航阶段速度峰值时间(从首次变化时间开始计时)存储在实例变量 self.peak_time 中。 """ first_change_index = self._get_first_change_index_THW() start_time = self.df.loc[first_change_index, 'simTime'] peak_time = self.df.loc[self.df['THW'].idxmax(), 'simTime'] self.peak_time_THW = peak_time - start_time def get_overshoot_cruise(self): """ 计算超速巡航超调量。 Args: 无。 Returns: 无返回值,但会更新类的成员变量 `overshoot`。 """ first_change_index = self._get_first_change_index_cruise() initial_speed = self.df.loc[first_change_index, 'speedX'] if initial_speed > self.stable_average_speed: self.overshoot_cruise = (self.stable_average_speed - self.df[ 'speedX'].min()) * 100 / self.stable_average_speed elif initial_speed < self.stable_average_speed: self.overshoot_cruise = (self.df[ 'speedX'].max() - self.stable_average_speed) * 100 / self.stable_average_speed else: self.overshoot_cruise = 0 def get_overshoot_THW(self): """ 计算超速THW超调量。 Args: 无。 Returns: 无返回值,但会更新类的成员变量 `overshoot`。 """ first_change_index = self._get_first_change_index_THW() initial_THW = self.df.loc[first_change_index, 'THW'] if initial_THW > self.stable_average_THW: self.overshoot_THW = (self.stable_average_THW - self.df['THW'].min()) * 100 / self.stable_average_THW elif initial_THW < self.stable_average_THW: self.overshoot_THW = (self.df['THW'].max() - self.stable_average_THW) * 100 / self.stable_average_THW else: self.overshoot_THW = 0 def get_steady_error_cruise(self): """ 计算巡航稳态误差。 Args: 无参数。 Returns: 无返回值,但会更新对象的steady_error属性。 Raises: ValueError: 如果stable_average_speed为None时,会抛出此异常。 """ first_change_index = self._get_first_change_index_cruise() set_cruise_speed = self.df.loc[first_change_index, 'set_cruise_speed'] if self.stable_average_speed: self.steady_error_cruise = (self.stable_average_speed - set_cruise_speed) * 100 / self.stable_average_speed else: raise ValueError("Stable average speed is None.") def get_steady_error_THW(self): """ 计算THW稳态误差。 Args: 无参数。 Returns: 无返回值,但会更新对象的steady_error属性。 Raises: ValueError: 如果stable_average_THW为None时 ,会抛出此异常。 """ first_change_index = self._get_first_change_index_THW() set_THW = self.df.loc[first_change_index, 'set_headway_time'] if self.stable_average_THW: self.steady_error_THW = (self.stable_average_THW - set_THW) * 100 / self.stable_average_THW else: raise ValueError("Stable average THW is None.") def get_target_recongnition_cut_in_time_mid(self): """ 获取目标识别切入时间中间值。 Args: 无参数。 Returns: float: 目标识别切入时间中点(单位:秒),如果未获取到则返回None。 """ pass def get_target_recongnition_cut_in_time_SD(self): """ 获取目标识别切入时间(SD)标准差 Args: 无 Returns: 目标识别切入时间(SD)的值,若函数无具体返回值则无返回(在此情况下,该函数被标记为 pass) """ pass def get_target_recongnition_cut_off_time_mid(self): """ 获取目标识别切出时间中间值。 Args: 无参数。 Returns: float: 目标识别切出时间中点(单位:秒),如果未获取到则返回None。 """ pass def get_target_recongnition_cut_off_time_SD(self): """ 获取目标识别切出时间(SD)标准差 Args: 无 Returns: 目标识别切出时间(SD)的值,若函数无具体返回值则无返回(在此情况下,该函数被标记为 pass) """ pass def get_stop_time(self): pass def Compute_nearby_distance_to_lane_boundary(self, x, width_ego): """ 计算自车到车道边界线的距离,需要减去自车宽度 Args: x.lateralDist :到边界线距离 width_ego:车宽 Returns: 自车到车道边界线的距离 """ if x.lateralDist < abs(x.right_lateral_distance): return x.lateralDist - width_ego / 2 else: return abs(x.right_lateral_distance) - width_ego / 2 def func_laneOffset_abs(self, x): """ 计算自车横向偏移量的绝对值 Args: x.laneOffset :横向偏移量 Returns: 横向偏移量的绝对值 """ return abs(x.laneOffset) def func_center_line_cycle_optical(self, l): """ 计算偏离车道中心线的振荡周期和极差,极大值要大于0,极小值要小于0 Args: l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2] Returns: cycle:振荡周期数 maximum_range:最大极差 """ # print("\n穿过y=0的周期数: ") length = len(l) number_peak = 0 number_trough = 0 # 生成新的、只含有波峰波谷的列表 new_list = [] plus = [] minus = [] for number, value in enumerate(l): if number == 0: if value > l[number + 1] and value > 0: number_peak += 1 plus.append(value) if value < l[number + 1] and value < 0: number_trough += 1 minus.append(value) continue if number == length - 1: if value > l[number - 1] and value > 0: number_peak += 1 plus.append(value) if len(minus) != 0: new_list.append(min(minus)) minus = [] if value < l[number - 1] and value < 0: number_trough += 1 minus.append(value) if len(plus) != 0: new_list.append(max(plus)) plus = [] if len(minus) != 0: new_list.append(min(minus)) minus = [] if len(plus) != 0: new_list.append(max(plus)) plus = [] continue if value >= l[number - 1] and value > l[number + 1] and value > 0: number_peak += 1 plus.append(value) if len(minus) != 0: new_list.append(min(minus)) minus = [] if value < l[number - 1] and value <= l[number + 1] and value < 0: number_trough += 1 minus.append(value) if len(plus) != 0: new_list.append(max(plus)) plus = [] cycle = min(number_peak, number_trough) - 1 difference_list = [] for i in range(len(new_list) - 1): difference = abs(new_list[i] - new_list[i + 1]) difference_list.append(difference) if len(difference_list) == 0: maximum_range = -1 # print(f"极差: {maximum_range}") else: maximum_range = max(difference_list) # print(f"极差: {maximum_range}") return cycle, maximum_range def func_normal_cycle_optical(self, l): """ 计算振荡周期和极差,两个相邻的极大值(或极小值)算一次振荡 Args: l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2] Returns: cycle:振荡周期数 maximum_range:最大极差 """ # print("正常的周期数: ") length = len(l) number_peak = 0 number_trough = 0 # 生成新的、只含有波峰波谷的列表 new_list = [] for number, value in enumerate(l): if number == 0: if value > l[number + 1]: number_peak += 1 new_list.append(value) if value < l[number + 1]: number_trough += 1 new_list.append(value) continue if number == length - 1: if value > l[number - 1]: number_peak += 1 new_list.append(value) if value < l[number - 1]: number_trough += 1 new_list.append(value) continue if value >= l[number - 1] and value > l[number + 1]: number_peak += 1 new_list.append(value) if value < l[number - 1] and value <= l[number + 1]: number_trough += 1 new_list.append(value) if abs(number_peak - number_trough) > 1: pass else: cycle = max(number_peak, number_trough) - 1 difference_list = [] for i in range(len(new_list) - 1): difference = abs(new_list[i] - new_list[i + 1]) difference_list.append(difference) if len(difference_list) == 0: maximum_range = -1 # print(f"极差: {maximum_range}") else: maximum_range = max(difference_list) # print(f"极差: {maximum_range}") return cycle, maximum_range def _lateral_control01_minimum_distance_nearby_lane(self): """ 横向控制01指标:计算离近侧车道线最小距离(m) Args: Returns: None """ roadMark_df = self.roadMark_df player_df = self.df ego_df = player_df[player_df.playerId == 1] width_ego = ego_df['dimY'].values.tolist()[0] # 提取距离左车道线和右车道线距离 roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True) roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True) roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist'] # 计算到车道边界线距离 roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply( lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1) nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary']) self.lateral_control01_minimum_distance_nearby_lane = round(nearby_distance_to_lane_boundary, 3) def _lateral_control02_maximum_lateral_offset(self): """ 横向控制02指标:计算最大横向偏移量(m) Args: Returns: None """ roadPos_df = self.roadPos_df # 提取自车的roadPos数据 roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) # 计算横向偏移量的绝对值 roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax() row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control02_maximum_lateral_offset = round(row_with_max_value, 3) def _lateral_control03_relative_lateral_offset_distribution_expectation(self): """ 横向控制03指标:计算相对横向偏移量分布期望(m) Args: Returns: None """ # 提取自车宽度 roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) mean_laneOffset_index = roadPos_ego_df['laneOffset'].mean() self.lateral_control03_relative_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3) def _lateral_control04_relative_lateral_offset_distribution_standard_deviation(self): """ 横向控制04指标:计算相对横向偏移量分布标准差(m) Args: Returns: None """ # 提取自车宽度 roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) mean_laneOffset_index = roadPos_ego_df['laneOffset'].std() self.lateral_control04_relative_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3) def _lateral_control05_absolute_lateral_offset_distribution_expectation(self): """ 横向控制05指标:计算绝对横向偏移量分布期望(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \ lambda x: self.func_laneOffset_abs(x), axis=1) mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].mean() self.lateral_control05_absolute_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3) def _lateral_control06_absolute_lateral_offset_distribution_standard_deviation(self): """ 横向控制06指标:计算绝对横向偏移量分布标准差(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \ lambda x: self.func_laneOffset_abs(x), axis=1) mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].std() self.lateral_control06_absolute_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3) def _lateral_control07_maximum_lateral_deviation(self): """ 横向控制07指标:横向距离极大值(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax() row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control07_maximum_lateral_deviation = row_with_max_value def _lateral_control08_minimum_lateral_deviation(self): """ 横向控制08指标:横向距离极小值(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmin() row_with_min_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.lateral_control08_minimum_lateral_deviation = row_with_min_value def _lateral_control09_absolute_position_oscillation_frequency(self): """ 横向控制09指标:横向绝对位置振荡频率(Hz) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() cycle_number, _ = self.func_center_line_cycle_optical(roadPos_ego_list) self.lateral_control09_absolute_position_oscillation_frequency = round(cycle_number, 3) def _lateral_control10_absolute_position_oscillation_difference(self): """ 横向控制10指标:横向绝对位置振荡极差(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() _, maximum_range = self.func_center_line_cycle_optical(roadPos_ego_list) self.lateral_control10_absolute_position_oscillation_difference = round(maximum_range, 3) def _lateral_control11_maximum_heading_deviation(self): """ 横向控制11指标:最大偏航角 Args: Returns: None """ player_df = self.df road_mark_df = self.roadMark_df ego_df = player_df[player_df.playerId == 1].reset_index(drop=True) # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减 road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True) road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True) road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor'] road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor'] road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \ lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1) ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle'] ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \ lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) row_with_max_value = max(ego_df['heading_deviation_abs']) self.lateral_control11_maximum_heading_deviation = row_with_max_value def _lateral_control12_relative_position_oscillation_frequency(self): """ 横向控制12指标:横向相对位置振荡频率(Hz) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() cycle_number, _ = self.func_normal_cycle_optical(roadPos_ego_list) self.lateral_control12_relative_position_oscillation_frequency = round(cycle_number, 3) def _lateral_control13_relative_position_oscillation_difference(self): """ 横向控制13指标:横向相对位置振荡极差(m) Args: Returns: None """ roadPos_df = self.roadPos_df roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist() _, maximum_range = self.func_normal_cycle_optical(roadPos_ego_list) self.lateral_control13_relative_position_oscillation_difference = round(maximum_range, 3)