safety.py 27 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 安全指标计算模块
  5. """
  6. import numpy as np
  7. import pandas as pd
  8. import math
  9. from collections import defaultdict
  10. from typing import Dict, Any, List, Optional
  11. from modules.lib.score import Score
  12. from modules.lib.log_manager import LogManager
  13. # 安全指标相关常量
  14. SAFETY_INFO = [
  15. "simTime",
  16. "simFrame",
  17. "playerId",
  18. "posX",
  19. "posY",
  20. "posH",
  21. "speedX",
  22. "speedY",
  23. "accelX",
  24. "accelY",
  25. "v",
  26. "type"
  27. ]
  28. # ----------------------
  29. # SafetyCalculator单例管理
  30. # ----------------------
  31. # class SafetyCalculatorManager:
  32. # """SafetyCalculator单例管理器"""
  33. # _instance = None
  34. # @classmethod
  35. # def get_instance(cls, data_processed):
  36. # """获取SafetyCalculator实例,如果不存在则创建"""
  37. # if cls._instance is None or cls._instance.data_processed != data_processed:
  38. # cls._instance = SafetyCalculator(data_processed)
  39. # return cls._instance
  40. # ----------------------
  41. # 独立指标计算函数
  42. # ----------------------
  43. def calculate_ttc(data_processed) -> dict:
  44. """计算TTC (Time To Collision)"""
  45. if data_processed is None or not hasattr(data_processed, 'object_df'):
  46. return {"TTC": None}
  47. try:
  48. safety = SafetyCalculator(data_processed)
  49. ttc_value = safety.get_ttc_value()
  50. return {"TTC": ttc_value}
  51. except Exception as e:
  52. LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
  53. return {"TTC": None}
  54. def calculate_mttc(data_processed) -> dict:
  55. """计算MTTC (Modified Time To Collision)"""
  56. if data_processed is None or not hasattr(data_processed, 'object_df'):
  57. return {"MTTC": None}
  58. try:
  59. safety = SafetyCalculator(data_processed)
  60. mttc_value = safety.get_mttc_value()
  61. return {"MTTC": mttc_value}
  62. except Exception as e:
  63. LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
  64. return {"MTTC": None}
  65. def calculate_thw(data_processed) -> dict:
  66. """计算THW (Time Headway)"""
  67. if data_processed is None or not hasattr(data_processed, 'object_df'):
  68. return {"THW": None}
  69. try:
  70. safety = SafetyCalculator(data_processed)
  71. thw_value = safety.get_thw_value()
  72. return {"THW": thw_value}
  73. except Exception as e:
  74. LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
  75. return {"THW": None}
  76. def calculate_collisionrisk(data_processed) -> dict:
  77. """计算碰撞风险"""
  78. safety = SafetyCalculator(data_processed)
  79. collision_risk_value = safety.get_collision_risk_value()
  80. return {"collisionRisk": collision_risk_value}
  81. def calculate_lonsd(data_processed) -> dict:
  82. """计算纵向安全距离"""
  83. safety = SafetyCalculator(data_processed)
  84. lonsd_value = safety.get_lonsd_value()
  85. return {"LonSD": lonsd_value}
  86. def calculate_latsd(data_processed) -> dict:
  87. """计算横向安全距离"""
  88. safety = SafetyCalculator(data_processed)
  89. latsd_value = safety.get_latsd_value()
  90. return {"LatSD": latsd_value}
  91. def calculate_btn(data_processed) -> dict:
  92. """计算制动威胁数"""
  93. safety = SafetyCalculator(data_processed)
  94. btn_value = safety.get_btn_value()
  95. return {"BTN": btn_value}
  96. def calculate_collisionseverity(data_processed) -> dict:
  97. """计算碰撞严重性"""
  98. safety = SafetyCalculator(data_processed)
  99. collision_severity_value = safety.get_collision_severity_value()
  100. return {"collisionSeverity": collision_severity_value}
  101. class SafetyRegistry:
  102. """安全指标注册器"""
  103. def __init__(self, data_processed):
  104. self.logger = LogManager().get_logger()
  105. self.data = data_processed
  106. self.safety_config = data_processed.safety_config["safety"]
  107. self.metrics = self._extract_metrics(self.safety_config)
  108. self._registry = self._build_registry()
  109. def _extract_metrics(self, config_node: dict) -> list:
  110. """从配置中提取指标名称"""
  111. metrics = []
  112. def _recurse(node):
  113. if isinstance(node, dict):
  114. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  115. metrics.append(node['name'])
  116. for v in node.values():
  117. _recurse(v)
  118. _recurse(config_node)
  119. self.logger.info(f'评比的安全指标列表:{metrics}')
  120. return metrics
  121. def _build_registry(self) -> dict:
  122. """构建指标函数注册表"""
  123. registry = {}
  124. for metric_name in self.metrics:
  125. func_name = f"calculate_{metric_name.lower()}"
  126. if func_name in globals():
  127. registry[metric_name] = globals()[func_name]
  128. else:
  129. self.logger.warning(f"未实现安全指标函数: {func_name}")
  130. return registry
  131. def batch_execute(self) -> dict:
  132. """批量执行指标计算"""
  133. results = {}
  134. for name, func in self._registry.items():
  135. try:
  136. result = func(self.data)
  137. results.update(result)
  138. except Exception as e:
  139. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  140. results[name] = None
  141. self.logger.info(f'安全指标计算结果:{results}')
  142. return results
  143. class SafeManager:
  144. """安全指标管理类"""
  145. def __init__(self, data_processed):
  146. self.data = data_processed
  147. self.registry = SafetyRegistry(self.data)
  148. def report_statistic(self):
  149. """计算并报告安全指标结果"""
  150. safety_result = self.registry.batch_execute()
  151. return safety_result
  152. class SafetyCalculator:
  153. """安全指标计算类 - 兼容Safe类风格"""
  154. def __init__(self, data_processed):
  155. self.logger = LogManager().get_logger()
  156. self.data_processed = data_processed
  157. self.df = data_processed.object_df.copy()
  158. self.ego_df = data_processed.ego_data
  159. self.obj_id_list = data_processed.obj_id_list
  160. self.metric_list = [
  161. 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
  162. ]
  163. self.time_list = self.ego_df['simTime'].values.tolist()
  164. self.frame_list = self.ego_df['simFrame'].values.tolist()
  165. self.collisionRisk = 0
  166. self.empty_flag = True
  167. self.logger.info("SafetyCalculator初始化完成,obj_id_list长度: %d", len(self.obj_id_list))
  168. if len(self.obj_id_list) > 1:
  169. self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  170. self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  171. self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  172. self.unsafe_acce_drac_df = pd.DataFrame(
  173. columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  174. self.unsafe_acce_xtn_df = pd.DataFrame(
  175. columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  176. self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  177. self.most_dangerous = {}
  178. self.pass_percent = {}
  179. self.logger.info("开始执行安全参数计算 _safe_param_cal_new")
  180. self._safe_param_cal_new()
  181. self.logger.info("安全参数计算完成")
  182. def _safe_param_cal_new(self):
  183. self.logger.debug("进入 _safe_param_cal_new 方法")
  184. # 直接复用Safe类的实现
  185. Tc = 0.3 # 安全距离
  186. rho = self.data_processed.vehicle_config["RHO"]
  187. ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"]
  188. obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"]
  189. ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
  190. ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
  191. ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
  192. obj_dict = defaultdict(dict)
  193. obj_data_dict = self.df.to_dict('records')
  194. for item in obj_data_dict:
  195. obj_dict[item['simFrame']][item['playerId']] = item
  196. df_list = []
  197. EGO_PLAYER_ID = 1
  198. for frame_num in self.frame_list:
  199. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  200. v1 = ego_data['v']
  201. x1 = ego_data['posX']
  202. y1 = ego_data['posY']
  203. h1 = ego_data['posH']
  204. v_x1 = ego_data['speedX']
  205. v_y1 = ego_data['speedY']
  206. a_x1 = ego_data['accelX']
  207. a_y1 = ego_data['accelY']
  208. for playerId in self.obj_id_list:
  209. if playerId == EGO_PLAYER_ID:
  210. continue
  211. try:
  212. obj_data = obj_dict[frame_num][playerId]
  213. except KeyError:
  214. continue
  215. x2 = obj_data['posX']
  216. y2 = obj_data['posY']
  217. dist = self.dist(x1, y1, x2, y2)
  218. obj_data['dist'] = dist
  219. v_x2 = obj_data['speedX']
  220. v_y2 = obj_data['speedY']
  221. v2 = obj_data['v']
  222. a_x2 = obj_data['accelX']
  223. a_y2 = obj_data['accelY']
  224. dx, dy = x2 - x1, y2 - y1
  225. A = np.array([dx, dy])
  226. x = np.array([1, 0])
  227. dot_product = np.dot(A, x)
  228. vector_length_A = np.linalg.norm(A)
  229. vector_length_x = np.linalg.norm(x)
  230. cos_theta = dot_product / (vector_length_A * vector_length_x)
  231. beta = np.arccos(cos_theta)
  232. lon_d = dist * math.cos(beta - h1)
  233. lat_d = abs(dist * math.sin(beta - h1))
  234. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  235. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  236. if lon_d > 100 or lon_d < -5 or lat_d > 4:
  237. continue
  238. self.empty_flag = False
  239. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  240. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  241. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  242. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  243. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  244. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  245. v_x2, v_y2)
  246. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  247. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  248. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  249. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  250. obj_type = obj_data['type']
  251. TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
  252. MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
  253. THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
  254. LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
  255. lat_dist = 0.5
  256. v_right = v1
  257. v_left = v2
  258. a_right_lat_brake_min = 1
  259. a_left_lat_brake_min = 1
  260. a_lat_max = 5
  261. LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  262. a_left_lat_brake_min, a_lat_max)
  263. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  264. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  265. lon_a = abs(lon_a1 - lon_a2)
  266. lon_d = max(0.1, dist * abs(math.cos(beta - h1)))
  267. lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  268. BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  269. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  270. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  271. lat_a = abs(lat_a1 - lat_a2)
  272. lat_d = dist * abs(math.sin(beta - h1))
  273. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  274. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  275. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  276. TTC = None if (TTC is None or TTC < 0) else TTC
  277. MTTC = None if (MTTC is None or MTTC < 0) else MTTC
  278. THW = None if (THW is None or THW < 0) else THW
  279. obj_dict[frame_num][playerId]['TTC'] = TTC
  280. obj_dict[frame_num][playerId]['MTTC'] = MTTC
  281. obj_dict[frame_num][playerId]['THW'] = THW
  282. obj_dict[frame_num][playerId]['LonSD'] = LonSD
  283. obj_dict[frame_num][playerId]['LatSD'] = LatSD
  284. obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
  285. collisionSeverity = 0
  286. pr_death = 0
  287. collisionRisk = 0
  288. obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
  289. obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
  290. obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
  291. df_fnum = pd.DataFrame(obj_dict[frame_num].values())
  292. df_list.append(df_fnum)
  293. df_safe = pd.concat(df_list)
  294. col_list = ['simTime', 'simFrame', 'playerId',
  295. 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'BTN',
  296. 'collisionSeverity', 'pr_death', 'collisionRisk']
  297. self.df_safe = df_safe[col_list].reset_index(drop=True)
  298. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  299. # 计算 AB 连线的向量 AB
  300. # dx = x2 - x1
  301. # dy = y2 - y1
  302. # 计算 AB 连线的模长 |AB|
  303. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  304. # 计算 AB 连线的单位向量 U_AB
  305. U_ABx = dx / AB_mod
  306. U_ABy = dy / AB_mod
  307. # 计算 A 在 AB 连线上的速度 V1_on_AB
  308. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  309. return V1_on_AB
  310. def _cal_v_projection(self, dx, dy, vx, vy):
  311. # 计算 AB 连线的向量 AB
  312. # dx = x2 - x1
  313. # dy = y2 - y1
  314. # 计算 AB 连线的模长 |AB|
  315. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  316. # 计算 AB 连线的单位向量 U_AB
  317. U_ABx = dx / AB_mod
  318. U_ABy = dy / AB_mod
  319. # 计算 A 相对于 B 的速度 V_relative
  320. # vx = vx1 - vx2
  321. # vy = vy1 - vy2
  322. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  323. V_on_AB = vx * U_ABx + vy * U_ABy
  324. return V_on_AB
  325. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  326. # 计算 AB 连线的向量 AB
  327. # dx = x2 - x1
  328. # dy = y2 - y1
  329. # 计算 θ
  330. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  331. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  332. if V_mod == 0 or AB_mod == 0:
  333. return 0
  334. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  335. theta = math.acos(cos_theta)
  336. # 计算 AB 连线的模长 |AB|
  337. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  338. # 计算 AB 连线的单位向量 U_AB
  339. U_ABx = dx / AB_mod
  340. U_ABy = dy / AB_mod
  341. # 计算 A 相对于 B 的加速度 a_relative
  342. # ax = ax1 - ax2
  343. # ay = ay1 - ay2
  344. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  345. a_on_AB = ax * U_ABx + ay * U_ABy
  346. VA = np.array([v_x1, v_y1])
  347. VB = np.array([v_x2, v_y2])
  348. D_A = np.array([x1, y1])
  349. D_B = np.array([x2, y2])
  350. V_r = VA - VB
  351. V = np.linalg.norm(V_r)
  352. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  353. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  354. return a_on_AB_back
  355. # 计算相对加速度
  356. def _calculate_derivative(self, a, w, V, theta):
  357. # 计算(V×cos(θ))'的值
  358. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  359. derivative = a - w * V * math.sin(theta)
  360. return derivative
  361. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  362. dx = A[0] - B[0]
  363. dy = A[1] - B[1]
  364. dvx = VA[0] - VB[0]
  365. dvy = VA[1] - VB[1]
  366. # (dx * dvy - dy * dvx)
  367. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  368. return angular_velocity
  369. def _death_pr(self, obj_type, v_relative):
  370. if obj_type == 5:
  371. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  372. else:
  373. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  374. return p_death
  375. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  376. if obj_type == 5:
  377. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  378. else:
  379. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  380. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  381. return collisionRisk
  382. # 求两车之间当前距离
  383. def dist(self, x1, y1, x2, y2):
  384. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  385. return dist
  386. # TTC (time to collision)
  387. def _cal_TTC(self, dist, vrel_projection_in_dist):
  388. if vrel_projection_in_dist == 0:
  389. return math.inf
  390. TTC = dist / vrel_projection_in_dist
  391. return TTC
  392. def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
  393. MTTC = math.nan
  394. if arel_projection_in_dist != 0:
  395. tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
  396. if tmp < 0:
  397. return math.nan
  398. t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
  399. t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
  400. if t1 > 0 and t2 > 0:
  401. if t1 >= t2:
  402. MTTC = t2
  403. elif t1 < t2:
  404. MTTC = t1
  405. elif t1 > 0 and t2 <= 0:
  406. MTTC = t1
  407. elif t1 <= 0 and t2 > 0:
  408. MTTC = t2
  409. if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
  410. MTTC = dist / vrel_projection_in_dist
  411. return MTTC
  412. # THW (time headway)
  413. def _cal_THW(self, dist, v_ego_projection_in_dist):
  414. if not v_ego_projection_in_dist:
  415. THW = None
  416. else:
  417. THW = dist / v_ego_projection_in_dist
  418. return THW
  419. def velocity(self, v_x, v_y):
  420. v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
  421. return v
  422. def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
  423. lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
  424. 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
  425. return lon_dist_min
  426. def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min,
  427. a_lat_max):
  428. v_right_rho = v_right + rho * a_lat_max
  429. v_left_rho = v_left + rho * a_lat_max
  430. dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + (
  431. (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2)
  432. return dist_min
  433. # DRAC (decelerate required avoid collision)
  434. def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
  435. dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
  436. if dist_length < 0:
  437. dist_width = dist - (width2 / 2 + width1 / 2)
  438. if dist_width < 0:
  439. return math.inf
  440. else:
  441. d = dist_width
  442. else:
  443. d = dist_length
  444. DRAC = vrel_projection_in_dist ** 2 / (2 * d)
  445. return DRAC
  446. # BTN (brake threat number)
  447. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  448. BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  449. return BTN
  450. # STN (steer threat number)
  451. def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
  452. STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
  453. width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
  454. return STN
  455. # BTN (brake threat number)
  456. def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
  457. BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  458. return BTN
  459. # STN (steer threat number)
  460. def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
  461. STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
  462. return STN
  463. # 追尾碰撞风险
  464. def _normal_distribution(self, x):
  465. mean = 1.32
  466. std_dev = 0.26
  467. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  468. def continuous_group(self, df):
  469. time_list = df['simTime'].values.tolist()
  470. frame_list = df['simFrame'].values.tolist()
  471. group_time = []
  472. group_frame = []
  473. sub_group_time = []
  474. sub_group_frame = []
  475. for i in range(len(frame_list)):
  476. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  477. sub_group_time.append(time_list[i])
  478. sub_group_frame.append(frame_list[i])
  479. else:
  480. group_time.append(sub_group_time)
  481. group_frame.append(sub_group_frame)
  482. sub_group_time = [time_list[i]]
  483. sub_group_frame = [frame_list[i]]
  484. group_time.append(sub_group_time)
  485. group_frame.append(sub_group_frame)
  486. group_time = [g for g in group_time if len(g) >= 2]
  487. group_frame = [g for g in group_frame if len(g) >= 2]
  488. # 输出图表值
  489. time = [[g[0], g[-1]] for g in group_time]
  490. frame = [[g[0], g[-1]] for g in group_frame]
  491. unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  492. unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  493. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  494. return unfunc_df
  495. # 统计最危险的指标
  496. def _safe_statistic_most_dangerous(self):
  497. min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
  498. max_list = ['BTN', 'collisionRisk', 'collisionSeverity']
  499. result = {}
  500. for metric in min_list:
  501. if metric in self.metric_list:
  502. if metric in self.df.columns:
  503. val = self.df[metric].min()
  504. result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
  505. else:
  506. result[metric] = self._default_value(metric)
  507. for metric in max_list:
  508. if metric in self.metric_list:
  509. if metric in self.df.columns:
  510. val = self.df[metric].max()
  511. result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
  512. else:
  513. result[metric] = self._default_value(metric)
  514. return result
  515. def _safe_no_obj_statistic(self):
  516. # 仅有自车时的默认值
  517. result = {metric: self._default_value(metric) for metric in self.metric_list}
  518. return result
  519. def _default_value(self, metric):
  520. # 统一默认值
  521. defaults = {
  522. 'TTC': 10.0,
  523. 'MTTC': 4.2,
  524. 'THW': 2.1,
  525. 'LonSD': 10.0,
  526. 'LatSD': 2.0,
  527. 'BTN': 1.0,
  528. 'collisionRisk': 0.0,
  529. 'collisionSeverity': 0.0
  530. }
  531. return defaults.get(metric, None)
  532. def report_statistic(self):
  533. if len(self.obj_id_list) == 1:
  534. safety_result = self._safe_no_obj_statistic()
  535. else:
  536. safety_result = self._safe_statistic_most_dangerous()
  537. evaluator = Score(self.data_processed.safety_config)
  538. result = evaluator.evaluate(safety_result)
  539. print("\n[安全性表现及得分情况]")
  540. return result
  541. def get_ttc_value(self) -> float:
  542. if self.empty_flag or self.df_safe is None:
  543. return self._default_value('TTC')
  544. ttc_values = self.df_safe['TTC'].dropna()
  545. return float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
  546. def get_mttc_value(self) -> float:
  547. if self.empty_flag or self.df_safe is None:
  548. return self._default_value('MTTC')
  549. mttc_values = self.df_safe['MTTC'].dropna()
  550. return float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
  551. def get_thw_value(self) -> float:
  552. if self.empty_flag or self.df_safe is None:
  553. return self._default_value('THW')
  554. thw_values = self.df_safe['THW'].dropna()
  555. return float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
  556. def get_lonsd_value(self) -> float:
  557. if self.empty_flag or self.df_safe is None:
  558. return self._default_value('LonSD')
  559. lonsd_values = self.df_safe['LonSD'].dropna()
  560. return float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
  561. def get_latsd_value(self) -> float:
  562. if self.empty_flag or self.df_safe is None:
  563. return self._default_value('LatSD')
  564. latsd_values = self.df_safe['LatSD'].dropna()
  565. return float(latsd_values.mean()) if not latsd_values.empty else self._default_value('LatSD')
  566. def get_btn_value(self) -> float:
  567. if self.empty_flag or self.df_safe is None:
  568. return self._default_value('BTN')
  569. btn_values = self.df_safe['BTN'].dropna()
  570. return float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
  571. def get_collision_risk_value(self) -> float:
  572. if self.empty_flag or self.df_safe is None:
  573. return self._default_value('collisionRisk')
  574. risk_values = self.df_safe['collisionRisk'].dropna()
  575. return float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
  576. def get_collision_severity_value(self) -> float:
  577. if self.empty_flag or self.df_safe is None:
  578. return self._default_value('collisionSeverity')
  579. severity_values = self.df_safe['collisionSeverity'].dropna()
  580. return float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')