comfort.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2023/06/25
  12. @Summary: Comfort metrics
  13. """
  14. import sys
  15. import math
  16. import pandas as pd
  17. import numpy as np
  18. import scipy.signal
  19. from pathlib import Path
  20. from typing import Dict, List, Any, Optional, Callable, Union, Tuple
  21. from modules.lib.score import Score
  22. from modules.lib.common import get_interpolation, get_frame_with_time
  23. from modules.lib import data_process
  24. from modules.lib.log_manager import LogManager
  25. COMFORT_INFO = [
  26. "simTime",
  27. "simFrame",
  28. "speedX",
  29. "speedY",
  30. "accelX",
  31. "accelY",
  32. "curvHor",
  33. "lightMask",
  34. "v",
  35. "lat_acc",
  36. "lon_acc",
  37. "time_diff",
  38. "lon_acc_diff",
  39. "lon_acc_roc",
  40. "speedH",
  41. "accelH",
  42. ]
  43. # ----------------------
  44. # 独立指标计算函数
  45. # ----------------------
  46. def weaving(data_processed) -> dict:
  47. """计算蛇行指标"""
  48. comfort = ComfortCalculator(data_processed)
  49. zigzag_count = comfort.calculate_zigzag_count()
  50. return {"weaving": float(zigzag_count)}
  51. def shake(data_processed) -> dict:
  52. """计算晃动指标"""
  53. comfort = ComfortCalculator(data_processed)
  54. shake_count = comfort.calculate_shake_count()
  55. return {"shake": float(shake_count)}
  56. def cadence(data_processed) -> dict:
  57. """计算顿挫指标"""
  58. comfort = ComfortCalculator(data_processed)
  59. cadence_count = comfort.calculate_cadence_count()
  60. return {"cadence": float(cadence_count)}
  61. def slamBrake(data_processed) -> dict:
  62. """计算急刹车指标"""
  63. comfort = ComfortCalculator(data_processed)
  64. slam_brake_count = comfort.calculate_slam_brake_count()
  65. return {"slamBrake": float(slam_brake_count)}
  66. def slamAccelerate(data_processed) -> dict:
  67. """计算急加速指标"""
  68. comfort = ComfortCalculator(data_processed)
  69. slam_accel_count = comfort.calculate_slam_accel_count()
  70. return {"slamAccelerate": float(slam_accel_count)}
  71. # 装饰器保持不变
  72. def peak_valley_decorator(method):
  73. def wrapper(self, *args, **kwargs):
  74. peak_valley = self._peak_valley_determination(self.df)
  75. pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
  76. if len(pv_list) != 0:
  77. flag = True
  78. p_last = pv_list[0]
  79. for i in range(1, len(pv_list)):
  80. p_curr = pv_list[i]
  81. if self._peak_valley_judgment(p_last, p_curr):
  82. # method(self, p_curr, p_last)
  83. method(self, p_curr, p_last, flag, *args, **kwargs)
  84. else:
  85. p_last = p_curr
  86. return method
  87. else:
  88. flag = False
  89. p_curr = [0, 0]
  90. p_last = [0, 0]
  91. method(self, p_curr, p_last, flag, *args, **kwargs)
  92. return method
  93. return wrapper
  94. class ComfortRegistry:
  95. """舒适性指标注册器"""
  96. def __init__(self, data_processed):
  97. self.logger = LogManager().get_logger() # 获取全局日志实例
  98. self.data = data_processed
  99. self.comfort_config = data_processed.comfort_config["comfort"]
  100. self.metrics = self._extract_metrics(self.comfort_config)
  101. self._registry = self._build_registry()
  102. def _extract_metrics(self, config_node: dict) -> list:
  103. """DFS遍历提取指标"""
  104. metrics = []
  105. def _recurse(node):
  106. if isinstance(node, dict):
  107. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  108. metrics.append(node['name'])
  109. for v in node.values():
  110. _recurse(v)
  111. _recurse(config_node)
  112. self.logger.info(f'评比的舒适性指标列表:{metrics}')
  113. return metrics
  114. def _build_registry(self) -> dict:
  115. """自动注册指标函数"""
  116. registry = {}
  117. for metric_name in self.metrics:
  118. try:
  119. registry[metric_name] = globals()[metric_name]
  120. except KeyError:
  121. self.logger.error(f"未实现指标函数: {metric_name}")
  122. return registry
  123. def batch_execute(self) -> dict:
  124. """批量执行指标计算"""
  125. results = {}
  126. for name, func in self._registry.items():
  127. try:
  128. result = func(self.data)
  129. results.update(result)
  130. except Exception as e:
  131. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  132. results[name] = None
  133. self.logger.info(f'舒适性指标计算结果:{results}')
  134. return results
  135. class ComfortCalculator:
  136. """舒适性指标计算类 - 提供核心计算功能"""
  137. def __init__(self, data_processed):
  138. self.data_processed = data_processed
  139. self.logger = LogManager().get_logger()
  140. self.data = data_processed.ego_data
  141. self.ego_df = pd.DataFrame()
  142. self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  143. self.time_list = self.data['simTime'].values.tolist()
  144. self.frame_list = self.data['simFrame'].values.tolist()
  145. self.zigzag_count = 0
  146. self.shake_count = 0
  147. self.cadence_count = 0
  148. self.slam_brake_count = 0
  149. self.slam_accel_count = 0
  150. self.zigzag_time_list = []
  151. self.zigzag_stre_list = []
  152. self.cur_ego_path_list = []
  153. self.curvature_list = []
  154. self._initialize_data()
  155. def _initialize_data(self):
  156. """初始化数据"""
  157. self.ego_df = self.data[COMFORT_INFO].copy()
  158. self.df = self.ego_df.reset_index(drop=True)
  159. self._prepare_comfort_parameters()
  160. def _prepare_comfort_parameters(self):
  161. """准备舒适性计算所需参数"""
  162. # 计算加减速阈值
  163. self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
  164. self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
  165. self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
  166. lambda x: 1 if x < 0 else 0)
  167. self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
  168. lambda x: 1 if x > 0 else 0)
  169. self.ego_df['cadence'] = self.ego_df.apply(
  170. lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
  171. # 计算曲率相关参数
  172. self.ego_df['cur_ego_path'] = self.ego_df.apply(self._cal_cur_ego_path, axis=1)
  173. self.ego_df['curvHor'] = self.ego_df['curvHor'].astype('float')
  174. self.ego_df['cur_diff'] = (self.ego_df['cur_ego_path'] - self.ego_df['curvHor']).abs()
  175. self.ego_df['R'] = self.ego_df['curvHor'].apply(lambda x: 10000 if x == 0 else 1 / x)
  176. self.ego_df['R_ego'] = self.ego_df['cur_ego_path'].apply(lambda x: 10000 if x == 0 else 1 / x)
  177. self.ego_df['R_diff'] = (self.ego_df['R_ego'] - self.ego_df['R']).abs()
  178. self.cur_ego_path_list = self.ego_df['cur_ego_path'].values.tolist()
  179. self.curvature_list = self.ego_df['curvHor'].values.tolist()
  180. def _cal_cur_ego_path(self, row):
  181. """计算车辆轨迹曲率"""
  182. try:
  183. divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2)
  184. if not divide:
  185. res = None
  186. else:
  187. res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
  188. except:
  189. res = None
  190. return res
  191. def _peak_valley_determination(self, df):
  192. """确定角速度的峰谷"""
  193. peaks, _ = scipy.signal.find_peaks(df['speedH'], height=0.01, distance=1, prominence=0.01)
  194. valleys, _ = scipy.signal.find_peaks(-df['speedH'], height=0.01, distance=1, prominence=0.01)
  195. peak_valley = sorted(list(peaks) + list(valleys))
  196. return peak_valley
  197. def _peak_valley_judgment(self, p_last, p_curr, tw=10000, avg=0.02):
  198. """判断峰谷是否满足蛇行条件"""
  199. t_diff = p_curr[0] - p_last[0]
  200. v_diff = abs(p_curr[1] - p_last[1])
  201. s = p_curr[1] * p_last[1]
  202. zigzag_flag = t_diff < tw and v_diff > avg and s < 0
  203. if zigzag_flag and ([p_last[0], p_curr[0]] not in self.zigzag_time_list):
  204. self.zigzag_time_list.append([p_last[0], p_curr[0]])
  205. return zigzag_flag
  206. def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
  207. """处理顿挫数据"""
  208. if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
  209. return np.nan
  210. elif abs(lon_acc) == 0:
  211. return 0
  212. elif lon_acc > 0 and lon_acc < ip_acc:
  213. return 1
  214. elif lon_acc < 0 and lon_acc > ip_dec:
  215. return -1
  216. else:
  217. return 0
  218. @peak_valley_decorator
  219. def _zigzag_count_func(self, p_curr, p_last, flag=True):
  220. """计算蛇行次数"""
  221. if flag:
  222. self.zigzag_count += 1
  223. else:
  224. self.zigzag_count += 0
  225. @peak_valley_decorator
  226. def _cal_zigzag_strength(self, p_curr, p_last, flag=True):
  227. """计算蛇行强度"""
  228. if flag:
  229. v_diff = abs(p_curr[1] - p_last[1])
  230. t_diff = p_curr[0] - p_last[0]
  231. self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
  232. else:
  233. self.zigzag_stre_list = []
  234. def calculate_zigzag_count(self):
  235. """计算蛇行指标"""
  236. self._zigzag_count_func()
  237. return self.zigzag_count
  238. def calculate_shake_count(self):
  239. """计算晃动指标"""
  240. self._shake_detector()
  241. return self.shake_count
  242. def calculate_cadence_count(self):
  243. """计算顿挫指标"""
  244. self._cadence_detector()
  245. return self.cadence_count
  246. def calculate_slam_brake_count(self):
  247. """计算急刹车指标"""
  248. self._slam_brake_detector()
  249. return self.slam_brake_count
  250. def calculate_slam_accel_count(self):
  251. """计算急加速指标"""
  252. self._slam_accel_detector()
  253. return self.slam_accel_count
  254. def _shake_detector(self, Cr_diff=0.05, T_diff=0.39):
  255. """晃动检测器"""
  256. time_list = []
  257. frame_list = []
  258. df = self.ego_df.copy()
  259. df = df[df['cur_diff'] > Cr_diff]
  260. df['frame_ID_diff'] = df['simFrame'].diff()
  261. filtered_df = df[df.frame_ID_diff > T_diff]
  262. row_numbers = filtered_df.index.tolist()
  263. cut_column = pd.cut(df.index, bins=row_numbers)
  264. grouped = df.groupby(cut_column)
  265. dfs = {}
  266. for name, group in grouped:
  267. dfs[name] = group.reset_index(drop=True)
  268. for name, df_group in dfs.items():
  269. # 直道,未主动换道
  270. df_group['curvHor'] = df_group['curvHor'].abs()
  271. df_group_straight = df_group[(df_group.lightMask == 0) & (df_group.curvHor < 0.001)]
  272. if not df_group_straight.empty:
  273. time_list.extend(df_group_straight['simTime'].values)
  274. frame_list.extend(df_group_straight['simFrame'].values)
  275. self.shake_count = self.shake_count + 1
  276. # 打转向灯,道路为直道
  277. df_group_change_lane = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'] < 0.001)]
  278. df_group_change_lane_data = df_group_change_lane[df_group_change_lane.cur_diff > Cr_diff + 0.2]
  279. if not df_group_change_lane_data.empty:
  280. time_list.extend(df_group_change_lane_data['simTime'].values)
  281. frame_list.extend(df_group_change_lane_data['simFrame'].values)
  282. self.shake_count = self.shake_count + 1
  283. # 转弯,打转向灯
  284. df_group_turn = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'].abs() > 0.001)]
  285. df_group_turn_data = df_group_turn[df_group_turn.cur_diff.abs() > Cr_diff + 0.1]
  286. if not df_group_turn_data.empty:
  287. time_list.extend(df_group_turn_data['simTime'].values)
  288. frame_list.extend(df_group_turn_data['simFrame'].values)
  289. self.shake_count = self.shake_count + 1
  290. # 分组处理
  291. TIME_RANGE = 1
  292. t_list = time_list
  293. f_list = frame_list
  294. group_time = []
  295. group_frame = []
  296. sub_group_time = []
  297. sub_group_frame = []
  298. if len(f_list) > 0:
  299. for i in range(len(f_list)):
  300. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE:
  301. sub_group_time.append(t_list[i])
  302. sub_group_frame.append(f_list[i])
  303. else:
  304. group_time.append(sub_group_time)
  305. group_frame.append(sub_group_frame)
  306. sub_group_time = [t_list[i]]
  307. sub_group_frame = [f_list[i]]
  308. group_time.append(sub_group_time)
  309. group_frame.append(sub_group_frame)
  310. # 输出图表值
  311. shake_time = [[g[0], g[-1]] for g in group_time]
  312. shake_frame = [[g[0], g[-1]] for g in group_frame]
  313. self.shake_count = len(shake_time)
  314. if shake_time:
  315. time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
  316. frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
  317. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  318. discomfort_df['type'] = 'shake'
  319. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  320. return time_list
  321. def _cadence_detector(self):
  322. """顿挫检测器"""
  323. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence']].copy()
  324. time_list = data['simTime'].values.tolist()
  325. data = data[data['cadence'] != np.nan]
  326. data['cadence_diff'] = data['cadence'].diff()
  327. data.dropna(subset='cadence_diff', inplace=True)
  328. data = data[data['cadence_diff'] != 0]
  329. t_list = data['simTime'].values.tolist()
  330. f_list = data['simFrame'].values.tolist()
  331. TIME_RANGE = 1
  332. group_time = []
  333. group_frame = []
  334. sub_group_time = []
  335. sub_group_frame = []
  336. for i in range(len(f_list)):
  337. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫
  338. sub_group_time.append(t_list[i])
  339. sub_group_frame.append(f_list[i])
  340. else:
  341. group_time.append(sub_group_time)
  342. group_frame.append(sub_group_frame)
  343. sub_group_time = [t_list[i]]
  344. sub_group_frame = [f_list[i]]
  345. group_time.append(sub_group_time)
  346. group_frame.append(sub_group_frame)
  347. group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫
  348. group_frame = [g for g in group_frame if len(g) >= 1]
  349. # 输出图表值
  350. cadence_time = [[g[0], g[-1]] for g in group_time]
  351. cadence_frame = [[g[0], g[-1]] for g in group_frame]
  352. if cadence_time:
  353. time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time'])
  354. frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame'])
  355. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  356. discomfort_df['type'] = 'cadence'
  357. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  358. # 将顿挫组的起始时间为组重新统计时间
  359. cadence_time_list = [time for pair in cadence_time for time in time_list if pair[0] <= time <= pair[1]]
  360. stre_list = []
  361. freq_list = []
  362. for g in group_time:
  363. # calculate strength
  364. g_df = data[data['simTime'].isin(g)]
  365. strength = g_df['lon_acc'].abs().mean()
  366. stre_list.append(strength)
  367. # calculate frequency
  368. cnt = len(g)
  369. t_start = g_df['simTime'].iloc[0]
  370. t_end = g_df['simTime'].iloc[-1]
  371. t_delta = t_end - t_start
  372. frequency = cnt / t_delta
  373. freq_list.append(frequency)
  374. self.cadence_count = len(freq_list)
  375. cadence_stre = sum(stre_list) / len(stre_list) if stre_list else 0
  376. return cadence_time_list
  377. def _slam_brake_detector(self):
  378. """急刹车检测器"""
  379. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake']].copy()
  380. res_df = data[data['slam_brake'] == 1]
  381. t_list = res_df['simTime'].values
  382. f_list = res_df['simFrame'].values.tolist()
  383. TIME_RANGE = 1
  384. group_time = []
  385. group_frame = []
  386. sub_group_time = []
  387. sub_group_frame = []
  388. for i in range(len(f_list)):
  389. if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹
  390. sub_group_time.append(t_list[i])
  391. sub_group_frame.append(f_list[i])
  392. else:
  393. group_time.append(sub_group_time)
  394. group_frame.append(sub_group_frame)
  395. sub_group_time = [t_list[i]]
  396. sub_group_frame = [f_list[i]]
  397. group_time.append(sub_group_time)
  398. group_frame.append(sub_group_frame)
  399. group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹
  400. group_frame = [g for g in group_frame if len(g) >= 2]
  401. # 输出图表值
  402. slam_brake_time = [[g[0], g[-1]] for g in group_time]
  403. slam_brake_frame = [[g[0], g[-1]] for g in group_frame]
  404. if slam_brake_time:
  405. time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time'])
  406. frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame'])
  407. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  408. discomfort_df['type'] = 'slam_brake'
  409. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  410. time_list = [element for sublist in group_time for element in sublist]
  411. self.slam_brake_count = len(group_time)
  412. return time_list
  413. def _slam_accel_detector(self):
  414. """急加速检测器"""
  415. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel']].copy()
  416. res_df = data.loc[data['slam_accel'] == 1]
  417. t_list = res_df['simTime'].values
  418. f_list = res_df['simFrame'].values.tolist()
  419. group_time = []
  420. group_frame = []
  421. sub_group_time = []
  422. sub_group_frame = []
  423. for i in range(len(f_list)):
  424. if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速
  425. sub_group_time.append(t_list[i])
  426. sub_group_frame.append(f_list[i])
  427. else:
  428. group_time.append(sub_group_time)
  429. group_frame.append(sub_group_frame)
  430. sub_group_time = [t_list[i]]
  431. sub_group_frame = [f_list[i]]
  432. group_time.append(sub_group_time)
  433. group_frame.append(sub_group_frame)
  434. group_time = [g for g in group_time if len(g) >= 2]
  435. group_frame = [g for g in group_frame if len(g) >= 2]
  436. # 输出图表值
  437. slam_accel_time = [[g[0], g[-1]] for g in group_time]
  438. slam_accel_frame = [[g[0], g[-1]] for g in group_frame]
  439. if slam_accel_time:
  440. time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time'])
  441. frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame'])
  442. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  443. discomfort_df['type'] = 'slam_accel'
  444. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  445. time_list = [element for sublist in group_time for element in sublist]
  446. self.slam_accel_count = len(group_time)
  447. return time_list
  448. class ComfortManager:
  449. """舒适性指标计算主类"""
  450. def __init__(self, data_processed):
  451. self.data = data_processed
  452. self.logger = LogManager().get_logger()
  453. self.registry = ComfortRegistry(self.data)
  454. def report_statistic(self):
  455. """生成舒适性评分报告"""
  456. comfort_result = self.registry.batch_execute()
  457. # evaluator = Score(self.data.comfort_config)
  458. # result = evaluator.evaluate(comfort_result)
  459. # return result
  460. return comfort_result
  461. if __name__ == '__main__':
  462. case_name = 'ICA'
  463. mode_label = 'PGVIL'
  464. data = data_process.DataPreprocessing(case_name, mode_label)
  465. comfort_instance = ComfortManager(data)
  466. try:
  467. comfort_result = comfort_instance.report_statistic()
  468. result = {'comfort': comfort_result}
  469. print(result)
  470. except Exception as e:
  471. print(f"An error occurred in Comfort.report_statistic: {e}")