safety.py 28 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/07/25
  11. @Last Modified: 2023/07/26
  12. @Summary: safe metrics
  13. """
  14. import math
  15. import numpy as np
  16. import pandas as pd
  17. from modules.lib.score import Score
  18. from modules.config import config
  19. from collections import defaultdict
  20. import scipy.integrate as spi
  21. class Safe(object):
  22. def __init__(self, data_processed):
  23. # self.logger = log.get_logger() # 使用时再初始化
  24. self.data_processed = data_processed
  25. self.df = data_processed.object_df.copy()
  26. self.ego_df = data_processed.ego_data.copy()
  27. self.obj_id_list = data_processed.obj_id_list
  28. self.metric_list = [
  29. "TTC",
  30. "MTTC",
  31. "THW",
  32. "LatSD",
  33. "BTN",
  34. "collisionRisk",
  35. "collisionSeverity",
  36. ]
  37. # score data
  38. self.calculated_value = {
  39. "TTC": 10.0,
  40. "MTTC": 10.0,
  41. "THW": 10.0,
  42. # 'LonSD': 0.0,
  43. "LatSD": 3.0,
  44. # 'DRAC': 0.0,
  45. "BTN": 1.0,
  46. # 'STN': 1.0,
  47. "collisionRisk": 0.0,
  48. "collisionSeverity": 0.0,
  49. }
  50. # lists of drving control info
  51. self.time_list = self.ego_df["simTime"].values.tolist()
  52. self.frame_list = self.ego_df["simFrame"].values.tolist()
  53. self.collisionRisk = 0
  54. self.empty_flag = True
  55. # no car following scene
  56. if len(self.obj_id_list) > 1:
  57. self.unsafe_df = pd.DataFrame(
  58. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  59. )
  60. self.unsafe_time_df = pd.DataFrame(
  61. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  62. )
  63. self.unsafe_dist_df = pd.DataFrame(
  64. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  65. )
  66. # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  67. self.unsafe_acce_drac_df = pd.DataFrame(
  68. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  69. )
  70. self.unsafe_acce_xtn_df = pd.DataFrame(
  71. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  72. )
  73. self.unsafe_prob_df = pd.DataFrame(
  74. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  75. )
  76. self.most_dangerous = {}
  77. self.pass_percent = {}
  78. self._safe_param_cal_new()
  79. def _safe_param_cal_new(self):
  80. Tc = 0.3 # 安全距离
  81. rho = Tc # 驾驶员制动反应时间(单位:秒)
  82. ego_accel_max = self.data_processed.vehicle_config[
  83. "EGO_ACCEL_MAX"
  84. ] # 自车油门最大加速度
  85. obj_decel_max = self.data_processed.vehicle_config[
  86. "OBJ_DECEL_MAX"
  87. ] # 前车刹车最大减速度
  88. ego_decel_min = self.data_processed.vehicle_config[
  89. "EGO_DECEL_MIN"
  90. ] # 自车刹车最小减速度 ug
  91. ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
  92. ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
  93. # 构建双层字典数据结构
  94. obj_dict = defaultdict(dict)
  95. # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对
  96. obj_data_dict = self.df.to_dict("records")
  97. for item in obj_data_dict:
  98. obj_dict[item["simFrame"]][item["playerId"]] = item
  99. df_list = []
  100. EGO_PLAYER_ID = 1
  101. # self.empty_flag = True
  102. for frame_num in self.frame_list:
  103. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  104. v1 = ego_data["v"]
  105. x1 = ego_data["posX"]
  106. y1 = ego_data["posY"]
  107. h1 = ego_data["posH"]
  108. # 将角度转换为弧度
  109. # len1 = ego_data['dimX']
  110. # width1 = ego_data['dimY']
  111. # o_x1 = ego_data['offX']
  112. v_x1 = ego_data["speedX"]
  113. v_y1 = ego_data["speedY"]
  114. a_x1 = ego_data["accelX"]
  115. a_y1 = ego_data["accelY"]
  116. # a1 = ego_data['accel']
  117. for playerId in self.obj_id_list:
  118. if playerId == EGO_PLAYER_ID:
  119. continue
  120. try:
  121. obj_data = obj_dict[frame_num][playerId]
  122. except KeyError:
  123. continue
  124. x2 = obj_data["posX"]
  125. y2 = obj_data["posY"]
  126. dist = self.dist(x1, y1, x2, y2)
  127. obj_data["dist"] = dist
  128. v_x2 = obj_data["speedX"] # m/s
  129. v_y2 = obj_data["speedY"] # m/s
  130. v2 = obj_data["v"] # m/s
  131. # h2 = obj_data['posH']
  132. # len2 = obj_data['dimX']
  133. # width2 = obj_data['dimY']
  134. # o_x2 = obj_data['offX']
  135. a_x2 = obj_data["accelX"]
  136. a_y2 = obj_data["accelY"]
  137. # a2 = obj_data['accel']
  138. dx, dy = x2 - x1, y2 - y1
  139. # 计算目标车相对于自车的方位角
  140. beta = math.atan2(dy, dx) # 添加beta的计算
  141. # 将全局坐标系下的相对位置向量转换到自车坐标系
  142. # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
  143. h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
  144. # 坐标变换
  145. lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
  146. lat_d = -dx * math.sin(h1_rad) + dy * math.cos(h1_rad) # 横向距离(左为正,右为负)
  147. # 添加调试信息
  148. # if frame_num == self.frame_list[-1]: # 判断是否是最后一帧
  149. # print(f"最后一帧数据 frame_num={frame_num}:")
  150. # print(f"目标车ID: {playerId}")
  151. # print(f"自车位置: ({x1:.2f}, {y1:.2f}), 朝向角: {h1:.2f}度")
  152. # print(f"目标车位置: ({x2:.2f}, {y2:.2f})")
  153. # print(f"相对距离: dx={dx:.2f}, dy={dy:.2f}")
  154. # print(f"纵向距离: lon_d={lon_d:.2f}")
  155. # print(f"横向距离: lat_d={lat_d:.2f}")
  156. # print("------------------------")
  157. obj_dict[frame_num][playerId]["lon_d"] = lon_d
  158. obj_dict[frame_num][playerId]["lat_d"] = lat_d
  159. # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离
  160. # print(f"lon_d: {lon_d}, lat_d: {lat_d}")
  161. if lon_d > 100 or lon_d < -5 or abs(lat_d) > 4:
  162. continue
  163. self.empty_flag = False
  164. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  165. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  166. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  167. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  168. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  169. arel_projection_in_dist = self._cal_a_projection(
  170. dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  171. )
  172. obj_dict[frame_num][playerId][
  173. "vrel_projection_in_dist"
  174. ] = vrel_projection_in_dist
  175. obj_dict[frame_num][playerId][
  176. "arel_projection_in_dist"
  177. ] = arel_projection_in_dist
  178. obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p
  179. obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p
  180. obj_type = obj_data["type"]
  181. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  182. MTTC = self._cal_MTTC(
  183. TTC, vrel_projection_in_dist, arel_projection_in_dist
  184. )
  185. THW = self._cal_THW(dist, v_ego_p)
  186. # 单车道时可用
  187. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  188. # obj_decel_max)
  189. lat_dist = 0.5
  190. v_right = v1
  191. v_left = v2
  192. a_right_lat_brake_min = 1
  193. a_left_lat_brake_min = 1
  194. a_lat_max = 5
  195. LatSD = self._cal_lateral_safe_dist(
  196. lat_dist,
  197. v_right,
  198. v_left,
  199. rho,
  200. a_right_lat_brake_min,
  201. a_left_lat_brake_min,
  202. a_lat_max,
  203. )
  204. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  205. # 将加速度和速度投影到车辆坐标系
  206. h1_rad_for_projection = math.radians(90 - h1) # 与L175保持一致
  207. # 计算纵向加速度
  208. lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection)
  209. lon_a2 = a_x2 * math.cos(h1_rad_for_projection) + a_y2 * math.sin(h1_rad_for_projection)
  210. lon_a = abs(lon_a1 - lon_a2)
  211. # 使用已计算的纵向距离或重新计算
  212. # 选项1: 使用已计算的lon_d (推荐,保持一致性)
  213. # lon_d = obj_dict[frame_num][playerId]["lon_d"]
  214. # 选项2: 重新计算,确保与L178一致
  215. # lon_d = dx * math.cos(h1_rad_for_projection) + dy * math.sin(h1_rad_for_projection)
  216. # 选项3: 使用简化公式,但确保角度单位一致
  217. lon_d = max(dist * abs(math.cos(beta - h1_rad_for_projection)), 0.1) # 设置最小值避免除零
  218. # 计算纵向速度
  219. lon_v = v_x1 * math.cos(h1_rad_for_projection) + v_y1 * math.sin(h1_rad_for_projection)
  220. # 计算BTN
  221. BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  222. # 这部分代码可以简化,直接使用已计算的lon_a1和lat_a1
  223. # 计算纵向加速度
  224. # lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection)
  225. lon_a1 = ego_data["lon_acc"] # 直接使用已计算的纵向加速度
  226. # 计算横向加速度
  227. # lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  228. lat_a1 = ego_data["lat_acc"] # 直接使用已计算的横向加速度
  229. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  230. # 修改:考虑相对加速度的方向,不再取绝对值
  231. lat_a = lat_a2 - lat_a1 # 目标车减去自车的加速度,保留方向信息
  232. lat_d = dist * abs(math.sin(beta - h1_rad))
  233. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  234. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  235. obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2
  236. obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2
  237. TTC = None if (not TTC or TTC < 0) else TTC
  238. MTTC = None if (not MTTC or MTTC < 0) else MTTC
  239. THW = None if (not THW or THW < 0) else THW
  240. # print(f'TTC: {TTC if TTC is None else f"{TTC:.2f}"}, '
  241. # f'MTTC: {MTTC if MTTC is None else f"{MTTC:.2f}"}, '
  242. # f'THW: {THW if THW is None else f"{THW:.2f}"}')
  243. # 原DRAC计算已被注释,需移除相关代码引用
  244. # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行
  245. # 修改后的指标赋值
  246. obj_dict[frame_num][playerId]["TTC"] = TTC
  247. obj_dict[frame_num][playerId]["MTTC"] = MTTC
  248. obj_dict[frame_num][playerId]["THW"] = THW
  249. # obj_dict[frame_num][playerId]['LonSD'] = LonSD
  250. obj_dict[frame_num][playerId]["LatSD"] = LatSD
  251. # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行
  252. obj_dict[frame_num][playerId]["BTN"] = abs(BTN)
  253. # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行
  254. # 确保以下变量已定义
  255. collisionSeverity = (
  256. collisionSeverity if "collisionSeverity" in locals() else 0
  257. )
  258. pr_death = pr_death if "pr_death" in locals() else 0
  259. collisionRisk = collisionRisk if "collisionRisk" in locals() else 0
  260. obj_dict[frame_num][playerId]["collisionSeverity"] = (
  261. collisionSeverity * 100
  262. )
  263. obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100
  264. obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100
  265. # print(
  266. # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}"
  267. # )
  268. df_fnum = pd.DataFrame(obj_dict[frame_num].values())
  269. df_list.append(df_fnum)
  270. df_safe = pd.concat(df_list)
  271. col_list = [
  272. "simTime",
  273. "simFrame",
  274. "playerId",
  275. "TTC",
  276. "MTTC",
  277. "THW",
  278. "LatSD",
  279. "BTN", # 移除STN
  280. "collisionSeverity",
  281. "pr_death",
  282. "collisionRisk",
  283. ]
  284. if not self.empty_flag:
  285. self.df = df_safe[col_list].reset_index(drop=True)
  286. # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df,
  287. # 需要增加如果是空的情况下,self.df_safe的值为默认值。
  288. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  289. # 计算 AB 连线的向量 AB
  290. # dx = x2 - x1
  291. # dy = y2 - y1
  292. # 计算 AB 连线的模长 |AB|
  293. AB_mod = math.sqrt(dx**2 + dy**2)
  294. # 计算 AB 连线的单位向量 U_AB
  295. U_ABx = dx / AB_mod
  296. U_ABy = dy / AB_mod
  297. # 计算 A 在 AB 连线上的速度 V1_on_AB
  298. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  299. return V1_on_AB
  300. def _cal_v_projection(self, dx, dy, vx, vy):
  301. # 计算 AB 连线的向量 AB
  302. # dx = x2 - x1
  303. # dy = y2 - y1
  304. # 计算 AB 连线的模长 |AB|
  305. AB_mod = math.sqrt(dx**2 + dy**2)
  306. # 计算 AB 连线的单位向量 U_AB
  307. U_ABx = dx / AB_mod
  308. U_ABy = dy / AB_mod
  309. # 计算 A 相对于 B 的速度 V_relative
  310. # vx = vx1 - vx2
  311. # vy = vy1 - vy2
  312. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  313. V_on_AB = vx * U_ABx + vy * U_ABy
  314. # print(f'_cal_v_projection:{V_on_AB}')
  315. return V_on_AB
  316. def _cal_a_projection(
  317. self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  318. ):
  319. # 计算 AB 连线的向量 AB
  320. # dx = x2 - x1
  321. # dy = y2 - y1
  322. # 计算 θ
  323. V_mod = math.sqrt(vx**2 + vy**2)
  324. AB_mod = math.sqrt(dx**2 + dy**2)
  325. if V_mod == 0 or AB_mod == 0:
  326. return 0
  327. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  328. theta = math.acos(cos_theta)
  329. # 计算 AB 连线的模长 |AB|
  330. AB_mod = math.sqrt(dx**2 + dy**2)
  331. # 计算 AB 连线的单位向量 U_AB
  332. U_ABx = dx / AB_mod
  333. U_ABy = dy / AB_mod
  334. # 计算 A 相对于 B 的加速度 a_relative
  335. # ax = ax1 - ax2
  336. # ay = ay1 - ay2
  337. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  338. a_on_AB = ax * U_ABx + ay * U_ABy
  339. VA = np.array([v_x1, v_y1])
  340. VB = np.array([v_x2, v_y2])
  341. D_A = np.array([x1, y1])
  342. D_B = np.array([x2, y2])
  343. V_r = VA - VB
  344. V = np.linalg.norm(V_r)
  345. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  346. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  347. return a_on_AB_back
  348. # 计算相对加速度
  349. def _calculate_derivative(self, a, w, V, theta):
  350. # 计算(V×cos(θ))'的值
  351. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  352. derivative = a - w * V * math.sin(theta)
  353. return derivative
  354. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  355. dx = A[0] - B[0]
  356. dy = A[1] - B[1]
  357. dvx = VA[0] - VB[0]
  358. dvy = VA[1] - VB[1]
  359. # (dx * dvy - dy * dvx)
  360. angular_velocity = (
  361. math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2)
  362. )
  363. return angular_velocity
  364. def _death_pr(self, obj_type, v_relative):
  365. if obj_type == 5:
  366. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  367. else:
  368. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  369. return p_death
  370. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  371. if obj_type == 5:
  372. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  373. else:
  374. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  375. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  376. return collisionRisk
  377. # 求两车之间当前距离
  378. def dist(self, x1, y1, x2, y2):
  379. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  380. return dist
  381. # TTC (time to collision)
  382. def _cal_TTC(self, dist, vrel_projection_in_dist):
  383. # 如果相对速度接近0,视为相对静止或无法计算TTC
  384. if abs(vrel_projection_in_dist) < 0.02: # 使用之前讨论过的阈值判断
  385. return self.calculated_value["TTC"] # 返回默认值10.0
  386. TTC = dist / vrel_projection_in_dist
  387. # 如果计算结果为负,表示正在远离,也视为安全
  388. if TTC < 0:
  389. return self.calculated_value["TTC"] # 返回默认值10.0
  390. return TTC
  391. def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
  392. if arel_projection_in_dist != 0:
  393. tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist
  394. if tmp < 0:
  395. return self.calculated_value["MTTC"] # 返回默认值10.0
  396. t1 = (
  397. -1 * vrel_projection_in_dist - math.sqrt(tmp)
  398. ) / arel_projection_in_dist
  399. t2 = (
  400. -1 * vrel_projection_in_dist + math.sqrt(tmp)
  401. ) / arel_projection_in_dist
  402. if t1 > 0 and t2 > 0:
  403. if t1 >= t2:
  404. MTTC = t2
  405. elif t1 < t2:
  406. MTTC = t1
  407. elif t1 > 0 and t2 <= 0:
  408. MTTC = t1
  409. elif t1 <= 0 and t2 > 0:
  410. MTTC = t2
  411. else:
  412. return self.calculated_value["MTTC"] # 返回默认值10.0
  413. elif arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
  414. MTTC = dist / vrel_projection_in_dist
  415. else:
  416. return self.calculated_value["MTTC"] # 返回默认值10.0
  417. return MTTC
  418. # THW (time headway)
  419. def _cal_THW(self, dist, v_ego_projection_in_dist):
  420. if not v_ego_projection_in_dist or abs(v_ego_projection_in_dist) < 0.02:
  421. return self.calculated_value["THW"] # 返回默认值10.0
  422. THW = dist / v_ego_projection_in_dist
  423. if THW < 0:
  424. return self.calculated_value["THW"] # 返回默认值10.0
  425. return THW
  426. def velocity(self, v_x, v_y):
  427. v = math.sqrt(v_x**2 + v_y**2) * 3.6
  428. return v
  429. def _cal_longitudinal_safe_dist(
  430. self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max
  431. ):
  432. lon_dist_min = (
  433. v_ego_p * rho
  434. + ego_accel_max * (rho**2) / 2
  435. + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min)
  436. - v_obj_p**2 / (2 * ego_decel_max)
  437. )
  438. return lon_dist_min
  439. def _cal_lateral_safe_dist(
  440. self,
  441. lat_dist,
  442. v_right,
  443. v_left,
  444. rho,
  445. a_right_lat_brake_min,
  446. a_left_lat_brake_min,
  447. a_lat_max,
  448. ):
  449. # 检查除数是否为零
  450. if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
  451. return self.calculated_value["LatSD"] # 返回默认值3.0
  452. v_right_rho = v_right + rho * a_lat_max
  453. v_left_rho = v_left + rho * a_lat_max
  454. dist_min = lat_dist + (
  455. (v_right + v_right_rho) * rho / 2
  456. + v_right_rho**2 / a_right_lat_brake_min / 2
  457. + ((v_left + v_right_rho) * rho / 2)
  458. + v_left_rho**2 / a_left_lat_brake_min / 2
  459. )
  460. return dist_min
  461. # DRAC (decelerate required avoid collision)
  462. def _cal_DRAC(
  463. self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2
  464. ):
  465. dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
  466. if dist_length < 0:
  467. dist_width = dist - (width2 / 2 + width1 / 2)
  468. if dist_width < 0:
  469. return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
  470. else:
  471. d = dist_width
  472. else:
  473. d = dist_length
  474. # 避免除零
  475. if d <= 0:
  476. return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
  477. DRAC = vrel_projection_in_dist**2 / (2 * d)
  478. return DRAC
  479. # BTN (brake threat number)
  480. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  481. # 安全检查,避免除零错误
  482. if lon_d <= 0 or ego_decel_lon_max == 0:
  483. return self.calculated_value["BTN"] # 返回默认值1.0
  484. # 注意:lon_a现在是带方向的相对加速度(lon_a2 - lon_a1)
  485. # 根据BTN公式:BTN = (lon_a1 + lon_a - lon_v**2 / (2 * lon_d)) / ego_decel_lon_max
  486. BTN = (
  487. lon_a1 + lon_a - lon_v**2 / (2 * lon_d)
  488. ) / ego_decel_lon_max
  489. return BTN
  490. # STN (steer threat number)
  491. def _cal_STN_new(
  492. self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2
  493. ):
  494. STN = (
  495. lat_a1
  496. + lat_a
  497. + 2
  498. / ttc**2
  499. * (
  500. lat_d
  501. + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2
  502. + abs(lat_v * ttc)
  503. )
  504. ) / ego_decel_lat_max
  505. return STN
  506. # BTN (brake threat number)
  507. def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
  508. BTN = (
  509. a_y1 + ay - vy**2 / (2 * dy)
  510. ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  511. return BTN
  512. # STN (steer threat number)
  513. def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
  514. STN = (
  515. a_x1
  516. + ax
  517. + 2
  518. / ttc**2
  519. * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)
  520. ) / max_ax
  521. return STN
  522. # 追尾碰撞风险
  523. def _normal_distribution(self, x):
  524. mean = 1.32
  525. std_dev = 0.26
  526. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(
  527. -0.5 * (x - mean) ** 2 / std_dev
  528. )
  529. def continuous_group(self, df):
  530. time_list = df["simTime"].values.tolist()
  531. frame_list = df["simFrame"].values.tolist()
  532. group_time = []
  533. group_frame = []
  534. sub_group_time = []
  535. sub_group_frame = []
  536. for i in range(len(frame_list)):
  537. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  538. sub_group_time.append(time_list[i])
  539. sub_group_frame.append(frame_list[i])
  540. else:
  541. group_time.append(sub_group_time)
  542. group_frame.append(sub_group_frame)
  543. sub_group_time = [time_list[i]]
  544. sub_group_frame = [frame_list[i]]
  545. group_time.append(sub_group_time)
  546. group_frame.append(sub_group_frame)
  547. group_time = [g for g in group_time if len(g) >= 2]
  548. group_frame = [g for g in group_frame if len(g) >= 2]
  549. # 输出图表值
  550. time = [[g[0], g[-1]] for g in group_time]
  551. frame = [[g[0], g[-1]] for g in group_frame]
  552. unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"])
  553. unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"])
  554. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  555. return unfunc_df
  556. # 统计最危险的指标
  557. def _safe_statistic_most_dangerous(self):
  558. min_list = ["TTC", "MTTC", "THW", "LatSD"]
  559. max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN
  560. for metric in min_list:
  561. if metric in self.metric_list:
  562. if metric in self.df.columns:
  563. # 筛选出非NaN的值
  564. valid_values = self.df[metric].dropna()
  565. if not valid_values.empty:
  566. # 如果有非NaN值,计算最小值
  567. self.calculated_value[metric] = valid_values.min()
  568. print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
  569. else:
  570. # 如果全是NaN或列为空,赋默认值
  571. self.calculated_value[metric] = self.calculated_value[metric]
  572. else:
  573. # 列不存在,赋默认值
  574. self.calculated_value[metric] = self.calculated_value[metric]
  575. for metric in max_list:
  576. if metric in self.metric_list:
  577. if metric in self.df.columns:
  578. # 筛选出非NaN的值
  579. valid_values = self.df[metric].dropna()
  580. if not valid_values.empty:
  581. # 如果有非NaN值,计算最大值
  582. self.calculated_value[metric] = valid_values.max()
  583. print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
  584. else:
  585. # 如果全是NaN或列为空,赋默认值
  586. self.calculated_value[metric] = self.calculated_value[metric]
  587. else:
  588. # 列不存在,赋默认值
  589. self.calculated_value[metric] = self.calculated_value[metric]
  590. return self.calculated_value
  591. def _safe_no_obj_statistic(self):
  592. # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
  593. return self.calculated_value
  594. def report_statistic(
  595. self,
  596. ):
  597. safety_result = {}
  598. if len(self.obj_id_list) == 1:
  599. safety_result = self._safe_no_obj_statistic()
  600. else:
  601. safety_result = self._safe_statistic_most_dangerous()
  602. evaluator = Score(self.data_processed.safety_config)
  603. result = evaluator.evaluate(safety_result)
  604. # self.logger.info(f"安全性表现及得分情况:[{result}]")
  605. print("\n[安全性表现及得分情况]")
  606. return result