comfort.py 64 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/04/25
  12. @Summary: Comfort metrics
  13. """
  14. import sys
  15. import math
  16. import scipy.signal
  17. import pandas as pd
  18. import numpy as np
  19. import os
  20. from pathlib import Path
  21. from typing import Dict, List, Any, Optional, Callable, Union, Tuple
  22. from modules.lib.score import Score
  23. from modules.lib.common import get_interpolation, get_frame_with_time
  24. from modules.lib import data_process
  25. from modules.lib.log_manager import LogManager
  26. from modules.lib.chart_generator import generate_comfort_chart_data
  27. # 更新COMFORT_INFO列表,添加车辆坐标系下的速度和加速度字段
  28. COMFORT_INFO = [
  29. "simTime",
  30. "simFrame",
  31. "speedX",
  32. "speedY",
  33. "accelX",
  34. "accelY",
  35. "curvHor",
  36. "lightMask",
  37. "v",
  38. "lat_acc",
  39. "lon_acc",
  40. "time_diff",
  41. "lon_acc_diff",
  42. "lon_acc_roc",
  43. "speedH",
  44. "accelH",
  45. "posH", # 航向角字段
  46. "lon_v_vehicle", # 车辆坐标系下的纵向速度
  47. "lat_v_vehicle", # 车辆坐标系下的横向速度
  48. "lon_acc_vehicle", # 车辆坐标系下的纵向加速度
  49. "lat_acc_vehicle" # 车辆坐标系下的横向加速度
  50. ]
  51. # ----------------------
  52. # 独立指标计算函数
  53. # ----------------------
  54. def motionComfortIndex(data_processed) -> dict:
  55. """计算运动舒适度指数"""
  56. comfort = Comfort(data_processed)
  57. index = comfort.calculate_motion_comfort_index()
  58. return {"motionComfortIndex": float(index)}
  59. def rideQualityScore(data_processed) -> dict:
  60. """计算乘坐质量评分"""
  61. comfort = Comfort(data_processed)
  62. score = comfort.calculate_ride_quality_score()
  63. return {"rideQualityScore": float(score)}
  64. def calculate_motionsickness(data_processed) -> dict:
  65. """计算晕车概率指标"""
  66. comfort = ComfortCalculator(data_processed)
  67. motion_sickness_prob = comfort.calculate_motion_sickness_probability()
  68. return {"motionSickness": float(motion_sickness_prob)}
  69. def calculate_vdv(data_processed) -> dict:
  70. """计算振动剂量值(Vibration Dose Value, VDV)指标"""
  71. comfort = ComfortCalculator(data_processed)
  72. vdv_value = comfort.calculate_vdv()
  73. return {"vdv": float(vdv_value)}
  74. def calculate_ava_vav(data_processed) -> dict:
  75. """计算多维度综合加权加速度"""
  76. comfort = ComfortCalculator(data_processed)
  77. ava_vav_value = comfort.calculate_ava_vav()
  78. return {"ava_vav": float(ava_vav_value)}
  79. def calculate_msdv(data_processed) -> dict:
  80. """计算晕动剂量值(MSDV)指标"""
  81. comfort = ComfortCalculator(data_processed)
  82. msdv_value = comfort.calculate_msdv()
  83. return {"msdv": float(msdv_value)}
  84. def calculate_zigzag(data_processed) -> dict:
  85. """计算蛇行指标"""
  86. comfort = ComfortCalculator(data_processed)
  87. zigzag_count = comfort.calculate_zigzag_count()
  88. return {"zigzag": float(zigzag_count)}
  89. def calculate_shake(data_processed) -> dict:
  90. """计算晃动指标"""
  91. comfort = ComfortCalculator(data_processed)
  92. shake_count = comfort.calculate_shake_count()
  93. return {"shake": float(shake_count)}
  94. def calculate_cadence(data_processed) -> dict:
  95. """计算顿挫指标"""
  96. comfort = ComfortCalculator(data_processed)
  97. cadence_count = comfort.calculate_cadence_count()
  98. return {"cadence": float(cadence_count)}
  99. def calculate_slambrake(data_processed) -> dict:
  100. """计算急刹车指标"""
  101. comfort = ComfortCalculator(data_processed)
  102. slam_brake_count = comfort.calculate_slam_brake_count()
  103. return {"slamBrake": float(slam_brake_count)}
  104. def calculate_slamaccelerate(data_processed) -> dict:
  105. """计算急加速指标"""
  106. comfort = ComfortCalculator(data_processed)
  107. slam_accel_count = comfort.calculate_slam_accel_count()
  108. return {"slamAccelerate": float(slam_accel_count)}
  109. # 装饰器保持不变
  110. def peak_valley_decorator(method):
  111. def wrapper(self, *args, **kwargs):
  112. peak_valley = self._peak_valley_determination(self.df)
  113. pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
  114. if len(pv_list) != 0:
  115. flag = True
  116. p_last = pv_list[0]
  117. for i in range(1, len(pv_list)):
  118. p_curr = pv_list[i]
  119. if self._peak_valley_judgment(p_last, p_curr):
  120. # method(self, p_curr, p_last)
  121. method(self, p_curr, p_last, flag, *args, **kwargs)
  122. else:
  123. p_last = p_curr
  124. return method
  125. else:
  126. flag = False
  127. p_curr = [0, 0]
  128. p_last = [0, 0]
  129. method(self, p_curr, p_last, flag, *args, **kwargs)
  130. return method
  131. return wrapper
  132. class ComfortRegistry:
  133. """舒适性指标注册器"""
  134. def __init__(self, data_processed):
  135. self.logger = LogManager().get_logger() # 获取全局日志实例
  136. self.data = data_processed
  137. self.comfort_config = data_processed.comfort_config["comfort"]
  138. self.metrics = self._extract_metrics(self.comfort_config)
  139. self._registry = self._build_registry()
  140. self.output_dir = None # 图表数据输出目录
  141. def _extract_metrics(self, config_node: dict) -> list:
  142. """DFS遍历提取指标"""
  143. metrics = []
  144. def _recurse(node):
  145. if isinstance(node, dict):
  146. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  147. metrics.append(node['name'])
  148. for v in node.values():
  149. _recurse(v)
  150. _recurse(config_node)
  151. self.logger.info(f'评比的舒适性指标列表:{metrics}')
  152. return metrics
  153. def _build_registry(self) -> dict:
  154. """自动注册指标函数"""
  155. registry = {}
  156. for metric_name in self.metrics:
  157. func_name = f"calculate_{metric_name.lower()}"
  158. try:
  159. registry[metric_name] = globals()[func_name]
  160. except KeyError:
  161. self.logger.error(f"未实现指标函数: {func_name}")
  162. return registry
  163. def batch_execute(self) -> dict:
  164. """批量执行指标计算"""
  165. results = {}
  166. for name, func in self._registry.items():
  167. try:
  168. result = func(self.data)
  169. results.update(result)
  170. # 新增:将每个指标的结果写入日志
  171. self.logger.info(f'舒适性指标[{name}]计算结果: {result}')
  172. except Exception as e:
  173. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  174. results[name] = None
  175. self.logger.info(f'舒适性指标计算结果:{results}')
  176. return results
  177. class ComfortCalculator:
  178. """舒适性指标计算类 - 提供核心计算功能"""
  179. def generate_metric_chart(self, metric_name: str) -> None:
  180. """
  181. 生成指标图表
  182. Args:
  183. metric_name: 指标名称
  184. """
  185. # 设置输出目录
  186. if not hasattr(self, 'output_dir') or not self.output_dir:
  187. self.output_dir = os.path.join(os.getcwd(), 'data')
  188. os.makedirs(self.output_dir, exist_ok=True)
  189. # 调用chart_generator中的函数生成图表
  190. chart_path = generate_comfort_chart_data(self, metric_name, self.output_dir)
  191. if chart_path:
  192. self.logger.info(f"{metric_name}图表已生成: {chart_path}")
  193. def __init__(self, data_processed):
  194. self.data_processed = data_processed
  195. self.logger = LogManager().get_logger()
  196. self.data = data_processed.ego_data
  197. self.ego_df = pd.DataFrame()
  198. self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  199. # 统计指标
  200. self.calculated_value = {
  201. 'zigzag': 0,
  202. 'shake': 0,
  203. 'cadence': 0,
  204. 'slamBrake': 0,
  205. 'slamAccelerate': 0,
  206. 'ava_vav': 0, # 添加新指标的默认值
  207. 'msdv': 0, # 添加MSDV指标的默认值
  208. 'motionSickness': 0, # 添加晕车概率指标的默认值
  209. 'vdt:': 0,
  210. 'motionComfortIndex': 0, # 新增指标
  211. 'rideQualityScore': 0 # 新增指标
  212. }
  213. self.time_list = self.data['simTime'].values.tolist()
  214. self.frame_list = self.data['simFrame'].values.tolist()
  215. self.zigzag_count = 0
  216. self.shake_count = 0
  217. self.cadence_count = 0
  218. self.slam_brake_count = 0
  219. self.slam_accel_count = 0
  220. self.zigzag_time_list = []
  221. self.zigzag_stre_list = []
  222. self.shake_events = [] # 用于存储晃动事件数据
  223. self._initialize_data()
  224. def _initialize_data(self):
  225. """初始化数据"""
  226. self.ego_df = self.data[COMFORT_INFO].copy()
  227. self.df = self.ego_df.reset_index(drop=True)
  228. self._prepare_comfort_parameters()
  229. def _prepare_comfort_parameters(self):
  230. """准备舒适性计算所需参数"""
  231. # 检查是否已经有车身坐标系下的加速度和速度数据
  232. # 这些数据应该在data_process.py的process_ego_data方法中已经计算好
  233. if 'lon_acc_vehicle' in self.ego_df.columns and 'lat_acc_vehicle' in self.ego_df.columns:
  234. # 直接使用已转换的数据
  235. self.logger.info("使用data_process中预先计算的车身坐标系加速度和速度数据")
  236. elif 'posH' in self.ego_df.columns:
  237. # 如果没有预先计算的数据,但有航向角,则进行计算
  238. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  239. self.logger.warning("未找到预先计算的车身坐标系数据,进行实时计算")
  240. self.ego_df['heading_rad'] = np.deg2rad(90 - self.ego_df['posH']) # 转换为与x轴的夹角
  241. # 转换加速度到车身坐标系 - 使用与data_process.py相同的转换方法
  242. self.ego_df['lon_acc_vehicle'] = self.ego_df['accelX'] * np.cos(self.ego_df['heading_rad']) + \
  243. self.ego_df['accelY'] * np.sin(self.ego_df['heading_rad'])
  244. self.ego_df['lat_acc_vehicle'] = -self.ego_df['accelX'] * np.sin(self.ego_df['heading_rad']) + \
  245. self.ego_df['accelY'] * np.cos(self.ego_df['heading_rad'])
  246. # 转换速度到车身坐标系
  247. self.ego_df['lon_v_vehicle'] = self.ego_df['speedX'] * np.cos(self.ego_df['heading_rad']) + \
  248. self.ego_df['speedY'] * np.sin(self.ego_df['heading_rad'])
  249. self.ego_df['lat_v_vehicle'] = -self.ego_df['speedX'] * np.sin(self.ego_df['heading_rad']) + \
  250. self.ego_df['speedY'] * np.cos(self.ego_df['heading_rad'])
  251. else:
  252. self.logger.warning("缺少航向角数据,无法将数据转换为车身坐标系")
  253. # 计算加减速阈值 - 使用车辆坐标系下的纵向速度代替合速度
  254. speed_field = 'lon_v_vehicle' if 'lon_v_vehicle' in self.ego_df.columns else 'v'
  255. self.logger.info(f"加减速阈值计算使用的速度字段: {speed_field}")
  256. self.ego_df['ip_acc'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
  257. self.ego_df['ip_dec'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
  258. # 使用车辆坐标系下的纵向加速度计算急刹车和急加速
  259. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in self.ego_df.columns else 'lon_acc'
  260. self.logger.info(f"急刹车和急加速检测使用的加速度字段: {acc_field}")
  261. # 使用车辆坐标系下的纵向加速度与阈值比较,判断急刹车和急加速
  262. self.ego_df['slam_brake'] = (self.ego_df[acc_field] - self.ego_df['ip_dec']).apply(
  263. lambda x: 1 if x < 0 else 0)
  264. self.ego_df['slam_accel'] = (self.ego_df[acc_field] - self.ego_df['ip_acc']).apply(
  265. lambda x: 1 if x > 0 else 0)
  266. # 确保cadence列使用车辆坐标系下的纵向加速度计算
  267. self.ego_df['cadence'] = self.ego_df.apply(
  268. lambda row: self._cadence_process_new(row[acc_field], row['ip_acc'], row['ip_dec']), axis=1)
  269. def _apply_frequency_weighting(self, acceleration_data, weighting_type='Wk', fs=100):
  270. """应用ISO 2631-1:1997标准的频率加权滤波
  271. 参数:
  272. acceleration_data: 加速度时间序列数据
  273. weighting_type: 加权类型,可选值包括:
  274. - 'Wk': 垂直方向(Z轴)加权
  275. - 'Wd': 水平方向(X和Y轴)加权
  276. - 'Wf': 运动病相关加权
  277. fs: 采样频率(Hz)
  278. 返回:
  279. 加权后的加速度数据
  280. """
  281. # 检查数据有效性
  282. if acceleration_data.empty or acceleration_data.isna().all():
  283. return acceleration_data
  284. # 根据ISO 2631-1:1997标准设计滤波器
  285. # 这些参数来自标准文档,用于构建数字滤波器
  286. if weighting_type == 'Wk': # 垂直方向(Z轴)
  287. # Wk滤波器参数
  288. f1 = 0.4
  289. f2 = 100.0
  290. f3 = 12.5
  291. f4 = 12.5
  292. Q1 = 0.63
  293. Q2 = 0.5
  294. Q3 = 0.63
  295. Q4 = 0.63
  296. K = 0.4
  297. elif weighting_type == 'Wd': # 水平方向(X和Y轴)
  298. # Wd滤波器参数
  299. f1 = 0.4
  300. f2 = 100.0
  301. f3 = 2.0
  302. f4 = 2.0
  303. Q1 = 0.63
  304. Q2 = 0.5
  305. Q3 = 0.63
  306. Q4 = 0.63
  307. K = 0.4
  308. elif weighting_type == 'Wf': # 运动病相关
  309. # Wf滤波器参数
  310. f1 = 0.08
  311. f2 = 0.63
  312. f3 = 0.25
  313. f4 = 0.8
  314. Q1 = 0.63
  315. Q2 = 0.86
  316. Q3 = 0.8
  317. Q4 = 0.8
  318. K = 1.0
  319. else:
  320. self.logger.warning(f"未知的加权类型: {weighting_type},使用原始数据")
  321. return acceleration_data
  322. # 将频率转换为角频率
  323. w1 = 2 * np.pi * f1
  324. w2 = 2 * np.pi * f2
  325. w3 = 2 * np.pi * f3
  326. w4 = 2 * np.pi * f4
  327. # 设计高通滤波器(s域)
  328. b1 = [K * w1**2, 0]
  329. a1 = [1, w1/Q1, w1**2]
  330. # 设计低通滤波器(s域)
  331. b2 = [K, 0, 0]
  332. a2 = [1, w2/Q2, w2**2]
  333. # 设计加速度-速度转换滤波器(s域)
  334. b3 = [K, 0]
  335. a3 = [1, w3/Q3, w3**2]
  336. # 设计上升滤波器(s域)
  337. b4 = [K, 0, 0]
  338. a4 = [1, w4/Q4, w4**2]
  339. # 使用双线性变换将s域滤波器转换为z域
  340. b1_z, a1_z = scipy.signal.bilinear(b1, a1, fs)
  341. b2_z, a2_z = scipy.signal.bilinear(b2, a2, fs)
  342. b3_z, a3_z = scipy.signal.bilinear(b3, a3, fs)
  343. b4_z, a4_z = scipy.signal.bilinear(b4, a4, fs)
  344. # 应用滤波器链
  345. data_np = acceleration_data.to_numpy()
  346. filtered_data = scipy.signal.lfilter(b1_z, a1_z, data_np)
  347. filtered_data = scipy.signal.lfilter(b2_z, a2_z, filtered_data)
  348. filtered_data = scipy.signal.lfilter(b3_z, a3_z, filtered_data)
  349. filtered_data = scipy.signal.lfilter(b4_z, a4_z, filtered_data)
  350. return pd.Series(filtered_data, index=acceleration_data.index)
  351. def calculate_vdv(self):
  352. """计算振动剂量值(Vibration Dose Value, VDV)
  353. VDV更强调"冲击"或"突发"振动事件对整体舒适度的影响,
  354. 常用于评估包含较多瞬态冲击或颠簸的振动信号。
  355. 计算公式: VDV = (∫[0,T] |a_ω(t)|⁴ dt)^(1/4)
  356. 相较于MSDV的二次累积,VDV的四次累积使其对高幅值短时冲击更为敏感,
  357. 能够更准确地反映剧烈颠簸对乘员舒适度的不利影响。
  358. 如你正在开发自动驾驶体验分析模块,建议:
  359. MSDV → 用于标记低频摇摆造成的晕车风险
  360. VDV → 用于标记车内座椅或底盘振动强度(比如悬挂问题、路面冲击)
  361. Returns:
  362. float: 振动剂量值
  363. """
  364. # 获取数据
  365. df = self.ego_df.copy()
  366. # 确保有必要的列
  367. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  368. self.logger.warning("缺少计算振动剂量值所需的数据列")
  369. return self.calculated_value['vdv']
  370. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  371. if 'posH' not in df.columns:
  372. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  373. return self.calculated_value['vdv']
  374. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  375. df['posH_rad'] = np.radians(df['posH'])
  376. # 转换加速度到车身坐标系
  377. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  378. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  379. # Z方向加速度,如果没有则假设为0
  380. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  381. # 计算时间差
  382. df['time_diff'] = df['simTime'].diff().fillna(0)
  383. # 应用ISO 2631-1:1997标准的频率加权滤波
  384. # 估计采样频率 - 假设数据是均匀采样的
  385. if len(df) > 1:
  386. time_diff = df['simTime'].diff().median()
  387. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  388. else:
  389. fs = 100 # 默认采样频率
  390. # 对各方向加速度应用适当的频率加权
  391. # 对于VDV评估,使用与MSDV相同的加权滤波器
  392. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wd', fs) # 水平方向使用Wd
  393. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wd', fs) # 水平方向使用Wd
  394. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wk', fs) # 垂直方向使用Wk
  395. # 计算加权均方根值 (r.m.s.)
  396. a_x_rms = np.sqrt(np.mean(a_x_weighted**2))
  397. a_y_rms = np.sqrt(np.mean(a_y_weighted**2))
  398. a_z_rms = np.sqrt(np.mean(a_z_weighted**2))
  399. # 记录r.m.s.值用于参考
  400. self.logger.info(f"X方向加权均方根值: {a_x_rms}")
  401. self.logger.info(f"Y方向加权均方根值: {a_y_rms}")
  402. self.logger.info(f"Z方向加权均方根值: {a_z_rms}")
  403. # 计算VDV - 对加速度四次方进行时间积分,再开四次方根
  404. # 对于X方向(前后方向)
  405. vdv_x = np.power(np.sum(np.power(np.abs(a_x_weighted), 4) * df['time_diff']), 0.25)
  406. # 对于Y方向(左右方向)
  407. vdv_y = np.power(np.sum(np.power(np.abs(a_y_weighted), 4) * df['time_diff']), 0.25)
  408. # 对于Z方向(上下方向)
  409. vdv_z = np.power(np.sum(np.power(np.abs(a_z_weighted), 4) * df['time_diff']), 0.25)
  410. # 综合VDV - 可以使用向量和或加权和
  411. # 根据ISO 2631标准,垂直方向(Z)的权重通常更高
  412. vdv = np.sqrt(vdv_x**2 + vdv_y**2 + (1.4 * vdv_z)**2)
  413. # 记录计算结果
  414. self.calculated_value['vdv'] = vdv
  415. self.logger.info(f"振动剂量值(VDV)计算结果: {vdv}")
  416. self.logger.info(f"X方向VDV: {vdv_x}, Y方向VDV: {vdv_y}, Z方向VDV: {vdv_z}")
  417. return vdv
  418. def calculate_motion_sickness_probability(self):
  419. """计算基于运动参数的晕车概率模型
  420. 该模型综合考虑三轴加速度和加加速度(Jerk)对乘客晕车感的影响,
  421. 通过非线性指数函数将计算结果映射到0-100%的概率范围。
  422. 计算公式: P = 100 * [1 - exp(-(α*(ax²+ay²+az²) + β*Jerk_RMS) / γ)]
  423. 其中:
  424. - ax, ay, az: 三轴加速度,表征车辆纵向、横向、垂向运动强度
  425. - Jerk_RMS: 加速度变化率的均方根值,反映运动突兀性
  426. - α: 加速度权重(默认0.1 s⁴/m²)
  427. - β: Jerk权重(默认0.5 s²/m²)
  428. - γ: 归一化因子(默认10 m²/s⁴)
  429. Returns:
  430. float: 晕车概率(0-100%)
  431. 晕车概率 (%) 晕车程度分级 建议动作
  432. 0 - 10% 无不适感(Comfortable) 无需干预
  433. 10% - 30% 轻度不适(Slight) 可接受
  434. 30% - 50% 中度不适(Moderate) 需关注驾驶方式
  435. > 50% 明显不适(Discomfort) 应考虑优化轨迹、减速、减小加加速度变化率
  436. """
  437. # 获取数据
  438. df = self.ego_df.copy()
  439. # 确保有必要的列
  440. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  441. self.logger.warning("缺少计算晕车概率所需的数据列")
  442. return self.calculated_value.get('motionSickness', 0)
  443. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  444. if 'posH' not in df.columns:
  445. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  446. return self.calculated_value.get('motionSickness', 0)
  447. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  448. df['posH_rad'] = np.radians(df['posH'])
  449. # 转换加速度到车身坐标系
  450. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  451. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  452. # Z方向加速度,如果没有则假设为0
  453. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  454. # 计算时间差
  455. df['time_diff'] = df['simTime'].diff().fillna(0)
  456. # 应用ISO 2631-1:1997标准的频率加权滤波
  457. # 估计采样频率 - 假设数据是均匀采样的
  458. if len(df) > 1:
  459. time_diff = df['simTime'].diff().median()
  460. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  461. else:
  462. fs = 100 # 默认采样频率
  463. # 对各方向加速度应用适当的频率加权
  464. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wf', fs) # 使用Wf滤波器(晕动相关)
  465. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wf', fs)
  466. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wf', fs)
  467. # 计算加加速度(Jerk) - 加速度的导数
  468. # 使用中心差分法计算导数
  469. df['jerk_x'] = a_x_weighted.diff() / df['time_diff']
  470. df['jerk_y'] = a_y_weighted.diff() / df['time_diff']
  471. df['jerk_z'] = a_z_weighted.diff() / df['time_diff']
  472. # 填充NaN值
  473. df[['jerk_x', 'jerk_y', 'jerk_z']] = df[['jerk_x', 'jerk_y', 'jerk_z']].fillna(0)
  474. # 计算Jerk的均方根值(RMS)
  475. jerk_squared_sum = df['jerk_x']**2 + df['jerk_y']**2 + df['jerk_z']**2
  476. jerk_rms = np.sqrt(np.mean(jerk_squared_sum))
  477. # 计算加速度平方和的均值
  478. accel_squared_sum = a_x_weighted**2 + a_y_weighted**2 + a_z_weighted**2
  479. accel_squared_mean = np.mean(accel_squared_sum)
  480. # 设置模型参数
  481. alpha = 0.1 # 加速度权重(s⁴/m²)
  482. beta = 0.5 # Jerk权重(s²/m²)
  483. gamma = 10.0 # 归一化因子(m²/s⁴)
  484. # 计算晕车概率
  485. acceleration_term = alpha * accel_squared_mean
  486. jerk_term = beta * jerk_rms
  487. score = (acceleration_term + jerk_term) / gamma
  488. probability = 100 * (1 - np.exp(-score))
  489. # 限制在0-100%范围内
  490. probability = np.clip(probability, 0, 100)
  491. # 记录计算结果
  492. self.calculated_value['motionSickness'] = probability
  493. self.logger.info(f"晕车概率(Motion Sickness Probability)计算结果: {probability:.2f}%")
  494. self.logger.info(f"加速度平方和均值: {accel_squared_mean:.4f} m²/s⁴, Jerk均方根值: {jerk_rms:.4f} m/s³")
  495. return probability
  496. def calculate_motion_comfort_index(self):
  497. """计算运动舒适度指数
  498. 该指数综合考虑加速度、加加速度和角速度对乘客舒适感的影响,
  499. 通过加权平均的方式得出一个0-10的舒适度评分,其中10表示最舒适。
  500. Returns:
  501. float: 运动舒适度指数(0-10)
  502. | 舒适指数 `comfort_index` | 等级 | 描述 |
  503. | -------------------- | ---- | ------------ |
  504. | **8 - 10** | 非常舒适 | 几乎无晃动 |
  505. | **6 - 8** | 舒适 | 偶有轻微晃动 |
  506. | **4 - 6** | 一般 | 有一定晃动感 |
  507. | **2 - 4** | 不适 | 明显晃动、影响乘坐感受 |
  508. | **0 - 2** | 极度不适 | 强烈晃动,需优化控制系统 |
  509. """
  510. # 获取数据
  511. df = self.ego_df.copy()
  512. # 确保有必要的列
  513. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  514. self.logger.warning("缺少计算运动舒适度指数所需的数据列")
  515. return self.calculated_value.get('motionComfortIndex', 5.0)
  516. # 计算合成加速度
  517. df['accel_magnitude'] = np.sqrt(df['accelX']**2 + df['accelY']**2)
  518. if 'accelZ' in df.columns:
  519. df['accel_magnitude'] = np.sqrt(df['accel_magnitude']**2 + df['accelZ']**2)
  520. # 计算加加速度(Jerk)
  521. df['time_diff'] = df['simTime'].diff().fillna(0.01)
  522. df['jerk_x'] = df['accelX'].diff() / df['time_diff']
  523. df['jerk_y'] = df['accelY'].diff() / df['time_diff']
  524. df['jerk_magnitude'] = np.sqrt(df['jerk_x']**2 + df['jerk_y']**2)
  525. if 'accelZ' in df.columns:
  526. df['jerk_z'] = df['accelZ'].diff() / df['time_diff']
  527. df['jerk_magnitude'] = np.sqrt(df['jerk_magnitude']**2 + df['jerk_z']**2)
  528. # 计算角速度
  529. if 'rollRate' in df.columns and 'pitchRate' in df.columns:
  530. df['angular_velocity'] = np.sqrt(df['rollRate']**2 + df['pitchRate']**2)
  531. if 'speedH' in df.columns: # 使用航向角速度作为偏航角速度
  532. df['angular_velocity'] = np.sqrt(df['angular_velocity']**2 + df['speedH']**2)
  533. else:
  534. df['angular_velocity'] = 0
  535. # 计算各指标的均方根值
  536. accel_rms = np.sqrt(np.mean(df['accel_magnitude']**2))
  537. jerk_rms = np.sqrt(np.mean(df['jerk_magnitude']**2))
  538. angular_rms = np.sqrt(np.mean(df['angular_velocity']**2))
  539. # 设置阈值和权重
  540. accel_threshold = 2.0 # m/s²,超过此值舒适度开始下降
  541. jerk_threshold = 1.0 # m/s³,超过此值舒适度开始下降
  542. angular_threshold = 0.2 # rad/s,超过此值舒适度开始下降
  543. accel_weight = 0.5 # 加速度权重
  544. jerk_weight = 0.3 # 加加速度权重
  545. angular_weight = 0.2 # 角速度权重
  546. # 计算各分量的舒适度得分(0-10)
  547. accel_score = 10 * np.exp(-max(0, accel_rms - accel_threshold) / accel_threshold)
  548. jerk_score = 10 * np.exp(-max(0, jerk_rms - jerk_threshold) / jerk_threshold)
  549. angular_score = 10 * np.exp(-max(0, angular_rms - angular_threshold) / angular_threshold)
  550. # 计算加权平均得分
  551. comfort_index = (accel_weight * accel_score +
  552. jerk_weight * jerk_score +
  553. angular_weight * angular_score)
  554. # 限制在0-10范围内
  555. comfort_index = np.clip(comfort_index, 0, 10)
  556. # 记录计算结果
  557. self.calculated_value['motionComfortIndex'] = comfort_index
  558. self.logger.info(f"运动舒适度指数(Motion Comfort Index)计算结果: {comfort_index:.2f}/10")
  559. self.logger.info(f"加速度RMS: {accel_rms:.4f} m/s², 加加速度RMS: {jerk_rms:.4f} m/s³, 角速度RMS: {angular_rms:.4f} rad/s")
  560. return comfort_index
  561. def calculate_ride_quality_score(self):
  562. """计算乘坐质量评分
  563. 该评分基于ISO 2631标准,综合考虑振动频率、振幅和持续时间对人体的影响,
  564. 评估车辆在不同路况下的乘坐舒适性。
  565. Returns:
  566. float: 乘坐质量评分(0-100)
  567. """
  568. # 获取数据
  569. df = self.ego_df.copy()
  570. # 确保有必要的列
  571. if 'accelZ' not in df.columns:
  572. self.logger.warning("缺少计算乘坐质量评分所需的垂直加速度数据")
  573. return self.calculated_value.get('rideQualityScore', 70.0)
  574. # 估计采样频率
  575. if len(df) > 1:
  576. time_diff = df['simTime'].diff().median()
  577. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  578. else:
  579. fs = 100 # 默认采样频率
  580. # 应用ISO 2631-1:1997标准的频率加权滤波
  581. a_z_weighted = self._apply_frequency_weighting(df['accelZ'], 'Wk', fs)
  582. # 计算垂直方向加速度的均方根值
  583. a_z_rms = np.sqrt(np.mean(a_z_weighted**2))
  584. # 根据ISO 2631-1:1997标准的舒适度评价
  585. # < 0.315 m/s² - 不感到不适
  586. # 0.315-0.63 m/s² - 轻微不适
  587. # 0.5-1.0 m/s² - 有些不适
  588. # 0.8-1.6 m/s² - 不适
  589. # 1.25-2.5 m/s² - 非常不适
  590. # > 2.0 m/s² - 极度不适
  591. # 将RMS值映射到0-100的评分
  592. if a_z_rms < 0.315:
  593. base_score = 90
  594. elif a_z_rms < 0.63:
  595. base_score = 80
  596. elif a_z_rms < 1.0:
  597. base_score = 70
  598. elif a_z_rms < 1.6:
  599. base_score = 60
  600. elif a_z_rms < 2.5:
  601. base_score = 40
  602. else:
  603. base_score = 20
  604. # 考虑振动持续时间的影响
  605. duration_factor = min(1.0, 10.0 / (df['simTime'].max() - df['simTime'].min()))
  606. # 考虑振动频率分布的影响
  607. # 计算功率谱密度
  608. if len(a_z_weighted) > 50: # 确保有足够的数据点进行频谱分析
  609. f, psd = self._calculate_psd(a_z_weighted, fs)
  610. # 计算人体敏感频率范围(4-8Hz)的能量占比
  611. sensitive_mask = (f >= 4) & (f <= 8)
  612. sensitive_energy = np.sum(psd[sensitive_mask])
  613. total_energy = np.sum(psd)
  614. frequency_factor = 1.0 - 0.3 * (sensitive_energy / total_energy if total_energy > 0 else 0)
  615. else:
  616. frequency_factor = 1.0
  617. # 计算最终评分
  618. ride_quality_score = base_score * duration_factor * frequency_factor
  619. # 限制在0-100范围内
  620. ride_quality_score = np.clip(ride_quality_score, 0, 100)
  621. # 记录计算结果
  622. self.calculated_value['rideQualityScore'] = ride_quality_score
  623. self.logger.info(f"乘坐质量评分(Ride Quality Score)计算结果: {ride_quality_score:.2f}/100")
  624. self.logger.info(f"垂直加速度RMS: {a_z_rms:.4f} m/s²")
  625. return ride_quality_score
  626. def _calculate_psd(self, signal, fs):
  627. """计算信号的功率谱密度
  628. Args:
  629. signal: 输入信号
  630. fs: 采样频率
  631. Returns:
  632. tuple: 频率和对应的功率谱密度
  633. """
  634. # 使用Welch方法计算PSD
  635. from scipy import signal as sp_signal
  636. f, psd = sp_signal.welch(signal, fs, nperseg=min(256, len(signal)//2))
  637. return f, psd
  638. def calculate_ava_vav(self):
  639. """计算多维度综合加权加速度
  640. 基于ISO 2631-1:1997标准,综合考虑车辆在三个平移方向和三个旋转方向的加速度或角速度
  641. Returns:
  642. float: 多维度综合加权加速度值
  643. """
  644. # 定义各方向的权重系数
  645. k_x = 1.0 # X方向加速度权重
  646. k_y = 1.0 # Y方向加速度权重
  647. k_z = 1.0 # Z方向加速度权重
  648. k_roll = 0.63 # 横滚角速度权重
  649. k_pitch = 0.8 # 俯仰角速度权重
  650. k_yaw = 0.5 # 偏航角速度权重
  651. # 获取数据
  652. df = self.ego_df.copy()
  653. # 确保有必要的列
  654. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  655. self.logger.warning("缺少计算多维度综合加权加速度所需的数据列")
  656. return self.calculated_value['ava_vav']
  657. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  658. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  659. if 'posH' not in df.columns:
  660. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  661. return self.calculated_value['ava_vav']
  662. df['posH_rad'] = np.radians(df['posH'])
  663. # 转换加速度到车身坐标系
  664. # 注意:posH是航向角,北向为0度,顺时针为正
  665. # 车身X轴 = 东向*sin(posH) + 北向*cos(posH)
  666. # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH)
  667. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  668. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  669. # Z方向加速度,如果没有则假设为0
  670. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  671. # 角速度数据,如果没有则使用角速度变化率代替
  672. # 注意:speedH是航向角速度,需要转换为车身坐标系下的偏航角速度
  673. omega_roll = df['rollRate'] if 'rollRate' in df.columns else pd.Series(np.zeros(len(df)))
  674. omega_pitch = df['pitchRate'] if 'pitchRate' in df.columns else pd.Series(np.zeros(len(df)))
  675. omega_yaw = df['speedH'] # 使用航向角速度作为偏航角速度
  676. # 应用ISO 2631-1:1997标准的频率加权滤波
  677. # 估计采样频率 - 假设数据是均匀采样的
  678. if len(df) > 1:
  679. time_diff = df['simTime'].diff().median()
  680. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  681. else:
  682. fs = 100 # 默认采样频率
  683. # 对各方向加速度应用适当的频率加权
  684. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wd', fs)
  685. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wd', fs)
  686. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wk', fs)
  687. # 对角速度也应用适当的频率加权
  688. # 注意:ISO标准没有直接指定角速度的加权,这里使用简化处理
  689. omega_roll_weighted = omega_roll # 可以根据需要应用适当的滤波
  690. omega_pitch_weighted = omega_pitch
  691. omega_yaw_weighted = omega_yaw
  692. # 计算加权均方根值 (r.m.s.)
  693. # 对每个方向的加速度/角速度平方后求平均,再开平方根
  694. a_x_rms = np.sqrt(np.mean(a_x_weighted**2))
  695. a_y_rms = np.sqrt(np.mean(a_y_weighted**2))
  696. a_z_rms = np.sqrt(np.mean(a_z_weighted**2))
  697. omega_roll_rms = np.sqrt(np.mean(omega_roll_weighted**2))
  698. omega_pitch_rms = np.sqrt(np.mean(omega_pitch_weighted**2))
  699. omega_yaw_rms = np.sqrt(np.mean(omega_yaw_weighted**2))
  700. # 计算综合加权加速度
  701. ava_vav = np.sqrt(
  702. k_x * a_x_rms**2 +
  703. k_y * a_y_rms**2 +
  704. k_z * a_z_rms**2 +
  705. k_roll * omega_roll_rms**2 +
  706. k_pitch * omega_pitch_rms**2 +
  707. k_yaw * omega_yaw_rms**2
  708. )
  709. # 记录计算结果
  710. self.calculated_value['ava_vav'] = ava_vav
  711. self.logger.info(f"多维度综合加权加速度(ava_vav)计算结果: {ava_vav}")
  712. return ava_vav
  713. def calculate_msdv(self):
  714. """计算晕动剂量值(Motion Sickness Dose Value, MSDV)
  715. MSDV用于量化乘员因持续振动而产生的晕动风险,其物理意义是
  716. "频率加权后的加速度有效值的平方对时间的累积",
  717. 能够反映乘员在一定时间内受到振动刺激的总量。
  718. 计算公式: MSDV = √(∫[0,T] a_ω(t)² dt)
  719. Returns:
  720. float: 晕动剂量值
  721. """
  722. # 获取数据
  723. df = self.ego_df.copy()
  724. # 确保有必要的列
  725. if 'accelX' not in df.columns or 'accelY' not in df.columns:
  726. self.logger.warning("缺少计算晕动剂量值所需的数据列")
  727. return self.calculated_value['msdv']
  728. # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
  729. if 'posH' not in df.columns:
  730. self.logger.warning("缺少航向角数据,无法进行坐标转换")
  731. return self.calculated_value['msdv']
  732. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  733. df['posH_rad'] = np.radians(df['posH'])
  734. # 转换加速度到车身坐标系
  735. # 注意:posH是航向角,北向为0度,顺时针为正
  736. # 车身X轴 = 东向*sin(posH) + 北向*cos(posH)
  737. # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH)
  738. df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
  739. df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
  740. # Z方向加速度,如果没有则假设为0
  741. df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
  742. # 计算时间差
  743. df['time_diff'] = df['simTime'].diff().fillna(0)
  744. total_time = df['time_diff'].sum()
  745. # 应用ISO 2631-1:1997标准的频率加权滤波
  746. # 估计采样频率 - 假设数据是均匀采样的
  747. if len(df) > 1:
  748. time_diff = df['simTime'].diff().median()
  749. fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz
  750. else:
  751. fs = 100 # 默认采样频率
  752. # 对各方向加速度应用适当的频率加权
  753. # 对于晕动评估,使用Wf加权滤波器
  754. a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wf', fs)
  755. a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wf', fs)
  756. a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wf', fs)
  757. # 先计算加权均方根值 (r.m.s.)
  758. a_x_rms = np.sqrt(np.sum(a_x_weighted**2 * df['time_diff']) / total_time)
  759. a_y_rms = np.sqrt(np.sum(a_y_weighted**2 * df['time_diff']) / total_time)
  760. a_z_rms = np.sqrt(np.sum(a_z_weighted**2 * df['time_diff']) / total_time)
  761. # 记录r.m.s.值用于参考
  762. self.logger.info(f"X方向加权均方根值: {a_x_rms}")
  763. self.logger.info(f"Y方向加权均方根值: {a_y_rms}")
  764. self.logger.info(f"Z方向加权均方根值: {a_z_rms}")
  765. # 计算MSDV - 基于r.m.s.值和总时间
  766. msdv_x = a_x_rms * np.sqrt(total_time)
  767. msdv_y = a_y_rms * np.sqrt(total_time)
  768. msdv_z = a_z_rms * np.sqrt(total_time)
  769. # 综合MSDV - 可以使用向量和或加权和
  770. # 根据ISO 2631标准,垂直方向(Z)的权重通常更高
  771. msdv = np.sqrt(msdv_x**2 + msdv_y**2 + (1.4 * msdv_z)**2)
  772. # 记录计算结果
  773. self.calculated_value['msdv'] = msdv
  774. self.logger.info(f"晕动剂量值(MSDV)计算结果: {msdv}")
  775. self.logger.info(f"X方向MSDV: {msdv_x}, Y方向MSDV: {msdv_y}, Z方向MSDV: {msdv_z}")
  776. return msdv
  777. def _cal_cur_ego_path(self, row):
  778. """计算车辆轨迹曲率"""
  779. try:
  780. divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2)
  781. if not divide:
  782. res = None
  783. else:
  784. res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
  785. except:
  786. res = None
  787. return res
  788. def _peak_valley_determination(self, df):
  789. """确定角速度的峰谷"""
  790. peaks, _ = scipy.signal.find_peaks(
  791. df['speedH'], height=2.3, distance=3,
  792. prominence=2.3, width=1)
  793. valleys, _ = scipy.signal.find_peaks(
  794. -df['speedH'], height=2.3, distance=3,
  795. prominence=2.3, width=1)
  796. return sorted(list(peaks) + list(valleys))
  797. def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6):
  798. """判断峰谷是否满足蛇行条件"""
  799. t_diff = p_curr[0] - p_last[0]
  800. v_diff = abs(p_curr[1] - p_last[1])
  801. s = p_curr[1] * p_last[1]
  802. if t_diff < tw and v_diff > avg and s < 0:
  803. if [p_last[0], p_curr[0]] not in self.zigzag_time_list:
  804. self.zigzag_time_list.append([p_last[0], p_curr[0]])
  805. return True
  806. return False
  807. def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
  808. """处理顿挫数据
  809. 使用车辆坐标系下的纵向加速度判断顿挫
  810. Args:
  811. lon_acc: 纵向加速度(车辆坐标系)
  812. ip_acc: 加速阈值
  813. ip_dec: 减速阈值
  814. Returns:
  815. int/float: nan表示不符合顿挫条件,1表示加速顿挫,-1表示减速顿挫,0表示正常
  816. """
  817. if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
  818. return np.nan
  819. elif abs(lon_acc) == 0:
  820. return 0
  821. elif lon_acc > 0 and lon_acc < ip_acc:
  822. return 1
  823. elif lon_acc < 0 and lon_acc > ip_dec:
  824. return -1
  825. else:
  826. return 0
  827. @peak_valley_decorator
  828. def _zigzag_detector(self, p_curr, p_last, flag=True):
  829. """检测蛇行事件"""
  830. if flag:
  831. # 记录蛇行事件的起止时间和帧号
  832. start_time = p_last[0]
  833. end_time = p_curr[0]
  834. start_frame = get_frame_with_time(self.time_list, self.frame_list, start_time)
  835. end_frame = get_frame_with_time(self.time_list, self.frame_list, end_time)
  836. # 计算事件持续时间
  837. duration = end_time - start_time
  838. # 设置最小持续时间阈值
  839. min_duration = 0.5 # 秒
  840. if duration >= min_duration:
  841. # 更新蛇行计数
  842. self.zigzag_count += 1
  843. # 添加到不舒适事件表
  844. new_row = pd.DataFrame([{
  845. 'start_time': start_time,
  846. 'end_time': end_time,
  847. 'start_frame': start_frame,
  848. 'end_frame': end_frame,
  849. 'type': 'zigzag'
  850. }])
  851. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  852. # 记录事件信息到zigzag_time_list
  853. self.zigzag_time_list.append({
  854. 'start_time': start_time,
  855. 'end_time': end_time,
  856. 'start_frame': start_frame,
  857. 'end_frame': end_frame,
  858. 'duration': duration
  859. })
  860. else:
  861. self.zigzag_count += 0
  862. @peak_valley_decorator
  863. def _cal_zigzag_strength(self, p_curr, p_last, flag=True):
  864. """计算蛇行强度"""
  865. if flag:
  866. v_diff = abs(p_curr[1] - p_last[1])
  867. t_diff = p_curr[0] - p_last[0]
  868. if t_diff > 0:
  869. self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
  870. else:
  871. self.zigzag_stre_list = []
  872. def _get_zigzag_times(self):
  873. """获取所有蛇行时间点"""
  874. all_times = []
  875. for time_range in self.zigzag_time_list:
  876. start, end = time_range
  877. # 获取这个时间范围内的所有时间点
  878. times_in_range = self.ego_df[(self.ego_df['simTime'] >= start) &
  879. (self.ego_df['simTime'] <= end)]['simTime'].tolist()
  880. all_times.extend(times_in_range)
  881. return all_times
  882. def calculate_zigzag_count(self):
  883. """计算蛇行指标"""
  884. self._zigzag_detector()
  885. # 生成蛇行指标图表
  886. self.generate_metric_chart('zigzag')
  887. return self.zigzag_count
  888. def calculate_shake_count(self):
  889. """计算晃动指标"""
  890. self._shake_detector()
  891. # 生成晃动指标图表
  892. self.generate_metric_chart('shake')
  893. return self.shake_count
  894. def calculate_cadence_count(self):
  895. """计算顿挫指标"""
  896. # 调用顿挫检测器
  897. cadence_time_ranges = self._cadence_detector()
  898. # 记录检测结果
  899. self.calculated_value['cadence'] = self.cadence_count
  900. # 生成顿挫指标图表
  901. self.generate_metric_chart('cadence')
  902. return self.cadence_count
  903. def calculate_slam_brake_count(self):
  904. """计算急刹车指标"""
  905. self._slam_brake_detector()
  906. # 生成急刹车指标图表
  907. self.generate_metric_chart('slamBrake')
  908. return self.slam_brake_count
  909. def calculate_slam_accel_count(self):
  910. """计算急加速指标"""
  911. self._slam_accel_detector()
  912. # 生成急加速指标图表
  913. self.generate_metric_chart('slamAccelerate')
  914. return self.slam_accel_count
  915. def calculate_vdv(self):
  916. """计算振动剂量值(Vibration Dose Value, VDV)指标"""
  917. vdv_value = super().calculate_vdv()
  918. # 生成VDV指标图表
  919. self.generate_metric_chart('vdv')
  920. return vdv_value
  921. def calculate_ava_vav(self):
  922. """计算多维度综合加权加速度"""
  923. ava_vav_value = super().calculate_ava_vav()
  924. # 生成AVA_VAV指标图表
  925. self.generate_metric_chart('ava_vav')
  926. return ava_vav_value
  927. def calculate_msdv(self):
  928. """计算晕动剂量值(Motion Sickness Dose Value, MSDV)"""
  929. msdv_value = super().calculate_msdv()
  930. # 生成MSDV指标图表
  931. self.generate_metric_chart('msdv')
  932. return msdv_value
  933. def _shake_detector(self, T_diff=0.5):
  934. """检测晃动事件 - 改进版本(使用向量化操作)
  935. 该函数通过以下步骤检测车辆晃动事件:
  936. 1. 计算横向加速度变化率和横摆角速度变化率
  937. 2. 分析横摆角速度的短期变化特性
  938. 3. 设置基于车速的动态阈值
  939. 4. 综合多个条件判断晃动事件
  940. 5. 对连续帧进行分组处理
  941. 6. 保存晃动事件数据到CSV文件
  942. 7. 将保存的CSV数据可视化并为晃动事件发生的时间段添加背景颜色
  943. """
  944. # 获取数据
  945. df = self.ego_df.copy()
  946. # 检查是否有必要的列
  947. if 'lat_acc' not in df.columns or 'posH' not in df.columns:
  948. self.logger.warning("缺少计算晃动指标所需的数据列")
  949. return
  950. # 将东北天坐标系下的数据转换为车身坐标系
  951. # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
  952. df['posH_rad'] = np.radians(df['posH'])
  953. # 转换横向加速度到车身坐标系
  954. df['lat_acc_body'] = df['lat_acc'] * np.cos(df['posH_rad']) - df['lon_acc'] * np.sin(df['posH_rad'])
  955. # 转换横摆角速度到车身坐标系
  956. # speedH已经是车身坐标系下的横摆角速度,不需要转换
  957. df['speedH_body'] = df['speedH']
  958. # 1. 计算横向加速度变化率(使用车身坐标系下的横向加速度)
  959. df['lat_acc_rate'] = df['lat_acc_body'].diff() / df['simTime'].diff()
  960. # 2. 计算横摆角速度变化率(使用车身坐标系下的横摆角速度)
  961. df['speedH_rate'] = df['speedH_body'].diff() / df['simTime'].diff()
  962. # 3. 计算横摆角速度的短期变化特性
  963. window_size = 10 # 10帧窗口
  964. df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
  965. # 4. 基于车速的动态阈值
  966. v0 = 20 * 5/18 # ≈5.56 m/s
  967. k = 0.008 * 3.6 # =0.0288 per m/s
  968. df['lat_acc_threshold'] = df['v'].apply(
  969. lambda speed: max(
  970. 1.0, # 下限 1.0 m/s²
  971. min(
  972. 1.8, # 上限 1.8 m/s²
  973. 1.8 - k * (speed - v0) # 线性递减
  974. )
  975. )
  976. )
  977. df['speedH_threshold'] = df['v'].apply(
  978. lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
  979. )
  980. # 5. 综合判断晃动条件
  981. # 条件A: 横向加速度超过阈值
  982. condition_A = df['lat_acc'].abs() > df['lat_acc_threshold']
  983. # 条件B: 横向加速度变化率超过阈值
  984. lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³)
  985. condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold
  986. # 条件C: 横摆角速度有明显变化但不呈现周期性
  987. condition_C = (df['speedH_std'] > df['speedH_threshold'])
  988. # 综合条件: 满足条件A,且满足条件B或条件C
  989. shake_condition = condition_A & (condition_B | condition_C)
  990. # 6. 使用向量化操作检测连续事件
  991. event_groups = (shake_condition != shake_condition.shift()).cumsum()
  992. shake_events = []
  993. for _, group in df[shake_condition].groupby(event_groups):
  994. if len(group) >= 2: # 至少2帧才算一次晃动
  995. start_time = group['simTime'].iloc[0]
  996. end_time = group['simTime'].iloc[-1]
  997. duration = end_time - start_time
  998. if duration >= T_diff: # 只记录持续时间超过阈值的事件
  999. shake_events.append({
  1000. 'start_time': start_time,
  1001. 'end_time': end_time,
  1002. 'start_frame': group['simFrame'].iloc[0],
  1003. 'end_frame': group['simFrame'].iloc[-1],
  1004. 'duration': duration,
  1005. 'max_lat_acc': group['lat_acc'].abs().max()
  1006. })
  1007. # 添加到不舒适事件表
  1008. new_row = pd.DataFrame([{
  1009. 'start_time': start_time,
  1010. 'end_time': end_time,
  1011. 'start_frame': group['simFrame'].iloc[0],
  1012. 'end_frame': group['simFrame'].iloc[-1],
  1013. 'type': 'shake'
  1014. }])
  1015. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  1016. # 更新晃动计数
  1017. self.shake_count = len(shake_events)
  1018. self.logger.info(f"检测到 {self.shake_count} 次晃动事件")
  1019. # 更新ego_df中的相关列
  1020. self.ego_df = df.copy()
  1021. # 保存晃动事件数据到CSV文件,用于后续图表生成
  1022. if shake_events:
  1023. try:
  1024. # 保存用于绘图的数据
  1025. self.shake_events = shake_events
  1026. self.logger.info(f"检测到 {len(shake_events)} 个晃动事件,已保存用于图表生成")
  1027. except Exception as e:
  1028. self.logger.error(f"保存晃动事件数据失败: {str(e)}", exc_info=True)
  1029. return shake_events
  1030. def _cadence_detector(self):
  1031. """
  1032. 顿挫检测:短时间内出现多次加速度突变(cadence),例如频繁加减速。
  1033. - 特征点定义为 cadence 变化值非 0 的时刻;
  1034. - 聚类这些特征点,1s 内聚为一组,去除小于 1 个点的组;
  1035. - 统计强度与频率,记录不适事件。
  1036. """
  1037. # 检查必要字段是否存在
  1038. required_fields = ['simTime', 'simFrame', 'cadence']
  1039. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in self.ego_df.columns else 'lon_acc'
  1040. if not all(field in self.ego_df.columns for field in required_fields + [acc_field]):
  1041. missing_fields = [field for field in required_fields + [acc_field] if field not in self.ego_df.columns]
  1042. self.logger.warning(f"顿挫检测缺少必要字段: {missing_fields},无法进行检测")
  1043. self.cadence_count = 0
  1044. return []
  1045. # 提取必要字段
  1046. df = self.ego_df[['simTime', 'simFrame', acc_field, 'cadence']].copy()
  1047. # 重命名列以保持代码一致性
  1048. df.rename(columns={acc_field: 'acc_used'}, inplace=True)
  1049. # 滤除无效cadence值
  1050. df = df[df['cadence'].notna()].copy()
  1051. df['cadence_diff'] = df['cadence'].diff()
  1052. df.dropna(subset=['cadence_diff'], inplace=True)
  1053. df = df[df['cadence_diff'] != 0]
  1054. if df.empty:
  1055. self.logger.info("未检测到明显cadence变化,未触发顿挫事件")
  1056. self.cadence_count = 0
  1057. return []
  1058. # 提取突变点信息
  1059. time_list = df['simTime'].tolist()
  1060. frame_list = df['simFrame'].tolist()
  1061. # 聚类突变点:按时间差小于 TIME_RANGE 分组
  1062. TIME_RANGE = 1.0 # 秒
  1063. grouped_times, grouped_frames = [], []
  1064. temp_times, temp_frames = [], []
  1065. for i in range(len(time_list)):
  1066. if not temp_times or (time_list[i] - temp_times[-1] <= TIME_RANGE):
  1067. temp_times.append(time_list[i])
  1068. temp_frames.append(frame_list[i])
  1069. else:
  1070. if len(temp_times) >= 1:
  1071. grouped_times.append(temp_times)
  1072. grouped_frames.append(temp_frames)
  1073. temp_times, temp_frames = [time_list[i]], [frame_list[i]]
  1074. if len(temp_times) >= 1:
  1075. grouped_times.append(temp_times)
  1076. grouped_frames.append(temp_frames)
  1077. # 只保留有效顿挫组
  1078. cadence_time_ranges = [[g[0], g[-1]] for g in grouped_times]
  1079. cadence_frame_ranges = [[g[0], g[-1]] for g in grouped_frames]
  1080. # 输出结果到 discomfort_df
  1081. if cadence_time_ranges:
  1082. discomfort_df = pd.DataFrame({
  1083. 'start_time': [g[0] for g in cadence_time_ranges],
  1084. 'end_time': [g[1] for g in cadence_time_ranges],
  1085. 'start_frame': [g[0] for g in cadence_frame_ranges],
  1086. 'end_frame': [g[1] for g in cadence_frame_ranges],
  1087. 'type': 'cadence'
  1088. })
  1089. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  1090. # 计算强度与频率(用于量化)
  1091. stre_list, freq_list = [], []
  1092. for group_times in grouped_times:
  1093. g_df = df[df['simTime'].isin(group_times)]
  1094. strength = g_df['acc_used'].abs().mean()
  1095. stre_list.append(strength)
  1096. if len(group_times) > 1:
  1097. t_delta = group_times[-1] - group_times[0]
  1098. freq = len(group_times) / t_delta if t_delta > 0 else 0
  1099. freq_list.append(freq)
  1100. # 存储检测统计
  1101. self.cadence_count = len(cadence_time_ranges)
  1102. cadence_strength = np.mean(stre_list) if stre_list else 0
  1103. self.logger.info(f"检测到 {self.cadence_count} 次顿挫事件,平均强度:{cadence_strength:.2f}")
  1104. # 记录使用的加速度字段
  1105. self.logger.info(f"顿挫检测使用的加速度字段: {acc_field}")
  1106. return cadence_time_ranges
  1107. def _slam_brake_detector(self):
  1108. """检测急刹车事件"""
  1109. # 获取数据
  1110. df = self.ego_df.copy()
  1111. # 检查是否有必要的列
  1112. if 'slam_brake' not in df.columns:
  1113. self.logger.warning("缺少计算急刹车指标所需的数据列")
  1114. return
  1115. # 设置急刹车检测参数
  1116. min_duration = 0.5 # 最小持续时间 秒
  1117. # 检测连续的急刹车事件
  1118. slam_brake_events = []
  1119. in_event = False
  1120. start_idx = 0
  1121. for i, row in df.iterrows():
  1122. if row['slam_brake'] == 1 and not in_event:
  1123. # 开始新的急刹车事件
  1124. in_event = True
  1125. start_idx = i
  1126. elif row['slam_brake'] == 0 and in_event:
  1127. # 结束当前急刹车事件
  1128. in_event = False
  1129. end_idx = i - 1
  1130. # 计算事件持续时间
  1131. start_time = df.loc[start_idx, 'simTime']
  1132. end_time = df.loc[end_idx, 'simTime']
  1133. duration = end_time - start_time
  1134. # 如果持续时间超过阈值,记录为有效急刹车事件
  1135. if duration >= min_duration:
  1136. # 确定使用的加速度字段
  1137. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in df.columns else 'lon_acc'
  1138. slam_brake_events.append({
  1139. 'start_time': start_time,
  1140. 'end_time': end_time,
  1141. 'start_frame': df.loc[start_idx, 'simFrame'],
  1142. 'end_frame': df.loc[end_idx, 'simFrame'],
  1143. 'duration': duration,
  1144. 'min_lon_acc': df.loc[start_idx:end_idx, acc_field].min()
  1145. })
  1146. # 添加到不舒适事件表
  1147. new_row = pd.DataFrame([{
  1148. 'start_time': start_time,
  1149. 'end_time': end_time,
  1150. 'start_frame': df.loc[start_idx, 'simFrame'],
  1151. 'end_frame': df.loc[end_idx, 'simFrame'],
  1152. 'type': 'slam_brake'
  1153. }])
  1154. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  1155. # 如果最后一个事件没有结束,检查它
  1156. if in_event:
  1157. end_idx = len(df) - 1
  1158. start_time = df.loc[start_idx, 'simTime']
  1159. end_time = df.loc[end_idx, 'simTime']
  1160. duration = end_time - start_time
  1161. if duration >= min_duration:
  1162. # 确定使用的加速度字段
  1163. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in df.columns else 'lon_acc'
  1164. slam_brake_events.append({
  1165. 'start_time': start_time,
  1166. 'end_time': end_time,
  1167. 'start_frame': df.loc[start_idx, 'simFrame'],
  1168. 'end_frame': df.loc[end_idx, 'simFrame'],
  1169. 'duration': duration,
  1170. 'min_lon_acc': df.loc[start_idx:end_idx, acc_field].min()
  1171. })
  1172. # 添加到不舒适事件表
  1173. new_row = pd.DataFrame([{
  1174. 'start_time': start_time,
  1175. 'end_time': end_time,
  1176. 'start_frame': df.loc[start_idx, 'simFrame'],
  1177. 'end_frame': df.loc[end_idx, 'simFrame'],
  1178. 'type': 'slam_brake'
  1179. }])
  1180. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  1181. # 更新急刹车计数
  1182. self.slam_brake_count = len(slam_brake_events)
  1183. self.logger.info(f"检测到 {self.slam_brake_count} 次急刹车事件")
  1184. def _slam_accel_detector(self):
  1185. """检测急加速事件"""
  1186. # 获取数据
  1187. df = self.ego_df.copy()
  1188. # 检查是否有必要的列
  1189. if 'slam_accel' not in df.columns:
  1190. self.logger.warning("缺少计算急加速指标所需的数据列")
  1191. return
  1192. # 设置急加速检测参数
  1193. min_duration = 0.5 # 最小持续时间 秒
  1194. # 检测连续的急加速事件
  1195. slam_accel_events = []
  1196. in_event = False
  1197. start_idx = 0
  1198. for i, row in df.iterrows():
  1199. if row['slam_accel'] == 1 and not in_event:
  1200. # 开始新的急加速事件
  1201. in_event = True
  1202. start_idx = i
  1203. elif row['slam_accel'] == 0 and in_event:
  1204. # 结束当前急加速事件
  1205. in_event = False
  1206. end_idx = i - 1
  1207. # 计算事件持续时间
  1208. start_time = df.loc[start_idx, 'simTime']
  1209. end_time = df.loc[end_idx, 'simTime']
  1210. duration = end_time - start_time
  1211. # 如果持续时间超过阈值,记录为有效急加速事件
  1212. if duration >= min_duration:
  1213. # 确定使用的加速度字段
  1214. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in df.columns else 'lon_acc'
  1215. slam_accel_events.append({
  1216. 'start_time': start_time,
  1217. 'end_time': end_time,
  1218. 'start_frame': df.loc[start_idx, 'simFrame'],
  1219. 'end_frame': df.loc[end_idx, 'simFrame'],
  1220. 'duration': duration,
  1221. 'max_lon_acc': df.loc[start_idx:end_idx, acc_field].max()
  1222. })
  1223. # 添加到不舒适事件表
  1224. new_row = pd.DataFrame([{
  1225. 'start_time': start_time,
  1226. 'end_time': end_time,
  1227. 'start_frame': df.loc[start_idx, 'simFrame'],
  1228. 'end_frame': df.loc[end_idx, 'simFrame'],
  1229. 'type': 'slam_accel'
  1230. }])
  1231. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  1232. # 如果最后一个事件没有结束,检查它
  1233. if in_event:
  1234. end_idx = len(df) - 1
  1235. start_time = df.loc[start_idx, 'simTime']
  1236. end_time = df.loc[end_idx, 'simTime']
  1237. duration = end_time - start_time
  1238. if duration >= min_duration:
  1239. # 确定使用的加速度字段
  1240. acc_field = 'lon_acc_vehicle' if 'lon_acc_vehicle' in df.columns else 'lon_acc'
  1241. slam_accel_events.append({
  1242. 'start_time': start_time,
  1243. 'end_time': end_time,
  1244. 'start_frame': df.loc[start_idx, 'simFrame'],
  1245. 'end_frame': df.loc[end_idx, 'simFrame'],
  1246. 'duration': duration,
  1247. 'max_lon_acc': df.loc[start_idx:end_idx, acc_field].max()
  1248. })
  1249. # 添加到不舒适事件表
  1250. new_row = pd.DataFrame([{
  1251. 'start_time': start_time,
  1252. 'end_time': end_time,
  1253. 'start_frame': df.loc[start_idx, 'simFrame'],
  1254. 'end_frame': df.loc[end_idx, 'simFrame'],
  1255. 'type': 'slam_accel'
  1256. }])
  1257. self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
  1258. # 更新急加速计数
  1259. self.slam_accel_count = len(slam_accel_events)
  1260. self.logger.info(f"检测到 {self.slam_accel_count} 次急加速事件")
  1261. class ComfortManager:
  1262. """舒适性指标计算主类"""
  1263. def __init__(self, data_processed):
  1264. self.data = data_processed
  1265. self.logger = LogManager().get_logger()
  1266. self.registry = ComfortRegistry(self.data)
  1267. def report_statistic(self):
  1268. """生成舒适性评分报告"""
  1269. comfort_result = self.registry.batch_execute()
  1270. return comfort_result