#!/usr/bin/env python # -*- coding: utf-8 -*- import math import pandas as pd import numpy as np import scipy.signal from modules.lib.score import Score from modules.lib.common import get_interpolation, get_frame_with_time from modules.config import config from modules.lib.log_manager import LogManager def peak_valley_decorator(method): """峰谷检测装饰器""" def wrapper(self, *args, **kwargs): peak_valley = self._peak_valley_determination(self.df) pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist() if pv_list: p_last = pv_list[0] for i in range(1, len(pv_list)): p_curr = pv_list[i] if self._peak_valley_judgment(p_last, p_curr): method(self, p_curr, p_last, True, *args, **kwargs) else: p_last = p_curr return method else: method(self, [0, 0], [0, 0], False, *args, **kwargs) return method return wrapper class Comfort(object): """自动驾驶舒适性评估类""" def __init__(self, data_processed): self.data_processed = data_processed self.logger = LogManager().get_logger() # 初始化数据容器 self.data = data_processed.ego_data self.ego_df = pd.DataFrame() self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # 统计指标 self.calculated_value = { 'weaving': 0, 'shake': 0, 'cadence': 0, 'slamBrake': 0, 'slamAccelerate': 0 } # 时间序列数据 self.time_list = self.data['simTime'].values.tolist() self.frame_list = self.data['simFrame'].values.tolist() # 初始化检测器状态 self.zigzag_count = 0 self.shake_count = 0 self.cadence_count = 0 self.slam_brake_count = 0 self.slam_accel_count = 0 self.zigzag_time_list = [] # 数据预处理 self._get_data() self._comf_param_cal() def _get_data(self): """获取舒适性评估所需数据""" self.ego_df = self.data[config.COMFORT_INFO].copy() self.df = self.ego_df.reset_index(drop=True) # 1. 移除未使用的曲率计算相关代码 def _comf_param_cal(self): """计算舒适性相关参数""" # 动态加减速阈值 self.ego_df['ip_acc'] = self.ego_df['v'].apply( get_interpolation, point1=[18, 4], point2=[72, 2]) self.ego_df['ip_dec'] = self.ego_df['v'].apply( get_interpolation, point1=[18, -5], point2=[72, -3.5]) # 急刹急加速标记 self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply( lambda x: 1 if x < 0 else 0) self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply( lambda x: 1 if x > 0 else 0) # 顿挫检测预处理 self.ego_df['cadence'] = self.ego_df.apply( lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1) def _peak_valley_determination(self, df): """确定车辆角速度的峰值和谷值""" peaks, _ = scipy.signal.find_peaks( df['speedH'], height=2.3, distance=3, prominence=2.3, width=1) valleys, _ = scipy.signal.find_peaks( -df['speedH'], height=2.3, distance=3, prominence=2.3, width=1) return sorted(list(peaks) + list(valleys)) def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6): """判断峰谷对是否构成曲折行驶""" t_diff = p_curr[0] - p_last[0] v_diff = abs(p_curr[1] - p_last[1]) s = p_curr[1] * p_last[1] if t_diff < tw and v_diff > avg and s < 0: if [p_last[0], p_curr[0]] not in self.zigzag_time_list: self.zigzag_time_list.append([p_last[0], p_curr[0]]) return True return False @peak_valley_decorator def zigzag_count_func(self, p_curr, p_last, flag=True): """计算曲折行驶次数""" if flag: self.zigzag_count += 1 @peak_valley_decorator def cal_zigzag_strength_strength(self, p_curr, p_last, flag=True): """计算曲折行驶强度""" if flag: v_diff = abs(p_curr[1] - p_last[1]) t_diff = p_curr[0] - p_last[0] if t_diff > 0: self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度 else: self.zigzag_stre_list = [] def _shake_detector(self, T_diff=0.5): """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)""" # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算 time_list = [] frame_list = [] # 复制数据以避免修改原始数据 df = self.ego_df.copy() # 1. 计算横向加速度变化率 df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff() # 2. 计算横摆角速度变化率 df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff() # 3. 计算横摆角速度的短期变化特性 window_size = 5 # 5帧窗口 df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std() # 4. 基于车速的动态阈值 # df['lat_acc_threshold'] = df['v'].apply( # lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60))) # ) v0 = 20 * 5/18 # ≈5.56 m/s # 递减系数 k = 0.008 * 3.6 # =0.0288 per m/s df['lat_acc_threshold'] = df['v'].apply( lambda speed: max( 1.0, # 下限 1.0 m/s² min( 1.8, # 上限 1.8 m/s² 1.8 - k * (speed - v0) # 线性递减 ) ) ) df['speedH_threshold'] = df['v'].apply( lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60))) ) # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用 self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold'] self.ego_df['speedH_threshold'] = df['speedH_threshold'] self.ego_df['lat_acc_rate'] = df['lat_acc_rate'] self.ego_df['speedH_rate'] = df['speedH_rate'] self.ego_df['speedH_std'] = df['speedH_std'] # 5. 综合判断晃动条件 # 条件A: 横向加速度超过阈值 condition_A = df['lat_acc'].abs() > df['lat_acc_threshold'] # 条件B: 横向加速度变化率超过阈值 lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³) condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold # 条件C: 横摆角速度有明显变化但不呈现周期性 condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times())) # 综合条件: 满足条件A,且满足条件B或条件C shake_condition = condition_A & (condition_B | condition_C) # 筛选满足条件的数据 shake_df = df[shake_condition].copy() # 修改:按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动 if not shake_df.empty: # 计算帧号差 shake_df['frame_diff'] = shake_df['simFrame'].diff() # 标记不连续的点(帧号差大于1) # 通常连续帧的帧号差应该是1 shake_df['is_new_group'] = shake_df['frame_diff'] > 1 # 第一个点标记为新组 if not shake_df.empty: shake_df.iloc[0, shake_df.columns.get_loc('is_new_group')] = True # 创建组ID shake_df['group_id'] = shake_df['is_new_group'].cumsum() # 按组计算帧数和持续时间 group_info = shake_df.groupby('group_id').agg({ 'simTime': ['min', 'max'], 'simFrame': ['min', 'max', 'count'] # 添加count计算每组的帧数 }) group_info.columns = ['start_time', 'end_time', 'start_frame', 'end_frame', 'frame_count'] group_info['duration'] = group_info['end_time'] - group_info['start_time'] # 筛选连续帧数超过阈值的组 # 假设采样率为100Hz,则0.5秒对应约50帧 MIN_FRAME_COUNT = 5 # 最小连续帧数阈值,可根据实际采样率调整 valid_groups = group_info[group_info['frame_count'] >= MIN_FRAME_COUNT] # 如果有有效的晃动组 if not valid_groups.empty: # 获取有效组的ID valid_group_ids = valid_groups.index.tolist() # 筛选属于有效组的数据点 valid_shake_df = shake_df[shake_df['group_id'].isin(valid_group_ids)] # 简化场景分类,只收集时间和帧号 for group_id, group in valid_shake_df.groupby('group_id'): # 不再使用curvHor进行场景分类,而是使用横摆角速度和转向灯状态 # 直道场景(横摆角速度小,无转向灯) straight_mask = (group.lightMask == 0) & (group.speedH.abs() < 2.0) # 换道场景(有转向灯,横摆角速度适中) lane_change_mask = (group.lightMask != 0) & (group.speedH.abs() < 5.0) # 转弯场景(横摆角速度大或有转向灯且横摆角速度适中) turning_mask = (group.speedH.abs() >= 5.0) | ((group.lightMask != 0) & (group.speedH.abs() >= 2.0)) # 为每种场景添加标记 if straight_mask.any(): straight_group = group[straight_mask].copy() time_list.extend(straight_group['simTime'].values) frame_list.extend(straight_group['simFrame'].values) if lane_change_mask.any(): lane_change_group = group[lane_change_mask].copy() time__list.extend(lane_change_group['simTime'].values) frame_list.extend(lane_change_group['simFrame'].values) if turning_mask.any(): turning_group = group[turning_mask].copy() time_list.extend(turning_group['simTime'].values) frame_list.extend(turning_group['simFrame'].values) # 准备晃动事件数据 shake_time = [] shake_frame = [] for group_id in valid_group_ids: start_time = valid_groups.loc[group_id, 'start_time'] end_time = valid_groups.loc[group_id, 'end_time'] start_frame = valid_groups.loc[group_id, 'start_frame'] end_frame = valid_groups.loc[group_id, 'end_frame'] shake_time.append([start_time, end_time]) shake_frame.append([start_frame, end_frame]) self.shake_count = len(shake_time) if shake_time: # 保存晃动事件摘要 time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time']) time_df['duration'] = time_df['end_time'] - time_df['start_time'] # 添加持续时间列 frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'shake' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) # 在方法末尾添加以下代码,保存晃动事件的详细数据 # if self.shake_count > 0: # self._save_shake_data() # self._plot_shake_analysis() return time_list def _save_shake_data(self): """保存晃动事件的详细数据,用于后续分析""" import os # 创建保存目录 save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis") os.makedirs(save_dir, exist_ok=True) # 1. 保存所有晃动事件的摘要信息 shake_events = self.discomfort_df[self.discomfort_df['type'] == 'shake'].copy() if not shake_events.empty: shake_events.to_csv(os.path.join(save_dir, "shake_events_summary.csv"), index=False) # 2. 为每个晃动事件保存详细数据 for i, event in shake_events.iterrows(): start_time = event['start_time'] end_time = event['end_time'] # 提取该晃动事件的所有数据帧 event_data = self.ego_df[ (self.ego_df['simTime'] >= start_time) & (self.ego_df['simTime'] <= end_time) ].copy() # 添加一些分析指标 event_data['lat_acc_abs'] = event_data['lat_acc'].abs() event_data['lat_acc_rate'] = event_data['lat_acc'].diff() / event_data['simTime'].diff() event_data['speedH_rate'] = event_data['speedH'].diff() / event_data['simTime'].diff() # 保存该事件的详细数据 event_data.to_csv( os.path.join(save_dir, f"shake_event_{i+1}_detail.csv"), index=False ) # 3. 保存所有晃动事件的汇总统计数据 shake_stats = { 'total_count': self.shake_count, 'avg_duration': shake_events['duration'].mean(), 'max_duration': shake_events['duration'].max(), 'min_duration': shake_events['duration'].min(), 'total_duration': shake_events['duration'].sum(), } import json with open(os.path.join(save_dir, "shake_statistics.json"), 'w') as f: json.dump(shake_stats, f, indent=4) self.logger.info(f"晃动事件数据已保存至: {save_dir}") def _plot_shake_analysis(self): """绘制晃动分析图表,并标记关键阈值和数据点""" import os import matplotlib.pyplot as plt import numpy as np import pandas as pd # 创建保存目录 save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis") os.makedirs(save_dir, exist_ok=True) # 准备数据 df = self.ego_df.copy() # 检查必要的列是否存在 required_columns = ['lat_acc_threshold', 'speedH_threshold', 'speedH_std'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: self.logger.warning(f"Missing columns for plotting: {missing_columns}, possibly because shake detection was not executed correctly") # 如果缺少必要的列,重新计算一次 for col in missing_columns: if col == 'lat_acc_threshold': df['lat_acc_threshold'] = df['v'].apply( lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60))) ) elif col == 'speedH_threshold': df['speedH_threshold'] = df['v'].apply( lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60))) ) elif col == 'speedH_std': window_size = 5 df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std() # 创建图表 fig, axs = plt.subplots(3, 1, figsize=(14, 12), sharex=True) # 绘制横向加速度 axs[0].plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration') axs[0].set_ylabel('Lateral Acceleration (m/s²)') axs[0].set_title('Shake Analysis') axs[0].grid(True) # 绘制动态阈值线 axs[0].plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='Threshold') axs[0].plot(df['simTime'], -df['lat_acc_threshold'], 'r--') # 绘制横摆角速度 axs[1].plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate') axs[1].set_ylabel('Yaw Rate (deg/s)') axs[1].grid(True) # 绘制横摆角速度阈值 axs[1].plot(df['simTime'], df['speedH_threshold'], 'r--', label='Threshold') axs[1].plot(df['simTime'], -df['speedH_threshold'], 'r--') # 绘制横摆角速度标准差 axs[1].plot(df['simTime'], df['speedH_std'], 'm-', alpha=0.5, label='Yaw Rate Std') # 绘制车速 axs[2].plot(df['simTime'], df['v'], 'k-', label='Vehicle Speed') axs[2].set_xlabel('Time (s)') axs[2].set_ylabel('Speed (km/h)') axs[2].grid(True) # 标记晃动事件 if not self.discomfort_df.empty: shake_df = self.discomfort_df[self.discomfort_df['type'] == 'shake'] # 为每个晃动事件创建详细标记 for idx, row in shake_df.iterrows(): start_time = row['start_time'] end_time = row['end_time'] # 在所有子图中标记晃动区域 for ax in axs: ax.axvspan(start_time, end_time, alpha=0.2, color='red') # 获取晃动期间的数据 shake_period = df[(df['simTime'] >= start_time) & (df['simTime'] <= end_time)] if not shake_period.empty: # 找出晃动期间横向加速度的最大值点 max_lat_acc_idx = shake_period['lat_acc'].abs().idxmax() max_lat_acc_time = shake_period.loc[max_lat_acc_idx, 'simTime'] max_lat_acc_value = shake_period.loc[max_lat_acc_idx, 'lat_acc'] # 标记最大横向加速度点 axs[0].scatter(max_lat_acc_time, max_lat_acc_value, color='red', s=80, zorder=5) axs[0].annotate( f'Max: {max_lat_acc_value:.2f} m/s²\nTime: {max_lat_acc_time:.2f}s', xy=(max_lat_acc_time, max_lat_acc_value), xytext=(10, 20), textcoords='offset points', arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'), bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7) ) # 找出晃动期间横摆角速度的最大值点 max_speedH_idx = shake_period['speedH'].abs().idxmax() max_speedH_time = shake_period.loc[max_speedH_idx, 'simTime'] max_speedH_value = shake_period.loc[max_speedH_idx, 'speedH'] # 标记最大横摆角速度点 axs[1].scatter(max_speedH_time, max_speedH_value, color='red', s=80, zorder=5) axs[1].annotate( f'Max: {max_speedH_value:.2f} deg/s\nTime: {max_speedH_time:.2f}s', xy=(max_speedH_time, max_speedH_value), xytext=(10, 20), textcoords='offset points', arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'), bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7) ) # 标记晃动开始和结束点 for i in range(2): # 只在前两个子图中标记 # 开始点 start_value = shake_period.iloc[0][['lat_acc', 'speedH'][i]] axs[i].scatter(start_time, start_value, color='green', s=80, zorder=5) axs[i].annotate( f'Start: {start_time:.2f}s', xy=(start_time, start_value), xytext=(-10, -30), textcoords='offset points', arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'), bbox=dict(boxstyle='round,pad=0.5', fc='lightgreen', alpha=0.7) ) # 结束点 end_value = shake_period.iloc[-1][['lat_acc', 'speedH'][i]] axs[i].scatter(end_time, end_value, color='blue', s=80, zorder=5) axs[i].annotate( f'End: {end_time:.2f}s', xy=(end_time, end_value), xytext=(10, -30), textcoords='offset points', arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'), bbox=dict(boxstyle='round,pad=0.5', fc='lightblue', alpha=0.7) ) # 添加晃动检测条件说明 textstr = '\n'.join(( 'Shake Detection Conditions:', '1. Lateral acceleration exceeds dynamic threshold', '2. High lateral acceleration rate or yaw rate std', '3. Duration exceeds threshold' )) props = dict(boxstyle='round', facecolor='wheat', alpha=0.5) axs[0].text(0.02, 0.98, textstr, transform=axs[0].transAxes, fontsize=10, verticalalignment='top', bbox=props) # 添加图例 for ax in axs: ax.legend(loc='upper right') # 调整布局并保存 plt.tight_layout() plt.savefig(os.path.join(save_dir, "shake_analysis.png"), dpi=300) plt.close() def _get_zigzag_times(self): """获取所有画龙事件的时间点,用于排除画龙与晃动的重叠检测""" zigzag_times = [] for start_time, end_time in self.zigzag_time_list: # 获取该时间段内的所有时间点 times_in_range = self.ego_df[(self.ego_df['simTime'] >= start_time) & (self.ego_df['simTime'] <= end_time)]['simTime'].values zigzag_times.extend(times_in_range) return zigzag_times def _cadence_process_new(self, lon_acc, ip_acc, ip_dec): """处理顿挫数据""" if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec: return np.nan elif abs(lon_acc) == 0: return 0 elif lon_acc > 0 and lon_acc < ip_acc: return 1 elif lon_acc < 0 and lon_acc > ip_dec: return -1 else: return 0 def _cadence_detector(self): """检测顿挫事件""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence', 'v']].copy() time_list = data['simTime'].values.tolist() data = data[data['cadence'] != np.nan] data['cadence_diff'] = data['cadence'].diff() data.dropna(subset='cadence_diff', inplace=True) data = data[data['cadence_diff'] != 0] t_list = data['simTime'].values.tolist() f_list = data['simFrame'].values.tolist() TIME_RANGE = 1 group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫 group_frame = [g for g in group_frame if len(g) >= 1] # 输出图表值 cadence_time = [[g[0], g[-1]] for g in group_time] cadence_frame = [[g[0], g[-1]] for g in group_frame] if cadence_time: # 保存顿挫事件摘要 time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'cadence' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) # 将顿挫组的起始时间为组重新统计时间 cadence_time_list = [time for pair in cadence_time for time in self.ego_df['simTime'].values if pair[0] <= time <= pair[1]] self.cadence_count = len(cadence_time) return cadence_time_list def _slam_brake_detector(self): """检测急刹车事件""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake', 'v']].copy() res_df = data[data['slam_brake'] == 1] t_list = res_df['simTime'].values f_list = res_df['simFrame'].values.tolist() TIME_RANGE = 1 group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹 group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 slam_brake_time = [[g[0], g[-1]] for g in group_time] slam_brake_frame = [[g[0], g[-1]] for g in group_frame] if slam_brake_time: # 保存事件摘要 time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'slam_brake' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) time_list = [element for sublist in group_time for element in sublist] self.slam_brake_count = len(group_time) return time_list def _slam_accel_detector(self): """检测急加速事件""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel', 'v']].copy() res_df = data.loc[data['slam_accel'] == 1] t_list = res_df['simTime'].values f_list = res_df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 slam_accel_time = [[g[0], g[-1]] for g in group_time] slam_accel_frame = [[g[0], g[-1]] for g in group_frame] if slam_accel_time: # 保存事件摘要 time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'slam_accel' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) time_list = [element for sublist in group_time for element in sublist] self.slam_accel_count = len(group_time) return time_list def comf_statistic(self): """统计舒适性指标""" df = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'accelH', 'speedH', 'lat_acc', 'v']].copy() self.zigzag_count_func() # self.cal_zigzag_strength_strength() if self.zigzag_time_list: # 保存 Weaving (zigzag) 事件摘要 zigzag_df = pd.DataFrame(self.zigzag_time_list, columns=['start_time', 'end_time']) zigzag_df = get_frame_with_time(zigzag_df, self.ego_df) zigzag_df['type'] = 'zigzag' self.discomfort_df = pd.concat([self.discomfort_df, zigzag_df], ignore_index=True) zigzag_t_list = [] # 只有[t_start, t_end]数对,要提取为完整time list t_list = df['simTime'].values.tolist() for t_start, t_end in self.zigzag_time_list: index_1 = t_list.index(t_start) index_2 = t_list.index(t_end) zigzag_t_list.extend(t_list[index_1:index_2 + 1]) zigzag_t_list = list(set(zigzag_t_list)) shake_t_list = self._shake_detector() cadence_t_list = self._cadence_detector() slam_brake_t_list = self._slam_brake_detector() slam_accel_t_list = self._slam_accel_detector() # 统计结果 self.calculated_value = { "weaving": self.zigzag_count, "shake": self.shake_count, "cadence": self.cadence_count, "slamBrake": self.slam_brake_count, "slamAccelerate": self.slam_accel_count } self.logger.info(f"舒适性计算完成,统计结果:{self.calculated_value}") return self.calculated_value def report_statistic(self): """生成舒适性评估报告""" comfort_result = self.comf_statistic() evaluator = Score(self.data_processed.comfort_config) result = evaluator.evaluate(comfort_result) print("\n[舒适性表现及得分情况]") return result