safety.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/07/25
  11. @Last Modified: 2023/07/26
  12. @Summary: safe metrics
  13. """
  14. import math
  15. import numpy as np
  16. import pandas as pd
  17. from modules.lib.score import Score
  18. from modules.config import config
  19. from collections import defaultdict
  20. import scipy.integrate as spi
  21. class Safe(object):
  22. def __init__(self, data_processed):
  23. # self.logger = log.get_logger() # 使用时再初始化
  24. self.data_processed = data_processed
  25. self.df = data_processed.object_df.copy()
  26. self.ego_df = data_processed.ego_data.copy()
  27. self.obj_id_list = data_processed.obj_id_list
  28. self.metric_list = [
  29. "TTC",
  30. "MTTC",
  31. "THW",
  32. "LatSD",
  33. "BTN",
  34. "collisionRisk",
  35. "collisionSeverity",
  36. ]
  37. # score data
  38. self.calculated_value = {
  39. "TTC": 10.0,
  40. "MTTC": 10.0,
  41. "THW": 10.0,
  42. # 'LonSD': 0.0,
  43. "LatSD": 3.0,
  44. # 'DRAC': 0.0,
  45. "BTN": 1.0,
  46. # 'STN': 1.0,
  47. "collisionRisk": 0.0,
  48. "collisionSeverity": 0.0,
  49. }
  50. # lists of drving control info
  51. self.time_list = self.ego_df["simTime"].values.tolist()
  52. self.frame_list = self.ego_df["simFrame"].values.tolist()
  53. self.collisionRisk = 0
  54. self.empty_flag = True
  55. # no car following scene
  56. if len(self.obj_id_list) > 1:
  57. self.unsafe_df = pd.DataFrame(
  58. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  59. )
  60. self.unsafe_time_df = pd.DataFrame(
  61. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  62. )
  63. self.unsafe_dist_df = pd.DataFrame(
  64. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  65. )
  66. # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  67. self.unsafe_acce_drac_df = pd.DataFrame(
  68. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  69. )
  70. self.unsafe_acce_xtn_df = pd.DataFrame(
  71. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  72. )
  73. self.unsafe_prob_df = pd.DataFrame(
  74. columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
  75. )
  76. self.most_dangerous = {}
  77. self.pass_percent = {}
  78. self._safe_param_cal_new()
  79. def _safe_param_cal_new(self):
  80. Tc = 0.3 # 安全距离
  81. rho = Tc # 驾驶员制动反应时间(单位:秒)
  82. ego_accel_max = self.data_processed.vehicle_config[
  83. "EGO_ACCEL_MAX"
  84. ] # 自车油门最大加速度
  85. obj_decel_max = self.data_processed.vehicle_config[
  86. "OBJ_DECEL_MAX"
  87. ] # 前车刹车最大减速度
  88. ego_decel_min = self.data_processed.vehicle_config[
  89. "EGO_DECEL_MIN"
  90. ] # 自车刹车最小减速度 ug
  91. ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
  92. ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
  93. # 构建双层字典数据结构
  94. obj_dict = defaultdict(dict)
  95. # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对
  96. obj_data_dict = self.df.to_dict("records")
  97. for item in obj_data_dict:
  98. obj_dict[item["simFrame"]][item["playerId"]] = item
  99. df_list = []
  100. EGO_PLAYER_ID = 1
  101. # self.empty_flag = True
  102. for frame_num in self.frame_list:
  103. ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  104. v1 = ego_data["v"]
  105. x1 = ego_data["posX"]
  106. y1 = ego_data["posY"]
  107. h1 = ego_data["posH"]
  108. # 将角度转换为弧度
  109. # len1 = ego_data['dimX']
  110. # width1 = ego_data['dimY']
  111. # o_x1 = ego_data['offX']
  112. v_x1 = ego_data["speedX"]
  113. v_y1 = ego_data["speedY"]
  114. a_x1 = ego_data["accelX"]
  115. a_y1 = ego_data["accelY"]
  116. # a1 = ego_data['accel']
  117. for playerId in self.obj_id_list:
  118. if playerId == EGO_PLAYER_ID:
  119. continue
  120. try:
  121. obj_data = obj_dict[frame_num][playerId]
  122. except KeyError:
  123. continue
  124. x2 = obj_data["posX"]
  125. y2 = obj_data["posY"]
  126. dist = self.dist(x1, y1, x2, y2)
  127. obj_data["dist"] = dist
  128. v_x2 = obj_data["speedX"] # m/s
  129. v_y2 = obj_data["speedY"] # m/s
  130. v2 = obj_data["v"] # m/s
  131. # h2 = obj_data['posH']
  132. # len2 = obj_data['dimX']
  133. # width2 = obj_data['dimY']
  134. # o_x2 = obj_data['offX']
  135. a_x2 = obj_data["accelX"]
  136. a_y2 = obj_data["accelY"]
  137. # a2 = obj_data['accel']
  138. dx, dy = x2 - x1, y2 - y1
  139. # 计算目标车相对于自车的方位角
  140. beta = math.atan2(dy, dx) # 添加beta的计算
  141. # 将全局坐标系下的相对位置向量转换到自车坐标系
  142. # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
  143. h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
  144. # 坐标变换
  145. lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
  146. lat_d = -dx * math.sin(h1_rad) + dy * math.cos(h1_rad) # 横向距离(左为正,右为负)
  147. # 添加调试信息
  148. # if frame_num == self.frame_list[-1]: # 判断是否是最后一帧
  149. # print(f"最后一帧数据 frame_num={frame_num}:")
  150. # print(f"目标车ID: {playerId}")
  151. # print(f"自车位置: ({x1:.2f}, {y1:.2f}), 朝向角: {h1:.2f}度")
  152. # print(f"目标车位置: ({x2:.2f}, {y2:.2f})")
  153. # print(f"相对距离: dx={dx:.2f}, dy={dy:.2f}")
  154. # print(f"纵向距离: lon_d={lon_d:.2f}")
  155. # print(f"横向距离: lat_d={lat_d:.2f}")
  156. # print("------------------------")
  157. obj_dict[frame_num][playerId]["lon_d"] = lon_d
  158. obj_dict[frame_num][playerId]["lat_d"] = lat_d
  159. # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离
  160. # print(f"lon_d: {lon_d}, lat_d: {lat_d}")
  161. if lon_d > 100 or lon_d < -5 or abs(lat_d) > 4:
  162. continue
  163. self.empty_flag = False
  164. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  165. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  166. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  167. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  168. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  169. arel_projection_in_dist = self._cal_a_projection(
  170. dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  171. )
  172. obj_dict[frame_num][playerId][
  173. "vrel_projection_in_dist"
  174. ] = vrel_projection_in_dist
  175. obj_dict[frame_num][playerId][
  176. "arel_projection_in_dist"
  177. ] = arel_projection_in_dist
  178. obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p
  179. obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p
  180. obj_type = obj_data["type"]
  181. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  182. MTTC = self._cal_MTTC(
  183. TTC, vrel_projection_in_dist, arel_projection_in_dist
  184. )
  185. THW = self._cal_THW(dist, v_ego_p)
  186. # 单车道时可用
  187. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  188. # obj_decel_max)
  189. lat_dist = 0.5
  190. v_right = v1
  191. v_left = v2
  192. a_right_lat_brake_min = 1
  193. a_left_lat_brake_min = 1
  194. a_lat_max = 5
  195. LatSD = self._cal_lateral_safe_dist(
  196. lat_dist,
  197. v_right,
  198. v_left,
  199. rho,
  200. a_right_lat_brake_min,
  201. a_left_lat_brake_min,
  202. a_lat_max,
  203. )
  204. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  205. # 将加速度和速度投影到车辆坐标系
  206. h1_rad_for_projection = math.radians(90 - h1) # 与L175保持一致
  207. # 计算纵向加速度
  208. lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection)
  209. lon_a2 = a_x2 * math.cos(h1_rad_for_projection) + a_y2 * math.sin(h1_rad_for_projection)
  210. lon_a = abs(lon_a1 - lon_a2)
  211. # 使用已计算的纵向距离或重新计算
  212. # 选项1: 使用已计算的lon_d (推荐,保持一致性)
  213. # lon_d = obj_dict[frame_num][playerId]["lon_d"]
  214. # 选项2: 重新计算,确保与L178一致
  215. # lon_d = dx * math.cos(h1_rad_for_projection) + dy * math.sin(h1_rad_for_projection)
  216. # 选项3: 使用简化公式,但确保角度单位一致
  217. lon_d = max(dist * abs(math.cos(beta - h1_rad_for_projection)), 0.1) # 设置最小值避免除零
  218. # 计算纵向速度
  219. lon_v = v_x1 * math.cos(h1_rad_for_projection) + v_y1 * math.sin(h1_rad_for_projection)
  220. # 计算BTN
  221. BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  222. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  223. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  224. lat_a = abs(lat_a1 - lat_a2)
  225. lat_d = dist * abs(math.sin(beta - h1_rad))
  226. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  227. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  228. obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2
  229. obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2
  230. TTC = None if (not TTC or TTC < 0) else TTC
  231. MTTC = None if (not MTTC or MTTC < 0) else MTTC
  232. THW = None if (not THW or THW < 0) else THW
  233. # print(f'TTC: {TTC if TTC is None else f"{TTC:.2f}"}, '
  234. # f'MTTC: {MTTC if MTTC is None else f"{MTTC:.2f}"}, '
  235. # f'THW: {THW if THW is None else f"{THW:.2f}"}')
  236. # 原DRAC计算已被注释,需移除相关代码引用
  237. # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行
  238. # 修改后的指标赋值
  239. obj_dict[frame_num][playerId]["TTC"] = TTC
  240. obj_dict[frame_num][playerId]["MTTC"] = MTTC
  241. obj_dict[frame_num][playerId]["THW"] = THW
  242. # obj_dict[frame_num][playerId]['LonSD'] = LonSD
  243. obj_dict[frame_num][playerId]["LatSD"] = LatSD
  244. # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行
  245. obj_dict[frame_num][playerId]["BTN"] = abs(BTN)
  246. # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行
  247. # 确保以下变量已定义
  248. collisionSeverity = (
  249. collisionSeverity if "collisionSeverity" in locals() else 0
  250. )
  251. pr_death = pr_death if "pr_death" in locals() else 0
  252. collisionRisk = collisionRisk if "collisionRisk" in locals() else 0
  253. obj_dict[frame_num][playerId]["collisionSeverity"] = (
  254. collisionSeverity * 100
  255. )
  256. obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100
  257. obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100
  258. # print(
  259. # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}"
  260. # )
  261. df_fnum = pd.DataFrame(obj_dict[frame_num].values())
  262. df_list.append(df_fnum)
  263. df_safe = pd.concat(df_list)
  264. col_list = [
  265. "simTime",
  266. "simFrame",
  267. "playerId",
  268. "TTC",
  269. "MTTC",
  270. "THW",
  271. "LatSD",
  272. "BTN", # 移除STN
  273. "collisionSeverity",
  274. "pr_death",
  275. "collisionRisk",
  276. ]
  277. if not self.empty_flag:
  278. self.df = df_safe[col_list].reset_index(drop=True)
  279. # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df,
  280. # 需要增加如果是空的情况下,self.df_safe的值为默认值。
  281. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  282. # 计算 AB 连线的向量 AB
  283. # dx = x2 - x1
  284. # dy = y2 - y1
  285. # 计算 AB 连线的模长 |AB|
  286. AB_mod = math.sqrt(dx**2 + dy**2)
  287. # 计算 AB 连线的单位向量 U_AB
  288. U_ABx = dx / AB_mod
  289. U_ABy = dy / AB_mod
  290. # 计算 A 在 AB 连线上的速度 V1_on_AB
  291. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  292. return V1_on_AB
  293. def _cal_v_projection(self, dx, dy, vx, vy):
  294. # 计算 AB 连线的向量 AB
  295. # dx = x2 - x1
  296. # dy = y2 - y1
  297. # 计算 AB 连线的模长 |AB|
  298. AB_mod = math.sqrt(dx**2 + dy**2)
  299. # 计算 AB 连线的单位向量 U_AB
  300. U_ABx = dx / AB_mod
  301. U_ABy = dy / AB_mod
  302. # 计算 A 相对于 B 的速度 V_relative
  303. # vx = vx1 - vx2
  304. # vy = vy1 - vy2
  305. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  306. V_on_AB = vx * U_ABx + vy * U_ABy
  307. # print(f'_cal_v_projection:{V_on_AB}')
  308. return V_on_AB
  309. def _cal_a_projection(
  310. self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
  311. ):
  312. # 计算 AB 连线的向量 AB
  313. # dx = x2 - x1
  314. # dy = y2 - y1
  315. # 计算 θ
  316. V_mod = math.sqrt(vx**2 + vy**2)
  317. AB_mod = math.sqrt(dx**2 + dy**2)
  318. if V_mod == 0 or AB_mod == 0:
  319. return 0
  320. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  321. theta = math.acos(cos_theta)
  322. # 计算 AB 连线的模长 |AB|
  323. AB_mod = math.sqrt(dx**2 + dy**2)
  324. # 计算 AB 连线的单位向量 U_AB
  325. U_ABx = dx / AB_mod
  326. U_ABy = dy / AB_mod
  327. # 计算 A 相对于 B 的加速度 a_relative
  328. # ax = ax1 - ax2
  329. # ay = ay1 - ay2
  330. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  331. a_on_AB = ax * U_ABx + ay * U_ABy
  332. VA = np.array([v_x1, v_y1])
  333. VB = np.array([v_x2, v_y2])
  334. D_A = np.array([x1, y1])
  335. D_B = np.array([x2, y2])
  336. V_r = VA - VB
  337. V = np.linalg.norm(V_r)
  338. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  339. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  340. return a_on_AB_back
  341. # 计算相对加速度
  342. def _calculate_derivative(self, a, w, V, theta):
  343. # 计算(V×cos(θ))'的值
  344. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  345. derivative = a - w * V * math.sin(theta)
  346. return derivative
  347. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  348. dx = A[0] - B[0]
  349. dy = A[1] - B[1]
  350. dvx = VA[0] - VB[0]
  351. dvy = VA[1] - VB[1]
  352. # (dx * dvy - dy * dvx)
  353. angular_velocity = (
  354. math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2)
  355. )
  356. return angular_velocity
  357. def _death_pr(self, obj_type, v_relative):
  358. if obj_type == 5:
  359. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  360. else:
  361. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  362. return p_death
  363. def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  364. if obj_type == 5:
  365. p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  366. else:
  367. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  368. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  369. return collisionRisk
  370. # 求两车之间当前距离
  371. def dist(self, x1, y1, x2, y2):
  372. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  373. return dist
  374. # TTC (time to collision)
  375. def _cal_TTC(self, dist, vrel_projection_in_dist):
  376. # 如果相对速度接近0,视为相对静止或无法计算TTC
  377. if abs(vrel_projection_in_dist) < 0.02: # 使用之前讨论过的阈值判断
  378. return self.calculated_value["TTC"] # 返回默认值10.0
  379. TTC = dist / vrel_projection_in_dist
  380. # 如果计算结果为负,表示正在远离,也视为安全
  381. if TTC < 0:
  382. return self.calculated_value["TTC"] # 返回默认值10.0
  383. return TTC
  384. def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
  385. if arel_projection_in_dist != 0:
  386. tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist
  387. if tmp < 0:
  388. return self.calculated_value["MTTC"] # 返回默认值10.0
  389. t1 = (
  390. -1 * vrel_projection_in_dist - math.sqrt(tmp)
  391. ) / arel_projection_in_dist
  392. t2 = (
  393. -1 * vrel_projection_in_dist + math.sqrt(tmp)
  394. ) / arel_projection_in_dist
  395. if t1 > 0 and t2 > 0:
  396. if t1 >= t2:
  397. MTTC = t2
  398. elif t1 < t2:
  399. MTTC = t1
  400. elif t1 > 0 and t2 <= 0:
  401. MTTC = t1
  402. elif t1 <= 0 and t2 > 0:
  403. MTTC = t2
  404. else:
  405. return self.calculated_value["MTTC"] # 返回默认值10.0
  406. elif arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
  407. MTTC = dist / vrel_projection_in_dist
  408. else:
  409. return self.calculated_value["MTTC"] # 返回默认值10.0
  410. return MTTC
  411. # THW (time headway)
  412. def _cal_THW(self, dist, v_ego_projection_in_dist):
  413. if not v_ego_projection_in_dist or abs(v_ego_projection_in_dist) < 0.02:
  414. return self.calculated_value["THW"] # 返回默认值10.0
  415. THW = dist / v_ego_projection_in_dist
  416. if THW < 0:
  417. return self.calculated_value["THW"] # 返回默认值10.0
  418. return THW
  419. def velocity(self, v_x, v_y):
  420. v = math.sqrt(v_x**2 + v_y**2) * 3.6
  421. return v
  422. def _cal_longitudinal_safe_dist(
  423. self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max
  424. ):
  425. lon_dist_min = (
  426. v_ego_p * rho
  427. + ego_accel_max * (rho**2) / 2
  428. + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min)
  429. - v_obj_p**2 / (2 * ego_decel_max)
  430. )
  431. return lon_dist_min
  432. def _cal_lateral_safe_dist(
  433. self,
  434. lat_dist,
  435. v_right,
  436. v_left,
  437. rho,
  438. a_right_lat_brake_min,
  439. a_left_lat_brake_min,
  440. a_lat_max,
  441. ):
  442. # 检查除数是否为零
  443. if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
  444. return self.calculated_value["LatSD"] # 返回默认值3.0
  445. v_right_rho = v_right + rho * a_lat_max
  446. v_left_rho = v_left + rho * a_lat_max
  447. dist_min = lat_dist + (
  448. (v_right + v_right_rho) * rho / 2
  449. + v_right_rho**2 / a_right_lat_brake_min / 2
  450. + ((v_left + v_right_rho) * rho / 2)
  451. + v_left_rho**2 / a_left_lat_brake_min / 2
  452. )
  453. return dist_min
  454. # DRAC (decelerate required avoid collision)
  455. def _cal_DRAC(
  456. self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2
  457. ):
  458. dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
  459. if dist_length < 0:
  460. dist_width = dist - (width2 / 2 + width1 / 2)
  461. if dist_width < 0:
  462. return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
  463. else:
  464. d = dist_width
  465. else:
  466. d = dist_length
  467. # 避免除零
  468. if d <= 0:
  469. return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
  470. DRAC = vrel_projection_in_dist**2 / (2 * d)
  471. return DRAC
  472. # BTN (brake threat number)
  473. def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
  474. # 安全检查,避免除零错误
  475. if lon_d <= 0 or ego_decel_lon_max == 0:
  476. return self.calculated_value["BTN"] # 返回默认值1.0
  477. BTN = (
  478. lon_a1 + lon_a - lon_v**2 / (2 * lon_d)
  479. ) / ego_decel_lon_max
  480. return BTN
  481. # STN (steer threat number)
  482. def _cal_STN_new(
  483. self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2
  484. ):
  485. STN = (
  486. lat_a1
  487. + lat_a
  488. + 2
  489. / ttc**2
  490. * (
  491. lat_d
  492. + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2
  493. + abs(lat_v * ttc)
  494. )
  495. ) / ego_decel_lat_max
  496. return STN
  497. # BTN (brake threat number)
  498. def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
  499. BTN = (
  500. a_y1 + ay - vy**2 / (2 * dy)
  501. ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
  502. return BTN
  503. # STN (steer threat number)
  504. def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
  505. STN = (
  506. a_x1
  507. + ax
  508. + 2
  509. / ttc**2
  510. * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)
  511. ) / max_ax
  512. return STN
  513. # 追尾碰撞风险
  514. def _normal_distribution(self, x):
  515. mean = 1.32
  516. std_dev = 0.26
  517. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(
  518. -0.5 * (x - mean) ** 2 / std_dev
  519. )
  520. def continuous_group(self, df):
  521. time_list = df["simTime"].values.tolist()
  522. frame_list = df["simFrame"].values.tolist()
  523. group_time = []
  524. group_frame = []
  525. sub_group_time = []
  526. sub_group_frame = []
  527. for i in range(len(frame_list)):
  528. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  529. sub_group_time.append(time_list[i])
  530. sub_group_frame.append(frame_list[i])
  531. else:
  532. group_time.append(sub_group_time)
  533. group_frame.append(sub_group_frame)
  534. sub_group_time = [time_list[i]]
  535. sub_group_frame = [frame_list[i]]
  536. group_time.append(sub_group_time)
  537. group_frame.append(sub_group_frame)
  538. group_time = [g for g in group_time if len(g) >= 2]
  539. group_frame = [g for g in group_frame if len(g) >= 2]
  540. # 输出图表值
  541. time = [[g[0], g[-1]] for g in group_time]
  542. frame = [[g[0], g[-1]] for g in group_frame]
  543. unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"])
  544. unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"])
  545. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  546. return unfunc_df
  547. # 统计最危险的指标
  548. def _safe_statistic_most_dangerous(self):
  549. min_list = ["TTC", "MTTC", "THW", "LatSD"]
  550. max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN
  551. for metric in min_list:
  552. if metric in self.metric_list:
  553. if metric in self.df.columns:
  554. # 筛选出非NaN的值
  555. valid_values = self.df[metric].dropna()
  556. if not valid_values.empty:
  557. # 如果有非NaN值,计算最小值
  558. self.calculated_value[metric] = valid_values.min()
  559. print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
  560. else:
  561. # 如果全是NaN或列为空,赋默认值
  562. self.calculated_value[metric] = self.calculated_value[metric]
  563. else:
  564. # 列不存在,赋默认值
  565. self.calculated_value[metric] = self.calculated_value[metric]
  566. for metric in max_list:
  567. if metric in self.metric_list:
  568. if metric in self.df.columns:
  569. # 筛选出非NaN的值
  570. valid_values = self.df[metric].dropna()
  571. if not valid_values.empty:
  572. # 如果有非NaN值,计算最大值
  573. self.calculated_value[metric] = valid_values.max()
  574. print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
  575. else:
  576. # 如果全是NaN或列为空,赋默认值
  577. self.calculated_value[metric] = self.calculated_value[metric]
  578. else:
  579. # 列不存在,赋默认值
  580. self.calculated_value[metric] = self.calculated_value[metric]
  581. return self.calculated_value
  582. def _safe_no_obj_statistic(self):
  583. # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
  584. return self.calculated_value
  585. def report_statistic(
  586. self,
  587. ):
  588. safety_result = {}
  589. if len(self.obj_id_list) == 1:
  590. safety_result = self._safe_no_obj_statistic()
  591. else:
  592. safety_result = self._safe_statistic_most_dangerous()
  593. evaluator = Score(self.data_processed.safety_config)
  594. result = evaluator.evaluate(safety_result)
  595. # self.logger.info(f"安全性表现及得分情况:[{result}]")
  596. print("\n[安全性表现及得分情况]")
  597. return result