comfort.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2023/06/25
  12. @Summary: Comfort metrics
  13. """
  14. import sys
  15. import math
  16. import pandas as pd
  17. import numpy as np
  18. import scipy.signal
  19. from pathlib import Path
  20. from typing import Dict, List, Any, Optional, Callable, Union, Tuple
  21. from modules.lib.score import Score
  22. from modules.lib.common import get_interpolation, get_frame_with_time
  23. from modules.lib import data_process
  24. from modules.lib.log_manager import LogManager
  25. COMFORT_INFO = [
  26. "simTime",
  27. "simFrame",
  28. "speedX",
  29. "speedY",
  30. "accelX",
  31. "accelY",
  32. "curvHor",
  33. "lightMask",
  34. "v",
  35. "lat_acc",
  36. "lon_acc",
  37. "time_diff",
  38. "lon_acc_diff",
  39. "lon_acc_roc",
  40. "speedH",
  41. "accelH",
  42. ]
  43. # ----------------------
  44. # 独立指标计算函数
  45. # ----------------------
  46. def calculate_weaving(data_processed) -> dict:
  47. """计算蛇行指标"""
  48. comfort = ComfortCalculator(data_processed)
  49. zigzag_count = comfort.calculate_zigzag_count()
  50. return {"weaving": float(zigzag_count)}
  51. def calculate_shake(data_processed) -> dict:
  52. """计算晃动指标"""
  53. comfort = ComfortCalculator(data_processed)
  54. shake_count = comfort.calculate_shake_count()
  55. return {"shake": float(shake_count)}
  56. def calculate_cadence(data_processed) -> dict:
  57. """计算顿挫指标"""
  58. comfort = ComfortCalculator(data_processed)
  59. cadence_count = comfort.calculate_cadence_count()
  60. return {"cadence": float(cadence_count)}
  61. def calculate_slambrake(data_processed) -> dict:
  62. """计算急刹车指标"""
  63. comfort = ComfortCalculator(data_processed)
  64. slam_brake_count = comfort.calculate_slam_brake_count()
  65. return {"slamBrake": float(slam_brake_count)}
  66. def calculate_slamaccelerate(data_processed) -> dict:
  67. """计算急加速指标"""
  68. comfort = ComfortCalculator(data_processed)
  69. slam_accel_count = comfort.calculate_slam_accel_count()
  70. return {"slamAccelerate": float(slam_accel_count)}
  71. # 装饰器保持不变
  72. def peak_valley_decorator(method):
  73. def wrapper(self, *args, **kwargs):
  74. peak_valley = self._peak_valley_determination(self.df)
  75. pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
  76. if len(pv_list) != 0:
  77. flag = True
  78. p_last = pv_list[0]
  79. for i in range(1, len(pv_list)):
  80. p_curr = pv_list[i]
  81. if self._peak_valley_judgment(p_last, p_curr):
  82. # method(self, p_curr, p_last)
  83. method(self, p_curr, p_last, flag, *args, **kwargs)
  84. else:
  85. p_last = p_curr
  86. return method
  87. else:
  88. flag = False
  89. p_curr = [0, 0]
  90. p_last = [0, 0]
  91. method(self, p_curr, p_last, flag, *args, **kwargs)
  92. return method
  93. return wrapper
  94. class ComfortRegistry:
  95. """舒适性指标注册器"""
  96. def __init__(self, data_processed):
  97. self.logger = LogManager().get_logger() # 获取全局日志实例
  98. self.data = data_processed
  99. self.comfort_config = data_processed.comfort_config["comfort"]
  100. self.metrics = self._extract_metrics(self.comfort_config)
  101. self._registry = self._build_registry()
  102. def _extract_metrics(self, config_node: dict) -> list:
  103. """DFS遍历提取指标"""
  104. metrics = []
  105. def _recurse(node):
  106. if isinstance(node, dict):
  107. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  108. metrics.append(node['name'])
  109. for v in node.values():
  110. _recurse(v)
  111. _recurse(config_node)
  112. self.logger.info(f'评比的舒适性指标列表:{metrics}')
  113. return metrics
  114. def _build_registry(self) -> dict:
  115. """自动注册指标函数"""
  116. registry = {}
  117. for metric_name in self.metrics:
  118. func_name = f"calculate_{metric_name.lower()}"
  119. try:
  120. registry[metric_name] = globals()[func_name]
  121. except KeyError:
  122. self.logger.error(f"未实现指标函数: {func_name}")
  123. return registry
  124. def batch_execute(self) -> dict:
  125. """批量执行指标计算"""
  126. results = {}
  127. for name, func in self._registry.items():
  128. try:
  129. result = func(self.data)
  130. results.update(result)
  131. # 新增:将每个指标的结果写入日志
  132. self.logger.info(f'舒适性指标[{name}]计算结果: {result}')
  133. except Exception as e:
  134. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  135. results[name] = None
  136. self.logger.info(f'舒适性指标计算结果:{results}')
  137. return results
  138. class ComfortCalculator:
  139. """舒适性指标计算类 - 提供核心计算功能"""
  140. def __init__(self, data_processed):
  141. self.data_processed = data_processed
  142. self.logger = LogManager().get_logger()
  143. self.data = data_processed.ego_data
  144. self.ego_df = pd.DataFrame()
  145. self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  146. self.time_list = self.data['simTime'].values.tolist()
  147. self.frame_list = self.data['simFrame'].values.tolist()
  148. self.zigzag_count = 0
  149. self.shake_count = 0
  150. self.cadence_count = 0
  151. self.slam_brake_count = 0
  152. self.slam_accel_count = 0
  153. self.zigzag_time_list = []
  154. self.zigzag_stre_list = []
  155. self.cur_ego_path_list = []
  156. self.curvature_list = []
  157. self._initialize_data()
  158. def _initialize_data(self):
  159. """初始化数据"""
  160. self.ego_df = self.data[COMFORT_INFO].copy()
  161. self.df = self.ego_df.reset_index(drop=True)
  162. self._prepare_comfort_parameters()
  163. def _prepare_comfort_parameters(self):
  164. """准备舒适性计算所需参数"""
  165. # 计算加减速阈值
  166. self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
  167. self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
  168. self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
  169. lambda x: 1 if x < 0 else 0)
  170. self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
  171. lambda x: 1 if x > 0 else 0)
  172. self.ego_df['cadence'] = self.ego_df.apply(
  173. lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
  174. # 计算曲率相关参数
  175. self.ego_df['cur_ego_path'] = self.ego_df.apply(self._cal_cur_ego_path, axis=1)
  176. self.ego_df['curvHor'] = self.ego_df['curvHor'].astype('float')
  177. self.ego_df['cur_diff'] = (self.ego_df['cur_ego_path'] - self.ego_df['curvHor']).abs()
  178. self.ego_df['R'] = self.ego_df['curvHor'].apply(lambda x: 10000 if x == 0 else 1 / x)
  179. self.ego_df['R_ego'] = self.ego_df['cur_ego_path'].apply(lambda x: 10000 if x == 0 else 1 / x)
  180. self.ego_df['R_diff'] = (self.ego_df['R_ego'] - self.ego_df['R']).abs()
  181. self.cur_ego_path_list = self.ego_df['cur_ego_path'].values.tolist()
  182. self.curvature_list = self.ego_df['curvHor'].values.tolist()
  183. def _cal_cur_ego_path(self, row):
  184. """计算车辆轨迹曲率"""
  185. try:
  186. divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2)
  187. if not divide:
  188. res = None
  189. else:
  190. res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
  191. except:
  192. res = None
  193. return res
  194. def _peak_valley_determination(self, df):
  195. """确定角速度的峰谷"""
  196. peaks, _ = scipy.signal.find_peaks(df['speedH'], height=0.01, distance=1, prominence=0.01)
  197. valleys, _ = scipy.signal.find_peaks(-df['speedH'], height=0.01, distance=1, prominence=0.01)
  198. peak_valley = sorted(list(peaks) + list(valleys))
  199. return peak_valley
  200. def _peak_valley_judgment(self, p_last, p_curr, tw=10000, avg=0.02):
  201. """判断峰谷是否满足蛇行条件"""
  202. t_diff = p_curr[0] - p_last[0]
  203. v_diff = abs(p_curr[1] - p_last[1])
  204. s = p_curr[1] * p_last[1]
  205. zigzag_flag = t_diff < tw and v_diff > avg and s < 0
  206. if zigzag_flag and ([p_last[0], p_curr[0]] not in self.zigzag_time_list):
  207. self.zigzag_time_list.append([p_last[0], p_curr[0]])
  208. return zigzag_flag
  209. def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
  210. """处理顿挫数据"""
  211. if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
  212. return np.nan
  213. elif abs(lon_acc) == 0:
  214. return 0
  215. elif lon_acc > 0 and lon_acc < ip_acc:
  216. return 1
  217. elif lon_acc < 0 and lon_acc > ip_dec:
  218. return -1
  219. else:
  220. return 0
  221. @peak_valley_decorator
  222. def _zigzag_count_func(self, p_curr, p_last, flag=True):
  223. """计算蛇行次数"""
  224. if flag:
  225. self.zigzag_count += 1
  226. else:
  227. self.zigzag_count += 0
  228. @peak_valley_decorator
  229. def _cal_zigzag_strength(self, p_curr, p_last, flag=True):
  230. """计算蛇行强度"""
  231. if flag:
  232. v_diff = abs(p_curr[1] - p_last[1])
  233. t_diff = p_curr[0] - p_last[0]
  234. self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
  235. else:
  236. self.zigzag_stre_list = []
  237. def calculate_zigzag_count(self):
  238. """计算蛇行指标"""
  239. self._zigzag_count_func()
  240. return self.zigzag_count
  241. def calculate_shake_count(self):
  242. """计算晃动指标"""
  243. self._shake_detector()
  244. return self.shake_count
  245. def calculate_cadence_count(self):
  246. """计算顿挫指标"""
  247. self._cadence_detector()
  248. return self.cadence_count
  249. def calculate_slam_brake_count(self):
  250. """计算急刹车指标"""
  251. self._slam_brake_detector()
  252. return self.slam_brake_count
  253. def calculate_slam_accel_count(self):
  254. """计算急加速指标"""
  255. self._slam_accel_detector()
  256. return self.slam_accel_count
  257. def _shake_detector(self, Cr_diff=0.05, T_diff=0.39):
  258. """晃动检测器"""
  259. time_list = []
  260. frame_list = []
  261. df = self.ego_df.copy()
  262. df = df[df['cur_diff'] > Cr_diff]
  263. df['frame_ID_diff'] = df['simFrame'].diff()
  264. filtered_df = df[df.frame_ID_diff > T_diff]
  265. row_numbers = filtered_df.index.tolist()
  266. cut_column = pd.cut(df.index, bins=row_numbers)
  267. grouped = df.groupby(cut_column)
  268. dfs = {}
  269. for name, group in grouped:
  270. dfs[name] = group.reset_index(drop=True)
  271. for name, df_group in dfs.items():
  272. # 直道,未主动换道
  273. df_group['curvHor'] = df_group['curvHor'].abs()
  274. df_group_straight = df_group[(df_group.lightMask == 0) & (df_group.curvHor < 0.001)]
  275. if not df_group_straight.empty:
  276. time_list.extend(df_group_straight['simTime'].values)
  277. frame_list.extend(df_group_straight['simFrame'].values)
  278. self.shake_count = self.shake_count + 1
  279. # 打转向灯,道路为直道
  280. df_group_change_lane = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'] < 0.001)]
  281. df_group_change_lane_data = df_group_change_lane[df_group_change_lane.cur_diff > Cr_diff + 0.2]
  282. if not df_group_change_lane_data.empty:
  283. time_list.extend(df_group_change_lane_data['simTime'].values)
  284. frame_list.extend(df_group_change_lane_data['simFrame'].values)
  285. self.shake_count = self.shake_count + 1
  286. # 转弯,打转向灯
  287. df_group_turn = df_group[(df_group['lightMask'] != 0) & (df_group['curvHor'].abs() > 0.001)]
  288. df_group_turn_data = df_group_turn[df_group_turn.cur_diff.abs() > Cr_diff + 0.1]
  289. if not df_group_turn_data.empty:
  290. time_list.extend(df_group_turn_data['simTime'].values)
  291. frame_list.extend(df_group_turn_data['simFrame'].values)
  292. self.shake_count = self.shake_count + 1
  293. # 分组处理
  294. TIME_RANGE = 1
  295. t_list = time_list
  296. f_list = frame_list
  297. group_time = []
  298. group_frame = []
  299. sub_group_time = []
  300. sub_group_frame = []
  301. if len(f_list) > 0:
  302. for i in range(len(f_list)):
  303. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE:
  304. sub_group_time.append(t_list[i])
  305. sub_group_frame.append(f_list[i])
  306. else:
  307. group_time.append(sub_group_time)
  308. group_frame.append(sub_group_frame)
  309. sub_group_time = [t_list[i]]
  310. sub_group_frame = [f_list[i]]
  311. group_time.append(sub_group_time)
  312. group_frame.append(sub_group_frame)
  313. # 输出图表值
  314. shake_time = [[g[0], g[-1]] for g in group_time]
  315. shake_frame = [[g[0], g[-1]] for g in group_frame]
  316. self.shake_count = len(shake_time)
  317. if shake_time:
  318. time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
  319. frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
  320. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  321. discomfort_df['type'] = 'shake'
  322. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  323. return time_list
  324. def _cadence_detector(self):
  325. """顿挫检测器"""
  326. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence']].copy()
  327. time_list = data['simTime'].values.tolist()
  328. data = data[data['cadence'] != np.nan]
  329. data['cadence_diff'] = data['cadence'].diff()
  330. data.dropna(subset='cadence_diff', inplace=True)
  331. data = data[data['cadence_diff'] != 0]
  332. t_list = data['simTime'].values.tolist()
  333. f_list = data['simFrame'].values.tolist()
  334. TIME_RANGE = 1
  335. group_time = []
  336. group_frame = []
  337. sub_group_time = []
  338. sub_group_frame = []
  339. for i in range(len(f_list)):
  340. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫
  341. sub_group_time.append(t_list[i])
  342. sub_group_frame.append(f_list[i])
  343. else:
  344. group_time.append(sub_group_time)
  345. group_frame.append(sub_group_frame)
  346. sub_group_time = [t_list[i]]
  347. sub_group_frame = [f_list[i]]
  348. group_time.append(sub_group_time)
  349. group_frame.append(sub_group_frame)
  350. group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫
  351. group_frame = [g for g in group_frame if len(g) >= 1]
  352. # 输出图表值
  353. cadence_time = [[g[0], g[-1]] for g in group_time]
  354. cadence_frame = [[g[0], g[-1]] for g in group_frame]
  355. if cadence_time:
  356. time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time'])
  357. frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame'])
  358. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  359. discomfort_df['type'] = 'cadence'
  360. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  361. # 将顿挫组的起始时间为组重新统计时间
  362. cadence_time_list = [time for pair in cadence_time for time in time_list if pair[0] <= time <= pair[1]]
  363. stre_list = []
  364. freq_list = []
  365. for g in group_time:
  366. # calculate strength
  367. g_df = data[data['simTime'].isin(g)]
  368. strength = g_df['lon_acc'].abs().mean()
  369. stre_list.append(strength)
  370. # calculate frequency
  371. cnt = len(g)
  372. t_start = g_df['simTime'].iloc[0]
  373. t_end = g_df['simTime'].iloc[-1]
  374. t_delta = t_end - t_start
  375. frequency = cnt / t_delta
  376. freq_list.append(frequency)
  377. self.cadence_count = len(freq_list)
  378. cadence_stre = sum(stre_list) / len(stre_list) if stre_list else 0
  379. return cadence_time_list
  380. def _slam_brake_detector(self):
  381. """急刹车检测器"""
  382. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake']].copy()
  383. res_df = data[data['slam_brake'] == 1]
  384. t_list = res_df['simTime'].values
  385. f_list = res_df['simFrame'].values.tolist()
  386. TIME_RANGE = 1
  387. group_time = []
  388. group_frame = []
  389. sub_group_time = []
  390. sub_group_frame = []
  391. for i in range(len(f_list)):
  392. if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹
  393. sub_group_time.append(t_list[i])
  394. sub_group_frame.append(f_list[i])
  395. else:
  396. group_time.append(sub_group_time)
  397. group_frame.append(sub_group_frame)
  398. sub_group_time = [t_list[i]]
  399. sub_group_frame = [f_list[i]]
  400. group_time.append(sub_group_time)
  401. group_frame.append(sub_group_frame)
  402. group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹
  403. group_frame = [g for g in group_frame if len(g) >= 2]
  404. # 输出图表值
  405. slam_brake_time = [[g[0], g[-1]] for g in group_time]
  406. slam_brake_frame = [[g[0], g[-1]] for g in group_frame]
  407. if slam_brake_time:
  408. time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time'])
  409. frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame'])
  410. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  411. discomfort_df['type'] = 'slam_brake'
  412. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  413. time_list = [element for sublist in group_time for element in sublist]
  414. self.slam_brake_count = len(group_time)
  415. return time_list
  416. def _slam_accel_detector(self):
  417. """急加速检测器"""
  418. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel']].copy()
  419. res_df = data.loc[data['slam_accel'] == 1]
  420. t_list = res_df['simTime'].values
  421. f_list = res_df['simFrame'].values.tolist()
  422. group_time = []
  423. group_frame = []
  424. sub_group_time = []
  425. sub_group_frame = []
  426. for i in range(len(f_list)):
  427. if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速
  428. sub_group_time.append(t_list[i])
  429. sub_group_frame.append(f_list[i])
  430. else:
  431. group_time.append(sub_group_time)
  432. group_frame.append(sub_group_frame)
  433. sub_group_time = [t_list[i]]
  434. sub_group_frame = [f_list[i]]
  435. group_time.append(sub_group_time)
  436. group_frame.append(sub_group_frame)
  437. group_time = [g for g in group_time if len(g) >= 2]
  438. group_frame = [g for g in group_frame if len(g) >= 2]
  439. # 输出图表值
  440. slam_accel_time = [[g[0], g[-1]] for g in group_time]
  441. slam_accel_frame = [[g[0], g[-1]] for g in group_frame]
  442. if slam_accel_time:
  443. time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time'])
  444. frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame'])
  445. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  446. discomfort_df['type'] = 'slam_accel'
  447. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  448. time_list = [element for sublist in group_time for element in sublist]
  449. self.slam_accel_count = len(group_time)
  450. return time_list
  451. class ComfortManager:
  452. """舒适性指标计算主类"""
  453. def __init__(self, data_processed):
  454. self.data = data_processed
  455. self.logger = LogManager().get_logger()
  456. self.registry = ComfortRegistry(self.data)
  457. def report_statistic(self):
  458. """生成舒适性评分报告"""
  459. comfort_result = self.registry.batch_execute()
  460. return comfort_result
  461. if __name__ == '__main__':
  462. case_name = 'ICA'
  463. mode_label = 'PGVIL'
  464. data = data_process.DataPreprocessing(case_name, mode_label)
  465. comfort_instance = ComfortManager(data)
  466. try:
  467. comfort_result = comfort_instance.report_statistic()
  468. result = {'comfort': comfort_result}
  469. print(result)
  470. except Exception as e:
  471. print(f"An error occurred in Comfort.report_statistic: {e}")