|
@@ -12,137 +12,110 @@ from modules.lib.log_manager import LogManager
|
|
|
|
|
|
|
|
|
|
def peak_valley_decorator(method):
|
|
def peak_valley_decorator(method):
|
|
|
|
+ """峰谷检测装饰器"""
|
|
def wrapper(self, *args, **kwargs):
|
|
def wrapper(self, *args, **kwargs):
|
|
peak_valley = self._peak_valley_determination(self.df)
|
|
peak_valley = self._peak_valley_determination(self.df)
|
|
pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
|
|
pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
|
|
- if len(pv_list) != 0:
|
|
|
|
- flag = True
|
|
|
|
|
|
+
|
|
|
|
+ if pv_list:
|
|
p_last = pv_list[0]
|
|
p_last = pv_list[0]
|
|
-
|
|
|
|
for i in range(1, len(pv_list)):
|
|
for i in range(1, len(pv_list)):
|
|
p_curr = pv_list[i]
|
|
p_curr = pv_list[i]
|
|
-
|
|
|
|
if self._peak_valley_judgment(p_last, p_curr):
|
|
if self._peak_valley_judgment(p_last, p_curr):
|
|
- method(self, p_curr, p_last, flag, *args, **kwargs)
|
|
|
|
|
|
+ method(self, p_curr, p_last, True, *args, **kwargs)
|
|
else:
|
|
else:
|
|
p_last = p_curr
|
|
p_last = p_curr
|
|
-
|
|
|
|
return method
|
|
return method
|
|
else:
|
|
else:
|
|
- flag = False
|
|
|
|
- p_curr = [0, 0]
|
|
|
|
- p_last = [0, 0]
|
|
|
|
- method(self, p_curr, p_last, flag, *args, **kwargs)
|
|
|
|
|
|
+ method(self, [0, 0], [0, 0], False, *args, **kwargs)
|
|
return method
|
|
return method
|
|
-
|
|
|
|
return wrapper
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
-class Comfort(object):
|
|
|
|
- """
|
|
|
|
- Class for achieving comfort metrics for autonomous driving.
|
|
|
|
-
|
|
|
|
- Attributes:
|
|
|
|
- dataframe: Vehicle driving data, stored in dataframe format.
|
|
|
|
- """
|
|
|
|
|
|
|
|
|
|
+class Comfort(object):
|
|
|
|
+
|
|
|
|
+ """自动驾驶舒适性评估类"""
|
|
|
|
+
|
|
def __init__(self, data_processed):
|
|
def __init__(self, data_processed):
|
|
self.data_processed = data_processed
|
|
self.data_processed = data_processed
|
|
self.logger = LogManager().get_logger()
|
|
self.logger = LogManager().get_logger()
|
|
|
|
|
|
- self.data = data_processed.ego_data.copy()
|
|
|
|
|
|
+ # 初始化数据容器
|
|
|
|
+ self.data = data_processed.ego_data
|
|
self.ego_df = pd.DataFrame()
|
|
self.ego_df = pd.DataFrame()
|
|
self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
-
|
|
|
|
- self.calculated_value = {
|
|
|
|
- 'weaving': 0,
|
|
|
|
- 'shake': 0,
|
|
|
|
- 'cadence': 0,
|
|
|
|
- 'slamBrake': 0,
|
|
|
|
- 'slamAccelerate': 0,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ # 统计指标
|
|
|
|
+ self.calculated_value = {
|
|
|
|
+ 'weaving': 0, 'shake': 0, 'cadence': 0,
|
|
|
|
+ 'slamBrake': 0, 'slamAccelerate': 0
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 时间序列数据
|
|
self.time_list = self.data['simTime'].values.tolist()
|
|
self.time_list = self.data['simTime'].values.tolist()
|
|
self.frame_list = self.data['simFrame'].values.tolist()
|
|
self.frame_list = self.data['simFrame'].values.tolist()
|
|
-
|
|
|
|
- # 移除未使用的字典
|
|
|
|
|
|
+
|
|
|
|
+ # 初始化检测器状态
|
|
self.zigzag_count = 0
|
|
self.zigzag_count = 0
|
|
self.shake_count = 0
|
|
self.shake_count = 0
|
|
self.cadence_count = 0
|
|
self.cadence_count = 0
|
|
self.slam_brake_count = 0
|
|
self.slam_brake_count = 0
|
|
self.slam_accel_count = 0
|
|
self.slam_accel_count = 0
|
|
-
|
|
|
|
self.zigzag_time_list = []
|
|
self.zigzag_time_list = []
|
|
- self.zigzag_stre_list = []
|
|
|
|
- self.cur_ego_path_list = []
|
|
|
|
- self.curvature_list = []
|
|
|
|
|
|
|
|
|
|
+ # 数据预处理
|
|
self._get_data()
|
|
self._get_data()
|
|
self._comf_param_cal()
|
|
self._comf_param_cal()
|
|
|
|
|
|
|
|
+
|
|
def _get_data(self):
|
|
def _get_data(self):
|
|
"""获取舒适性评估所需数据"""
|
|
"""获取舒适性评估所需数据"""
|
|
self.ego_df = self.data[config.COMFORT_INFO].copy()
|
|
self.ego_df = self.data[config.COMFORT_INFO].copy()
|
|
self.df = self.ego_df.reset_index(drop=True)
|
|
self.df = self.ego_df.reset_index(drop=True)
|
|
|
|
|
|
- def _cal_cur_ego_path(self, row):
|
|
|
|
- """计算车辆轨迹曲率"""
|
|
|
|
- try:
|
|
|
|
- # 计算速度平方和,判断是否接近零
|
|
|
|
- speed_sq = row['speedX']**2 + row['speedY']**2
|
|
|
|
- if speed_sq < 1e-6: # 阈值根据实际场景调整
|
|
|
|
- return 1e5 # 速度接近零时返回极大曲率
|
|
|
|
- divide = speed_sq ** (3/2)
|
|
|
|
- res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
|
|
|
|
- return res
|
|
|
|
- except Exception:
|
|
|
|
- return 1e5 # 异常时也返回极大值(如除零、缺失值等)
|
|
|
|
-
|
|
|
|
|
|
+ # 1. 移除未使用的曲率计算相关代码
|
|
def _comf_param_cal(self):
|
|
def _comf_param_cal(self):
|
|
"""计算舒适性相关参数"""
|
|
"""计算舒适性相关参数"""
|
|
- # 加减速阈值计算
|
|
|
|
- self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
|
|
|
|
- self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
|
|
|
|
|
|
+ # 动态加减速阈值
|
|
|
|
+ self.ego_df['ip_acc'] = self.ego_df['v'].apply(
|
|
|
|
+ get_interpolation, point1=[18, 4], point2=[72, 2])
|
|
|
|
+ self.ego_df['ip_dec'] = self.ego_df['v'].apply(
|
|
|
|
+ get_interpolation, point1=[18, -5], point2=[72, -3.5])
|
|
|
|
+
|
|
|
|
+ # 急刹急加速标记
|
|
self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
|
|
self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
|
|
lambda x: 1 if x < 0 else 0)
|
|
lambda x: 1 if x < 0 else 0)
|
|
self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
|
|
self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
|
|
lambda x: 1 if x > 0 else 0)
|
|
lambda x: 1 if x > 0 else 0)
|
|
|
|
+
|
|
|
|
+ # 顿挫检测预处理
|
|
self.ego_df['cadence'] = self.ego_df.apply(
|
|
self.ego_df['cadence'] = self.ego_df.apply(
|
|
lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
|
|
lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
|
|
-
|
|
|
|
- # 晃动检测相关参数
|
|
|
|
- self.ego_df['cur_ego_path'] = self.ego_df.apply(self._cal_cur_ego_path, axis=1)
|
|
|
|
- self.ego_df['curvHor'] = self.ego_df['curvHor'].astype('float')
|
|
|
|
- self.ego_df['cur_diff'] = (self.ego_df['cur_ego_path'] - self.ego_df['curvHor']).abs()
|
|
|
|
- self.ego_df['R'] = self.ego_df['curvHor'].apply(lambda x: 10000 if x == 0 else 1 / x)
|
|
|
|
- self.ego_df['R_ego'] = self.ego_df['cur_ego_path'].apply(lambda x: 10000 if x == 0 else 1 / x)
|
|
|
|
- self.ego_df['R_diff'] = (self.ego_df['R_ego'] - self.ego_df['R']).abs()
|
|
|
|
-
|
|
|
|
- self.cur_ego_path_list = self.ego_df['cur_ego_path'].values.tolist()
|
|
|
|
- self.curvature_list = self.ego_df['curvHor'].values.tolist()
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
|
|
def _peak_valley_determination(self, df):
|
|
def _peak_valley_determination(self, df):
|
|
- """
|
|
|
|
- 确定车辆角速度的峰值和谷值
|
|
|
|
- """
|
|
|
|
- # 调整参数以减少噪音干扰
|
|
|
|
- peaks, _ = scipy.signal.find_peaks(df['speedH'], height=0.03, distance=3, prominence=0.03, width=1)
|
|
|
|
- valleys, _ = scipy.signal.find_peaks(-df['speedH'], height=0.03, distance=3, prominence=0.03, width=1)
|
|
|
|
- peak_valley = sorted(list(peaks) + list(valleys))
|
|
|
|
- return peak_valley
|
|
|
|
-
|
|
|
|
- def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=0.06):
|
|
|
|
- """
|
|
|
|
- 判断给定的峰值和谷值是否满足特定条件
|
|
|
|
- """
|
|
|
|
|
|
+ """确定车辆角速度的峰值和谷值"""
|
|
|
|
+ peaks, _ = scipy.signal.find_peaks(
|
|
|
|
+ df['speedH'], height=2.3, distance=3,
|
|
|
|
+ prominence=2.3, width=1)
|
|
|
|
+ valleys, _ = scipy.signal.find_peaks(
|
|
|
|
+ -df['speedH'], height=2.3, distance=3,
|
|
|
|
+ prominence=2.3, width=1)
|
|
|
|
+ return sorted(list(peaks) + list(valleys))
|
|
|
|
+
|
|
|
|
+ def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6):
|
|
|
|
+ """判断峰谷对是否构成曲折行驶"""
|
|
t_diff = p_curr[0] - p_last[0]
|
|
t_diff = p_curr[0] - p_last[0]
|
|
v_diff = abs(p_curr[1] - p_last[1])
|
|
v_diff = abs(p_curr[1] - p_last[1])
|
|
s = p_curr[1] * p_last[1]
|
|
s = p_curr[1] * p_last[1]
|
|
|
|
|
|
- zigzag_flag = t_diff < tw and v_diff > avg and s < 0
|
|
|
|
- if zigzag_flag and ([p_last[0], p_curr[0]] not in self.zigzag_time_list):
|
|
|
|
- self.zigzag_time_list.append([p_last[0], p_curr[0]])
|
|
|
|
- return zigzag_flag
|
|
|
|
|
|
+ if t_diff < tw and v_diff > avg and s < 0:
|
|
|
|
+ if [p_last[0], p_curr[0]] not in self.zigzag_time_list:
|
|
|
|
+ self.zigzag_time_list.append([p_last[0], p_curr[0]])
|
|
|
|
+ return True
|
|
|
|
+ return False
|
|
|
|
|
|
@peak_valley_decorator
|
|
@peak_valley_decorator
|
|
def zigzag_count_func(self, p_curr, p_last, flag=True):
|
|
def zigzag_count_func(self, p_curr, p_last, flag=True):
|
|
@@ -161,80 +134,384 @@ class Comfort(object):
|
|
else:
|
|
else:
|
|
self.zigzag_stre_list = []
|
|
self.zigzag_stre_list = []
|
|
|
|
|
|
- def _shake_detector(self, Cr_diff=0.05, T_diff=0.39):
|
|
|
|
- """检测晃动事件"""
|
|
|
|
|
|
+ def _shake_detector(self, T_diff=0.5):
|
|
|
|
+ """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)"""
|
|
|
|
+ # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算
|
|
time_list = []
|
|
time_list = []
|
|
frame_list = []
|
|
frame_list = []
|
|
|
|
|
|
|
|
+ # 复制数据以避免修改原始数据
|
|
df = self.ego_df.copy()
|
|
df = self.ego_df.copy()
|
|
- df = df[df['cur_diff'] > Cr_diff]
|
|
|
|
- df['frame_ID_diff'] = df['simFrame'].diff() # 找出行车轨迹曲率与道路曲率之差大于阈值的数据段
|
|
|
|
- filtered_df = df[df.frame_ID_diff > T_diff] # 此处是用大间隔区分多次晃动情景
|
|
|
|
-
|
|
|
|
- row_numbers = filtered_df.index.tolist()
|
|
|
|
- cut_column = pd.cut(df.index, bins=row_numbers)
|
|
|
|
-
|
|
|
|
- grouped = df.groupby(cut_column)
|
|
|
|
- dfs = {}
|
|
|
|
- for name, group in grouped:
|
|
|
|
- dfs[name] = group.reset_index(drop=True)
|
|
|
|
-
|
|
|
|
- for name, df_group in dfs.items():
|
|
|
|
- # 直道,未主动换道
|
|
|
|
- df_group['curvHor'] = df_group['curvHor'].abs()
|
|
|
|
- df_group_straight = df_group[(df_group.lightMask == 0) & (df_group.curvHor < 0.001)]
|
|
|
|
- if not df_group_straight.empty:
|
|
|
|
- time_list.extend(df_group_straight['simTime'].values)
|
|
|
|
- frame_list.extend(df_group_straight['simFrame'].values)
|
|
|
|
- self.shake_count = self.shake_count + 1
|
|
|
|
-
|
|
|
|
- # 打转向灯,道路为直道,此时晃动判断标准车辆曲率变化率为一个更大的阈值
|
|
|
|
- df_group_change_lane = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'] < 0.001)]
|
|
|
|
- df_group_change_lane_data = df_group_change_lane[df_group_change_lane.cur_diff > Cr_diff + 0.2]
|
|
|
|
- if not df_group_change_lane_data.empty:
|
|
|
|
- time_list.extend(df_group_change_lane_data['simTime'].values)
|
|
|
|
- frame_list.extend(df_group_change_lane_data['simFrame'].values)
|
|
|
|
- self.shake_count = self.shake_count + 1
|
|
|
|
-
|
|
|
|
- # 转弯,打转向灯
|
|
|
|
- df_group_turn = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'].abs() > 0.001)]
|
|
|
|
- df_group_turn_data = df_group_turn[df_group_turn.cur_diff.abs() > Cr_diff + 0.1]
|
|
|
|
- if not df_group_turn_data.empty:
|
|
|
|
- time_list.extend(df_group_turn_data['simTime'].values)
|
|
|
|
- frame_list.extend(df_group_turn_data['simFrame'].values)
|
|
|
|
- self.shake_count = self.shake_count + 1
|
|
|
|
-
|
|
|
|
- TIME_RANGE = 1
|
|
|
|
- t_list = time_list
|
|
|
|
- f_list = frame_list
|
|
|
|
- group_time = []
|
|
|
|
- group_frame = []
|
|
|
|
- sub_group_time = []
|
|
|
|
- sub_group_frame = []
|
|
|
|
- for i in range(len(f_list)):
|
|
|
|
- if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE:
|
|
|
|
- sub_group_time.append(t_list[i])
|
|
|
|
- sub_group_frame.append(f_list[i])
|
|
|
|
- else:
|
|
|
|
- group_time.append(sub_group_time)
|
|
|
|
- group_frame.append(sub_group_frame)
|
|
|
|
- sub_group_time = [t_list[i]]
|
|
|
|
- sub_group_frame = [f_list[i]]
|
|
|
|
-
|
|
|
|
- # 输出图表值
|
|
|
|
- shake_time = [[g[0], g[-1]] for g in group_time]
|
|
|
|
- shake_frame = [[g[0], g[-1]] for g in group_frame]
|
|
|
|
- self.shake_count = len(shake_time)
|
|
|
|
-
|
|
|
|
- if shake_time:
|
|
|
|
- # 保存晃动事件摘要
|
|
|
|
- time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
|
|
|
|
- frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
|
|
|
|
- discomfort_df = pd.concat([time_df, frame_df], axis=1)
|
|
|
|
- discomfort_df['type'] = 'shake'
|
|
|
|
- self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ # 1. 计算横向加速度变化率
|
|
|
|
+ df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff()
|
|
|
|
+
|
|
|
|
+ # 2. 计算横摆角速度变化率
|
|
|
|
+ df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff()
|
|
|
|
+
|
|
|
|
+ # 3. 计算横摆角速度的短期变化特性
|
|
|
|
+ window_size = 5 # 5帧窗口
|
|
|
|
+ df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
|
|
|
|
+
|
|
|
|
+ # 4. 基于车速的动态阈值
|
|
|
|
+ # df['lat_acc_threshold'] = df['v'].apply(
|
|
|
|
+ # lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60)))
|
|
|
|
+ # )
|
|
|
|
+ v0 = 20 * 5/18 # ≈5.56 m/s
|
|
|
|
+ # 递减系数
|
|
|
|
+ k = 0.008 * 3.6 # =0.0288 per m/s
|
|
|
|
+ df['lat_acc_threshold'] = df['v'].apply(
|
|
|
|
+ lambda speed: max(
|
|
|
|
+ 1.0, # 下限 1.0 m/s²
|
|
|
|
+ min(
|
|
|
|
+ 1.8, # 上限 1.8 m/s²
|
|
|
|
+ 1.8 - k * (speed - v0) # 线性递减
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ df['speedH_threshold'] = df['v'].apply(
|
|
|
|
+ lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
|
|
|
|
+ )
|
|
|
|
+ # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用
|
|
|
|
+ self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold']
|
|
|
|
+ self.ego_df['speedH_threshold'] = df['speedH_threshold']
|
|
|
|
+ self.ego_df['lat_acc_rate'] = df['lat_acc_rate']
|
|
|
|
+ self.ego_df['speedH_rate'] = df['speedH_rate']
|
|
|
|
+ self.ego_df['speedH_std'] = df['speedH_std']
|
|
|
|
+
|
|
|
|
+ # 5. 综合判断晃动条件
|
|
|
|
+ # 条件A: 横向加速度超过阈值
|
|
|
|
+ condition_A = df['lat_acc'].abs() > df['lat_acc_threshold']
|
|
|
|
+
|
|
|
|
+ # 条件B: 横向加速度变化率超过阈值
|
|
|
|
+ lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³)
|
|
|
|
+ condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold
|
|
|
|
+
|
|
|
|
+ # 条件C: 横摆角速度有明显变化但不呈现周期性
|
|
|
|
+ condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times()))
|
|
|
|
+
|
|
|
|
+ # 综合条件: 满足条件A,且满足条件B或条件C
|
|
|
|
+ shake_condition = condition_A & (condition_B | condition_C)
|
|
|
|
+
|
|
|
|
+ # 筛选满足条件的数据
|
|
|
|
+ shake_df = df[shake_condition].copy()
|
|
|
|
+
|
|
|
|
+ # 修改:按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动
|
|
|
|
+ if not shake_df.empty:
|
|
|
|
+ # 计算帧号差
|
|
|
|
+ shake_df['frame_diff'] = shake_df['simFrame'].diff()
|
|
|
|
+
|
|
|
|
+ # 标记不连续的点(帧号差大于1)
|
|
|
|
+ # 通常连续帧的帧号差应该是1
|
|
|
|
+ shake_df['is_new_group'] = shake_df['frame_diff'] > 1
|
|
|
|
+
|
|
|
|
+ # 第一个点标记为新组
|
|
|
|
+ if not shake_df.empty:
|
|
|
|
+ shake_df.iloc[0, shake_df.columns.get_loc('is_new_group')] = True
|
|
|
|
+
|
|
|
|
+ # 创建组ID
|
|
|
|
+ shake_df['group_id'] = shake_df['is_new_group'].cumsum()
|
|
|
|
+
|
|
|
|
+ # 按组计算帧数和持续时间
|
|
|
|
+ group_info = shake_df.groupby('group_id').agg({
|
|
|
|
+ 'simTime': ['min', 'max'],
|
|
|
|
+ 'simFrame': ['min', 'max', 'count'] # 添加count计算每组的帧数
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ group_info.columns = ['start_time', 'end_time', 'start_frame', 'end_frame', 'frame_count']
|
|
|
|
+ group_info['duration'] = group_info['end_time'] - group_info['start_time']
|
|
|
|
+
|
|
|
|
+ # 筛选连续帧数超过阈值的组
|
|
|
|
+ # 假设采样率为100Hz,则0.5秒对应约50帧
|
|
|
|
+ MIN_FRAME_COUNT = 5 # 最小连续帧数阈值,可根据实际采样率调整
|
|
|
|
+ valid_groups = group_info[group_info['frame_count'] >= MIN_FRAME_COUNT]
|
|
|
|
+
|
|
|
|
+ # 如果有有效的晃动组
|
|
|
|
+ if not valid_groups.empty:
|
|
|
|
+ # 获取有效组的ID
|
|
|
|
+ valid_group_ids = valid_groups.index.tolist()
|
|
|
|
+
|
|
|
|
+ # 筛选属于有效组的数据点
|
|
|
|
+ valid_shake_df = shake_df[shake_df['group_id'].isin(valid_group_ids)]
|
|
|
|
+
|
|
|
|
+ # 简化场景分类,只收集时间和帧号
|
|
|
|
+ for group_id, group in valid_shake_df.groupby('group_id'):
|
|
|
|
+ # 不再使用curvHor进行场景分类,而是使用横摆角速度和转向灯状态
|
|
|
|
+
|
|
|
|
+ # 直道场景(横摆角速度小,无转向灯)
|
|
|
|
+ straight_mask = (group.lightMask == 0) & (group.speedH.abs() < 2.0)
|
|
|
|
+ # 换道场景(有转向灯,横摆角速度适中)
|
|
|
|
+ lane_change_mask = (group.lightMask != 0) & (group.speedH.abs() < 5.0)
|
|
|
|
+ # 转弯场景(横摆角速度大或有转向灯且横摆角速度适中)
|
|
|
|
+ turning_mask = (group.speedH.abs() >= 5.0) | ((group.lightMask != 0) & (group.speedH.abs() >= 2.0))
|
|
|
|
+
|
|
|
|
+ # 为每种场景添加标记
|
|
|
|
+ if straight_mask.any():
|
|
|
|
+ straight_group = group[straight_mask].copy()
|
|
|
|
+ time_list.extend(straight_group['simTime'].values)
|
|
|
|
+ frame_list.extend(straight_group['simFrame'].values)
|
|
|
|
+
|
|
|
|
+ if lane_change_mask.any():
|
|
|
|
+ lane_change_group = group[lane_change_mask].copy()
|
|
|
|
+ time__list.extend(lane_change_group['simTime'].values)
|
|
|
|
+ frame_list.extend(lane_change_group['simFrame'].values)
|
|
|
|
+
|
|
|
|
+ if turning_mask.any():
|
|
|
|
+ turning_group = group[turning_mask].copy()
|
|
|
|
+ time_list.extend(turning_group['simTime'].values)
|
|
|
|
+ frame_list.extend(turning_group['simFrame'].values)
|
|
|
|
+
|
|
|
|
+ # 准备晃动事件数据
|
|
|
|
+ shake_time = []
|
|
|
|
+ shake_frame = []
|
|
|
|
+
|
|
|
|
+ for group_id in valid_group_ids:
|
|
|
|
+ start_time = valid_groups.loc[group_id, 'start_time']
|
|
|
|
+ end_time = valid_groups.loc[group_id, 'end_time']
|
|
|
|
+ start_frame = valid_groups.loc[group_id, 'start_frame']
|
|
|
|
+ end_frame = valid_groups.loc[group_id, 'end_frame']
|
|
|
|
+
|
|
|
|
+ shake_time.append([start_time, end_time])
|
|
|
|
+ shake_frame.append([start_frame, end_frame])
|
|
|
|
+
|
|
|
|
+ self.shake_count = len(shake_time)
|
|
|
|
+
|
|
|
|
+ if shake_time:
|
|
|
|
+ # 保存晃动事件摘要
|
|
|
|
+ time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
|
|
|
|
+ time_df['duration'] = time_df['end_time'] - time_df['start_time'] # 添加持续时间列
|
|
|
|
+ frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
|
|
|
|
+ discomfort_df = pd.concat([time_df, frame_df], axis=1)
|
|
|
|
+ discomfort_df['type'] = 'shake'
|
|
|
|
+ self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
|
|
|
|
+
|
|
|
|
+ # 在方法末尾添加以下代码,保存晃动事件的详细数据
|
|
|
|
+ # if self.shake_count > 0:
|
|
|
|
+ # self._save_shake_data()
|
|
|
|
+ # self._plot_shake_analysis()
|
|
|
|
+
|
|
return time_list
|
|
return time_list
|
|
|
|
+
|
|
|
|
+ def _save_shake_data(self):
|
|
|
|
+ """保存晃动事件的详细数据,用于后续分析"""
|
|
|
|
+ import os
|
|
|
|
+
|
|
|
|
+ # 创建保存目录
|
|
|
|
+ save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis")
|
|
|
|
+ os.makedirs(save_dir, exist_ok=True)
|
|
|
|
+
|
|
|
|
+ # 1. 保存所有晃动事件的摘要信息
|
|
|
|
+ shake_events = self.discomfort_df[self.discomfort_df['type'] == 'shake'].copy()
|
|
|
|
+ if not shake_events.empty:
|
|
|
|
+ shake_events.to_csv(os.path.join(save_dir, "shake_events_summary.csv"), index=False)
|
|
|
|
+
|
|
|
|
+ # 2. 为每个晃动事件保存详细数据
|
|
|
|
+ for i, event in shake_events.iterrows():
|
|
|
|
+ start_time = event['start_time']
|
|
|
|
+ end_time = event['end_time']
|
|
|
|
+
|
|
|
|
+ # 提取该晃动事件的所有数据帧
|
|
|
|
+ event_data = self.ego_df[
|
|
|
|
+ (self.ego_df['simTime'] >= start_time) &
|
|
|
|
+ (self.ego_df['simTime'] <= end_time)
|
|
|
|
+ ].copy()
|
|
|
|
+
|
|
|
|
+ # 添加一些分析指标
|
|
|
|
+ event_data['lat_acc_abs'] = event_data['lat_acc'].abs()
|
|
|
|
+ event_data['lat_acc_rate'] = event_data['lat_acc'].diff() / event_data['simTime'].diff()
|
|
|
|
+ event_data['speedH_rate'] = event_data['speedH'].diff() / event_data['simTime'].diff()
|
|
|
|
+
|
|
|
|
+ # 保存该事件的详细数据
|
|
|
|
+ event_data.to_csv(
|
|
|
|
+ os.path.join(save_dir, f"shake_event_{i+1}_detail.csv"),
|
|
|
|
+ index=False
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 3. 保存所有晃动事件的汇总统计数据
|
|
|
|
+ shake_stats = {
|
|
|
|
+ 'total_count': self.shake_count,
|
|
|
|
+ 'avg_duration': shake_events['duration'].mean(),
|
|
|
|
+ 'max_duration': shake_events['duration'].max(),
|
|
|
|
+ 'min_duration': shake_events['duration'].min(),
|
|
|
|
+ 'total_duration': shake_events['duration'].sum(),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ import json
|
|
|
|
+ with open(os.path.join(save_dir, "shake_statistics.json"), 'w') as f:
|
|
|
|
+ json.dump(shake_stats, f, indent=4)
|
|
|
|
+
|
|
|
|
+ self.logger.info(f"晃动事件数据已保存至: {save_dir}")
|
|
|
|
+
|
|
|
|
+ def _plot_shake_analysis(self):
|
|
|
|
+ """绘制晃动分析图表,并标记关键阈值和数据点"""
|
|
|
|
+ import os
|
|
|
|
+ import matplotlib.pyplot as plt
|
|
|
|
+ import numpy as np
|
|
|
|
+ import pandas as pd
|
|
|
|
+
|
|
|
|
+ # 创建保存目录
|
|
|
|
+ save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis")
|
|
|
|
+ os.makedirs(save_dir, exist_ok=True)
|
|
|
|
+
|
|
|
|
+ # 准备数据
|
|
|
|
+ df = self.ego_df.copy()
|
|
|
|
+
|
|
|
|
+ # 检查必要的列是否存在
|
|
|
|
+ required_columns = ['lat_acc_threshold', 'speedH_threshold', 'speedH_std']
|
|
|
|
+ missing_columns = [col for col in required_columns if col not in df.columns]
|
|
|
|
+
|
|
|
|
+ if missing_columns:
|
|
|
|
+ self.logger.warning(f"Missing columns for plotting: {missing_columns}, possibly because shake detection was not executed correctly")
|
|
|
|
+ # 如果缺少必要的列,重新计算一次
|
|
|
|
+ for col in missing_columns:
|
|
|
|
+ if col == 'lat_acc_threshold':
|
|
|
|
+ df['lat_acc_threshold'] = df['v'].apply(
|
|
|
|
+ lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60)))
|
|
|
|
+ )
|
|
|
|
+ elif col == 'speedH_threshold':
|
|
|
|
+ df['speedH_threshold'] = df['v'].apply(
|
|
|
|
+ lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
|
|
|
|
+ )
|
|
|
|
+ elif col == 'speedH_std':
|
|
|
|
+ window_size = 5
|
|
|
|
+ df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
|
|
|
|
+
|
|
|
|
+ # 创建图表
|
|
|
|
+ fig, axs = plt.subplots(3, 1, figsize=(14, 12), sharex=True)
|
|
|
|
+
|
|
|
|
+ # 绘制横向加速度
|
|
|
|
+ axs[0].plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
|
|
|
|
+ axs[0].set_ylabel('Lateral Acceleration (m/s²)')
|
|
|
|
+ axs[0].set_title('Shake Analysis')
|
|
|
|
+ axs[0].grid(True)
|
|
|
|
+
|
|
|
|
+ # 绘制动态阈值线
|
|
|
|
+ axs[0].plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='Threshold')
|
|
|
|
+ axs[0].plot(df['simTime'], -df['lat_acc_threshold'], 'r--')
|
|
|
|
+
|
|
|
|
+ # 绘制横摆角速度
|
|
|
|
+ axs[1].plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
|
|
|
|
+ axs[1].set_ylabel('Yaw Rate (deg/s)')
|
|
|
|
+ axs[1].grid(True)
|
|
|
|
+
|
|
|
|
+ # 绘制横摆角速度阈值
|
|
|
|
+ axs[1].plot(df['simTime'], df['speedH_threshold'], 'r--', label='Threshold')
|
|
|
|
+ axs[1].plot(df['simTime'], -df['speedH_threshold'], 'r--')
|
|
|
|
+
|
|
|
|
+ # 绘制横摆角速度标准差
|
|
|
|
+ axs[1].plot(df['simTime'], df['speedH_std'], 'm-', alpha=0.5, label='Yaw Rate Std')
|
|
|
|
+
|
|
|
|
+ # 绘制车速
|
|
|
|
+ axs[2].plot(df['simTime'], df['v'], 'k-', label='Vehicle Speed')
|
|
|
|
+ axs[2].set_xlabel('Time (s)')
|
|
|
|
+ axs[2].set_ylabel('Speed (km/h)')
|
|
|
|
+ axs[2].grid(True)
|
|
|
|
+
|
|
|
|
+ # 标记晃动事件
|
|
|
|
+ if not self.discomfort_df.empty:
|
|
|
|
+ shake_df = self.discomfort_df[self.discomfort_df['type'] == 'shake']
|
|
|
|
+
|
|
|
|
+ # 为每个晃动事件创建详细标记
|
|
|
|
+ for idx, row in shake_df.iterrows():
|
|
|
|
+ start_time = row['start_time']
|
|
|
|
+ end_time = row['end_time']
|
|
|
|
+
|
|
|
|
+ # 在所有子图中标记晃动区域
|
|
|
|
+ for ax in axs:
|
|
|
|
+ ax.axvspan(start_time, end_time, alpha=0.2, color='red')
|
|
|
|
+
|
|
|
|
+ # 获取晃动期间的数据
|
|
|
|
+ shake_period = df[(df['simTime'] >= start_time) & (df['simTime'] <= end_time)]
|
|
|
|
+
|
|
|
|
+ if not shake_period.empty:
|
|
|
|
+ # 找出晃动期间横向加速度的最大值点
|
|
|
|
+ max_lat_acc_idx = shake_period['lat_acc'].abs().idxmax()
|
|
|
|
+ max_lat_acc_time = shake_period.loc[max_lat_acc_idx, 'simTime']
|
|
|
|
+ max_lat_acc_value = shake_period.loc[max_lat_acc_idx, 'lat_acc']
|
|
|
|
+
|
|
|
|
+ # 标记最大横向加速度点
|
|
|
|
+ axs[0].scatter(max_lat_acc_time, max_lat_acc_value, color='red', s=80, zorder=5)
|
|
|
|
+ axs[0].annotate(
|
|
|
|
+ f'Max: {max_lat_acc_value:.2f} m/s²\nTime: {max_lat_acc_time:.2f}s',
|
|
|
|
+ xy=(max_lat_acc_time, max_lat_acc_value),
|
|
|
|
+ xytext=(10, 20),
|
|
|
|
+ textcoords='offset points',
|
|
|
|
+ arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
|
|
|
|
+ bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 找出晃动期间横摆角速度的最大值点
|
|
|
|
+ max_speedH_idx = shake_period['speedH'].abs().idxmax()
|
|
|
|
+ max_speedH_time = shake_period.loc[max_speedH_idx, 'simTime']
|
|
|
|
+ max_speedH_value = shake_period.loc[max_speedH_idx, 'speedH']
|
|
|
|
+
|
|
|
|
+ # 标记最大横摆角速度点
|
|
|
|
+ axs[1].scatter(max_speedH_time, max_speedH_value, color='red', s=80, zorder=5)
|
|
|
|
+ axs[1].annotate(
|
|
|
|
+ f'Max: {max_speedH_value:.2f} deg/s\nTime: {max_speedH_time:.2f}s',
|
|
|
|
+ xy=(max_speedH_time, max_speedH_value),
|
|
|
|
+ xytext=(10, 20),
|
|
|
|
+ textcoords='offset points',
|
|
|
|
+ arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
|
|
|
|
+ bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 标记晃动开始和结束点
|
|
|
|
+ for i in range(2): # 只在前两个子图中标记
|
|
|
|
+ # 开始点
|
|
|
|
+ start_value = shake_period.iloc[0][['lat_acc', 'speedH'][i]]
|
|
|
|
+ axs[i].scatter(start_time, start_value, color='green', s=80, zorder=5)
|
|
|
|
+ axs[i].annotate(
|
|
|
|
+ f'Start: {start_time:.2f}s',
|
|
|
|
+ xy=(start_time, start_value),
|
|
|
|
+ xytext=(-10, -30),
|
|
|
|
+ textcoords='offset points',
|
|
|
|
+ arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
|
|
|
|
+ bbox=dict(boxstyle='round,pad=0.5', fc='lightgreen', alpha=0.7)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 结束点
|
|
|
|
+ end_value = shake_period.iloc[-1][['lat_acc', 'speedH'][i]]
|
|
|
|
+ axs[i].scatter(end_time, end_value, color='blue', s=80, zorder=5)
|
|
|
|
+ axs[i].annotate(
|
|
|
|
+ f'End: {end_time:.2f}s',
|
|
|
|
+ xy=(end_time, end_value),
|
|
|
|
+ xytext=(10, -30),
|
|
|
|
+ textcoords='offset points',
|
|
|
|
+ arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
|
|
|
|
+ bbox=dict(boxstyle='round,pad=0.5', fc='lightblue', alpha=0.7)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 添加晃动检测条件说明
|
|
|
|
+ textstr = '\n'.join((
|
|
|
|
+ 'Shake Detection Conditions:',
|
|
|
|
+ '1. Lateral acceleration exceeds dynamic threshold',
|
|
|
|
+ '2. High lateral acceleration rate or yaw rate std',
|
|
|
|
+ '3. Duration exceeds threshold'
|
|
|
|
+ ))
|
|
|
|
+ props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
|
|
|
|
+ axs[0].text(0.02, 0.98, textstr, transform=axs[0].transAxes, fontsize=10,
|
|
|
|
+ verticalalignment='top', bbox=props)
|
|
|
|
+
|
|
|
|
+ # 添加图例
|
|
|
|
+ for ax in axs:
|
|
|
|
+ ax.legend(loc='upper right')
|
|
|
|
+
|
|
|
|
+ # 调整布局并保存
|
|
|
|
+ plt.tight_layout()
|
|
|
|
+ plt.savefig(os.path.join(save_dir, "shake_analysis.png"), dpi=300)
|
|
|
|
+ plt.close()
|
|
|
|
+
|
|
|
|
+ def _get_zigzag_times(self):
|
|
|
|
+ """获取所有画龙事件的时间点,用于排除画龙与晃动的重叠检测"""
|
|
|
|
+ zigzag_times = []
|
|
|
|
+ for start_time, end_time in self.zigzag_time_list:
|
|
|
|
+ # 获取该时间段内的所有时间点
|
|
|
|
+ times_in_range = self.ego_df[(self.ego_df['simTime'] >= start_time) &
|
|
|
|
+ (self.ego_df['simTime'] <= end_time)]['simTime'].values
|
|
|
|
+ zigzag_times.extend(times_in_range)
|
|
|
|
+ return zigzag_times
|
|
|
|
|
|
def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
|
|
def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
|
|
"""处理顿挫数据"""
|
|
"""处理顿挫数据"""
|
|
@@ -390,10 +667,10 @@ class Comfort(object):
|
|
|
|
|
|
def comf_statistic(self):
|
|
def comf_statistic(self):
|
|
"""统计舒适性指标"""
|
|
"""统计舒适性指标"""
|
|
- df = self.ego_df[['simTime', 'simFrame', 'cur_diff', 'lon_acc', 'lon_acc_roc', 'accelH', 'speedH', 'lat_acc', 'v']].copy()
|
|
|
|
|
|
+ df = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'accelH', 'speedH', 'lat_acc', 'v']].copy()
|
|
|
|
|
|
self.zigzag_count_func()
|
|
self.zigzag_count_func()
|
|
- self.cal_zigzag_strength_strength()
|
|
|
|
|
|
+ # self.cal_zigzag_strength_strength()
|
|
if self.zigzag_time_list:
|
|
if self.zigzag_time_list:
|
|
# 保存 Weaving (zigzag) 事件摘要
|
|
# 保存 Weaving (zigzag) 事件摘要
|
|
zigzag_df = pd.DataFrame(self.zigzag_time_list, columns=['start_time', 'end_time'])
|
|
zigzag_df = pd.DataFrame(self.zigzag_time_list, columns=['start_time', 'end_time'])
|
|
@@ -431,4 +708,4 @@ class Comfort(object):
|
|
evaluator = Score(self.data_processed.comfort_config)
|
|
evaluator = Score(self.data_processed.comfort_config)
|
|
result = evaluator.evaluate(comfort_result)
|
|
result = evaluator.evaluate(comfort_result)
|
|
print("\n[舒适性表现及得分情况]")
|
|
print("\n[舒适性表现及得分情况]")
|
|
- return result
|
|
|
|
|
|
+ return result
|