safety.py 23 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/07/25
  11. @Last Modified: 2023/07/26
  12. @Summary: safe metrics
  13. """
  14. import math
  15. import numpy as np
  16. import pandas as pd
  17. from modules.lib.score import Score
  18. from modules.config import config
  19. from collections import defaultdict
  20. import scipy.integrate as spi
  21. class Safe(object):
  22. def __init__(self, data_processed):
  23. # self.logger = log.get_logger() # 使用时再初始化
  24. self.data_processed = data_processed
  25. self.df = data_processed.object_df.copy()
  26. self.ego_df = data_processed.ego_data
  27. self.obj_id_list = data_processed.obj_id_list
  28. self.metric_list = [
  29. "TTC",
  30. "MTTC",
  31. "THW",
  32. "LatSD",
  33. "BTN",
  34. "collisionRisk",
  35. "collisionSeverity",
  36. ]
  37. # score data
  38. self.calculated_value = {
  39. "TTC": 1.0,
  40. "MTTC": 1.0,
  41. "THW": 1.0,
  42. # 'LonSD': 0.0,
  43. "LatSD": 0.0,
  44. # 'DRAC': 0.0,
  45. "BTN": 1.0,
  46. # 'STN': 1.0,
  47. "collisionRisk": 0.0,
  48. "collisionSeverity": 0.0,
  49. }
  50. # lists of drving control info
  51. self.time_list = self.ego_df["simTime"].values.tolist()
  52. self.frame_list = self.ego_df["simFrame"].values.tolist()
  53. self.collisionRisk = 0
  54. self.empty_flag = True
  55. # no car following scene
  56. if len(self.obj_id_list) > 1:
  57. self.unsafe_df = pd.DataFrame(
  58. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  59. )
  60. self.unsafe_time_df = pd.DataFrame(
  61. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  62. )
  63. self.unsafe_dist_df = pd.DataFrame(
  64. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  65. )
  66. # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  67. self.unsafe_acce_drac_df = pd.DataFrame(
  68. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  69. )
  70. self.unsafe_acce_xtn_df = pd.DataFrame(
  71. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  72. )
  73. self.unsafe_prob_df = pd.DataFrame(
  74. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  75. )
  76. self.most_dangerous = {}
  77. self.pass_percent = {}
  78. self._safe_param_cal_new()
  79. def _safe_param_cal_new(self):
  80. Tc = 0.3 # 安全距离
  81. rho = Tc # 驾驶员制动反应时间(单位:秒)
  82. ego_accel_max = self.data_processed.vehicle_config[
  83. "EGO_ACCEL_MAX"
  84. ] # 自车油门最大加速度
  85. obj_decel_max = self.data_processed.vehicle_config[
  86. "OBJ_DECEL_MAX"
  87. ] # 前车刹车最大减速度
  88. ego_decel_min = self.data_processed.vehicle_config[
  89. "EGO_DECEL_MIN"
  90. ] # 自车刹车最小减速度 ug
  91. ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
  92. ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
  93. # 构建双层字典数据结构
  94. obj_dict = defaultdict(dict)
  95. # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对
  96. obj_data_dict = self.df.to_dict("records")
  97. for item in obj_data_dict:
  98. obj_dict[item["simFrame"]][item["playerId"]] = item
  99. df_list = []
  100. EGO_PLAYER_ID = 1
  101. # self.empty_flag = True
  102. for frame_num in self.frame_list:
  103. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  104. v1 = ego_data["v"] # km/h to m/s
  105. x1 = ego_data["posX"]
  106. y1 = ego_data["posY"]
  107. h1 = ego_data["posH"]
  108. # len1 = ego_data['dimX']
  109. # width1 = ego_data['dimY']
  110. # o_x1 = ego_data['offX']
  111. v_x1 = ego_data["speedX"] # km/h to m/s
  112. v_y1 = ego_data["speedY"] # km/h to m/s
  113. a_x1 = ego_data["accelX"]
  114. a_y1 = ego_data["accelY"]
  115. # a1 = ego_data['accel']
  116. for playerId in self.obj_id_list:
  117. if playerId == EGO_PLAYER_ID:
  118. continue
  119. try:
  120. obj_data = obj_dict[frame_num][playerId]
  121. except KeyError:
  122. continue
  123. x2 = obj_data["posX"]
  124. y2 = obj_data["posY"]
  125. dist = self.dist(x1, y1, x2, y2)
  126. obj_data["dist"] = dist
  127. v_x2 = obj_data["speedX"] # km/h to m/s
  128. v_y2 = obj_data["speedY"] # km/h to m/s
  129. v2 = obj_data["v"] # km/h to m/s
  130. # h2 = obj_data['posH']
  131. # len2 = obj_data['dimX']
  132. # width2 = obj_data['dimY']
  133. # o_x2 = obj_data['offX']
  134. a_x2 = obj_data["accelX"]
  135. a_y2 = obj_data["accelY"]
  136. # a2 = obj_data['accel']
  137. dx, dy = x2 - x1, y2 - y1
  138. # 定义矢量A和x轴正向向量x
  139. A = np.array([dx, dy])
  140. x = np.array([1, 0])
  141. # 计算点积和向量长度
  142. dot_product = np.dot(A, x)
  143. vector_length_A = np.linalg.norm(A)
  144. vector_length_x = np.linalg.norm(x)
  145. # 计算夹角的余弦值
  146. cos_theta = dot_product / (vector_length_A * vector_length_x)
  147. # 将余弦值转换为角度值(弧度制)
  148. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  149. lon_d = dist * math.cos(beta - h1)
  150. lat_d = abs(
  151. dist * math.sin(beta - h1)
  152. ) # 需要增加左正右负的判断,但beta取值为[0,pi)
  153. obj_dict[frame_num][playerId]["lon_d"] = lon_d
  154. obj_dict[frame_num][playerId]["lat_d"] = lat_d
  155. # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离
  156. # print(f"lon_d: {lon_d}, lat_d: {lat_d}")
  157. if lon_d > 100 or lon_d < -5 or lat_d > 4:
  158. continue
  159. self.empty_flag = False
  160. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  161. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  162. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  163. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  164. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  165. arel_projection_in_dist = self._cal_a_projection(
  166. dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  167. )
  168. obj_dict[frame_num][playerId][
  169. "vrel_projection_in_dist"
  170. ] = vrel_projection_in_dist
  171. obj_dict[frame_num][playerId][
  172. "arel_projection_in_dist"
  173. ] = arel_projection_in_dist
  174. obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p
  175. obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p
  176. obj_type = obj_data["type"]
  177. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  178. MTTC = self._cal_MTTC(
  179. TTC, vrel_projection_in_dist, arel_projection_in_dist
  180. )
  181. THW = self._cal_THW(dist, v_ego_p)
  182. # 单车道时可用
  183. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  184. # obj_decel_max)
  185. lat_dist = 0.5
  186. v_right = v1
  187. v_left = v2
  188. a_right_lat_brake_min = 1
  189. a_left_lat_brake_min = 1
  190. a_lat_max = 5
  191. LatSD = self._cal_lateral_safe_dist(
  192. lat_dist,
  193. v_right,
  194. v_left,
  195. rho,
  196. a_right_lat_brake_min,
  197. a_left_lat_brake_min,
  198. a_lat_max,
  199. )
  200. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  201. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  202. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  203. lon_a = abs(lon_a1 - lon_a2)
  204. lon_d = dist * abs(math.cos(beta - h1))
  205. lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  206. BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  207. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  208. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  209. lat_a = abs(lat_a1 - lat_a2)
  210. lat_d = dist * abs(math.sin(beta - h1))
  211. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  212. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  213. obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2
  214. obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2
  215. TTC = None if (not TTC or TTC < 0) else TTC
  216. MTTC = None if (not MTTC or MTTC < 0) else MTTC
  217. THW = None if (not THW or THW < 0) else THW
  218. # 原DRAC计算已被注释,需移除相关代码引用
  219. # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行
  220. # 修改后的指标赋值
  221. obj_dict[frame_num][playerId]["TTC"] = TTC
  222. obj_dict[frame_num][playerId]["MTTC"] = MTTC
  223. obj_dict[frame_num][playerId]["THW"] = THW
  224. # obj_dict[frame_num][playerId]['LonSD'] = LonSD
  225. obj_dict[frame_num][playerId]["LatSD"] = LatSD
  226. # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行
  227. obj_dict[frame_num][playerId]["BTN"] = abs(BTN)
  228. # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行
  229. # 确保以下变量已定义
  230. collisionSeverity = (
  231. collisionSeverity if "collisionSeverity" in locals() else 0
  232. )
  233. pr_death = pr_death if "pr_death" in locals() else 0
  234. collisionRisk = collisionRisk if "collisionRisk" in locals() else 0
  235. obj_dict[frame_num][playerId]["collisionSeverity"] = (
  236. collisionSeverity * 100
  237. )
  238. obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100
  239. obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100
  240. # print(
  241. # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}"
  242. # )
  243. df_fnum = pd.DataFrame(obj_dict[frame_num].values())
  244. df_list.append(df_fnum)
  245. df_safe = pd.concat(df_list)
  246. col_list = [
  247. "simTime",
  248. "simFrame",
  249. "playerId",
  250. "TTC",
  251. "MTTC",
  252. "THW",
  253. "LatSD",
  254. "BTN", # 移除STN
  255. "collisionSeverity",
  256. "pr_death",
  257. "collisionRisk",
  258. ]
  259. if not self.empty_flag:
  260. self.df = df_safe[col_list].reset_index(drop=True)
  261. # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df,
  262. # 需要增加如果是空的情况下,self.df_safe的值为默认值。
  263. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  264. # 计算 AB 连线的向量 AB
  265. # dx = x2 - x1
  266. # dy = y2 - y1
  267. # 计算 AB 连线的模长 |AB|
  268. AB_mod = math.sqrt(dx**2 + dy**2)
  269. # 计算 AB 连线的单位向量 U_AB
  270. U_ABx = dx / AB_mod
  271. U_ABy = dy / AB_mod
  272. # 计算 A 在 AB 连线上的速度 V1_on_AB
  273. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  274. return V1_on_AB
  275. def _cal_v_projection(self, dx, dy, vx, vy):
  276. # 计算 AB 连线的向量 AB
  277. # dx = x2 - x1
  278. # dy = y2 - y1
  279. # 计算 AB 连线的模长 |AB|
  280. AB_mod = math.sqrt(dx**2 + dy**2)
  281. # 计算 AB 连线的单位向量 U_AB
  282. U_ABx = dx / AB_mod
  283. U_ABy = dy / AB_mod
  284. # 计算 A 相对于 B 的速度 V_relative
  285. # vx = vx1 - vx2
  286. # vy = vy1 - vy2
  287. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  288. V_on_AB = vx * U_ABx + vy * U_ABy
  289. return V_on_AB
  290. def _cal_a_projection(
  291. self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  292. ):
  293. # 计算 AB 连线的向量 AB
  294. # dx = x2 - x1
  295. # dy = y2 - y1
  296. # 计算 θ
  297. V_mod = math.sqrt(vx**2 + vy**2)
  298. AB_mod = math.sqrt(dx**2 + dy**2)
  299. if V_mod == 0 or AB_mod == 0:
  300. return 0
  301. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  302. theta = math.acos(cos_theta)
  303. # 计算 AB 连线的模长 |AB|
  304. AB_mod = math.sqrt(dx**2 + dy**2)
  305. # 计算 AB 连线的单位向量 U_AB
  306. U_ABx = dx / AB_mod
  307. U_ABy = dy / AB_mod
  308. # 计算 A 相对于 B 的加速度 a_relative
  309. # ax = ax1 - ax2
  310. # ay = ay1 - ay2
  311. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  312. a_on_AB = ax * U_ABx + ay * U_ABy
  313. VA = np.array([v_x1, v_y1])
  314. VB = np.array([v_x2, v_y2])
  315. D_A = np.array([x1, y1])
  316. D_B = np.array([x2, y2])
  317. V_r = VA - VB
  318. V = np.linalg.norm(V_r)
  319. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  320. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  321. return a_on_AB_back
  322. # 计算相对加速度
  323. def _calculate_derivative(self, a, w, V, theta):
  324. # 计算(V×cos(θ))'的值
  325. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  326. derivative = a - w * V * math.sin(theta)
  327. return derivative
  328. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  329. dx = A[0] - B[0]
  330. dy = A[1] - B[1]
  331. dvx = VA[0] - VB[0]
  332. dvy = VA[1] - VB[1]
  333. # (dx * dvy - dy * dvx)
  334. angular_velocity = (
  335. math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2)
  336. )
  337. return angular_velocity
  338. def _death_pr(self, obj_type, v_relative):
  339. if obj_type == 5:
  340. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  341. else:
  342. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  343. return p_death
  344. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  345. if obj_type == 5:
  346. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  347. else:
  348. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  349. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  350. return collisionRisk
  351. # 求两车之间当前距离
  352. def dist(self, x1, y1, x2, y2):
  353. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  354. return dist
  355. # TTC (time to collision)
  356. def _cal_TTC(self, dist, vrel_projection_in_dist):
  357. if vrel_projection_in_dist == 0:
  358. return math.inf
  359. TTC = dist / vrel_projection_in_dist
  360. return TTC
  361. def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
  362. MTTC = math.nan
  363. if arel_projection_in_dist != 0:
  364. tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist
  365. if tmp < 0:
  366. return math.nan
  367. t1 = (
  368. -1 * vrel_projection_in_dist - math.sqrt(tmp)
  369. ) / arel_projection_in_dist
  370. t2 = (
  371. -1 * vrel_projection_in_dist + math.sqrt(tmp)
  372. ) / arel_projection_in_dist
  373. if t1 > 0 and t2 > 0:
  374. if t1 >= t2:
  375. MTTC = t2
  376. elif t1 < t2:
  377. MTTC = t1
  378. elif t1 > 0 and t2 <= 0:
  379. MTTC = t1
  380. elif t1 <= 0 and t2 > 0:
  381. MTTC = t2
  382. if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
  383. MTTC = dist / vrel_projection_in_dist
  384. return MTTC
  385. # THW (time headway)
  386. def _cal_THW(self, dist, v_ego_projection_in_dist):
  387. if not v_ego_projection_in_dist:
  388. THW = None
  389. else:
  390. THW = dist / v_ego_projection_in_dist
  391. return THW
  392. def velocity(self, v_x, v_y):
  393. v = math.sqrt(v_x**2 + v_y**2) * 3.6
  394. return v
  395. def _cal_longitudinal_safe_dist(
  396. self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max
  397. ):
  398. lon_dist_min = (
  399. v_ego_p * rho
  400. + ego_accel_max * (rho**2) / 2
  401. + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min)
  402. - v_obj_p**2 / (2 * ego_decel_max)
  403. )
  404. return lon_dist_min
  405. def _cal_lateral_safe_dist(
  406. self,
  407. lat_dist,
  408. v_right,
  409. v_left,
  410. rho,
  411. a_right_lat_brake_min,
  412. a_left_lat_brake_min,
  413. a_lat_max,
  414. ):
  415. v_right_rho = v_right + rho * a_lat_max
  416. v_left_rho = v_left + rho * a_lat_max
  417. dist_min = lat_dist + (
  418. (v_right + v_right_rho) * rho / 2
  419. + v_right_rho**2 / a_right_lat_brake_min / 2
  420. + ((v_left + v_right_rho) * rho / 2)
  421. + v_left_rho**2 / a_left_lat_brake_min / 2
  422. )
  423. return dist_min
  424. # DRAC (decelerate required avoid collision)
  425. def _cal_DRAC(
  426. self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2
  427. ):
  428. dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
  429. if dist_length < 0:
  430. dist_width = dist - (width2 / 2 + width1 / 2)
  431. if dist_width < 0:
  432. return math.inf
  433. else:
  434. d = dist_width
  435. else:
  436. d = dist_length
  437. DRAC = vrel_projection_in_dist**2 / (2 * d)
  438. return DRAC
  439. # BTN (brake threat number)
  440. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  441. BTN = (
  442. lon_a1 + lon_a - lon_v**2 / (2 * lon_d)
  443. ) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  444. return BTN
  445. # STN (steer threat number)
  446. def _cal_STN_new(
  447. self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2
  448. ):
  449. STN = (
  450. lat_a1
  451. + lat_a
  452. + 2
  453. / ttc**2
  454. * (
  455. lat_d
  456. + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2
  457. + abs(lat_v * ttc)
  458. )
  459. ) / ego_decel_lat_max
  460. return STN
  461. # BTN (brake threat number)
  462. def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
  463. BTN = (
  464. a_y1 + ay - vy**2 / (2 * dy)
  465. ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  466. return BTN
  467. # STN (steer threat number)
  468. def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
  469. STN = (
  470. a_x1
  471. + ax
  472. + 2
  473. / ttc**2
  474. * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)
  475. ) / max_ax
  476. return STN
  477. # 追尾碰撞风险
  478. def _normal_distribution(self, x):
  479. mean = 1.32
  480. std_dev = 0.26
  481. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(
  482. -0.5 * (x - mean) ** 2 / std_dev
  483. )
  484. def continuous_group(self, df):
  485. time_list = df["simTime"].values.tolist()
  486. frame_list = df["simFrame"].values.tolist()
  487. group_time = []
  488. group_frame = []
  489. sub_group_time = []
  490. sub_group_frame = []
  491. for i in range(len(frame_list)):
  492. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  493. sub_group_time.append(time_list[i])
  494. sub_group_frame.append(frame_list[i])
  495. else:
  496. group_time.append(sub_group_time)
  497. group_frame.append(sub_group_frame)
  498. sub_group_time = [time_list[i]]
  499. sub_group_frame = [frame_list[i]]
  500. group_time.append(sub_group_time)
  501. group_frame.append(sub_group_frame)
  502. group_time = [g for g in group_time if len(g) >= 2]
  503. group_frame = [g for g in group_frame if len(g) >= 2]
  504. # 输出图表值
  505. time = [[g[0], g[-1]] for g in group_time]
  506. frame = [[g[0], g[-1]] for g in group_frame]
  507. unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"])
  508. unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"])
  509. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  510. return unfunc_df
  511. # 统计最危险的指标
  512. def _safe_statistic_most_dangerous(self):
  513. min_list = ["TTC", "MTTC", "THW", "LatSD"]
  514. max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN
  515. for metric in min_list:
  516. if metric in self.metric_list:
  517. if metric in self.df.columns:
  518. self.calculated_value[metric] = self.df[metric].min()
  519. else:
  520. self.calculated_value[metric] = 10.0
  521. for metric in max_list:
  522. if metric in self.metric_list:
  523. if metric in self.df.columns:
  524. self.calculated_value[metric] = self.df[metric].max()
  525. else:
  526. self.calculated_value[metric] = 10.0
  527. return self.calculated_value
  528. def _safe_no_obj_statistic(self):
  529. # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
  530. return self.calculated_value
  531. def report_statistic(
  532. self,
  533. ):
  534. safety_result = {}
  535. if len(self.obj_id_list) == 1:
  536. safety_result = self._safe_no_obj_statistic()
  537. else:
  538. safety_result = self._safe_statistic_most_dangerous()
  539. evaluator = Score(self.data_processed.safety_config)
  540. result = evaluator.evaluate(safety_result)
  541. # self.logger.info(f"安全性表现及得分情况:[{result}]")
  542. print("\n[安全性表现及得分情况]")
  543. return result