|
- import pandas as pd
- import numpy as np
- class ICA:
- def __init__(self, df):
- self.df = df #object_df
- self.segmented_data = []
- """
- 注释:zhangyu 0626 新增
- """
- self.roadPos_df = None
- self.roadMark_df = None
- """
- 纵向指标
- """
- self.stable_start_time_cruise = None
- self.stable_average_speed = None
- self.percentage_accel = None
- self.percentage_lon_acc_roc = None
- self.delay_time_cruise = None
- self.delay_time_THW = None
- self.rise_time_cruise = None
- self.rise_time_THW = None
- self.peak_time_cruise = None
- self.peak_time_THW = None
- self.overshoot_cruise = None
- self.overshoot_THW = None
- self.settling_time = None
- self.steady_error_cruise = None
- self.steady_error_THW = None
- self.stable_start_time_THW = None
- self.stable_average_THW = None
- """
- 横向指标
- """
- self.lateral_control01_minimum_distance_nearby_lane = None
- self.lateral_control02_maximum_lateral_offset = None
- self.lateral_control03_relative_lateral_offset_distribution_expectation = None
- self.lateral_control04_relative_lateral_offset_distribution_standard_deviation = None
- self.lateral_control05_absolute_lateral_offset_distribution_expectation = None
- self.lateral_control06_absolute_lateral_offset_distribution_standard_deviation = None
- self.lateral_control07_maximum_lateral_deviation = None
- self.lateral_control08_minimum_lateral_deviation = None
- self.lateral_control09_absolute_position_oscillation_frequency = None
- self.lateral_control10_absolute_position_oscillation_difference = None
- self.lateral_control11_maximum_heading_deviation = None
- self.lateral_control12_relative_position_oscillation_frequency = None
- self.lateral_control13_relative_position_oscillation_difference = None
- def _segment_data_by_status(self, status_name, status_value):
- """
- 根据状态名称和状态值对DataFrame中的数据进行分段处理。
-
- Args:
- status_name (str): DataFrame中用于标识状态的列名。
- status_value (Any): 需要进行分段处理的状态值。
-
- Returns:
- None
-
- Raises:
- KeyError: 如果DataFrame中不存在'ACC_status'或'simTime'列时,抛出KeyError异常。
-
- """
- if 'ACC_status' not in self.df.columns:
- raise KeyError("'ACC_status' column not found in the dataframe.")
- if 'simTime' not in self.df.columns:
- raise KeyError("'simTime' column not found in the dataframe.")
- filtered_data = self.df[self.df[status_name] == status_value]
- current_segment = []
- for index, row in filtered_data.iterrows():
- if not current_segment or abs(float(row['simTime']) - float(current_segment[-1]['simTime'])) <= 0.01:
- current_segment.append(row)
- else:
- self.segmented_data.append(pd.DataFrame(current_segment))
- current_segment = [row]
- if current_segment:
- self.segmented_data.append(pd.DataFrame(current_segment))
- def _get_headway_time(self, df):
- pass
- def _find_stable_speed_cruise(self, window_size, percent_deviation, set_value):
- """
- 在给定的速度数据中查找稳定的巡航速度段,并计算该段的平均速度。
-
- Args:
- window_size (int): 滑动窗口的大小,表示用于计算平均速度的速度数据点数量。
- percent_deviation (float): 设定值的允许偏差百分比。
- set_value (float): 期望的稳定速度设定值。
-
- Returns:
- None
-
- """
- speed_data = self.df['speedX'].values
- deviation = set_value * (percent_deviation / 100)
- stable_start = None
- stable_average_speed = None
- for i in range(len(speed_data) - window_size + 1):
- window_data = speed_data[i:i + window_size]
- if all(set_value - deviation <= s <= set_value + deviation for s in window_data):
- if stable_start is None:
- stable_start = i
- stable_end = i + window_size - 1
- stable_average_speed = np.mean(window_data)
- j = i + window_size
- while j < len(speed_data) - window_size + 1:
- next_window_data = speed_data[j:j + window_size]
- if all(set_value - deviation <= s <= set_value + deviation for s in next_window_data):
- stable_end = j + window_size - 1
- stable_average_speed = (stable_average_speed * (j - stable_start) + sum(next_window_data)) / (
- j - stable_start + window_size)
- j += window_size
- else:
- stable_start = j + window_size - 1
- stable_end = i + window_size - 1
- stable_average_speed = np.mean(window_data)
- break
- self.stable_start_time_cruise = self.df['simTime'].iloc[stable_start]
- self.stable_average_speed = stable_average_speed
- def _find_stable_THW(self, window_size, percent_deviation, set_value):
- """
- 在给定的数据窗口中查找稳定跟车时距离THW,并计算该段内THW的平均值。
-
- Args:
- window_size (int): 窗口大小,表示在数据中寻找稳定段时考虑的连续数据点数量。
- percent_deviation (float): THW值相对于设定值的允许偏差百分比。
- set_value (float): THW的设定值。
-
- Returns:
- None
-
- """
- THW = self.df['THW'].values
- deviation = set_value * (percent_deviation / 100)
- stable_start = None
- stable_average_THW = None
- for i in range(len(THW) - window_size + 1):
- window_data = THW[i:i + window_size]
- if all(set_value - deviation <= s <= set_value + deviation for s in window_data):
- if stable_start is None:
- stable_start = i
- stable_end = i + window_size - 1
- stable_average_THW = np.mean(window_data)
- j = i + window_size
- while j < len(THW) - window_size + 1:
- next_window_data = THW[j:j + window_size]
- if all(set_value - deviation <= s <= set_value + deviation for s in next_window_data):
- stable_end = j + window_size - 1
- stable_average_THW = (stable_average_THW * (j - stable_start) + sum(next_window_data)) / (
- j - stable_start + window_size)
- j += window_size
- else:
- stable_start = j + window_size - 1
- stable_end = i + window_size - 1
- stable_average_THW = np.mean(window_data)
- break
- self.stable_start_time_THW = self.df['simTime'].iloc[stable_start]
- self.stable_average_THW = stable_average_THW
- def _find_closest_time_stamp_cruise(self, df, target_speed, start_index):
- """
- 在给定的数据帧df中,从start_index索引位置开始,查找与目标速度target_speed最接近的时间戳。
-
- Args:
- df (pd.DataFrame): 包含速度和时间戳等信息的数据帧,需要至少包含'speedX'和'simTime'两列。
- target_speed (float): 目标速度值,用于在数据帧中查找最接近此值的时间戳。
- start_index (int): 开始查找的索引位置,即在数据帧df中从该索引位置开始向后查找。
-
- Returns:
- pd.Timestamp: 与目标速度最接近的时间戳。
-
- """
- subset = df.loc[start_index + 1:]
- speed_diff = np.abs(subset['speedX'] - target_speed)
- closest_index = speed_diff.idxmin()
- closest_timestamp = subset.loc[closest_index, 'simTime']
- return closest_timestamp
- def _find_closest_time_stamp_THW(self, df, target_THW, start_index):
- """
- 在DataFrame中找到与目标THW值最接近的时间戳。
-
- Args:
- df (pandas.DataFrame): 包含'THW'和'simTime'列的DataFrame,其中'THW'表示目标变量,'simTime'表示时间戳。
- target_THW (float): 目标THW值。
- start_index (int): 开始搜索的索引位置(不包含)。
-
- Returns:
- pandas.Timestamp: 与目标THW值最接近的时间戳。
-
- """
- subset = df.loc[start_index + 1:]
- THW_diff = np.abs(subset['THW'] - target_THW)
- closest_index = THW_diff.idxmin()
- closest_timestamp = subset.loc[closest_index, 'simTime']
- return closest_timestamp
- def _get_first_change_index_cruise(self):
- """
- 获取数据集中第一次巡航速度发生变化的索引。
-
- Args:
- 无参数。
-
- Returns:
- Union[int, None]: 如果存在巡航速度发生变化的索引,则返回第一个发生变化的索引(int类型);
- 如果不存在,则返回None。
-
- """
- change_indices = self.df[self.df['set_cruise_speed'] != self.df['set_cruise_speed'].shift()].index
- if not change_indices.empty:
- first_change_index = change_indices.min()
- else:
- first_change_index = None
- return first_change_index
- def _get_first_change_index_THW(self):
- """
- 获取DataFrame中'set_headway_time'列首次发生变化的索引值。
-
- Args:
- 无参数。
-
- Returns:
- Union[int, None]: 如果存在变化,则返回首次发生变化的索引值(int类型),否则返回None。
-
- """
- change_indices = self.df[self.df['set_headway_time'] != self.df['set_headway_time'].shift()].index
- if not change_indices.empty:
- first_change_index = change_indices.min()
- else:
- first_change_index = None
- return first_change_index
- def calculate_reasonable_acceleration_percentage(self):
- """
- 计算合理的加速度百分比。
-
- Args:
- 无。
- Returns:
- 无返回值,但会更新对象的 percentage_accel 属性,表示合理的加速度百分比(单位:%)。
-
- """
- df = self.df_follow.copy()
- col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'accel', 'lon_acc_roc'] # target_id
- df = df[col_list].copy()
- count_accel_in_range = 0
- total_accel_frames = 0
- ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'accel', 'lon_acc_roc']]
- for row in self.segmented_data:
- df = row[col_list].copy()
- filtered_data = df[(df['accel'] >= -5) & (df['accel'] <= 4)]
- count_accel_in_range = count_accel_in_range + len(filtered_data)
- total_accel_frames = total_accel_frames + len(df)
- self.percentage_accel = (count_accel_in_range / total_accel_frames) * 100
- print(f"percentage_accel: {self.percentage_accel}")
- def calculate_reasonable_acceleration_change_rate_percentage(self):
- """
- 计算纵向加速度变化率(lon_acc_roc)的合理变化率百分比。
-
- Args:
- 无参数。
-
- Returns:
- 无返回值,但会更新对象的 percentage_lon_acc_roc 属性。
-
- """
- df = self.df_follow.copy()
- col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'accel', 'lon_acc_roc'] # target_id
- df = df[col_list].copy()
- count_lon_acc_roc_in_range = 0
- total_lon_acc_roc_frames = 0
- for row in self.segmented_data:
- df = row[col_list].copy()
- filtered_data = df[(df['lon_acc_roc'] >= -5) & (df['lon_acc_roc'] <= 6)]
- count_lon_acc_roc_in_range = count_lon_acc_roc_in_range + len(filtered_data)
- total_lon_acc_roc_frames = total_lon_acc_roc_frames + len(df)
- self.percentage_lon_acc_roc = (count_lon_acc_roc_in_range / total_lon_acc_roc_frames) * 100
- def get_delay_time_cruise(self):
- """
- 计算巡航速度改变后的延迟时间。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会设置self.delay_time为计算得到的延迟时间(秒)。
-
- Raises:
- 无特定异常,但会捕获并打印任何发生的异常信息。
-
- """
- try:
- change_indices = self.df[self.df['set_cruise_speed'] != self.df['set_cruise_speed'].shift()].index
- print(f"Change indices of set speed: {change_indices}")
- if not change_indices.empty:
- first_change_index = change_indices[change_indices != 0].min()
- set_cruise_speed_at_change = self.df.loc[first_change_index, 'set_cruise_speed']
- timestamp_at_change = self.df.loc[first_change_index, 'simTime']
- print(f"Set speed at first change: {set_cruise_speed_at_change}, Timestamp: {timestamp_at_change}")
- target_speed = (self.stable_average_speed + self.df.loc[first_change_index, 'speedX']) / 2
- closest_index = (self.df['speedX'] - target_speed).abs().idxmin()
- closest_current_speed = self.df.loc[closest_index, 'speedX']
- closest_timestamp = self.df.loc[closest_index, 'simTime']
- print(f"Closest speed: {closest_current_speed} at time: {closest_timestamp}")
- self.delay_time_cruise = closest_timestamp - timestamp_at_change
- print(f"Delay time: {self.delay_time_cruise}")
- else:
- print("No valid change point for further calculation.")
- except Exception as e:
- print(f"An error occurred: {e}")
- def get_delay_time_THW(self):
- """
- 计算THW改变后的延迟时间。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会设置self.delay_time为计算得到的延迟时间(秒)。
-
- Raises:
- 无特定异常,但会捕获并打印任何发生的异常信息。
-
- """
- try:
- change_indices = self.df[self.df['set_headway_time'] != self.df['set_headway_time'].shift()].index
- print(f"Change indices of set speed: {change_indices}")
- if not change_indices.empty:
- first_change_index = change_indices[change_indices != 0].min()
- set_THW_at_change = self.df.loc[first_change_index, 'set_headway_time']
- timestamp_at_change = self.df.loc[first_change_index, 'simTime']
- print(f"Set THW at first change: {set_THW_at_change}, Timestamp: {timestamp_at_change}")
- target_THW = (self.stable_average_THW + self.df.loc[first_change_index, 'set_headway_time']) / 2
- closest_index = (self.df['set_headway_time'] - target_THW).abs().idxmin()
- closest_current_THW = self.df.loc[closest_index, 'set_headway_time']
- closest_timestamp = self.df.loc[closest_index, 'simTime']
- print(f"Closest speed: {closest_current_THW} at time: {closest_timestamp}")
- self.delay_time_THW = closest_timestamp - timestamp_at_change
- print(f"Delay time: {self.delay_time_THW}")
- else:
- print("No valid change point for further calculation.")
- except Exception as e:
- print(f"An error occurred: {e}")
- def get_rise_time_cruise(self):
- """
- 计算巡航时从初速度到达稳定平均速度的90%的所需时间(rise time)。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会设置实例属性self.rise_time为计算得到的rise time。
-
- """
- first_change_index = self._get_first_change_index_cruise()
- initial_speed = self.df.loc[first_change_index, 'speedX']
- target_speed_90 = initial_speed + (self.stable_average_speed - initial_speed) * 0.9
- target_speed_10 = initial_speed + (self.stable_average_speed - initial_speed) * 0.1
- timestamp_at_10 = self._find_closest_time_stamp_cruise(self.df, target_speed_10, first_change_index)
- timestamp_at_90 = self._find_closest_time_stamp_cruise(self.df, target_speed_90, first_change_index)
- print(f"Closest speed at 10% range from set speed: {timestamp_at_10}")
- print(f"Closest speed at 90% range from set speed: {timestamp_at_90}")
- self.rise_time_cruise = timestamp_at_90 - timestamp_at_10
- print(f"Rise time: {self.rise_time_cruise}")
- def get_rise_time_THW(self):
- """
- 计算巡航时从初THW到达稳定THW的90%的所需时间(rise time)。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会设置实例属性self.rise_time_THW为计算得到的rise time。
-
- """
- first_change_index = self._get_first_change_index_THW()
- initial_THW = self.df.loc[first_change_index, 'THW']
- target_THW_90 = initial_THW + (self.stable_average_THW - initial_THW) * 0.9
- target_THW_10 = initial_THW + (self.stable_average_THW - initial_THW) * 0.1
- timestamp_at_10 = self._find_closest_time_stamp_THW(self.df, target_THW_10, first_change_index)
- timestamp_at_90 = self._find_closest_time_stamp_THW(self.df, target_THW_90, first_change_index)
- print(f"Closest speed at 10% range from set speed: {timestamp_at_10}")
- print(f"Closest speed at 90% range from set speed: {timestamp_at_90}")
- self.rise_time_THW = timestamp_at_90 - timestamp_at_10
- print(f"Rise time: {self.rise_time_THW}")
- def get_peak_time_cruise(self):
- """
- 计算巡航阶段速度峰值时间。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但将计算得到的巡航阶段速度峰值时间(从首次变化时间开始计时)存储在实例变量 self.peak_time 中。
-
- """
- first_change_index = self._get_first_change_index_cruise()
- start_time = self.df.loc[first_change_index, 'simTime']
- peak_time = self.df.loc[self.df['speedX'].idxmax(), 'simTime']
- self.peak_time_cruise = peak_time - start_time
- def get_peak_time_THW(self):
- """
- 计算THW阶段速度峰值时间。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但将计算得到的巡航阶段速度峰值时间(从首次变化时间开始计时)存储在实例变量 self.peak_time 中。
-
- """
- first_change_index = self._get_first_change_index_THW()
- start_time = self.df.loc[first_change_index, 'simTime']
- peak_time = self.df.loc[self.df['THW'].idxmax(), 'simTime']
- self.peak_time_THW = peak_time - start_time
- def get_overshoot_cruise(self):
- """
- 计算超速巡航超调量。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会更新类的成员变量 `overshoot`。
-
- """
- first_change_index = self._get_first_change_index_cruise()
- initial_speed = self.df.loc[first_change_index, 'speedX']
- if initial_speed > self.stable_average_speed:
- self.overshoot_cruise = (self.stable_average_speed - self.df[
- 'speedX'].min()) * 100 / self.stable_average_speed
- elif initial_speed < self.stable_average_speed:
- self.overshoot_cruise = (self.df[
- 'speedX'].max() - self.stable_average_speed) * 100 / self.stable_average_speed
- else:
- self.overshoot_cruise = 0
- def get_overshoot_THW(self):
- """
- 计算超速THW超调量。
-
- Args:
- 无。
-
- Returns:
- 无返回值,但会更新类的成员变量 `overshoot`。
-
- """
- first_change_index = self._get_first_change_index_THW()
- initial_THW = self.df.loc[first_change_index, 'THW']
- if initial_THW > self.stable_average_THW:
- self.overshoot_THW = (self.stable_average_THW - self.df['THW'].min()) * 100 / self.stable_average_THW
- elif initial_THW < self.stable_average_THW:
- self.overshoot_THW = (self.df['THW'].max() - self.stable_average_THW) * 100 / self.stable_average_THW
- else:
- self.overshoot_THW = 0
- def get_steady_error_cruise(self):
- """
- 计算巡航稳态误差。
-
- Args:
- 无参数。
-
- Returns:
- 无返回值,但会更新对象的steady_error属性。
-
- Raises:
- ValueError: 如果stable_average_speed为None时,会抛出此异常。
-
- """
- first_change_index = self._get_first_change_index_cruise()
- set_cruise_speed = self.df.loc[first_change_index, 'set_cruise_speed']
- if self.stable_average_speed:
- self.steady_error_cruise = (self.stable_average_speed - set_cruise_speed) * 100 / self.stable_average_speed
- else:
- raise ValueError("Stable average speed is None.")
- def get_steady_error_THW(self):
- """
- 计算THW稳态误差。
-
- Args:
- 无参数。
-
- Returns:
- 无返回值,但会更新对象的steady_error属性。
-
- Raises:
- ValueError: 如果stable_average_THW为None时 ,会抛出此异常。
-
- """
- first_change_index = self._get_first_change_index_THW()
- set_THW = self.df.loc[first_change_index, 'set_headway_time']
- if self.stable_average_THW:
- self.steady_error_THW = (self.stable_average_THW - set_THW) * 100 / self.stable_average_THW
- else:
- raise ValueError("Stable average THW is None.")
- def get_target_recongnition_cut_in_time_mid(self):
- """
- 获取目标识别切入时间中间值。
-
- Args:
- 无参数。
-
- Returns:
- float: 目标识别切入时间中点(单位:秒),如果未获取到则返回None。
-
- """
- pass
- def get_target_recongnition_cut_in_time_SD(self):
- """
- 获取目标识别切入时间(SD)标准差
-
- Args:
- 无
-
- Returns:
- 目标识别切入时间(SD)的值,若函数无具体返回值则无返回(在此情况下,该函数被标记为 pass)
-
- """
- pass
- def get_target_recongnition_cut_off_time_mid(self):
- """
- 获取目标识别切出时间中间值。
-
- Args:
- 无参数。
-
- Returns:
- float: 目标识别切出时间中点(单位:秒),如果未获取到则返回None。
-
- """
- pass
- def get_target_recongnition_cut_off_time_SD(self):
- """
- 获取目标识别切出时间(SD)标准差
-
- Args:
- 无
-
- Returns:
- 目标识别切出时间(SD)的值,若函数无具体返回值则无返回(在此情况下,该函数被标记为 pass)
-
- """
- pass
- def get_stop_time(self):
- pass
- def Compute_nearby_distance_to_lane_boundary(self, x, width_ego):
- """
- 计算自车到车道边界线的距离,需要减去自车宽度
- Args:
- x.lateralDist :到边界线距离
- width_ego:车宽
- Returns:
- 自车到车道边界线的距离
- """
- if x.lateralDist < abs(x.right_lateral_distance):
- return x.lateralDist - width_ego / 2
- else:
- return abs(x.right_lateral_distance) - width_ego / 2
- def func_laneOffset_abs(self, x):
- """
- 计算自车横向偏移量的绝对值
- Args:
- x.laneOffset :横向偏移量
- Returns:
- 横向偏移量的绝对值
- """
- return abs(x.laneOffset)
- def func_center_line_cycle_optical(self, l):
- """
- 计算偏离车道中心线的振荡周期和极差,极大值要大于0,极小值要小于0
- Args:
- l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
- Returns:
- cycle:振荡周期数
- maximum_range:最大极差
- """
- # print("\n穿过y=0的周期数: ")
- length = len(l)
- number_peak = 0
- number_trough = 0
- # 生成新的、只含有波峰波谷的列表
- new_list = []
- plus = []
- minus = []
- for number, value in enumerate(l):
- if number == 0:
- if value > l[number + 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if value < l[number + 1] and value < 0:
- number_trough += 1
- minus.append(value)
- continue
- if number == length - 1:
- if value > l[number - 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if value < l[number - 1] and value < 0:
- number_trough += 1
- minus.append(value)
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- continue
- if value >= l[number - 1] and value > l[number + 1] and value > 0:
- number_peak += 1
- plus.append(value)
- if len(minus) != 0:
- new_list.append(min(minus))
- minus = []
- if value < l[number - 1] and value <= l[number + 1] and value < 0:
- number_trough += 1
- minus.append(value)
- if len(plus) != 0:
- new_list.append(max(plus))
- plus = []
- cycle = min(number_peak, number_trough) - 1
- difference_list = []
- for i in range(len(new_list) - 1):
- difference = abs(new_list[i] - new_list[i + 1])
- difference_list.append(difference)
- if len(difference_list) == 0:
- maximum_range = -1
- # print(f"极差: {maximum_range}")
- else:
- maximum_range = max(difference_list)
- # print(f"极差: {maximum_range}")
- return cycle, maximum_range
- def func_normal_cycle_optical(self, l):
- """
- 计算振荡周期和极差,两个相邻的极大值(或极小值)算一次振荡
- Args:
- l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
- Returns:
- cycle:振荡周期数
- maximum_range:最大极差
- """
- # print("正常的周期数: ")
- length = len(l)
- number_peak = 0
- number_trough = 0
- # 生成新的、只含有波峰波谷的列表
- new_list = []
- for number, value in enumerate(l):
- if number == 0:
- if value > l[number + 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number + 1]:
- number_trough += 1
- new_list.append(value)
- continue
- if number == length - 1:
- if value > l[number - 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number - 1]:
- number_trough += 1
- new_list.append(value)
- continue
- if value >= l[number - 1] and value > l[number + 1]:
- number_peak += 1
- new_list.append(value)
- if value < l[number - 1] and value <= l[number + 1]:
- number_trough += 1
- new_list.append(value)
- if abs(number_peak - number_trough) > 1:
- pass
- else:
- cycle = max(number_peak, number_trough) - 1
- difference_list = []
- for i in range(len(new_list) - 1):
- difference = abs(new_list[i] - new_list[i + 1])
- difference_list.append(difference)
- if len(difference_list) == 0:
- maximum_range = -1
- # print(f"极差: {maximum_range}")
- else:
- maximum_range = max(difference_list)
- # print(f"极差: {maximum_range}")
- return cycle, maximum_range
- def _lateral_control01_minimum_distance_nearby_lane(self):
- """
- 横向控制01指标:计算离近侧车道线最小距离(m)
- Args:
- Returns:
- None
- """
- roadMark_df = self.roadMark_df
- player_df = self.df
- ego_df = player_df[player_df.playerId == 1]
- width_ego = ego_df['dimY'].values.tolist()[0]
- # 提取距离左车道线和右车道线距离
- roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True)
- roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True)
- roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist']
- # 计算到车道边界线距离
- roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply(
- lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1)
- nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary'])
- self.lateral_control01_minimum_distance_nearby_lane = round(nearby_distance_to_lane_boundary, 3)
- def _lateral_control02_maximum_lateral_offset(self):
- """
- 横向控制02指标:计算最大横向偏移量(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- # 提取自车的roadPos数据
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- # 计算横向偏移量的绝对值
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
- row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control02_maximum_lateral_offset = round(row_with_max_value, 3)
- def _lateral_control03_relative_lateral_offset_distribution_expectation(self):
- """
- 横向控制03指标:计算相对横向偏移量分布期望(m)
- Args:
- Returns:
- None
- """
- # 提取自车宽度
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- mean_laneOffset_index = roadPos_ego_df['laneOffset'].mean()
- self.lateral_control03_relative_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3)
- def _lateral_control04_relative_lateral_offset_distribution_standard_deviation(self):
- """
- 横向控制04指标:计算相对横向偏移量分布标准差(m)
- Args:
- Returns:
- None
- """
- # 提取自车宽度
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- mean_laneOffset_index = roadPos_ego_df['laneOffset'].std()
- self.lateral_control04_relative_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3)
- def _lateral_control05_absolute_lateral_offset_distribution_expectation(self):
- """
- 横向控制05指标:计算绝对横向偏移量分布期望(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
- lambda x: self.func_laneOffset_abs(x), axis=1)
- mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].mean()
- self.lateral_control05_absolute_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3)
- def _lateral_control06_absolute_lateral_offset_distribution_standard_deviation(self):
- """
- 横向控制06指标:计算绝对横向偏移量分布标准差(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
- lambda x: self.func_laneOffset_abs(x), axis=1)
- mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].std()
- self.lateral_control06_absolute_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3)
- def _lateral_control07_maximum_lateral_deviation(self):
- """
- 横向控制07指标:横向距离极大值(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
- row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control07_maximum_lateral_deviation = row_with_max_value
- def _lateral_control08_minimum_lateral_deviation(self):
- """
- 横向控制08指标:横向距离极小值(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
- max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmin()
- row_with_min_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
- self.lateral_control08_minimum_lateral_deviation = row_with_min_value
- def _lateral_control09_absolute_position_oscillation_frequency(self):
- """
- 横向控制09指标:横向绝对位置振荡频率(Hz)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- cycle_number, _ = self.func_center_line_cycle_optical(roadPos_ego_list)
- self.lateral_control09_absolute_position_oscillation_frequency = round(cycle_number, 3)
- def _lateral_control10_absolute_position_oscillation_difference(self):
- """
- 横向控制10指标:横向绝对位置振荡极差(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- _, maximum_range = self.func_center_line_cycle_optical(roadPos_ego_list)
- self.lateral_control10_absolute_position_oscillation_difference = round(maximum_range, 3)
- def _lateral_control11_maximum_heading_deviation(self):
- """
- 横向控制11指标:最大偏航角
- Args:
- Returns:
- None
- """
- player_df = self.df
- road_mark_df = self.roadMark_df
- ego_df = player_df[player_df.playerId == 1].reset_index(drop=True)
- # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减
- road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True)
- road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True)
- road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor']
- road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor']
- road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \
- lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1)
- ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle']
- ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \
- lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1)
- row_with_max_value = max(ego_df['heading_deviation_abs'])
- self.lateral_control11_maximum_heading_deviation = row_with_max_value
- def _lateral_control12_relative_position_oscillation_frequency(self):
- """
- 横向控制12指标:横向相对位置振荡频率(Hz)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- cycle_number, _ = self.func_normal_cycle_optical(roadPos_ego_list)
- self.lateral_control12_relative_position_oscillation_frequency = round(cycle_number, 3)
- def _lateral_control13_relative_position_oscillation_difference(self):
- """
- 横向控制13指标:横向相对位置振荡极差(m)
- Args:
- Returns:
- None
- """
- roadPos_df = self.roadPos_df
- roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
- roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
- _, maximum_range = self.func_normal_cycle_optical(roadPos_ego_list)
- self.lateral_control13_relative_position_oscillation_difference = round(maximum_range, 3)
|