comfort copy.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/04/25
  12. @Summary: Comfort metrics
  13. """
  14. import sys
  15. import math
  16. import scipy.signal
  17. import pandas as pd
  18. import numpy as np
  19. from pathlib import Path
  20. from typing import Dict, List, Any, Optional, Callable, Union, Tuple
  21. from modules.lib.score import Score
  22. from modules.lib.common import get_interpolation, get_frame_with_time
  23. from modules.lib import data_process
  24. from modules.lib.log_manager import LogManager
  25. COMFORT_INFO = [
  26. "simTime",
  27. "simFrame",
  28. "speedX",
  29. "speedY",
  30. "accelX",
  31. "accelY",
  32. "curvHor",
  33. "lightMask",
  34. "v",
  35. "lat_acc",
  36. "lon_acc",
  37. "time_diff",
  38. "lon_acc_diff",
  39. "lon_acc_roc",
  40. "speedH",
  41. "accelH",
  42. "posH", # 确保包含航向角
  43. ]
  44. # ----------------------
  45. # 独立指标计算函数
  46. # ----------------------
  47. # 添加 Motion Sickness Dose Value (MSDV) 指标
  48. #用于量化乘员因持续振动而产生的晕动风险。以下是需要添加的代码:
  49. ## 1. 在独立指标计算函数部分添加新函数
  50. def calculate_ava_vav(data_processed) -> dict:
  51. """计算多维度综合加权加速度"""
  52. comfort = ComfortCalculator(data_processed)
  53. ava_vav_value = comfort.calculate_ava_vav()
  54. return {"ava_vav": float(ava_vav_value)}
  55. def calculate_msdv(data_processed) -> dict:
  56. """计算晕动剂量值(MSDV)指标"""
  57. comfort = ComfortCalculator(data_processed)
  58. msdv_value = comfort.calculate_msdv()
  59. return {"msdv": float(msdv_value)}
  60. def calculate_weaving(data_processed) -> dict:
  61. """计算蛇行指标"""
  62. comfort = ComfortCalculator(data_processed)
  63. zigzag_count = comfort.calculate_zigzag_count()
  64. return {"weaving": float(zigzag_count)}
  65. def calculate_shake(data_processed) -> dict:
  66. """计算晃动指标"""
  67. comfort = ComfortCalculator(data_processed)
  68. shake_count = comfort.calculate_shake_count()
  69. return {"shake": float(shake_count)}
  70. def calculate_cadence(data_processed) -> dict:
  71. """计算顿挫指标"""
  72. comfort = ComfortCalculator(data_processed)
  73. cadence_count = comfort.calculate_cadence_count()
  74. return {"cadence": float(cadence_count)}
  75. def calculate_slambrake(data_processed) -> dict:
  76. """计算急刹车指标"""
  77. comfort = ComfortCalculator(data_processed)
  78. slam_brake_count = comfort.calculate_slam_brake_count()
  79. return {"slamBrake": float(slam_brake_count)}
  80. def calculate_slamaccelerate(data_processed) -> dict:
  81. """计算急加速指标"""
  82. comfort = ComfortCalculator(data_processed)
  83. slam_accel_count = comfort.calculate_slam_accel_count()
  84. return {"slamAccelerate": float(slam_accel_count)}
  85. # 装饰器保持不变
  86. def peak_valley_decorator(method):
  87. def wrapper(self, *args, **kwargs):
  88. peak_valley = self._peak_valley_determination(self.df)
  89. pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
  90. if len(pv_list) != 0:
  91. flag = True
  92. p_last = pv_list[0]
  93. for i in range(1, len(pv_list)):
  94. p_curr = pv_list[i]
  95. if self._peak_valley_judgment(p_last, p_curr):
  96. # method(self, p_curr, p_last)
  97. method(self, p_curr, p_last, flag, *args, **kwargs)
  98. else:
  99. p_last = p_curr
  100. return method
  101. else:
  102. flag = False
  103. p_curr = [0, 0]
  104. p_last = [0, 0]
  105. method(self, p_curr, p_last, flag, *args, **kwargs)
  106. return method
  107. return wrapper
  108. class ComfortRegistry:
  109. """舒适性指标注册器"""
  110. def __init__(self, data_processed):
  111. self.logger = LogManager().get_logger() # 获取全局日志实例
  112. self.data = data_processed
  113. self.comfort_config = data_processed.comfort_config["comfort"]
  114. self.metrics = self._extract_metrics(self.comfort_config)
  115. self._registry = self._build_registry()
  116. def _extract_metrics(self, config_node: dict) -> list:
  117. """DFS遍历提取指标"""
  118. metrics = []
  119. def _recurse(node):
  120. if isinstance(node, dict):
  121. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  122. metrics.append(node['name'])
  123. for v in node.values():
  124. _recurse(v)
  125. _recurse(config_node)
  126. self.logger.info(f'评比的舒适性指标列表:{metrics}')
  127. return metrics
  128. def _build_registry(self) -> dict:
  129. """自动注册指标函数"""
  130. registry = {}
  131. for metric_name in self.metrics:
  132. func_name = f"calculate_{metric_name.lower()}"
  133. try:
  134. registry[metric_name] = globals()[func_name]
  135. except KeyError:
  136. self.logger.error(f"未实现指标函数: {func_name}")
  137. return registry
  138. def batch_execute(self) -> dict:
  139. """批量执行指标计算"""
  140. results = {}
  141. for name, func in self._registry.items():
  142. try:
  143. result = func(self.data)
  144. results.update(result)
  145. # 新增:将每个指标的结果写入日志
  146. self.logger.info(f'舒适性指标[{name}]计算结果: {result}')
  147. except Exception as e:
  148. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  149. results[name] = None
  150. self.logger.info(f'舒适性指标计算结果:{results}')
  151. return results
  152. class ComfortCalculator:
  153. """舒适性指标计算类 - 提供核心计算功能"""
  154. def __init__(self, data_processed):
  155. self.data_processed = data_processed
  156. self.logger = LogManager().get_logger()
  157. self.data = data_processed.ego_data
  158. self.ego_df = pd.DataFrame()
  159. self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  160. # 统计指标
  161. self.calculated_value = {
  162. 'weaving': 0,
  163. 'shake': 0,
  164. 'cadence': 0,
  165. 'slamBrake': 0,
  166. 'slamAccelerate': 0,
  167. 'ava_vav': 0, # 添加新指标的默认值
  168. 'msdv': 0 # 添加MSDV指标的默认值
  169. }
  170. self.time_list = self.data['simTime'].values.tolist()
  171. self.frame_list = self.data['simFrame'].values.tolist()
  172. self.zigzag_count = 0
  173. self.shake_count = 0
  174. self.cadence_count = 0
  175. self.slam_brake_count = 0
  176. self.slam_accel_count = 0
  177. self.zigzag_time_list = []
  178. self.zigzag_stre_list = []
  179. self._initialize_data()
  180. def _initialize_data(self):
  181. """初始化数据"""
  182. self.ego_df = self.data[COMFORT_INFO].copy()
  183. self.df = self.ego_df.reset_index(drop=True)
  184. self._prepare_comfort_parameters()
  185. def _prepare_comfort_parameters(self):
  186. """准备舒适性计算所需参数"""
  187. # 计算加减速阈值
  188. self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
  189. self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
  190. self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
  191. lambda x: 1 if x < 0 else 0)
  192. self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
  193. lambda x: 1 if x > 0 else 0)
  194. self.ego_df['cadence'] = self.ego_df.apply(
  195. lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
  196. def _cal_cur_ego_path(self, row):
  197. """计算车辆轨迹曲率"""
  198. try:
  199. divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2)
  200. if not divide:
  201. res = None
  202. else:
  203. res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
  204. except:
  205. res = None
  206. return res
  207. def _peak_valley_determination(self, df):
  208. """确定角速度的峰谷"""
  209. peaks, _ = scipy.signal.find_peaks(
  210. df['speedH'], height=2.3, distance=3,
  211. prominence=2.3, width=1)
  212. valleys, _ = scipy.signal.find_peaks(
  213. -df['speedH'], height=2.3, distance=3,
  214. prominence=2.3, width=1)
  215. return sorted(list(peaks) + list(valleys))
  216. def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6):
  217. """判断峰谷是否满足蛇行条件"""
  218. t_diff = p_curr[0] - p_last[0]
  219. v_diff = abs(p_curr[1] - p_last[1])
  220. s = p_curr[1] * p_last[1]
  221. if t_diff < tw and v_diff > avg and s < 0:
  222. if [p_last[0], p_curr[0]] not in self.zigzag_time_list:
  223. self.zigzag_time_list.append([p_last[0], p_curr[0]])
  224. return True
  225. return False
  226. def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
  227. """处理顿挫数据"""
  228. if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
  229. return np.nan
  230. elif abs(lon_acc) == 0:
  231. return 0
  232. elif lon_acc > 0 and lon_acc < ip_acc:
  233. return 1
  234. elif lon_acc < 0 and lon_acc > ip_dec:
  235. return -1
  236. else:
  237. return 0
  238. @peak_valley_decorator
  239. def _zigzag_count_func(self, p_curr, p_last, flag=True):
  240. """计算蛇行次数"""
  241. if flag:
  242. self.zigzag_count += 1
  243. else:
  244. self.zigzag_count += 0
  245. @peak_valley_decorator
  246. def _cal_zigzag_strength(self, p_curr, p_last, flag=True):
  247. """计算蛇行强度"""
  248. if flag:
  249. v_diff = abs(p_curr[1] - p_last[1])
  250. t_diff = p_curr[0] - p_last[0]
  251. if t_diff > 0:
  252. self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
  253. else:
  254. self.zigzag_stre_list = []
  255. def _get_zigzag_times(self):
  256. """获取所有蛇行时间点"""
  257. all_times = []
  258. for time_range in self.zigzag_time_list:
  259. start, end = time_range
  260. # 获取这个时间范围内的所有时间点
  261. times_in_range = self.ego_df[(self.ego_df['simTime'] >= start) &
  262. (self.ego_df['simTime'] <= end)]['simTime'].tolist()
  263. all_times.extend(times_in_range)
  264. return all_times
  265. def calculate_zigzag_count(self):
  266. """计算蛇行指标"""
  267. self._zigzag_count_func()
  268. return self.zigzag_count
  269. def calculate_shake_count(self):
  270. """计算晃动指标"""
  271. self._shake_detector()
  272. return self.shake_count
  273. def calculate_cadence_count(self):
  274. """计算顿挫指标"""
  275. self._cadence_detector()
  276. return self.cadence_count
  277. def calculate_slam_brake_count(self):
  278. """计算急刹车指标"""
  279. self._slam_brake_detector()
  280. return self.slam_brake_count
  281. def calculate_slam_accel_count(self):
  282. """计算急加速指标"""
  283. self._slam_accel_detector()
  284. return self.slam_accel_count
  285. def _shake_detector(self, T_diff=0.5):
  286. """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)"""
  287. # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算
  288. time_list = []
  289. frame_list = []
  290. # 复制数据以避免修改原始数据
  291. df = self.ego_df.copy()
  292. # 1. 计算横向加速度变化率
  293. df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff()
  294. # 2. 计算横摆角速度变化率
  295. df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff()
  296. # 3. 计算横摆角速度的短期变化特性
  297. window_size = 5 # 5帧窗口
  298. df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
  299. # 4. 基于车速的动态阈值
  300. v0 = 20 * 5/18 # ≈5.56 m/s
  301. # 递减系数
  302. k = 0.008 * 3.6 # =0.0288 per m/s
  303. df['lat_acc_threshold'] = df['v'].apply(
  304. lambda speed: max(
  305. 1.0, # 下限 1.0 m/s²
  306. min(
  307. 1.8, # 上限 1.8 m/s²
  308. 1.8 - k * (speed - v0) # 线性递减
  309. )
  310. )
  311. )
  312. df['speedH_threshold'] = df['v'].apply(
  313. lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
  314. )
  315. # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用
  316. self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold']
  317. self.ego_df['speedH_threshold'] = df['speedH_threshold']
  318. self.ego_df['lat_acc_rate'] = df['lat_acc_rate']
  319. self.ego_df['speedH_rate'] = df['speedH_rate']
  320. self.ego_df['speedH_std'] = df['speedH_std']
  321. # 5. 综合判断晃动条件
  322. # 条件A: 横向加速度超过阈值
  323. condition_A = df['lat_acc'].abs() > df['lat_acc_threshold']
  324. # 条件B: 横向加速度变化率超过阈值
  325. lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³)
  326. condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold
  327. # 条件C: 横摆角速度有明显变化但不呈现周期性
  328. condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times()))
  329. # 综合条件: 满足条件A,且满足条件B或条件C
  330. shake_condition = condition_A & (condition_B | condition_C)
  331. # 筛选满足条件的数据
  332. shake_df = df[shake_condition].copy()
  333. # 按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动
  334. if not shake_df.empty:
  335. shake_df['frame_diff'] = shake_df['simFrame'].diff().fillna(0)
  336. shake_df['group'] = (shake_df['frame_diff'] > T_diff).cumsum()
  337. # 分组统计
  338. shake_groups = shake_df.groupby('group')
  339. for _, group in shake_groups:
  340. if len(group) >= 2: # 至少2帧才算一次晃动
  341. time_list.extend(group['simTime'].values)
  342. frame_list.extend(group['simFrame'].values)
  343. self.shake_count += 1
  344. # 分组处理
  345. TIME_RANGE = 1
  346. t_list = time_list
  347. f_list = frame_list
  348. group_time = []
  349. group_frame = []
  350. sub_group_time = []
  351. sub_group_frame = []
  352. if len(f_list) > 0:
  353. for i in range(len(f_list)):
  354. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE:
  355. sub_group_time.append(t_list[i])
  356. sub_group_frame.append(f_list[i])
  357. else:
  358. group_time.append(sub_group_time)
  359. group_frame.append(sub_group_frame)
  360. sub_group_time = [t_list[i]]
  361. sub_group_frame = [f_list[i]]
  362. group_time.append(sub_group_time)
  363. group_frame.append(sub_group_frame)
  364. # 输出图表值
  365. shake_time = [[g[0], g[-1]] for g in group_time]
  366. shake_frame = [[g[0], g[-1]] for g in group_frame]
  367. self.shake_count = len(shake_time)
  368. if shake_time:
  369. time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
  370. frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
  371. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  372. discomfort_df['type'] = 'shake'
  373. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  374. return time_list
  375. def _cadence_detector(self):
  376. """顿挫检测器"""
  377. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence']].copy()
  378. time_list = data['simTime'].values.tolist()
  379. data = data[data['cadence'] != np.nan]
  380. data['cadence_diff'] = data['cadence'].diff()
  381. data.dropna(subset='cadence_diff', inplace=True)
  382. data = data[data['cadence_diff'] != 0]
  383. t_list = data['simTime'].values.tolist()
  384. f_list = data['simFrame'].values.tolist()
  385. TIME_RANGE = 1
  386. group_time = []
  387. group_frame = []
  388. sub_group_time = []
  389. sub_group_frame = []
  390. for i in range(len(f_list)):
  391. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫
  392. sub_group_time.append(t_list[i])
  393. sub_group_frame.append(f_list[i])
  394. else:
  395. group_time.append(sub_group_time)
  396. group_frame.append(sub_group_frame)
  397. sub_group_time = [t_list[i]]
  398. sub_group_frame = [f_list[i]]
  399. group_time.append(sub_group_time)
  400. group_frame.append(sub_group_frame)
  401. group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫
  402. group_frame = [g for g in group_frame if len(g) >= 1]
  403. # 输出图表值
  404. cadence_time = [[g[0], g[-1]] for g in group_time]
  405. cadence_frame = [[g[0], g[-1]] for g in group_frame]
  406. if cadence_time:
  407. time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time'])
  408. frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame'])
  409. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  410. discomfort_df['type'] = 'cadence'
  411. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  412. # 将顿挫组的起始时间为组重新统计时间
  413. cadence_time_list = [time for pair in cadence_time for time in time_list if pair[0] <= time <= pair[1]]
  414. stre_list = []
  415. freq_list = []
  416. for g in group_time:
  417. # calculate strength
  418. g_df = data[data['simTime'].isin(g)]
  419. strength = g_df['lon_acc'].abs().mean()
  420. stre_list.append(strength)
  421. # calculate frequency
  422. cnt = len(g)
  423. t_start = g_df['simTime'].iloc[0]
  424. t_end = g_df['simTime'].iloc[-1]
  425. t_delta = t_end - t_start
  426. frequency = cnt / t_delta
  427. freq_list.append(frequency)
  428. self.cadence_count = len(freq_list)
  429. cadence_stre = sum(stre_list) / len(stre_list) if stre_list else 0
  430. return cadence_time_list
  431. def _slam_brake_detector(self):
  432. """急刹车检测器"""
  433. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake']].copy()
  434. res_df = data[data['slam_brake'] == 1]
  435. t_list = res_df['simTime'].values
  436. f_list = res_df['simFrame'].values.tolist()
  437. TIME_RANGE = 1
  438. group_time = []
  439. group_frame = []
  440. sub_group_time = []
  441. sub_group_frame = []
  442. for i in range(len(f_list)):
  443. if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹
  444. sub_group_time.append(t_list[i])
  445. sub_group_frame.append(f_list[i])
  446. else:
  447. group_time.append(sub_group_time)
  448. group_frame.append(sub_group_frame)
  449. sub_group_time = [t_list[i]]
  450. sub_group_frame = [f_list[i]]
  451. group_time.append(sub_group_time)
  452. group_frame.append(sub_group_frame)
  453. group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹
  454. group_frame = [g for g in group_frame if len(g) >= 2]
  455. # 输出图表值
  456. slam_brake_time = [[g[0], g[-1]] for g in group_time]
  457. slam_brake_frame = [[g[0], g[-1]] for g in group_frame]
  458. if slam_brake_time:
  459. time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time'])
  460. frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame'])
  461. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  462. discomfort_df['type'] = 'slam_brake'
  463. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  464. time_list = [element for sublist in group_time for element in sublist]
  465. self.slam_brake_count = len(group_time)
  466. return time_list
  467. def _slam_accel_detector(self):
  468. """急加速检测器"""
  469. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel']].copy()
  470. res_df = data.loc[data['slam_accel'] == 1]
  471. t_list = res_df['simTime'].values
  472. f_list = res_df['simFrame'].values.tolist()
  473. group_time = []
  474. group_frame = []
  475. sub_group_time = []
  476. sub_group_frame = []
  477. for i in range(len(f_list)):
  478. if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速
  479. sub_group_time.append(t_list[i])
  480. sub_group_frame.append(f_list[i])
  481. else:
  482. group_time.append(sub_group_time)
  483. group_frame.append(sub_group_frame)
  484. sub_group_time = [t_list[i]]
  485. sub_group_frame = [f_list[i]]
  486. group_time.append(sub_group_time)
  487. group_frame.append(sub_group_frame)
  488. group_time = [g for g in group_time if len(g) >= 2]
  489. group_frame = [g for g in group_frame if len(g) >= 2]
  490. # 输出图表值
  491. slam_accel_time = [[g[0], g[-1]] for g in group_time]
  492. slam_accel_frame = [[g[0], g[-1]] for g in group_frame]
  493. if slam_accel_time:
  494. time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time'])
  495. frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame'])
  496. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  497. discomfort_df['type'] = 'slam_accel'
  498. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  499. time_list = [element for sublist in group_time for element in sublist]
  500. self.slam_accel_count = len(group_time)
  501. return time_list
  502. class ComfortManager:
  503. """舒适性指标计算主类"""
  504. def __init__(self, data_processed):
  505. self.data = data_processed
  506. self.logger = LogManager().get_logger()
  507. self.registry = ComfortRegistry(self.data)
  508. def report_statistic(self):
  509. """生成舒适性评分报告"""
  510. comfort_result = self.registry.batch_execute()
  511. return comfort_result
  512. if __name__ == '__main__':
  513. case_name = 'ICA'
  514. mode_label = 'PGVIL'
  515. data = data_process.DataPreprocessing(case_name, mode_label)
  516. comfort_instance = ComfortManager(data)
  517. try:
  518. comfort_result = comfort_instance.report_statistic()
  519. result = {'comfort': comfort_result}
  520. print(result)
  521. except Exception as e:
  522. print(f"An error occurred in Comfort.report_statistic: {e}")
  523. # 将之前定义在类外部的方法移动到ComfortCalculator类内部
  524. def _apply_frequency_weighting(self, acceleration_data, weighting_type='Wk', fs=100):
  525. """应用ISO 2631-1:1997标准的频率加权滤波
  526. 参数:
  527. acceleration_data: 加速度时间序列数据
  528. weighting_type: 加权类型,可选值包括:
  529. - 'Wk': 垂直方向(Z轴)加权
  530. - 'Wd': 水平方向(X和Y轴)加权
  531. - 'Wf': 运动病相关加权
  532. fs: 采样频率(Hz)
  533. 返回:
  534. 加权后的加速度数据
  535. """
  536. # 检查数据有效性
  537. if acceleration_data.empty or acceleration_data.isna().all():
  538. return acceleration_data
  539. # 根据ISO 2631-1:1997标准设计滤波器
  540. # 这些参数来自标准文档,用于构建数字滤波器
  541. if weighting_type == 'Wk': # 垂直方向(Z轴)
  542. # Wk滤波器参数
  543. f1 = 0.4
  544. f2 = 100.0
  545. f3 = 12.5
  546. f4 = 12.5
  547. Q1 = 0.63
  548. Q2 = 0.5
  549. Q3 = 0.63
  550. Q4 = 0.63
  551. K = 0.4
  552. elif weighting_type == 'Wd': # 水平方向(X和Y轴)
  553. # Wd滤波器参数
  554. f1 = 0.4
  555. f2 = 100.0
  556. f3 = 2.0
  557. f4 = 2.0
  558. Q1 = 0.63
  559. Q2 = 0.5
  560. Q3 = 0.63
  561. Q4 = 0.63
  562. K = 0.4
  563. elif weighting_type == 'Wf': # 运动病相关
  564. # Wf滤波器参数
  565. f1 = 0.08
  566. f2 = 0.63
  567. f3 = 0.25
  568. f4 = 0.8
  569. Q1 = 0.63
  570. Q2 = 0.86
  571. Q3 = 0.8
  572. Q4 = 0.8
  573. K = 1.0
  574. else:
  575. self.logger.warning(f"未知的加权类型: {weighting_type},使用原始数据")
  576. return acceleration_data
  577. # 将频率转换为角频率
  578. w1 = 2 * np.pi * f1
  579. w2 = 2 * np.pi * f2
  580. w3 = 2 * np.pi * f3
  581. w4 = 2 * np.pi * f4
  582. # 设计高通滤波器(s域)
  583. b1 = [K * w1**2, 0]
  584. a1 = [1, w1/Q1, w1**2]
  585. # 设计低通滤波器(s域)
  586. b2 = [K, 0, 0]
  587. a2 = [1, w2/Q2, w2**2]
  588. # 设计加速度-速度转换滤波器(s域)
  589. b3 = [K, 0]
  590. a3 = [1, w3/Q3, w3**2]
  591. # 设计上升滤波器(s域)
  592. b4 = [K, 0, 0]
  593. a4 = [1, w4/Q4, w4**2]
  594. # 使用双线性变换将s域滤波器转换为z域
  595. b1_z, a1_z = scipy.signal.bilinear(b1, a1, fs)
  596. b2_z, a2_z = scipy.signal.bilinear(b2, a2, fs)
  597. b3_z, a3_z = scipy.signal.bilinear(b3, a3, fs)
  598. b4_z, a4_z = scipy.signal.bilinear(b4, a4, fs)
  599. # 应用滤波器链
  600. data_np = acceleration_data.to_numpy()
  601. filtered_data = scipy.signal.lfilter(b1_z, a1_z, data_np)
  602. filtered_data = scipy.signal.lfilter(b2_z, a2_z, filtered_data)
  603. filtered_data = scipy.signal.lfilter(b3_z, a3_z, filtered_data)
  604. filtered_data = scipy.signal.lfilter(b4_z, a4_z, filtered_data)
  605. return pd.Series(filtered_data, index=acceleration_data.index)
  606. def calculate_ava_vav(self):
  607. """计算多维度综合加权加速度
  608. 基于ISO 2631-1:1997标准,综合考虑车辆在三个平移方向和三个旋转方向的加速度或角速度
  609. Returns:
  610. float: 多维度综合加权加速度值
  611. """
  612. # 定义各方向的权重系数
  613. k_x = 1.0 # X方向加速度权重
  614. k_y = 1.0 # Y方向加速度权重
  615. k_z = 1.0 # Z方向加速度权重
  616. k_roll = 0.63 # 横滚角速度权重
  617. k_pitch = 0.8 # 俯仰角速度权重
  618. k_yaw = 0.5 # 偏航角速度权重
  619. # 获取数据
  620. df = self.ego_df.copy()
  621. # 确保有必要的列
  622. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  623. self.logger.warning("缺少计算多维度综合加权加速度所需的数据列")
  624. return self.calculated_value['ava_vav']
  625. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  626. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  627. if 'posH' not in df.columns:
  628. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  629. return self.calculated_value['ava_vav']
  630. df['posH_rad'] = np.radians(df['posH'])
  631. # 转换加速度到车身坐标系
  632. # 注意:posH是航向角,北向为0度,顺时针为正
  633. # 车身X轴 = 东向*sin(posH) + 北向*cos(posH)
  634. # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH)
  635. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  636. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  637. # Z方向加速度,如果没有则假设为0
  638. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  639. # 角速度数据,如果没有则使用角速度变化率代替
  640. # 注意:speedH是航向角速度,需要转换为车身坐标系下的偏航角速度
  641. omega_roll = df['rollRate'] if 'rollRate' in df.columns else pd.Series(np.zeros(len(df)))
  642. omega_pitch = df['pitchRate'] if 'pitchRate' in df.columns else pd.Series(np.zeros(len(df)))
  643. omega_yaw = df['speedH'] # 使用航向角速度作为偏航角速度
  644. # 应用ISO 2631-1:1997标准的频率加权滤波
  645. # 估计采样频率 - 假设数据是均匀采样的
  646. if len(df) > 1:
  647. time_diff = df['simTime'].diff().median()
  648. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  649. else:
  650. fs = 100 # 默认采样频率
  651. # 对各方向加速度应用适当的频率加权
  652. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wd', fs)
  653. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wd', fs)
  654. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wk', fs)
  655. # 对角速度也应用适当的频率加权
  656. # 注意:ISO标准没有直接指定角速度的加权,这里使用简化处理
  657. omega_roll_weighted = omega_roll # 可以根据需要应用适当的滤波
  658. omega_pitch_weighted = omega_pitch
  659. omega_yaw_weighted = omega_yaw
  660. # 计算加权均方根值 (r.m.s.)
  661. # 对每个方向的加速度/角速度平方后求平均,再开平方根
  662. a_x_rms = np.sqrt(np.mean(a_x_weighted**2))
  663. a_y_rms = np.sqrt(np.mean(a_y_weighted**2))
  664. a_z_rms = np.sqrt(np.mean(a_z_weighted**2))
  665. omega_roll_rms = np.sqrt(np.mean(omega_roll_weighted**2))
  666. omega_pitch_rms = np.sqrt(np.mean(omega_pitch_weighted**2))
  667. omega_yaw_rms = np.sqrt(np.mean(omega_yaw_weighted**2))
  668. # 计算综合加权加速度
  669. ava_vav = np.sqrt(
  670. k_x * a_x_rms**2 +
  671. k_y * a_y_rms**2 +
  672. k_z * a_z_rms**2 +
  673. k_roll * omega_roll_rms**2 +
  674. k_pitch * omega_pitch_rms**2 +
  675. k_yaw * omega_yaw_rms**2
  676. )
  677. # 记录计算结果
  678. self.calculated_value['ava_vav'] = ava_vav
  679. self.logger.info(f"多维度综合加权加速度(ava_vav)计算结果: {ava_vav}")
  680. return ava_vav
  681. def calculate_msdv(self):
  682. """计算晕动剂量值(Motion Sickness Dose Value, MSDV)
  683. MSDV用于量化乘员因持续振动而产生的晕动风险,其物理意义是
  684. "频率加权后的加速度有效值的平方对时间的累积",
  685. 能够反映乘员在一定时间内受到振动刺激的总量。
  686. 计算公式: MSDV = ∫[0,T] |a_ω(t)|² dt
  687. Returns:
  688. float: 晕动剂量值
  689. """
  690. # 获取数据
  691. df = self.ego_df.copy()
  692. # 确保有必要的列
  693. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  694. self.logger.warning("缺少计算晕动剂量值所需的数据列")
  695. return self.calculated_value['msdv']
  696. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  697. if 'posH' not in df.columns:
  698. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  699. return self.calculated_value['msdv']
  700. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  701. df['posH_rad'] = np.radians(df['posH'])
  702. # 转换加速度到车身坐标系
  703. # 注意:posH是航向角,北向为0度,顺时针为正
  704. # 车身X轴 = 东向*sin(posH) + 北向*cos(posH)
  705. # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH)
  706. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  707. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  708. # Z方向加速度,如果没有则假设为0
  709. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  710. # 计算时间差
  711. df['time_diff'] = df['simTime'].diff().fillna(0)
  712. # 应用ISO 2631-1:1997标准的频率加权滤波
  713. # 估计采样频率 - 假设数据是均匀采样的
  714. if len(df) > 1:
  715. time_diff = df['simTime'].diff().median()
  716. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  717. else:
  718. fs = 100 # 默认采样频率
  719. # 对各方向加速度应用适当的频率加权
  720. # 对于晕动评估,使用Wf加权滤波器
  721. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wf', fs)
  722. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wf', fs)
  723. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wf', fs)
  724. # 计算MSDV - 对加速度平方进行时间积分
  725. # 对于X方向(前后方向)- 主要影响晕动感
  726. msdv_x = np.sqrt(np.sum(a_x_weighted**2 * df['time_diff']))
  727. # 对于Y方向(左右方向)
  728. msdv_y = np.sqrt(np.sum(a_y_weighted**2 * df['time_diff']))
  729. # 对于Z方向(上下方向)- 也对晕动有显著影响
  730. msdv_z = np.sqrt(np.sum(a_z_weighted**2 * df['time_diff']))
  731. # 综合MSDV - 可以使用向量和或加权和
  732. # 根据ISO 2631标准,垂直方向(Z)的权重通常更高
  733. msdv = np.sqrt(msdv_x**2 + msdv_y**2 + (1.4 * msdv_z)**2)
  734. # 记录计算结果
  735. self.calculated_value['msdv'] = msdv
  736. self.logger.info(f"晕动剂量值(MSDV)计算结果: {msdv}")
  737. return msdv