safety.py 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 安全指标计算模块
  5. """
  6. import os
  7. import numpy as np
  8. import pandas as pd
  9. import math
  10. import matplotlib.pyplot as plt
  11. import scipy.integrate as spi
  12. from collections import defaultdict
  13. from typing import Dict, Any, List, Optional
  14. from pathlib import Path
  15. from modules.lib.score import Score
  16. from modules.lib.log_manager import LogManager
  17. from modules.lib.chart_generator import generate_safety_chart_data
  18. # 安全指标相关常量
  19. SAFETY_INFO = [
  20. "simTime",
  21. "simFrame",
  22. "playerId",
  23. "posX",
  24. "posY",
  25. "posH",
  26. "speedX",
  27. "speedY",
  28. "accelX",
  29. "accelY",
  30. "v",
  31. "type"
  32. ]
  33. # ----------------------
  34. # 独立指标计算函数
  35. # ----------------------
  36. def calculate_ttc(data_processed) -> dict:
  37. """计算TTC (Time To Collision)"""
  38. if data_processed is None or not hasattr(data_processed, 'object_df'):
  39. return {"TTC": None}
  40. try:
  41. safety = SafetyCalculator(data_processed)
  42. ttc_value = safety.get_ttc_value()
  43. # 只生成图表,数据导出由chart_generator处理
  44. if safety.ttc_data:
  45. safety.generate_metric_chart('TTC')
  46. LogManager().get_logger().info(f"安全指标[TTC]计算结果: {ttc_value}")
  47. return {"TTC": ttc_value}
  48. except Exception as e:
  49. LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
  50. return {"TTC": None}
  51. def calculate_mttc(data_processed) -> dict:
  52. """计算MTTC (Modified Time To Collision)"""
  53. if data_processed is None or not hasattr(data_processed, 'object_df'):
  54. return {"MTTC": None}
  55. try:
  56. safety = SafetyCalculator(data_processed)
  57. mttc_value = safety.get_mttc_value()
  58. if safety.mttc_data:
  59. safety.generate_metric_chart('MTTC')
  60. LogManager().get_logger().info(f"安全指标[MTTC]计算结果: {mttc_value}")
  61. return {"MTTC": mttc_value}
  62. except Exception as e:
  63. LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
  64. return {"MTTC": None}
  65. def calculate_thw(data_processed) -> dict:
  66. """计算THW (Time Headway)"""
  67. if data_processed is None or not hasattr(data_processed, 'object_df'):
  68. return {"THW": None}
  69. try:
  70. safety = SafetyCalculator(data_processed)
  71. thw_value = safety.get_thw_value()
  72. if safety.thw_data:
  73. safety.generate_metric_chart('THW')
  74. LogManager().get_logger().info(f"安全指标[THW]计算结果: {thw_value}")
  75. return {"THW": thw_value}
  76. except Exception as e:
  77. LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
  78. return {"THW": None}
  79. def calculate_tlc(data_processed) -> dict:
  80. """计算TLC (Time to Line Crossing)"""
  81. if data_processed is None or not hasattr(data_processed, 'object_df'):
  82. return {"TLC": None}
  83. try:
  84. safety = SafetyCalculator(data_processed)
  85. tlc_value = safety.get_tlc_value()
  86. if safety.tlc_data:
  87. safety.generate_metric_chart('TLC')
  88. LogManager().get_logger().info(f"安全指标[TLC]计算结果: {tlc_value}")
  89. return {"TLC": tlc_value}
  90. except Exception as e:
  91. LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
  92. return {"TLC": None}
  93. def calculate_ttb(data_processed) -> dict:
  94. """计算TTB (Time to Brake)"""
  95. if data_processed is None or not hasattr(data_processed, 'object_df'):
  96. return {"TTB": None}
  97. try:
  98. safety = SafetyCalculator(data_processed)
  99. ttb_value = safety.get_ttb_value()
  100. if safety.ttb_data:
  101. safety.generate_metric_chart('TTB')
  102. LogManager().get_logger().info(f"安全指标[TTB]计算结果: {ttb_value}")
  103. return {"TTB": ttb_value}
  104. except Exception as e:
  105. LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
  106. return {"TTB": None}
  107. def calculate_tm(data_processed) -> dict:
  108. """计算TM (Time Margin)"""
  109. if data_processed is None or not hasattr(data_processed, 'object_df'):
  110. return {"TM": None}
  111. try:
  112. safety = SafetyCalculator(data_processed)
  113. tm_value = safety.get_tm_value()
  114. if safety.tm_data:
  115. safety.generate_metric_chart('TM')
  116. LogManager().get_logger().info(f"安全指标[TM]计算结果: {tm_value}")
  117. return {"TM": tm_value}
  118. except Exception as e:
  119. LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
  120. return {"TM": None}
  121. def calculate_dtc(data_processed) -> dict:
  122. """计算DTC (Distance to Collision)"""
  123. if data_processed is None or not hasattr(data_processed, 'object_df'):
  124. return {"DTC": None}
  125. try:
  126. safety = SafetyCalculator(data_processed)
  127. dtc_value = safety.get_dtc_value()
  128. LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}")
  129. return {"DTC": dtc_value}
  130. except Exception as e:
  131. LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
  132. return {"DTC": None}
  133. def calculate_pet(data_processed) -> dict:
  134. """计算PET (Post Encroachment Time)"""
  135. if data_processed is None or not hasattr(data_processed, 'object_df'):
  136. return {"PET": None}
  137. try:
  138. safety = SafetyCalculator(data_processed)
  139. pet_value = safety.get_dtc_value()
  140. LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}")
  141. return {"PET": pet_value}
  142. except Exception as e:
  143. LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True)
  144. return {"PET": None}
  145. def calculate_psd(data_processed) -> dict:
  146. """计算PSD (Potential Safety Distance)"""
  147. if data_processed is None or not hasattr(data_processed, 'object_df'):
  148. return {"PSD": None}
  149. try:
  150. safety = SafetyCalculator(data_processed)
  151. psd_value = safety.get_psd_value()
  152. LogManager().get_logger().info(f"安全指标[PSD]计算结果: {psd_value}")
  153. return {"PSD": psd_value}
  154. except Exception as e:
  155. LogManager().get_logger().error(f"PSD计算异常: {str(e)}", exc_info=True)
  156. return {"PSD": None}
  157. def calculate_collisionrisk(data_processed) -> dict:
  158. """计算碰撞风险"""
  159. if data_processed is None or not hasattr(data_processed, 'object_df'):
  160. return {"collisionRisk": None}
  161. try:
  162. safety = SafetyCalculator(data_processed)
  163. collision_risk_value = safety.get_collision_risk_value()
  164. if safety.collision_risk_data:
  165. safety.generate_metric_chart('collisionRisk')
  166. LogManager().get_logger().info(f"安全指标[collisionRisk]计算结果: {collision_risk_value}")
  167. return {"collisionRisk": collision_risk_value}
  168. except Exception as e:
  169. LogManager().get_logger().error(f"collisionRisk计算异常: {str(e)}", exc_info=True)
  170. return {"collisionRisk": None}
  171. def calculate_lonsd(data_processed) -> dict:
  172. """计算纵向安全距离"""
  173. safety = SafetyCalculator(data_processed)
  174. lonsd_value = safety.get_lonsd_value()
  175. if safety.lonsd_data:
  176. safety.generate_metric_chart('LonSD')
  177. LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}")
  178. return {"LonSD": lonsd_value}
  179. def calculate_latsd(data_processed) -> dict:
  180. """计算横向安全距离"""
  181. if data_processed is None or not hasattr(data_processed, 'object_df'):
  182. return {"LatSD": None}
  183. try:
  184. safety = SafetyCalculator(data_processed)
  185. latsd_value = safety.get_latsd_value()
  186. if safety.latsd_data:
  187. # 只生成图表,数据导出由chart_generator处理
  188. safety.generate_metric_chart('LatSD')
  189. LogManager().get_logger().info(f"安全指标[LatSD]计算结果: {latsd_value}")
  190. return {"LatSD": latsd_value}
  191. except Exception as e:
  192. LogManager().get_logger().error(f"LatSD计算异常: {str(e)}", exc_info=True)
  193. return {"LatSD": None}
  194. def calculate_btn(data_processed) -> dict:
  195. """计算制动威胁数"""
  196. if data_processed is None or not hasattr(data_processed, 'object_df'):
  197. return {"BTN": None}
  198. try:
  199. safety = SafetyCalculator(data_processed)
  200. btn_value = safety.get_btn_value()
  201. if safety.btn_data:
  202. # 只生成图表,数据导出由chart_generator处理
  203. safety.generate_metric_chart('BTN')
  204. LogManager().get_logger().info(f"安全指标[BTN]计算结果: {btn_value}")
  205. return {"BTN": btn_value}
  206. except Exception as e:
  207. LogManager().get_logger().error(f"BTN计算异常: {str(e)}", exc_info=True)
  208. return {"BTN": None}
  209. def calculate_collisionseverity(data_processed) -> dict:
  210. """计算碰撞严重性"""
  211. if data_processed is None or not hasattr(data_processed, 'object_df'):
  212. return {"collisionSeverity": None}
  213. try:
  214. safety = SafetyCalculator(data_processed)
  215. collision_severity_value = safety.get_collision_severity_value()
  216. if safety.collision_severity_data:
  217. # 只生成图表,数据导出由chart_generator处理
  218. safety.generate_metric_chart('collisionSeverity')
  219. LogManager().get_logger().info(f"安全指标[collisionSeverity]计算结果: {collision_severity_value}")
  220. return {"collisionSeverity": collision_severity_value}
  221. except Exception as e:
  222. LogManager().get_logger().error(f"collisionSeverity计算异常: {str(e)}", exc_info=True)
  223. return {"collisionSeverity": None}
  224. class SafetyRegistry:
  225. """安全指标注册器"""
  226. def __init__(self, data_processed):
  227. self.logger = LogManager().get_logger()
  228. self.data = data_processed
  229. self.safety_config = data_processed.safety_config["safety"]
  230. self.metrics = self._extract_metrics(self.safety_config)
  231. self._registry = self._build_registry()
  232. def _extract_metrics(self, config_node: dict) -> list:
  233. """从配置中提取指标名称"""
  234. metrics = []
  235. def _recurse(node):
  236. if isinstance(node, dict):
  237. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  238. metrics.append(node['name'])
  239. for v in node.values():
  240. _recurse(v)
  241. _recurse(config_node)
  242. self.logger.info(f'评比的安全指标列表:{metrics}')
  243. return metrics
  244. def _build_registry(self) -> dict:
  245. """构建指标函数注册表"""
  246. registry = {}
  247. for metric_name in self.metrics:
  248. func_name = f"calculate_{metric_name.lower()}"
  249. if func_name in globals():
  250. registry[metric_name] = globals()[func_name]
  251. else:
  252. self.logger.warning(f"未实现安全指标函数: {func_name}")
  253. return registry
  254. def batch_execute(self) -> dict:
  255. """批量执行指标计算"""
  256. results = {}
  257. for name, func in self._registry.items():
  258. try:
  259. result = func(self.data)
  260. results.update(result)
  261. except Exception as e:
  262. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  263. results[name] = None
  264. self.logger.info(f'安全指标计算结果:{results}')
  265. return results
  266. class SafeManager:
  267. """安全指标管理类"""
  268. def __init__(self, data_processed):
  269. self.data = data_processed
  270. self.registry = SafetyRegistry(self.data)
  271. def report_statistic(self):
  272. """计算并报告安全指标结果"""
  273. safety_result = self.registry.batch_execute()
  274. return safety_result
  275. class SafetyCalculator:
  276. """安全指标计算类 - 兼容Safe类风格"""
  277. def __init__(self, data_processed):
  278. self.logger = LogManager().get_logger()
  279. self.data_processed = data_processed
  280. self.df = data_processed.object_df.copy()
  281. self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据
  282. self.obj_id_list = data_processed.obj_id_list
  283. self.metric_list = [
  284. 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
  285. ]
  286. # 初始化默认值
  287. self.calculated_value = {
  288. "TTC": 10.0,
  289. "MTTC": 10.0,
  290. "THW": 10.0,
  291. "TLC": 10.0,
  292. "TTB": 10.0,
  293. "TM": 10.0,
  294. # "MPrTTC": 10.0,
  295. "PET": 10.0,
  296. "DTC": 10.0,
  297. "PSD": 10.0,
  298. "LatSD": 3.0,
  299. "BTN": 1.0,
  300. "collisionRisk": 0.0,
  301. "collisionSeverity": 0.0,
  302. }
  303. self.time_list = self.ego_df['simTime'].values.tolist()
  304. self.frame_list = self.ego_df['simFrame'].values.tolist()
  305. self.collisionRisk = 0
  306. self.empty_flag = True
  307. # 初始化数据存储列表
  308. self.ttc_data = []
  309. self.mttc_data = []
  310. self.thw_data = []
  311. self.tlc_data = []
  312. self.ttb_data = []
  313. self.tm_data = []
  314. self.lonsd_data = []
  315. self.latsd_data = []
  316. self.btn_data = []
  317. self.collision_risk_data = []
  318. self.collision_severity_data = []
  319. # 初始化安全事件记录表
  320. self.unsafe_events_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  321. # 设置输出目录
  322. self.output_dir = os.path.join(os.getcwd(), 'data')
  323. os.makedirs(self.output_dir, exist_ok=True)
  324. self.logger.info("SafetyCalculator初始化完成,场景中包含自车的目标物一共为: %d", len(self.obj_id_list))
  325. if len(self.obj_id_list) > 1:
  326. self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  327. self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  328. self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  329. self.unsafe_acce_drac_df = pd.DataFrame(
  330. columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  331. self.unsafe_acce_xtn_df = pd.DataFrame(
  332. columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  333. self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  334. self.most_dangerous = {}
  335. self.pass_percent = {}
  336. self.logger.info("开始执行安全参数计算 _safe_param_cal_new")
  337. self._safe_param_cal_new()
  338. self.logger.info("安全参数计算完成")
  339. def _safe_param_cal_new(self):
  340. self.logger.debug("进入 _safe_param_cal_new 方法")
  341. # 直接复用Safe类的实现
  342. Tc = 0.3 # 安全距离
  343. rho = self.data_processed.vehicle_config["RHO"]
  344. ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"]
  345. obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"]
  346. ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
  347. ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
  348. ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
  349. ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
  350. #TEMP_COMMENT: x_relative_start_dist 注释开始
  351. #x_relative_start_dist = self.ego_df["x_relative_start_dist"]
  352. # 设置安全指标阈值
  353. self.safety_thresholds = {
  354. 'TTC': {'min': 1.5, 'max': None}, # TTC小于1.5秒视为危险
  355. 'MTTC': {'min': 1.5, 'max': None}, # MTTC小于1.5秒视为危险
  356. 'THW': {'min': 1.0, 'max': None}, # THW小于1.0秒视为危险
  357. 'LonSD': {'min': None, 'max': None}, # 根据实际情况设置
  358. 'LatSD': {'min': 0.5, 'max': None}, # LatSD小于0.5米视为危险
  359. 'BTN': {'min': None, 'max': 0.8}, # BTN大于0.8视为危险
  360. 'collisionRisk': {'min': None, 'max': 30}, # 碰撞风险大于30%视为危险
  361. 'collisionSeverity': {'min': None, 'max': 30} # 碰撞严重性大于30%视为危险
  362. }
  363. obj_dict = defaultdict(dict)
  364. obj_data_dict = self.df.to_dict('records')
  365. for item in obj_data_dict:
  366. obj_dict[item['simFrame']][item['playerId']] = item
  367. df_list = []
  368. EGO_PLAYER_ID = 1
  369. for frame_num in self.frame_list:
  370. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  371. v1 = ego_data['v']
  372. x1 = ego_data['posX']
  373. y1 = ego_data['posY']
  374. h1 = ego_data['posH']
  375. laneOffset = ego_data["laneOffset"]
  376. v_x1 = ego_data['speedX']
  377. v_y1 = ego_data['speedY']
  378. a_x1 = ego_data['accelX']
  379. a_y1 = ego_data['accelY']
  380. a1 = np.sqrt(a_x1 ** 2 + a_y1 ** 2)
  381. for playerId in self.obj_id_list:
  382. if playerId == EGO_PLAYER_ID:
  383. continue
  384. try:
  385. obj_data = obj_dict[frame_num][playerId]
  386. except KeyError:
  387. continue
  388. x2 = obj_data['posX']
  389. y2 = obj_data['posY']
  390. dist = self.dist(x1, y1, x2, y2)
  391. obj_data['dist'] = dist
  392. v_x2 = obj_data['speedX']
  393. v_y2 = obj_data['speedY']
  394. v2 = obj_data['v']
  395. a_x2 = obj_data['accelX']
  396. a_y2 = obj_data['accelY']
  397. a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
  398. dx, dy = x2 - x1, y2 - y1
  399. # 计算目标车相对于自车的方位角
  400. beta = math.atan2(dy, dx)
  401. # 将全局坐标系下的相对位置向量转换到自车坐标系
  402. # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
  403. h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
  404. # 坐标变换
  405. lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
  406. lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值)
  407. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  408. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  409. if lon_d > 100 or lon_d < -5 or lat_d > 4:
  410. continue
  411. self.empty_flag = False
  412. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  413. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  414. relative_v = np.sqrt(vx ** 2 + vy ** 2)
  415. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  416. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  417. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  418. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  419. v_x2, v_y2)
  420. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  421. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  422. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  423. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  424. obj_type = obj_data['type']
  425. TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
  426. MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
  427. THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
  428. TLC = self._cal_TLC(v1, h1, laneOffset)
  429. TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
  430. TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
  431. DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, driver_reaction_time)
  432. # MPrTTC = self._cal_MPrTTC(x_relative_start_dist)
  433. # PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
  434. PET = None
  435. for lane_pos in lane_poss:
  436. lane_posx1 = ast.literal_eval(lane_pos)[0][0]
  437. lane_posy1 = ast.literal_eval(lane_pos)[0][1]
  438. lane_posx2 = ast.literal_eval(lane_pos)[-1][0]
  439. lane_posy2 = ast.literal_eval(lane_pos)[-1][1]
  440. for ramp_pos in ramp_poss:
  441. ramp_posx1 = ast.literal_eval(ramp_pos)[0][0]
  442. ramp_posy1 = ast.literal_eval(ramp_pos)[0][1]
  443. ramp_posx2 = ast.literal_eval(ramp_pos)[-1][0]
  444. ramp_posy2 = ast.literal_eval(ramp_pos)[-1][1]
  445. ego_posx = x1
  446. ego_posy = y1
  447. obj_posx = x2
  448. obj_posy = y2
  449. delta_t = self._cal_reaction_time_to_avgspeed(self.ego_df)
  450. lane_width = self.ego_df["lane_width"].iloc[0]
  451. PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2,
  452. ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
  453. PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
  454. LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
  455. lat_dist = 0.5
  456. v_right = v1
  457. v_left = v2
  458. a_right_lat_brake_min = 1
  459. a_left_lat_brake_min = 1
  460. a_lat_max = 5
  461. LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  462. a_left_lat_brake_min, a_lat_max)
  463. # 使用自车坐标系下的纵向加速度
  464. lon_a1 = a_x1 * math.cos(h1_rad) + a_y1 * math.sin(h1_rad)
  465. lon_a2 = a_x2 * math.cos(h1_rad) + a_y2 * math.sin(h1_rad)
  466. lon_a = abs(lon_a1 - lon_a2)
  467. lon_d = max(0.1, lon_d) # 确保纵向距离为正
  468. lon_v = v_x1 * math.cos(h1_rad) + v_y1 * math.sin(h1_rad)
  469. BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  470. # 使用自车坐标系下的横向加速度
  471. lat_a1 = -a_x1 * math.sin(h1_rad) + a_y1 * math.cos(h1_rad)
  472. lat_a2 = -a_x2 * math.sin(h1_rad) + a_y2 * math.cos(h1_rad)
  473. lat_a = abs(lat_a1 - lat_a2)
  474. lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad)
  475. obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (-v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad))
  476. obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad))
  477. TTC = None if (TTC is None or TTC < 0) else TTC
  478. MTTC = None if (MTTC is None or MTTC < 0) else MTTC
  479. THW = None if (THW is None or THW < 0) else THW
  480. TLC = None if (TLC is None or TLC < 0) else TLC
  481. TTB = None if (TTB is None or TTB < 0) else TTB
  482. TM = None if (TM is None or TM < 0) else TM
  483. DTC = None if (DTC is None or DTC < 0) else DTC
  484. PET = None if (PET is None or PET < 0) else PET
  485. PSD = None if (PSD is None or PSD < 0) else PSD
  486. obj_dict[frame_num][playerId]['TTC'] = TTC
  487. obj_dict[frame_num][playerId]['MTTC'] = MTTC
  488. obj_dict[frame_num][playerId]['THW'] = THW
  489. obj_dict[frame_num][playerId]['TLC'] = TLC
  490. obj_dict[frame_num][playerId]['TTB'] = TTB
  491. obj_dict[frame_num][playerId]['TM'] = TM
  492. obj_dict[frame_num][playerId]['DTC'] = DTC
  493. obj_dict[frame_num][playerId]['PET'] = PET
  494. obj_dict[frame_num][playerId]['PSD'] = PSD
  495. obj_dict[frame_num][playerId]['LonSD'] = LonSD
  496. obj_dict[frame_num][playerId]['LatSD'] = LatSD
  497. obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
  498. # TTC要进行筛选,否则会出现nan或者TTC过大的情况
  499. if not TTC or TTC > 4000: # threshold = 4258.41
  500. collisionSeverity = 0
  501. pr_death = 0
  502. collisionRisk = 0
  503. else:
  504. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  505. collisionSeverity = 1 - result
  506. pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  507. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  508. obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
  509. obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
  510. obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
  511. df_fnum = pd.DataFrame(obj_dict[frame_num].values())
  512. df_list.append(df_fnum)
  513. df_safe = pd.concat(df_list)
  514. col_list = ['simTime', 'simFrame', 'playerId',
  515. 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN',
  516. 'collisionSeverity', 'pr_death', 'collisionRisk']
  517. self.df_safe = df_safe[col_list].reset_index(drop=True)
  518. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  519. # 计算 AB 连线的向量 AB
  520. # dx = x2 - x1
  521. # dy = y2 - y1
  522. # 计算 AB 连线的模长 |AB|
  523. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  524. # 计算 AB 连线的单位向量 U_AB
  525. U_ABx = dx / AB_mod
  526. U_ABy = dy / AB_mod
  527. # 计算 A 在 AB 连线上的速度 V1_on_AB
  528. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  529. return V1_on_AB
  530. def _cal_v_projection(self, dx, dy, vx, vy):
  531. # 计算 AB 连线的向量 AB
  532. # dx = x2 - x1
  533. # dy = y2 - y1
  534. # 计算 AB 连线的模长 |AB|
  535. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  536. # 计算 AB 连线的单位向量 U_AB
  537. U_ABx = dx / AB_mod
  538. U_ABy = dy / AB_mod
  539. # 计算 A 相对于 B 的速度 V_relative
  540. # vx = vx1 - vx2
  541. # vy = vy1 - vy2
  542. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  543. V_on_AB = vx * U_ABx + vy * U_ABy
  544. return V_on_AB
  545. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  546. # 计算 AB 连线的向量 AB
  547. # dx = x2 - x1
  548. # dy = y2 - y1
  549. # 计算 θ
  550. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  551. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  552. if V_mod == 0 or AB_mod == 0:
  553. return 0
  554. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  555. theta = math.acos(cos_theta)
  556. # 计算 AB 连线的模长 |AB|
  557. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  558. # 计算 AB 连线的单位向量 U_AB
  559. U_ABx = dx / AB_mod
  560. U_ABy = dy / AB_mod
  561. # 计算 A 相对于 B 的加速度 a_relative
  562. # ax = ax1 - ax2
  563. # ay = ay1 - ay2
  564. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  565. a_on_AB = ax * U_ABx + ay * U_ABy
  566. VA = np.array([v_x1, v_y1])
  567. VB = np.array([v_x2, v_y2])
  568. D_A = np.array([x1, y1])
  569. D_B = np.array([x2, y2])
  570. V_r = VA - VB
  571. V = np.linalg.norm(V_r)
  572. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  573. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  574. return a_on_AB_back
  575. # 计算相对加速度
  576. def _calculate_derivative(self, a, w, V, theta):
  577. # 计算(V×cos(θ))'的值
  578. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  579. derivative = a - w * V * math.sin(theta)
  580. return derivative
  581. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  582. dx = A[0] - B[0]
  583. dy = A[1] - B[1]
  584. dvx = VA[0] - VB[0]
  585. dvy = VA[1] - VB[1]
  586. # (dx * dvy - dy * dvx)
  587. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  588. return angular_velocity
  589. def _death_pr(self, obj_type, v_relative):
  590. if obj_type == 5:
  591. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  592. else:
  593. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  594. return p_death
  595. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  596. if obj_type == 5:
  597. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  598. else:
  599. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  600. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  601. return collisionRisk
  602. # 求两车之间当前距离
  603. def dist(self, x1, y1, x2, y2):
  604. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  605. return dist
  606. def generate_metric_chart(self, metric_name: str) -> None:
  607. """生成指标图表
  608. Args:
  609. metric_name: 指标名称
  610. """
  611. try:
  612. # 确定输出目录
  613. if self.output_dir is None:
  614. self.output_dir = os.path.join(os.getcwd(), 'data')
  615. os.makedirs(self.output_dir, exist_ok=True)
  616. # 调用图表生成函数
  617. chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
  618. if chart_path:
  619. self.logger.info(f"{metric_name}图表已生成: {chart_path}")
  620. else:
  621. self.logger.warning(f"{metric_name}图表生成失败")
  622. except Exception as e:
  623. self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
  624. # TTC (time to collision)
  625. def _cal_TTC(self, dist, vrel_projection_in_dist):
  626. if vrel_projection_in_dist == 0:
  627. return math.inf
  628. TTC = dist / vrel_projection_in_dist
  629. return TTC
  630. def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
  631. MTTC = math.nan
  632. if arel_projection_in_dist != 0:
  633. tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
  634. if tmp < 0:
  635. return math.nan
  636. t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
  637. t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
  638. if t1 > 0 and t2 > 0:
  639. if t1 >= t2:
  640. MTTC = t2
  641. elif t1 < t2:
  642. MTTC = t1
  643. elif t1 > 0 and t2 <= 0:
  644. MTTC = t1
  645. elif t1 <= 0 and t2 > 0:
  646. MTTC = t2
  647. if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
  648. MTTC = dist / vrel_projection_in_dist
  649. return MTTC
  650. # THW (time headway)
  651. def _cal_THW(self, dist, v_ego_projection_in_dist):
  652. if not v_ego_projection_in_dist:
  653. THW = None
  654. else:
  655. THW = dist / v_ego_projection_in_dist
  656. return THW
  657. # TLC (time to line crossing)
  658. def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
  659. TLC = laneOffset/ego_v/np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
  660. if TLC < 0:
  661. TLC = None
  662. return TLC
  663. def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
  664. if len(x_relative_start_dist):
  665. return None
  666. if (ego_decel_max == 0) or (relative_v == 0):
  667. return self.calculated_value["TTB"]
  668. else:
  669. x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
  670. TTB = (x_relative_start_dist0 + relative_v * relative_v/2/ego_decel_max)/relative_v
  671. return TTB
  672. def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
  673. if len(x_relative_start_dist):
  674. return None
  675. if (a2 == 0) or (v1 == 0):
  676. return self.calculated_value["TM"]
  677. if a1 == 0:
  678. return None
  679. x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
  680. TM = (x_relative_start_dist0 + v2**2/(2*a2) - v1**2/(2*a1)) / v1
  681. return TM
  682. def velocity(self, v_x, v_y):
  683. v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
  684. return v
  685. def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
  686. lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
  687. 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
  688. return lon_dist_min
  689. def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  690. a_left_lat_brake_min, a_lat_max):
  691. # 检查除数是否为零
  692. if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
  693. return self._default_value('LatSD') # 返回默认值
  694. v_right_rho = v_right + rho * a_lat_max
  695. v_left_rho = v_left + rho * a_lat_max
  696. dist_min = lat_dist + (
  697. (v_right + v_right_rho) * rho / 2
  698. + v_right_rho**2 / a_right_lat_brake_min / 2
  699. + ((v_left + v_right_rho) * rho / 2)
  700. + v_left_rho**2 / a_left_lat_brake_min / 2
  701. )
  702. return dist_min
  703. def _cal_DTC(self, v_on_dist, a_on_dist, t):
  704. if a_on_dist == 0:
  705. return None
  706. DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
  707. return DTC
  708. def _cal_PET(self, lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2):
  709. dist1 = self.horizontal_distance(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy)
  710. dist2 = self.horizontal_distance(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy)
  711. if ((dist1 <= lane_width/2) and (self._is_alone_the_road(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy))
  712. and (self._is_in_the_road(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy))
  713. and (dist2 <= lane_width/2) and (a1 != 0) and (a2 != 0)):
  714. dist_ego = np.sqrt((ego_posx - lane_posx1)**2 + (ego_posy-lane_posy1)**2)
  715. dist_obj = np.sqrt((obj_posx - ramp_posx2)**2 + (obj_posy-ramp_posy2)**2)
  716. PET = (-2*v2 + np.sqrt((4* v2**2)-8*a2*(v2*delta_t - dist_obj)))/ 2/ a2 - (2*v1 + np.sqrt((4* v1**2)-8*a1*dist_ego))/ 2/ a1 + delta_t
  717. return PET
  718. else:
  719. return None
  720. def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
  721. if v1 == 0:
  722. return None
  723. else:
  724. if len(x_relative_start_dist) > 0:
  725. x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
  726. PSD = x_relative_start_dist0 * 2 * ego_decel_lon_max / v1
  727. return PSD
  728. else:
  729. return None
  730. # DRAC (decelerate required avoid collision)
  731. def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
  732. dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
  733. if dist_length < 0:
  734. dist_width = dist - (width2 / 2 + width1 / 2)
  735. if dist_width < 0:
  736. return math.inf
  737. else:
  738. d = dist_width
  739. else:
  740. d = dist_length
  741. DRAC = vrel_projection_in_dist ** 2 / (2 * d)
  742. return DRAC
  743. # BTN (brake threat number)
  744. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  745. BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  746. return BTN
  747. # STN (steer threat number)
  748. def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
  749. STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
  750. width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
  751. return STN
  752. # BTN (brake threat number)
  753. def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
  754. BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  755. return BTN
  756. # STN (steer threat number)
  757. def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
  758. STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
  759. return STN
  760. # 追尾碰撞风险
  761. def _normal_distribution(self, x):
  762. mean = 1.32
  763. std_dev = 0.26
  764. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  765. def continuous_group(self, df):
  766. time_list = df['simTime'].values.tolist()
  767. frame_list = df['simFrame'].values.tolist()
  768. group_time = []
  769. group_frame = []
  770. sub_group_time = []
  771. sub_group_frame = []
  772. for i in range(len(frame_list)):
  773. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  774. sub_group_time.append(time_list[i])
  775. sub_group_frame.append(frame_list[i])
  776. else:
  777. group_time.append(sub_group_time)
  778. group_frame.append(sub_group_frame)
  779. sub_group_time = [time_list[i]]
  780. sub_group_frame = [frame_list[i]]
  781. group_time.append(sub_group_time)
  782. group_frame.append(sub_group_frame)
  783. group_time = [g for g in group_time if len(g) >= 2]
  784. group_frame = [g for g in group_frame if len(g) >= 2]
  785. # 输出图表值
  786. time = [[g[0], g[-1]] for g in group_time]
  787. frame = [[g[0], g[-1]] for g in group_frame]
  788. unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  789. unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  790. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  791. return unfunc_df
  792. # 统计最危险的指标
  793. def _safe_statistic_most_dangerous(self):
  794. min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PET', 'PSD']
  795. max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
  796. result = {}
  797. for metric in min_list:
  798. if metric in self.metric_list:
  799. if metric in self.df.columns:
  800. val = self.df[metric].min()
  801. result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
  802. else:
  803. result[metric] = self._default_value(metric)
  804. for metric in max_list:
  805. if metric in self.metric_list:
  806. if metric in self.df.columns:
  807. val = self.df[metric].max()
  808. result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
  809. else:
  810. result[metric] = self._default_value(metric)
  811. return result
  812. def _safe_no_obj_statistic(self):
  813. # 仅有自车时的默认值
  814. result = {metric: self._default_value(metric) for metric in self.metric_list}
  815. return result
  816. def _default_value(self, metric):
  817. # 统一默认值
  818. defaults = {
  819. 'TTC': 10.0,
  820. 'MTTC': 4.2,
  821. 'THW': 2.1,
  822. 'TLC': 10.0,
  823. 'TTB': 10.0,
  824. 'TM': 10.0,
  825. 'DTC': 10.0,
  826. 'PET': 10.0,
  827. 'PSD': 10.0,
  828. 'LonSD': 10.0,
  829. 'LatSD': 2.0,
  830. 'BTN': 1.0,
  831. 'collisionRisk': 0.0,
  832. 'collisionSeverity': 0.0
  833. }
  834. return defaults.get(metric, None)
  835. def report_statistic(self):
  836. if len(self.obj_id_list) == 1:
  837. safety_result = self._safe_no_obj_statistic()
  838. else:
  839. safety_result = self._safe_statistic_most_dangerous()
  840. evaluator = Score(self.data_processed.safety_config)
  841. result = evaluator.evaluate(safety_result)
  842. print("\n[安全性表现及得分情况]")
  843. return result
  844. def get_ttc_value(self) -> float:
  845. if self.empty_flag or self.df_safe is None:
  846. return self._default_value('TTC')
  847. ttc_values = self.df_safe['TTC'].dropna()
  848. ttc_value = float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
  849. # 收集TTC数据
  850. if not ttc_values.empty:
  851. self.ttc_data = []
  852. for time, frame, ttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTC']):
  853. if pd.notnull(ttc):
  854. self.ttc_data.append({'simTime': time, 'simFrame': frame, 'TTC': ttc})
  855. return ttc_value
  856. def get_mttc_value(self) -> float:
  857. if self.empty_flag or self.df_safe is None:
  858. return self._default_value('MTTC')
  859. mttc_values = self.df_safe['MTTC'].dropna()
  860. mttc_value = float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
  861. # 收集MTTC数据
  862. if not mttc_values.empty:
  863. self.mttc_data = []
  864. for time, frame, mttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['MTTC']):
  865. if pd.notnull(mttc):
  866. self.mttc_data.append({'simTime': time, 'simFrame': frame, 'MTTC': mttc})
  867. return mttc_value
  868. def get_thw_value(self) -> float:
  869. if self.empty_flag or self.df_safe is None:
  870. return self._default_value('THW')
  871. thw_values = self.df_safe['THW'].dropna()
  872. thw_value = float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
  873. # 收集THW数据
  874. if not thw_values.empty:
  875. self.thw_data = []
  876. for time, frame, thw in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['THW']):
  877. if pd.notnull(thw):
  878. self.thw_data.append({'simTime': time, 'simFrame': frame, 'THW': thw})
  879. return thw_value
  880. def get_tlc_value(self) -> float:
  881. if self.empty_flag or self.df_safe is None:
  882. return self._default_value('TLC')
  883. tlc_values = self.df_safe['TLC'].dropna()
  884. tlc_value = float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
  885. # 收集TLC数据
  886. if not tlc_values.empty:
  887. self.tlc_data = []
  888. for time, frame, tlc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TLC']):
  889. if pd.notnull(tlc):
  890. self.tlc_data.append({'simTime': time, 'simFrame': frame, 'TLC': tlc})
  891. return tlc_value
  892. def get_ttb_value(self) -> float:
  893. if self.empty_flag or self.df_safe is None:
  894. return self._default_value('TTB')
  895. ttb_values = self.df_safe['TTB'].dropna()
  896. ttb_value = float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
  897. # 收集TTB数据
  898. if not ttb_values.empty:
  899. self.ttb_data = []
  900. for time, frame, ttb in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTB']):
  901. if pd.notnull(ttb):
  902. self.ttb_data.append({'simTime': time, 'simFrame': frame, 'TTB': ttb})
  903. return ttb_value
  904. def get_tm_value(self) -> float:
  905. if self.empty_flag or self.df_safe is None:
  906. return self._default_value('TM')
  907. tm_values = self.df_safe['TM'].dropna()
  908. tm_value = float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
  909. # 收集TM数据
  910. if not tm_values.empty:
  911. self.tm_data = []
  912. for time, frame, tm in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TM']):
  913. if pd.notnull(tm):
  914. self.tm_data.append({'simTime': time, 'simFrame': frame, 'TM': tm})
  915. return tm_value
  916. def get_dtc_value(self) -> float:
  917. if self.empty_flag or self.df_safe is None:
  918. return self._default_value('DTC')
  919. dtc_values = self.df_safe['DTC'].dropna()
  920. return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
  921. def get_pet_value(self) -> float:
  922. if self.empty_flag or self.df_safe is None:
  923. return self._default_value('PET')
  924. pet_values = self.df_safe['PET'].dropna()
  925. return float(pet_values.min()) if not pet_values.empty else self._default_value('PET')
  926. def get_psd_value(self) -> float:
  927. if self.empty_flag or self.df_safe is None:
  928. return self._default_value('PSD')
  929. psd_values = self.df_safe['PSD'].dropna()
  930. return float(psd_values.min()) if not psd_values.empty else self._default_value('PSD')
  931. def get_lonsd_value(self) -> float:
  932. if self.empty_flag or self.df_safe is None:
  933. return self._default_value('LonSD')
  934. lonsd_values = self.df_safe['LonSD'].dropna()
  935. lonsd_value = float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
  936. # 收集LonSD数据
  937. if not lonsd_values.empty:
  938. self.lonsd_data = []
  939. for time, frame, lonsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LonSD']):
  940. if pd.notnull(lonsd):
  941. self.lonsd_data.append({'simTime': time, 'simFrame': frame, 'LonSD': lonsd})
  942. return lonsd_value
  943. def get_latsd_value(self) -> float:
  944. if self.empty_flag or self.df_safe is None:
  945. return self._default_value('LatSD')
  946. latsd_values = self.df_safe['LatSD'].dropna()
  947. # 使用最小值而非平均值,与safety1.py保持一致
  948. latsd_value = float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD')
  949. # 收集LatSD数据
  950. if not latsd_values.empty:
  951. self.latsd_data = []
  952. for time, frame, latsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LatSD']):
  953. if pd.notnull(latsd):
  954. self.latsd_data.append({'simTime': time, 'simFrame': frame, 'LatSD': latsd})
  955. return latsd_value
  956. def get_btn_value(self) -> float:
  957. if self.empty_flag or self.df_safe is None:
  958. return self._default_value('BTN')
  959. btn_values = self.df_safe['BTN'].dropna()
  960. btn_value = float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
  961. # 收集BTN数据
  962. if not btn_values.empty:
  963. self.btn_data = []
  964. for time, frame, btn in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['BTN']):
  965. if pd.notnull(btn):
  966. self.btn_data.append({'simTime': time, 'simFrame': frame, 'BTN': btn})
  967. return btn_value
  968. def get_collision_risk_value(self) -> float:
  969. if self.empty_flag or self.df_safe is None:
  970. return self._default_value('collisionRisk')
  971. risk_values = self.df_safe['collisionRisk'].dropna()
  972. risk_value = float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
  973. # 收集碰撞风险数据
  974. if not risk_values.empty:
  975. self.collision_risk_data = []
  976. for time, frame, risk in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionRisk']):
  977. if pd.notnull(risk):
  978. self.collision_risk_data.append({'simTime': time, 'simFrame': frame, 'collisionRisk': risk})
  979. return risk_value
  980. def get_collision_severity_value(self) -> float:
  981. if self.empty_flag or self.df_safe is None:
  982. return self._default_value('collisionSeverity')
  983. severity_values = self.df_safe['collisionSeverity'].dropna()
  984. severity_value = float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')
  985. # 收集碰撞严重性数据
  986. if not severity_values.empty:
  987. self.collision_severity_data = []
  988. for time, frame, severity in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionSeverity']):
  989. if pd.notnull(severity):
  990. self.collision_severity_data.append({'simTime': time, 'simFrame': frame, 'collisionSeverity': severity})
  991. return severity_value