123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import math
- import pandas as pd
- import numpy as np
- import scipy.signal
- from modules.lib.score import Score
- from modules.lib.common import get_interpolation, get_frame_with_time
- from modules.config import config
- from modules.lib.log_manager import LogManager
- def peak_valley_decorator(method):
- """峰谷检测装饰器"""
- def wrapper(self, *args, **kwargs):
- peak_valley = self._peak_valley_determination(self.df)
- pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
-
- if pv_list:
- p_last = pv_list[0]
- for i in range(1, len(pv_list)):
- p_curr = pv_list[i]
- if self._peak_valley_judgment(p_last, p_curr):
- method(self, p_curr, p_last, True, *args, **kwargs)
- else:
- p_last = p_curr
- return method
- else:
- method(self, [0, 0], [0, 0], False, *args, **kwargs)
- return method
- return wrapper
- class Comfort(object):
-
- """自动驾驶舒适性评估类"""
-
- def __init__(self, data_processed):
- self.data_processed = data_processed
- self.logger = LogManager().get_logger()
- # 初始化数据容器
- self.data = data_processed.ego_data
- self.ego_df = pd.DataFrame()
- self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
-
- # 统计指标
- self.calculated_value = {
- 'weaving': 0, 'shake': 0, 'cadence': 0,
- 'slamBrake': 0, 'slamAccelerate': 0
- }
-
- # 时间序列数据
- self.time_list = self.data['simTime'].values.tolist()
- self.frame_list = self.data['simFrame'].values.tolist()
-
- # 初始化检测器状态
- self.zigzag_count = 0
- self.shake_count = 0
- self.cadence_count = 0
- self.slam_brake_count = 0
- self.slam_accel_count = 0
- self.zigzag_time_list = []
- # 数据预处理
- self._get_data()
- self._comf_param_cal()
- def _get_data(self):
- """获取舒适性评估所需数据"""
- self.ego_df = self.data[config.COMFORT_INFO].copy()
- self.df = self.ego_df.reset_index(drop=True)
- # 1. 移除未使用的曲率计算相关代码
- def _comf_param_cal(self):
- """计算舒适性相关参数"""
- # 动态加减速阈值
- self.ego_df['ip_acc'] = self.ego_df['v'].apply(
- get_interpolation, point1=[18, 4], point2=[72, 2])
- self.ego_df['ip_dec'] = self.ego_df['v'].apply(
- get_interpolation, point1=[18, -5], point2=[72, -3.5])
-
- # 急刹急加速标记
- self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
- lambda x: 1 if x < 0 else 0)
- self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
- lambda x: 1 if x > 0 else 0)
-
- # 顿挫检测预处理
- self.ego_df['cadence'] = self.ego_df.apply(
- lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
-
-
- def _peak_valley_determination(self, df):
- """确定车辆角速度的峰值和谷值"""
- peaks, _ = scipy.signal.find_peaks(
- df['speedH'], height=2.3, distance=3,
- prominence=2.3, width=1)
- valleys, _ = scipy.signal.find_peaks(
- -df['speedH'], height=2.3, distance=3,
- prominence=2.3, width=1)
- return sorted(list(peaks) + list(valleys))
- def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6):
- """判断峰谷对是否构成曲折行驶"""
- t_diff = p_curr[0] - p_last[0]
- v_diff = abs(p_curr[1] - p_last[1])
- s = p_curr[1] * p_last[1]
- if t_diff < tw and v_diff > avg and s < 0:
- if [p_last[0], p_curr[0]] not in self.zigzag_time_list:
- self.zigzag_time_list.append([p_last[0], p_curr[0]])
- return True
- return False
- @peak_valley_decorator
- def zigzag_count_func(self, p_curr, p_last, flag=True):
- """计算曲折行驶次数"""
- if flag:
- self.zigzag_count += 1
- @peak_valley_decorator
- def cal_zigzag_strength_strength(self, p_curr, p_last, flag=True):
- """计算曲折行驶强度"""
- if flag:
- v_diff = abs(p_curr[1] - p_last[1])
- t_diff = p_curr[0] - p_last[0]
- if t_diff > 0:
- self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
- else:
- self.zigzag_stre_list = []
- def _shake_detector(self, T_diff=0.5):
- """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)"""
- # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算
- time_list = []
- frame_list = []
- # 复制数据以避免修改原始数据
- df = self.ego_df.copy()
-
- # 1. 计算横向加速度变化率
- df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff()
-
- # 2. 计算横摆角速度变化率
- df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff()
-
- # 3. 计算横摆角速度的短期变化特性
- window_size = 5 # 5帧窗口
- df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
-
- # 4. 基于车速的动态阈值
- # df['lat_acc_threshold'] = df['v'].apply(
- # lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60)))
- # )
- v0 = 20 * 5/18 # ≈5.56 m/s
- # 递减系数
- k = 0.008 * 3.6 # =0.0288 per m/s
- df['lat_acc_threshold'] = df['v'].apply(
- lambda speed: max(
- 1.0, # 下限 1.0 m/s²
- min(
- 1.8, # 上限 1.8 m/s²
- 1.8 - k * (speed - v0) # 线性递减
- )
- )
- )
-
- df['speedH_threshold'] = df['v'].apply(
- lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
- )
- # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用
- self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold']
- self.ego_df['speedH_threshold'] = df['speedH_threshold']
- self.ego_df['lat_acc_rate'] = df['lat_acc_rate']
- self.ego_df['speedH_rate'] = df['speedH_rate']
- self.ego_df['speedH_std'] = df['speedH_std']
-
- # 5. 综合判断晃动条件
- # 条件A: 横向加速度超过阈值
- condition_A = df['lat_acc'].abs() > df['lat_acc_threshold']
-
- # 条件B: 横向加速度变化率超过阈值
- lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³)
- condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold
-
- # 条件C: 横摆角速度有明显变化但不呈现周期性
- condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times()))
-
- # 综合条件: 满足条件A,且满足条件B或条件C
- shake_condition = condition_A & (condition_B | condition_C)
-
- # 筛选满足条件的数据
- shake_df = df[shake_condition].copy()
-
- # 修改:按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动
- if not shake_df.empty:
- # 计算帧号差
- shake_df['frame_diff'] = shake_df['simFrame'].diff()
-
- # 标记不连续的点(帧号差大于1)
- # 通常连续帧的帧号差应该是1
- shake_df['is_new_group'] = shake_df['frame_diff'] > 1
-
- # 第一个点标记为新组
- if not shake_df.empty:
- shake_df.iloc[0, shake_df.columns.get_loc('is_new_group')] = True
-
- # 创建组ID
- shake_df['group_id'] = shake_df['is_new_group'].cumsum()
-
- # 按组计算帧数和持续时间
- group_info = shake_df.groupby('group_id').agg({
- 'simTime': ['min', 'max'],
- 'simFrame': ['min', 'max', 'count'] # 添加count计算每组的帧数
- })
-
- group_info.columns = ['start_time', 'end_time', 'start_frame', 'end_frame', 'frame_count']
- group_info['duration'] = group_info['end_time'] - group_info['start_time']
-
- # 筛选连续帧数超过阈值的组
- # 假设采样率为100Hz,则0.5秒对应约50帧
- MIN_FRAME_COUNT = 5 # 最小连续帧数阈值,可根据实际采样率调整
- valid_groups = group_info[group_info['frame_count'] >= MIN_FRAME_COUNT]
-
- # 如果有有效的晃动组
- if not valid_groups.empty:
- # 获取有效组的ID
- valid_group_ids = valid_groups.index.tolist()
-
- # 筛选属于有效组的数据点
- valid_shake_df = shake_df[shake_df['group_id'].isin(valid_group_ids)]
-
- # 简化场景分类,只收集时间和帧号
- for group_id, group in valid_shake_df.groupby('group_id'):
- # 不再使用curvHor进行场景分类,而是使用横摆角速度和转向灯状态
-
- # 直道场景(横摆角速度小,无转向灯)
- straight_mask = (group.lightMask == 0) & (group.speedH.abs() < 2.0)
- # 换道场景(有转向灯,横摆角速度适中)
- lane_change_mask = (group.lightMask != 0) & (group.speedH.abs() < 5.0)
- # 转弯场景(横摆角速度大或有转向灯且横摆角速度适中)
- turning_mask = (group.speedH.abs() >= 5.0) | ((group.lightMask != 0) & (group.speedH.abs() >= 2.0))
-
- # 为每种场景添加标记
- if straight_mask.any():
- straight_group = group[straight_mask].copy()
- time_list.extend(straight_group['simTime'].values)
- frame_list.extend(straight_group['simFrame'].values)
-
- if lane_change_mask.any():
- lane_change_group = group[lane_change_mask].copy()
- time__list.extend(lane_change_group['simTime'].values)
- frame_list.extend(lane_change_group['simFrame'].values)
-
- if turning_mask.any():
- turning_group = group[turning_mask].copy()
- time_list.extend(turning_group['simTime'].values)
- frame_list.extend(turning_group['simFrame'].values)
-
- # 准备晃动事件数据
- shake_time = []
- shake_frame = []
-
- for group_id in valid_group_ids:
- start_time = valid_groups.loc[group_id, 'start_time']
- end_time = valid_groups.loc[group_id, 'end_time']
- start_frame = valid_groups.loc[group_id, 'start_frame']
- end_frame = valid_groups.loc[group_id, 'end_frame']
-
- shake_time.append([start_time, end_time])
- shake_frame.append([start_frame, end_frame])
-
- self.shake_count = len(shake_time)
-
- if shake_time:
- # 保存晃动事件摘要
- time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
- time_df['duration'] = time_df['end_time'] - time_df['start_time'] # 添加持续时间列
- frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
- discomfort_df = pd.concat([time_df, frame_df], axis=1)
- discomfort_df['type'] = 'shake'
- self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
-
- # 在方法末尾添加以下代码,保存晃动事件的详细数据
- # if self.shake_count > 0:
- # self._save_shake_data()
- # self._plot_shake_analysis()
-
- return time_list
-
- def _save_shake_data(self):
- """保存晃动事件的详细数据,用于后续分析"""
- import os
-
- # 创建保存目录
- save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis")
- os.makedirs(save_dir, exist_ok=True)
-
- # 1. 保存所有晃动事件的摘要信息
- shake_events = self.discomfort_df[self.discomfort_df['type'] == 'shake'].copy()
- if not shake_events.empty:
- shake_events.to_csv(os.path.join(save_dir, "shake_events_summary.csv"), index=False)
-
- # 2. 为每个晃动事件保存详细数据
- for i, event in shake_events.iterrows():
- start_time = event['start_time']
- end_time = event['end_time']
-
- # 提取该晃动事件的所有数据帧
- event_data = self.ego_df[
- (self.ego_df['simTime'] >= start_time) &
- (self.ego_df['simTime'] <= end_time)
- ].copy()
-
- # 添加一些分析指标
- event_data['lat_acc_abs'] = event_data['lat_acc'].abs()
- event_data['lat_acc_rate'] = event_data['lat_acc'].diff() / event_data['simTime'].diff()
- event_data['speedH_rate'] = event_data['speedH'].diff() / event_data['simTime'].diff()
-
- # 保存该事件的详细数据
- event_data.to_csv(
- os.path.join(save_dir, f"shake_event_{i+1}_detail.csv"),
- index=False
- )
-
- # 3. 保存所有晃动事件的汇总统计数据
- shake_stats = {
- 'total_count': self.shake_count,
- 'avg_duration': shake_events['duration'].mean(),
- 'max_duration': shake_events['duration'].max(),
- 'min_duration': shake_events['duration'].min(),
- 'total_duration': shake_events['duration'].sum(),
- }
-
- import json
- with open(os.path.join(save_dir, "shake_statistics.json"), 'w') as f:
- json.dump(shake_stats, f, indent=4)
-
- self.logger.info(f"晃动事件数据已保存至: {save_dir}")
-
- def _plot_shake_analysis(self):
- """绘制晃动分析图表,并标记关键阈值和数据点"""
- import os
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
-
- # 创建保存目录
- save_dir = os.path.join(self.data_processed.data_path, "comfort_analysis")
- os.makedirs(save_dir, exist_ok=True)
-
- # 准备数据
- df = self.ego_df.copy()
-
- # 检查必要的列是否存在
- required_columns = ['lat_acc_threshold', 'speedH_threshold', 'speedH_std']
- missing_columns = [col for col in required_columns if col not in df.columns]
-
- if missing_columns:
- self.logger.warning(f"Missing columns for plotting: {missing_columns}, possibly because shake detection was not executed correctly")
- # 如果缺少必要的列,重新计算一次
- for col in missing_columns:
- if col == 'lat_acc_threshold':
- df['lat_acc_threshold'] = df['v'].apply(
- lambda speed: max(0.3, min(0.8, 0.5 * (1 + (speed - 20) / 60)))
- )
- elif col == 'speedH_threshold':
- df['speedH_threshold'] = df['v'].apply(
- lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
- )
- elif col == 'speedH_std':
- window_size = 5
- df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
-
- # 创建图表
- fig, axs = plt.subplots(3, 1, figsize=(14, 12), sharex=True)
-
- # 绘制横向加速度
- axs[0].plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
- axs[0].set_ylabel('Lateral Acceleration (m/s²)')
- axs[0].set_title('Shake Analysis')
- axs[0].grid(True)
-
- # 绘制动态阈值线
- axs[0].plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='Threshold')
- axs[0].plot(df['simTime'], -df['lat_acc_threshold'], 'r--')
-
- # 绘制横摆角速度
- axs[1].plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
- axs[1].set_ylabel('Yaw Rate (deg/s)')
- axs[1].grid(True)
-
- # 绘制横摆角速度阈值
- axs[1].plot(df['simTime'], df['speedH_threshold'], 'r--', label='Threshold')
- axs[1].plot(df['simTime'], -df['speedH_threshold'], 'r--')
-
- # 绘制横摆角速度标准差
- axs[1].plot(df['simTime'], df['speedH_std'], 'm-', alpha=0.5, label='Yaw Rate Std')
-
- # 绘制车速
- axs[2].plot(df['simTime'], df['v'], 'k-', label='Vehicle Speed')
- axs[2].set_xlabel('Time (s)')
- axs[2].set_ylabel('Speed (km/h)')
- axs[2].grid(True)
-
- # 标记晃动事件
- if not self.discomfort_df.empty:
- shake_df = self.discomfort_df[self.discomfort_df['type'] == 'shake']
-
- # 为每个晃动事件创建详细标记
- for idx, row in shake_df.iterrows():
- start_time = row['start_time']
- end_time = row['end_time']
-
- # 在所有子图中标记晃动区域
- for ax in axs:
- ax.axvspan(start_time, end_time, alpha=0.2, color='red')
-
- # 获取晃动期间的数据
- shake_period = df[(df['simTime'] >= start_time) & (df['simTime'] <= end_time)]
-
- if not shake_period.empty:
- # 找出晃动期间横向加速度的最大值点
- max_lat_acc_idx = shake_period['lat_acc'].abs().idxmax()
- max_lat_acc_time = shake_period.loc[max_lat_acc_idx, 'simTime']
- max_lat_acc_value = shake_period.loc[max_lat_acc_idx, 'lat_acc']
-
- # 标记最大横向加速度点
- axs[0].scatter(max_lat_acc_time, max_lat_acc_value, color='red', s=80, zorder=5)
- axs[0].annotate(
- f'Max: {max_lat_acc_value:.2f} m/s²\nTime: {max_lat_acc_time:.2f}s',
- xy=(max_lat_acc_time, max_lat_acc_value),
- xytext=(10, 20),
- textcoords='offset points',
- arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
- bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7)
- )
-
- # 找出晃动期间横摆角速度的最大值点
- max_speedH_idx = shake_period['speedH'].abs().idxmax()
- max_speedH_time = shake_period.loc[max_speedH_idx, 'simTime']
- max_speedH_value = shake_period.loc[max_speedH_idx, 'speedH']
-
- # 标记最大横摆角速度点
- axs[1].scatter(max_speedH_time, max_speedH_value, color='red', s=80, zorder=5)
- axs[1].annotate(
- f'Max: {max_speedH_value:.2f} deg/s\nTime: {max_speedH_time:.2f}s',
- xy=(max_speedH_time, max_speedH_value),
- xytext=(10, 20),
- textcoords='offset points',
- arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
- bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7)
- )
-
- # 标记晃动开始和结束点
- for i in range(2): # 只在前两个子图中标记
- # 开始点
- start_value = shake_period.iloc[0][['lat_acc', 'speedH'][i]]
- axs[i].scatter(start_time, start_value, color='green', s=80, zorder=5)
- axs[i].annotate(
- f'Start: {start_time:.2f}s',
- xy=(start_time, start_value),
- xytext=(-10, -30),
- textcoords='offset points',
- arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
- bbox=dict(boxstyle='round,pad=0.5', fc='lightgreen', alpha=0.7)
- )
-
- # 结束点
- end_value = shake_period.iloc[-1][['lat_acc', 'speedH'][i]]
- axs[i].scatter(end_time, end_value, color='blue', s=80, zorder=5)
- axs[i].annotate(
- f'End: {end_time:.2f}s',
- xy=(end_time, end_value),
- xytext=(10, -30),
- textcoords='offset points',
- arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=.2'),
- bbox=dict(boxstyle='round,pad=0.5', fc='lightblue', alpha=0.7)
- )
-
- # 添加晃动检测条件说明
- textstr = '\n'.join((
- 'Shake Detection Conditions:',
- '1. Lateral acceleration exceeds dynamic threshold',
- '2. High lateral acceleration rate or yaw rate std',
- '3. Duration exceeds threshold'
- ))
- props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
- axs[0].text(0.02, 0.98, textstr, transform=axs[0].transAxes, fontsize=10,
- verticalalignment='top', bbox=props)
-
- # 添加图例
- for ax in axs:
- ax.legend(loc='upper right')
-
- # 调整布局并保存
- plt.tight_layout()
- plt.savefig(os.path.join(save_dir, "shake_analysis.png"), dpi=300)
- plt.close()
-
- def _get_zigzag_times(self):
- """获取所有画龙事件的时间点,用于排除画龙与晃动的重叠检测"""
- zigzag_times = []
- for start_time, end_time in self.zigzag_time_list:
- # 获取该时间段内的所有时间点
- times_in_range = self.ego_df[(self.ego_df['simTime'] >= start_time) &
- (self.ego_df['simTime'] <= end_time)]['simTime'].values
- zigzag_times.extend(times_in_range)
- return zigzag_times
- def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
- """处理顿挫数据"""
- if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
- return np.nan
- elif abs(lon_acc) == 0:
- return 0
- elif lon_acc > 0 and lon_acc < ip_acc:
- return 1
- elif lon_acc < 0 and lon_acc > ip_dec:
- return -1
- else:
- return 0
- def _cadence_detector(self):
- """检测顿挫事件"""
- data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence', 'v']].copy()
- time_list = data['simTime'].values.tolist()
- data = data[data['cadence'] != np.nan]
- data['cadence_diff'] = data['cadence'].diff()
- data.dropna(subset='cadence_diff', inplace=True)
- data = data[data['cadence_diff'] != 0]
- t_list = data['simTime'].values.tolist()
- f_list = data['simFrame'].values.tolist()
- TIME_RANGE = 1
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫
- group_frame = [g for g in group_frame if len(g) >= 1]
- # 输出图表值
- cadence_time = [[g[0], g[-1]] for g in group_time]
- cadence_frame = [[g[0], g[-1]] for g in group_frame]
- if cadence_time:
- # 保存顿挫事件摘要
- time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time'])
- frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame'])
- discomfort_df = pd.concat([time_df, frame_df], axis=1)
- discomfort_df['type'] = 'cadence'
- self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
- # 将顿挫组的起始时间为组重新统计时间
- cadence_time_list = [time for pair in cadence_time for time in self.ego_df['simTime'].values if pair[0] <= time <= pair[1]]
- self.cadence_count = len(cadence_time)
- return cadence_time_list
- def _slam_brake_detector(self):
- """检测急刹车事件"""
- data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake', 'v']].copy()
-
- res_df = data[data['slam_brake'] == 1]
- t_list = res_df['simTime'].values
- f_list = res_df['simFrame'].values.tolist()
- TIME_RANGE = 1
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- slam_brake_time = [[g[0], g[-1]] for g in group_time]
- slam_brake_frame = [[g[0], g[-1]] for g in group_frame]
- if slam_brake_time:
- # 保存事件摘要
- time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time'])
- frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame'])
- discomfort_df = pd.concat([time_df, frame_df], axis=1)
- discomfort_df['type'] = 'slam_brake'
- self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
- time_list = [element for sublist in group_time for element in sublist]
- self.slam_brake_count = len(group_time)
- return time_list
- def _slam_accel_detector(self):
- """检测急加速事件"""
- data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel', 'v']].copy()
-
- res_df = data.loc[data['slam_accel'] == 1]
- t_list = res_df['simTime'].values
- f_list = res_df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- slam_accel_time = [[g[0], g[-1]] for g in group_time]
- slam_accel_frame = [[g[0], g[-1]] for g in group_frame]
- if slam_accel_time:
- # 保存事件摘要
- time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time'])
- frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame'])
- discomfort_df = pd.concat([time_df, frame_df], axis=1)
- discomfort_df['type'] = 'slam_accel'
- self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
- time_list = [element for sublist in group_time for element in sublist]
- self.slam_accel_count = len(group_time)
- return time_list
-
- def comf_statistic(self):
- """统计舒适性指标"""
- df = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'accelH', 'speedH', 'lat_acc', 'v']].copy()
- self.zigzag_count_func()
- # self.cal_zigzag_strength_strength()
- if self.zigzag_time_list:
- # 保存 Weaving (zigzag) 事件摘要
- zigzag_df = pd.DataFrame(self.zigzag_time_list, columns=['start_time', 'end_time'])
- zigzag_df = get_frame_with_time(zigzag_df, self.ego_df)
- zigzag_df['type'] = 'zigzag'
- self.discomfort_df = pd.concat([self.discomfort_df, zigzag_df], ignore_index=True)
- zigzag_t_list = []
- # 只有[t_start, t_end]数对,要提取为完整time list
- t_list = df['simTime'].values.tolist()
- for t_start, t_end in self.zigzag_time_list:
- index_1 = t_list.index(t_start)
- index_2 = t_list.index(t_end)
- zigzag_t_list.extend(t_list[index_1:index_2 + 1])
- zigzag_t_list = list(set(zigzag_t_list))
- shake_t_list = self._shake_detector()
- cadence_t_list = self._cadence_detector()
- slam_brake_t_list = self._slam_brake_detector()
- slam_accel_t_list = self._slam_accel_detector()
- # 统计结果
- self.calculated_value = {
- "weaving": self.zigzag_count,
- "shake": self.shake_count,
- "cadence": self.cadence_count,
- "slamBrake": self.slam_brake_count,
- "slamAccelerate": self.slam_accel_count
- }
- self.logger.info(f"舒适性计算完成,统计结果:{self.calculated_value}")
- return self.calculated_value
- def report_statistic(self):
- """生成舒适性评估报告"""
- comfort_result = self.comf_statistic()
- evaluator = Score(self.data_processed.comfort_config)
- result = evaluator.evaluate(comfort_result)
- print("\n[舒适性表现及得分情况]")
- return result
|