safety.py 33 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 安全指标计算模块(支持多目标物) - 场景感知版本
  5. 优化要点:
  6. 1. 重构代码结构,提高可读性
  7. 2. 去除冗余代码,优化性能
  8. 3. 保持原始计算逻辑不变
  9. 4. 增加全面注释
  10. 5. 优化数据处理流程
  11. """
  12. import os
  13. import numpy as np
  14. import pandas as pd
  15. import math
  16. import scipy.integrate as spi
  17. from collections import defaultdict
  18. from typing import Dict, Any, List, Optional, Tuple
  19. from pathlib import Path
  20. from modules.lib.score import Score
  21. from modules.lib.log_manager import LogManager
  22. from modules.lib.chart_generator import generate_safety_chart_data
  23. # ==================== 安全指标计算接口函数 ====================
  24. # 每个函数对应一个安全指标的计算入口
  25. # ------------------------------------------------------------
  26. def calculate_ttc(data_processed, plot_path) -> dict:
  27. """计算碰撞时间(TTC)"""
  28. return _calculate_metric('TTC', data_processed, plot_path)
  29. def calculate_mttc(data_processed, plot_path) -> dict:
  30. """计算修正碰撞时间(MTTC)"""
  31. return _calculate_metric('MTTC', data_processed, plot_path)
  32. def calculate_thw(data_processed, plot_path) -> dict:
  33. """计算车头时距(THW)"""
  34. return _calculate_metric('THW', data_processed, plot_path)
  35. def calculate_ttb(data_processed, plot_path) -> dict:
  36. """计算制动时间(TTB)"""
  37. return _calculate_metric('TTB', data_processed, plot_path)
  38. def calculate_tm(data_processed, plot_path) -> dict:
  39. """计算时间裕度(TM)"""
  40. return _calculate_metric('TM', data_processed, plot_path)
  41. def calculate_dtc(data_processed, plot_path) -> dict:
  42. """计算碰撞距离(DTC)"""
  43. return _calculate_metric('DTC', data_processed, plot_path)
  44. def calculate_psd(data_processed, plot_path) -> dict:
  45. """计算预测安全距离比(PSD)"""
  46. return _calculate_metric('PSD', data_processed, plot_path)
  47. def calculate_collisionrisk(data_processed, plot_path) -> dict:
  48. """计算碰撞风险(collisionRisk)"""
  49. return _calculate_metric('collisionRisk', data_processed, plot_path)
  50. def calculate_lonsd(data_processed, plot_path) -> dict:
  51. """计算纵向安全距离(LonSD)"""
  52. return _calculate_metric('LonSD', data_processed, plot_path)
  53. def calculate_latsd(data_processed, plot_path) -> dict:
  54. """计算横向安全距离(LatSD)"""
  55. return _calculate_metric('LatSD', data_processed, plot_path)
  56. def calculate_btn(data_processed, plot_path) -> dict:
  57. """计算制动威胁数(BTN)"""
  58. return _calculate_metric('BTN', data_processed, plot_path)
  59. def calculate_collisionseverity(data_processed, plot_path) -> dict:
  60. """计算碰撞严重性(collisionSeverity)"""
  61. return _calculate_metric('collisionSeverity', data_processed, plot_path)
  62. def _calculate_metric(metric_name, data_processed, plot_path) -> dict:
  63. """安全指标计算的通用处理函数"""
  64. key_name = metric_name
  65. result_key = {metric_name: None}
  66. # 检查输入数据有效性
  67. if data_processed is None or not hasattr(data_processed, 'object_df'):
  68. return result_key
  69. try:
  70. safety = SafetyCalculator(data_processed)
  71. # 特殊处理 collisionRisk 和 collisionSeverity 指标
  72. if metric_name.lower() == 'collisionrisk':
  73. metric_value = safety.get_collision_risk_value()
  74. elif metric_name.lower() == 'collisionseverity':
  75. metric_value = safety.get_collision_severity_value()
  76. else:
  77. # 其他指标使用原有逻辑
  78. metric_value = getattr(safety, f'get_{metric_name.lower()}_value')()
  79. # 生成图表数据
  80. metric_data = getattr(safety, f'{metric_name.lower()}_data', None)
  81. if metric_data:
  82. safety.generate_metric_chart(metric_name, plot_path)
  83. LogManager().get_logger().info(f"安全指标[{metric_name}]计算结果: {metric_value}")
  84. return {metric_name: metric_value}
  85. except Exception as e:
  86. LogManager().get_logger().error(f"{metric_name}计算异常: {str(e)}", exc_info=True)
  87. return result_key
  88. # ==================== 安全指标注册与批处理类 ====================
  89. # 用于根据配置批量执行安全指标计算
  90. # ------------------------------------------------------------
  91. class SafetyRegistry:
  92. """安全指标注册器,根据配置动态注册和执行指标计算"""
  93. def __init__(self, data_processed, plot_path):
  94. self.logger = LogManager().get_logger()
  95. self.data = data_processed
  96. self.plot_path = plot_path
  97. # 检查安全配置
  98. if not hasattr(data_processed, 'safety_config') or not data_processed.safety_config:
  99. self.logger.warning("安全配置为空,跳过安全指标计算")
  100. self.safety_config = {}
  101. self.metrics = []
  102. self._registry = {}
  103. return
  104. self.safety_config = data_processed.safety_config.get("safety", {})
  105. self.metrics = self._extract_metrics(self.safety_config)
  106. self._registry = self._build_registry()
  107. def _extract_metrics(self, config_node: dict) -> list:
  108. """从配置中递归提取所有指标名称"""
  109. metrics = []
  110. def _recurse(node):
  111. if isinstance(node, dict):
  112. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  113. metrics.append(node['name'])
  114. for v in node.values():
  115. _recurse(v)
  116. _recurse(config_node)
  117. self.logger.info(f'评比的安全指标列表: {metrics}')
  118. return metrics
  119. def _build_registry(self) -> dict:
  120. """构建指标名称到计算函数的映射"""
  121. registry = {}
  122. for metric_name in self.metrics:
  123. func_name = f"calculate_{metric_name.lower()}"
  124. if func_name in globals():
  125. registry[metric_name] = globals()[func_name]
  126. else:
  127. self.logger.warning(f"未实现安全指标函数: {func_name}")
  128. return registry
  129. def batch_execute(self) -> dict:
  130. """批量执行所有注册的安全指标计算"""
  131. results = {}
  132. if not hasattr(self, 'safety_config') or not self.safety_config or not self._registry:
  133. self.logger.info("安全配置为空或无注册指标,返回空结果")
  134. return results
  135. for name, func in self._registry.items():
  136. try:
  137. result = func(self.data, self.plot_path)
  138. results.update(result)
  139. except Exception as e:
  140. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  141. results[name] = None
  142. self.logger.info(f'安全指标计算结果: {results}')
  143. return results
  144. class SafeManager:
  145. """安全管理器,封装安全指标计算流程"""
  146. def __init__(self, data_processed, plot_path):
  147. self.data = data_processed
  148. self.logger = LogManager().get_logger()
  149. self.plot_path = plot_path
  150. # 初始化安全指标注册器
  151. if not hasattr(data_processed, 'safety_config') or not data_processed.safety_config:
  152. self.logger.warning("安全配置为空,跳过安全指标计算初始化")
  153. self.registry = None
  154. else:
  155. self.registry = SafetyRegistry(self.data, self.plot_path)
  156. def report_statistic(self):
  157. """生成安全指标统计报告"""
  158. if self.registry is None:
  159. self.logger.info("安全指标管理器未初始化,返回空结果")
  160. return {}
  161. return self.registry.batch_execute()
  162. # ==================== 安全指标计算核心类 ====================
  163. # 包含所有安全指标的具体计算实现
  164. # ------------------------------------------------------------
  165. class SafetyCalculator:
  166. """安全指标计算器,实现各种安全指标的具体计算"""
  167. DEFAULT_VALUES = {
  168. 'TTC': 10.0, 'MTTC': 3.3, 'THW': 2.5,
  169. 'TTB': 10.0, 'TM': 10.0, 'DTC': 10.0, 'PSD': 10.0,
  170. 'LonSD': 100.0, 'LatSD': 2.0, 'BTN': 0.0,
  171. 'collisionRisk': 0.0, 'collisionSeverity': 0.0
  172. }
  173. def __init__(self, data_processed):
  174. self.logger = LogManager().get_logger()
  175. self.data_processed = data_processed
  176. self.df = data_processed.object_df.copy()
  177. self.ego_df = data_processed.ego_data.copy()
  178. self.obj_id_list = data_processed.obj_id_list
  179. # 初始化配置参数
  180. self._init_config_params()
  181. # 初始化数据结构和指标存储
  182. self._init_data_structures()
  183. # 执行安全参数计算
  184. if self.obj_id_list:
  185. self.logger.info("开始执行安全参数计算,目标物数量: %d", len(self.obj_id_list) - 1)
  186. self._calculate_safety_parameters()
  187. self.logger.info("安全参数计算完成")
  188. else:
  189. self.logger.info("没有目标物,跳过安全参数计算")
  190. self.empty_flag = True
  191. def _init_config_params(self):
  192. """从配置中初始化车辆参数"""
  193. config = self.data_processed.vehicle_config
  194. self.rho = config.get("RHO", 1.5) # 默认反应时间1.5秒
  195. self.ego_accel_max = config.get("EGO_ACCEL_MAX", 3.0) # 默认最大加速度3.0 m/s²
  196. self.obj_decel_max = config.get("OBJ_DECEL_MAX", 3.0) # 默认目标最大减速度3.0 m/s²
  197. self.ego_decel_min = config.get("EGO_DECEL_MIN", 4.0) # 默认自车最小减速度4.0 m/s²
  198. self.ego_decel_lon_max = config.get("EGO_DECEL_LON_MAX", 4.0) # 默认纵向最大减速度4.0 m/s²
  199. self.ego_decel_lat_max = config.get("EGO_DECEL_LAT_MAX", 3.0) # 默认横向最大减速度3.0 m/s²
  200. self.ego_width = config.get("CAR_WIDTH", 2.0) # 默认车辆宽度2.0米
  201. self.ego_length = config.get("CAR_LENGTH", 4.5) # 默认车辆长度4.5米
  202. self.vehicle_length = config.get("VEHICLE_LENGTH", 4.5) # 默认车辆长度4.5米
  203. self.vehicle_width = config.get("VEHICLE_WIDTH", 2.0) # 默认车辆宽度2.0米
  204. self.pedestrian_length = config.get("PEDESTRIAN_LENGTH", 0.5) # 默认行人长度0.5米
  205. self.pedestrian_width = config.get("PEDESTRIAN_WIDTH", 0.5) # 默认行人宽度0.5米
  206. # 计算派生参数
  207. self.max_deceleration = self.ego_decel_lon_max
  208. self.ego_decel_max = np.sqrt(self.ego_decel_lon_max ** 2 + self.ego_decel_lat_max ** 2)
  209. self.ped_width = self.pedestrian_width
  210. self.ped_length = self.pedestrian_length
  211. def _init_data_structures(self):
  212. """初始化数据存储结构"""
  213. self.time_list = self.ego_df['simTime'].values.tolist()
  214. self.frame_list = self.ego_df['simFrame'].values.tolist()
  215. self.empty_flag = True # 标记是否有有效目标
  216. # 初始化指标数据存储
  217. self._init_metric_storage()
  218. # 创建输出目录
  219. self.output_dir = os.path.join(os.getcwd(), 'data')
  220. os.makedirs(self.output_dir, exist_ok=True)
  221. def _init_metric_storage(self):
  222. """初始化所有指标的存储列表"""
  223. metrics = [
  224. 'ttc', 'mttc', 'thw', 'ttb', 'tm', 'lonsd', 'latsd', 'btn',
  225. 'collision_risk', 'collision_severity'
  226. ]
  227. for metric in metrics:
  228. setattr(self, f'{metric}_data', [])
  229. def _calculate_safety_parameters(self):
  230. """核心安全参数计算方法"""
  231. # 预处理:构建按帧组织的目标字典
  232. obj_dict = self._preprocess_object_data()
  233. df_list = []
  234. EGO_PLAYER_ID = 1
  235. found_valid_target = False
  236. # 逐帧处理数据
  237. for frame_num in self.frame_list:
  238. try:
  239. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  240. except KeyError:
  241. continue
  242. frame_targets = self._process_frame_targets(frame_num, ego_data, obj_dict, EGO_PLAYER_ID)
  243. if frame_targets:
  244. found_valid_target = True
  245. df_fnum = pd.DataFrame(frame_targets)
  246. df_list.append(df_fnum)
  247. # 合并处理结果
  248. self._postprocess_results(df_list, found_valid_target)
  249. def _preprocess_object_data(self):
  250. """预处理目标数据为按帧组织的字典"""
  251. obj_dict = defaultdict(dict)
  252. obj_data_dict = self.df.to_dict('records')
  253. for item in obj_data_dict:
  254. obj_dict[item['simFrame']][item['playerId']] = item
  255. return obj_dict
  256. def _process_frame_targets(self, frame_num, ego_data, obj_dict, ego_id):
  257. """处理单帧内的所有目标物"""
  258. frame_targets = []
  259. vx_ego = ego_data.get('lon_v_vehicle', 0)
  260. vy_ego = ego_data.get('lat_v_vehicle', 0)
  261. self.lane_width = ego_data.get('lane_width', 3.75)
  262. for player_id in self.obj_id_list:
  263. if player_id == ego_id:
  264. continue
  265. try:
  266. obj_data = obj_dict[frame_num][player_id]
  267. except KeyError:
  268. continue
  269. # 计算目标物安全指标
  270. result = self._calculate_target_metrics(ego_data, obj_data, vx_ego, vy_ego)
  271. obj_data.update(result)
  272. frame_targets.append(obj_data)
  273. return frame_targets
  274. def _calculate_target_metrics(self, ego_data, obj_data, vx_ego, vy_ego):
  275. """计算单个目标物的安全指标"""
  276. # 提取基本参数
  277. obj_type = obj_data.get('type', 0)
  278. lon_d = obj_data.get('x_relative_start_dist', 0)
  279. lat_d = obj_data.get('y_relative_start_dist', 0)
  280. vx_obj = obj_data.get('lon_v_vehicle', 0)
  281. vy_obj = obj_data.get('lat_v_vehicle', 0)
  282. # 计算相对运动参数
  283. vx_rel = vx_obj - vx_ego
  284. vy_rel = vy_obj - vy_ego
  285. dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
  286. # 计算航向角差异
  287. h1 = ego_data['posH']
  288. h2 = obj_data.get('posH', h1)
  289. heading_diff = abs(h1 - h2)
  290. if heading_diff > 180:
  291. heading_diff = 360 - heading_diff
  292. # 计算相对加速度
  293. ax_ego = ego_data.get('lon_acc_vehicle', 0)
  294. ay_ego = ego_data.get('lat_acc_vehicle', 0)
  295. ax_obj = obj_data.get('lon_acc_vehicle', 0)
  296. ay_obj = obj_data.get('lat_acc_vehicle', 0)
  297. ax_rel_ego = ax_obj - ax_ego
  298. ay_rel_ego = ay_obj - ay_ego
  299. # 计算投影
  300. vrel_projection = self._cal_v_projection_using_relative(lon_d, lat_d, vx_rel, vy_rel)
  301. accl_projection = self._cal_a_projection_using_relative(lon_d, lat_d, ax_rel_ego, ay_rel_ego)
  302. # 检查目标相关性
  303. is_relevant = self._is_relevant_target(lat_d, lon_d, vy_rel, heading_diff, obj_type)
  304. # 检查过滤条件
  305. passed_conflict = self._has_passed_conflict_point(lon_d, lat_d, obj_type, heading_diff)
  306. moving_away = self._is_moving_away(
  307. (vx_ego, vy_ego), (vx_obj, vy_obj), lon_d, lat_d, obj_type, heading_diff
  308. )
  309. skip_risk_calculation = passed_conflict and (moving_away or obj_data.get('v', 0) < 0.05)
  310. # 计算指标
  311. if not is_relevant or skip_risk_calculation:
  312. return self._get_default_metrics(obj_type)
  313. else:
  314. return self._calculate_relevant_metrics(
  315. dist, vrel_projection, accl_projection,
  316. lon_d, lat_d, vx_ego, vy_ego, vx_rel, ax_rel_ego,
  317. heading_diff, obj_type, ego_data, obj_data
  318. )
  319. def _get_default_metrics(self, obj_type):
  320. """获取默认指标值"""
  321. return {
  322. 'TTC': self.DEFAULT_VALUES['TTC'],
  323. 'MTTC': self.DEFAULT_VALUES['MTTC'],
  324. 'THW': self.DEFAULT_VALUES['THW'] if obj_type != 5 else None,
  325. 'TTB': self.DEFAULT_VALUES['TTB'] if obj_type != 5 else None,
  326. 'TM': self.DEFAULT_VALUES['TM'] if obj_type != 5 else None,
  327. 'PSD': self.DEFAULT_VALUES['PSD'],
  328. 'DTC': self.DEFAULT_VALUES['DTC'],
  329. 'LonSD': self.DEFAULT_VALUES['LonSD'] if obj_type != 5 else None,
  330. 'LatSD': self.DEFAULT_VALUES['LatSD'] if obj_type != 5 else None,
  331. 'BTN': self.DEFAULT_VALUES['BTN'] if obj_type != 5 else None,
  332. 'collisionSeverity': self.DEFAULT_VALUES['collisionSeverity'],
  333. 'pr_death': 0,
  334. 'collisionRisk': self.DEFAULT_VALUES['collisionRisk']
  335. }
  336. def _calculate_relevant_metrics(self, dist, vrel_projection, accl_projection,
  337. lon_d, lat_d, vx_ego, vy_ego, vx_rel, ax_rel_ego,
  338. heading_diff, obj_type, ego_data, obj_data):
  339. """计算相关目标的指标"""
  340. Tc = 0.3 # 反应时间
  341. # 核心指标计算
  342. metrics = {
  343. 'TTC': self._cal_TTC(dist, vrel_projection),
  344. 'MTTC': self._cal_MTTC(dist, vrel_projection, ego_data.get('lon_acc_vehicle', 0)),
  345. 'THW': self._cal_THW(lon_d, vx_ego),
  346. 'TTB': self._cal_TTB(dist, vrel_projection, self.ego_decel_max) if obj_type != 5 else None,
  347. 'TM': self._cal_TM(vrel_projection, obj_data.get('v', 0),
  348. obj_data.get('lon_acc_vehicle', 0),
  349. ego_data.get('v', 0),
  350. ego_data.get('lon_acc_vehicle', 0)) if obj_type != 5 else None,
  351. 'PSD': self._cal_PSD(dist, vrel_projection, self.max_deceleration,
  352. heading_diff, vx_ego, vy_ego, obj_type, lon_d, lat_d),
  353. 'DTC': self._cal_DTC(vx_rel, ax_rel_ego, self.rho) if abs(heading_diff) < 30 else self.DEFAULT_VALUES[
  354. 'DTC'],
  355. 'LonSD': self._cal_longitudinal_safe_dist(vx_ego, vx_rel, self.rho,
  356. self.ego_decel_min,
  357. self.obj_decel_max) if obj_type != 5 else None,
  358. 'LatSD': self._cal_lateral_safe_dist(abs(lat_d), vy_ego, self.ego_width,
  359. self.lane_width, self.ego_decel_lat_max) if obj_type != 5 else None,
  360. 'BTN': self._cal_BTN_new(ego_data.get('lon_acc_vehicle', 0),
  361. ego_data.get('lon_acc_vehicle', 0),
  362. abs(lon_d), vx_ego, self.ego_decel_lon_max) if obj_type != 5 else None
  363. }
  364. # 处理碰撞风险相关指标
  365. if metrics['TTC'] is None or metrics['TTC'] > 4000:
  366. metrics.update({
  367. 'collisionSeverity': 0,
  368. 'pr_death': 0,
  369. 'collisionRisk': 0
  370. })
  371. else:
  372. try:
  373. result, _ = spi.quad(self._normal_distribution, 0, metrics['TTC'] - Tc)
  374. collisionSeverity = 1 - result
  375. pr_death = self._death_pr(obj_type, vrel_projection)
  376. collisionRisk = self._cal_collisionRisk_level(obj_type, vrel_projection, collisionSeverity)
  377. metrics.update({
  378. 'collisionSeverity': collisionSeverity * 100,
  379. 'pr_death': pr_death * 100,
  380. 'collisionRisk': collisionRisk * 100
  381. })
  382. except Exception as e:
  383. self.logger.error(f"碰撞风险计算错误: {e}")
  384. metrics.update({
  385. 'collisionSeverity': 0,
  386. 'pr_death': 0,
  387. 'collisionRisk': 0
  388. })
  389. return metrics
  390. def _postprocess_results(self, df_list, found_valid_target):
  391. """后处理计算结果"""
  392. self.empty_flag = not found_valid_target
  393. # 合并所有帧数据
  394. if df_list:
  395. self.df_safe = pd.concat(df_list)
  396. col_list = ['simTime', 'simFrame', 'playerId',
  397. 'TTC', 'MTTC', 'THW', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN',
  398. 'collisionSeverity', 'pr_death', 'collisionRisk']
  399. self.df_safe = self.df_safe[col_list].reset_index(drop=True)
  400. else:
  401. self.df_safe = pd.DataFrame()
  402. self.empty_flag = True
  403. self.logger.info(f"处理完成,找到有效目标: {not self.empty_flag}")
  404. # ==================== 核心计算方法 ====================
  405. def _cal_v_projection_using_relative(self, lon_d, lat_d, vx_rel, vy_rel):
  406. """计算速度投影(自车坐标系)"""
  407. dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
  408. if dist < 1e-6:
  409. return 0.0
  410. U_ABx = lon_d / dist
  411. U_ABy = lat_d / dist
  412. return vx_rel * U_ABx + vy_rel * U_ABy
  413. def _cal_a_projection_using_relative(self, lon_d, lat_d, ax_rel, ay_rel):
  414. """计算加速度投影(自车坐标系)"""
  415. dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
  416. if dist < 1e-6:
  417. return 0.0
  418. U_ABx = lon_d / dist
  419. U_ABy = lat_d / dist
  420. return ax_rel * U_ABx + ay_rel * U_ABy
  421. def _cal_TTC(self, dist, vrel_projection):
  422. """计算碰撞时间(TTC)"""
  423. if vrel_projection >= 0: # 不接近
  424. return None
  425. return dist / abs(vrel_projection)
  426. def _cal_MTTC(self, dist, vrel_projection, a_ego):
  427. """计算修正的碰撞时间(MTTC)"""
  428. if vrel_projection >= 0 or dist <= 0 or a_ego >= 0:
  429. return None
  430. try:
  431. if abs(a_ego) < 1e-6:
  432. return dist / abs(vrel_projection) if abs(vrel_projection) > 1e-6 else None
  433. discriminant = vrel_projection ** 2 + 2 * a_ego * dist
  434. if discriminant < 0:
  435. return None
  436. t1 = (-vrel_projection + math.sqrt(discriminant)) / a_ego
  437. t2 = (-vrel_projection - math.sqrt(discriminant)) / a_ego
  438. valid_times = [t for t in (t1, t2) if t > 0]
  439. return min(valid_times) if valid_times else None
  440. except Exception as e:
  441. self.logger.warning(f"MTTC计算错误: {e}")
  442. return None
  443. def _cal_THW(self, lon_d, vx_ego):
  444. """计算车头时距(THW)"""
  445. if vx_ego is None or vx_ego <= 0 or lon_d is None or lon_d <= 0:
  446. return None
  447. return lon_d / vx_ego
  448. def _cal_TTB(self, dist, vrel_projection, ego_decel_max):
  449. """计算制动时间(TTB)"""
  450. if vrel_projection is None or ego_decel_max is None or vrel_projection >= 0:
  451. return None
  452. try:
  453. TTB = (dist + vrel_projection ** 2 / (2 * ego_decel_max) / vrel_projection)
  454. return TTB if TTB > 0 and not (np.isinf(TTB) or np.isnan(TTB)) else None
  455. except Exception as e:
  456. self.logger.warning(f"计算TTB时出错: {e}")
  457. return None
  458. def _cal_TM(self, vrel_projection, v2, a2, v1, a1):
  459. """计算时间裕度(TM)"""
  460. if (vrel_projection is None or v2 is None or a2 is None or a2 <= 0 or
  461. v1 is None or a1 is None or a1 <= 0 or vrel_projection >= 0):
  462. return None
  463. try:
  464. TM = (v2 ** 2 / (2 * a2) - v1 ** 2 / (2 * a1)) / v1
  465. return TM if TM > 0 and not (np.isinf(TM) or np.isnan(TM)) else None
  466. except Exception as e:
  467. self.logger.warning(f"计算TM时出错: {e}")
  468. return None
  469. def _cal_PSD(self, dist, vrel_projection, max_decel, heading_diff, vx_ego, vy_ego, obj_type, relative_x,
  470. relative_y):
  471. """计算预测安全距离比(PSD)"""
  472. abs_heading_diff = abs(heading_diff)
  473. # 根据航向差确定场景类型
  474. if abs_heading_diff < 45:
  475. # 纵向场景
  476. D = abs(relative_x)
  477. v_dir = abs(vx_ego)
  478. min_stopping_dist = (v_dir ** 2) / (2 * self.ego_decel_lon_max)
  479. elif 45 <= abs_heading_diff <= 135:
  480. # 横向场景
  481. D = abs(relative_y)
  482. v_dir = abs(vy_ego)
  483. if obj_type == 5: # 行人
  484. reaction_dist = abs(relative_y) * self.rho
  485. braking_dist = (v_dir ** 2) / (2 * self.ego_decel_lat_max)
  486. min_stopping_dist = reaction_dist + braking_dist
  487. else: # 车辆
  488. min_stopping_dist = (v_dir ** 2) / (2 * self.ego_decel_lat_max)
  489. else: # 对向场景
  490. return self.DEFAULT_VALUES['PSD']
  491. # 处理特殊情况
  492. if min_stopping_dist < 1e-3:
  493. return float('inf') if D > 0.1 else 0.0
  494. return D / min_stopping_dist
  495. def _cal_DTC(self, vx_rel, ax_rel_ego, t):
  496. """计算碰撞距离(DTC)"""
  497. if vx_rel >= 0: # 没有接近风险
  498. return self.DEFAULT_VALUES['DTC']
  499. try:
  500. speed = abs(vx_rel)
  501. reaction_distance = speed * t
  502. braking_distance = (speed ** 2) / (2 * self.ego_decel_lon_max)
  503. dtc = reaction_distance + braking_distance
  504. return max(0, dtc)
  505. except Exception as e:
  506. self.logger.warning(f"DTC计算错误: {e}")
  507. return self.DEFAULT_VALUES['DTC']
  508. def _cal_longitudinal_safe_dist(self, v_ego, v_lead_rel, rho, decel_ego, decel_lead):
  509. """计算纵向安全距离(LonSD)"""
  510. try:
  511. v_lead = v_ego + v_lead_rel
  512. reaction_distance = v_ego * rho
  513. braking_distance_ego = v_ego ** 2 / (2 * decel_ego) if v_ego > 0 else 0
  514. braking_distance_lead = v_lead ** 2 / (2 * decel_lead) if v_lead > 0 else 0
  515. safe_dist = reaction_distance + braking_distance_ego - braking_distance_lead
  516. return max(2.0, safe_dist)
  517. except Exception as e:
  518. self.logger.warning(f"计算纵向安全距离错误: {e}")
  519. return 25.0
  520. def _cal_lateral_safe_dist(self, lat_dist, v_ego_lat, vehicle_width, lane_width, decel_lat_max):
  521. """计算横向安全距离(LatSD)"""
  522. try:
  523. available_space = lane_width - vehicle_width
  524. base_margin = 0.5
  525. speed_margin = 0.1 * abs(v_ego_lat)
  526. safe_dist = min(base_margin + speed_margin, available_space / 2)
  527. return max(base_margin, safe_dist)
  528. except Exception as e:
  529. self.logger.warning(f"计算横向安全距离错误: {e}")
  530. return 1.0
  531. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  532. """计算制动威胁数(BTN)"""
  533. if lon_d is None or lon_d <= 0 or ego_decel_lon_max is None or ego_decel_lon_max <= 0:
  534. return None
  535. try:
  536. return (lon_a1 + lon_a - (lon_v ** 2) / (2 * lon_d)) / ego_decel_lon_max
  537. except:
  538. return None
  539. def _death_pr(self, obj_type, v_relative):
  540. """计算死亡概率"""
  541. if obj_type == 5: # 行人
  542. return 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  543. else: # 车辆
  544. return 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  545. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  546. """计算碰撞风险等级"""
  547. p_death = self._death_pr(obj_type, v_relative)
  548. return 0.4 * p_death + 0.6 * collisionSeverity
  549. def _normal_distribution(self, x, mean=1.32, std_dev=0.26):
  550. """正态分布函数用于碰撞严重性计算"""
  551. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  552. # ==================== 目标物过滤方法 ====================
  553. def _is_relevant_target(self, lat_d, lon_d, v_lat, heading_diff, obj_type):
  554. """判断目标物是否相关"""
  555. abs_lat_d = abs(lat_d)
  556. abs_heading_diff = abs(heading_diff)
  557. lane_half_width = self.lane_width / 2
  558. ego_half_width = self.ego_width / 2
  559. ped_half_width = self.ped_width / 2
  560. safe_width = ego_half_width + ped_half_width + 0.2
  561. # 行人目标处理
  562. if obj_type == 5:
  563. if abs_heading_diff >= 60: # 横穿行人
  564. return (0 < lon_d < 50) and (abs_lat_d < safe_width or abs_lat_d < lane_half_width)
  565. else: # 沿路行人
  566. return (0 < lon_d < 30) and (abs_lat_d < safe_width) and abs(v_lat) < 1.0
  567. # 车辆类目标处理
  568. if abs_heading_diff < 45: # 纵向目标
  569. return abs_lat_d < lane_half_width + 0.1 and abs(lon_d) < 80
  570. if 45 <= abs_heading_diff <= 135: # 横向目标
  571. return 0 < lon_d < 30 and abs_lat_d < ego_half_width + 0.1
  572. return False # 反向目标
  573. def _has_passed_conflict_point(self, relative_x, relative_y, obj_type, heading_diff):
  574. """判断目标是否已越过冲突点"""
  575. if obj_type == 5: # 行人
  576. obj_length = self.pedestrian_length
  577. obj_width = self.pedestrian_width
  578. else: # 车辆
  579. obj_length = self.vehicle_length
  580. obj_width = self.vehicle_width
  581. length_safe_dist = (self.ego_length + obj_length) / 2
  582. width_safe_dist = (self.ego_width + obj_width) / 2
  583. if obj_type == 5:
  584. return relative_x < -length_safe_dist or abs(relative_y) > width_safe_dist * 2
  585. else:
  586. if abs(heading_diff) < 45: # 同向
  587. return relative_x < -length_safe_dist or abs(relative_y) > width_safe_dist * 1.5
  588. elif 45 <= abs(heading_diff) <= 135: # 横向
  589. return relative_x < -length_safe_dist or relative_x > length_safe_dist
  590. else: # 对向
  591. return ((relative_x < -length_safe_dist and relative_y < -width_safe_dist) or
  592. (relative_x < -length_safe_dist and relative_y > width_safe_dist) or
  593. (relative_x > length_safe_dist and relative_y < -width_safe_dist) or
  594. (relative_x > length_safe_dist and relative_y > width_safe_dist))
  595. def _is_moving_away(self, ego_vel, target_vel, relative_x, relative_y, obj_type, heading_diff):
  596. """判断目标是否正在远离"""
  597. vx_rel = target_vel[0] - ego_vel[0]
  598. vy_rel = target_vel[1] - ego_vel[1]
  599. dist = math.sqrt(relative_x ** 2 + relative_y ** 2)
  600. if dist < 1e-6:
  601. return False
  602. v_projection = (vx_rel * relative_x + vy_rel * relative_y) / dist
  603. if obj_type == 5: # 行人
  604. if relative_x < 0: # 后方
  605. return v_projection > 0.5
  606. else: # 前方
  607. return v_projection < -0.5
  608. else:
  609. abs_heading_diff = abs(heading_diff)
  610. if abs_heading_diff < 45: # 同向
  611. if relative_x < 0: # 后方
  612. return v_projection > 0
  613. else: # 前方
  614. return v_projection < 0
  615. elif 45 <= abs_heading_diff <= 135: # 横向
  616. if relative_x > 0: # 前方
  617. return v_projection < 0
  618. else: # 后方
  619. return v_projection > 0
  620. else: # 对向
  621. return False
  622. # ==================== 指标获取方法 ====================
  623. def _get_metric_value(self, metric_name, aggregation='min'):
  624. """通用指标值获取方法"""
  625. if self.empty_flag or self.df_safe is None or self.df_safe.empty:
  626. return self.DEFAULT_VALUES.get(metric_name, None)
  627. values = self.df_safe[metric_name].dropna()
  628. if values.empty:
  629. return self.DEFAULT_VALUES.get(metric_name, None)
  630. # 根据聚合类型获取值
  631. if aggregation == 'min':
  632. metric_value = float(values.min())
  633. elif aggregation == 'max':
  634. metric_value = float(values.max())
  635. elif aggregation == 'mean':
  636. metric_value = float(values.mean())
  637. else:
  638. metric_value = float(values.min())
  639. # 存储指标数据
  640. metric_data = []
  641. for _, row in self.df_safe.iterrows():
  642. if pd.notnull(row[metric_name]):
  643. metric_data.append({
  644. 'simTime': row['simTime'],
  645. 'simFrame': row['simFrame'],
  646. 'playerId': row['playerId'],
  647. metric_name: row[metric_name]
  648. })
  649. setattr(self, f'{metric_name.lower()}_data', metric_data)
  650. return metric_value
  651. def get_ttc_value(self) -> float:
  652. return self._get_metric_value('TTC')
  653. def get_mttc_value(self) -> float:
  654. return self._get_metric_value('MTTC')
  655. def get_thw_value(self) -> float:
  656. return self._get_metric_value('THW')
  657. def get_ttb_value(self) -> float:
  658. return self._get_metric_value('TTB')
  659. def get_tm_value(self) -> float:
  660. return self._get_metric_value('TM')
  661. def get_dtc_value(self) -> float:
  662. return self._get_metric_value('DTC', 'min')
  663. def get_psd_value(self) -> float:
  664. return self._get_metric_value('PSD', 'min')
  665. def get_lonsd_value(self) -> float:
  666. return self._get_metric_value('LonSD', 'mean')
  667. def get_latsd_value(self) -> float:
  668. return self._get_metric_value('LatSD', 'min')
  669. def get_btn_value(self) -> float:
  670. return self._get_metric_value('BTN', 'max')
  671. def get_collision_risk_value(self) -> float:
  672. return self._get_metric_value('collisionRisk', 'max')
  673. def get_collision_severity_value(self) -> float:
  674. return self._get_metric_value('collisionSeverity', 'max')
  675. # ==================== 辅助方法 ====================
  676. def generate_metric_chart(self, metric_name: str, plot_path: Path) -> None:
  677. """生成指标图表"""
  678. try:
  679. self.output_dir = plot_path if plot_path else os.path.join(os.getcwd(), 'data')
  680. os.makedirs(self.output_dir, exist_ok=True)
  681. chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
  682. if chart_path:
  683. self.logger.info(f"{metric_name}图表已生成: {chart_path}")
  684. else:
  685. self.logger.warning(f"{metric_name}图表生成失败")
  686. except Exception as e:
  687. self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)