safe.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import os
  15. import math
  16. import numpy as np
  17. import pandas as pd
  18. from collections import defaultdict
  19. import scipy.integrate as spi
  20. from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
  21. OVERSPEED_THRESHOLD
  22. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100, _cal_max_min_avg
  23. from score_weight import cal_score_with_priority, cal_weight_from_80
  24. class Safe(object):
  25. """
  26. Class for achieving safe metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
  30. Methods:
  31. _safe_param_cal_new: Calculate parameters for evaluateion.
  32. _cal_v_ego_projection: Calculate ego car velocity project over distance.
  33. _cal_v_projection: Calculate relative velocity project over distance.
  34. _cal_a_projection: Calculate relative acceleration project over distance.
  35. _calculate_derivative: Calculate derivative.
  36. _cal_relative_angular_v: Calculate relative angular velocity.
  37. _death_pr: Calculate death probability.
  38. _cal_collisionRisk_level: Calculate collisionRisk level.
  39. dist: Calculate distance of two cars.
  40. """
  41. def __init__(self, data_processed, scoreModel, resultPath):
  42. # self.eval_data = pd.DataFrame()
  43. # self.data_processed = data_processed
  44. self.scoreModel = scoreModel
  45. self.resultPath = resultPath
  46. self.df = data_processed.object_df.copy()
  47. self.ego_df = data_processed.ego_df
  48. # print("self.ego_df is", self.ego_df)
  49. self.obj_id_list = data_processed.obj_id_list
  50. # print("self.obj_id_list is", self.obj_id_list)
  51. #
  52. # # config infos for calculating score
  53. self.config = data_processed.config
  54. # print("self.config is", self.config)
  55. safe_config = self.config.config['safe']
  56. self.safe_config = safe_config
  57. # # common data
  58. self.bulitin_metric_list = self.config.builtinMetricList
  59. #
  60. # # dimension data
  61. self.weight_custom = safe_config['weightCustom']
  62. self.metric_list = safe_config['metric']
  63. self.type_list = safe_config['type']
  64. self.type_name_dict = safe_config['typeName']
  65. self.name_dict = safe_config['name']
  66. self.unit_dict = safe_config['unit']
  67. # # custom metric data
  68. # # self.customMetricParam = safe_config['customMetricParam']
  69. # # self.custom_metric_list = list(self.customMetricParam.keys())
  70. # # self.custom_data = {}
  71. # # self.custom_param_dict = {}
  72. #
  73. # # score data
  74. self.weight = safe_config['weightDimension']
  75. self.weight_type_dict = safe_config['typeWeight']
  76. self.weight_type_list = safe_config['typeWeightList']
  77. self.weight_dict = safe_config['weight']
  78. self.weight_list = safe_config['weightList']
  79. self.priority_dict = safe_config['priority']
  80. self.priority_list = safe_config['priorityList']
  81. self.kind1_dict = safe_config['kind']
  82. # self.kind1_dict = self.kind_dict[0]
  83. self.optimal1_dict = safe_config['optimal']
  84. # self.optimal1_dict = self.optimal_dict[0]
  85. self.multiple_dict = safe_config['multiple']
  86. self.kind_list = safe_config['kindList']
  87. self.optimal_list = safe_config['optimalList']
  88. self.multiple_list = safe_config['multipleList']
  89. self.metric_dict = safe_config['typeMetricDict']
  90. # # self.time_metric_list = self.metric_dict['safeTime']
  91. # # self.distance_metric_list = self.metric_dict['safeDistance']
  92. #
  93. # # lists of drving control info
  94. # self.time_list = data_processed.driver_ctrl_data['time_list']
  95. # self.frame_list = self.df['simFrame'].values.tolist()
  96. if self.df['simFrame'].values.tolist()[-1] <= self.ego_df['simFrame'].values.tolist()[-1]:
  97. self.frame_list = self.df['simFrame'].values.tolist()
  98. else:
  99. self.frame_list = self.ego_df['simFrame'].values.tolist()
  100. self.collision_dist_list = []
  101. self.collisionRisk_list = []
  102. self.collisionSeverity_list = []
  103. self.overSpeed_list = []
  104. self.collision_risk_dict = dict()
  105. self.collision_severity_dict = dict()
  106. self.over_speed_dict = dict()
  107. self.collisionCount = 0
  108. self.collisionRisk = 0
  109. self.collisionSeverity = 0
  110. self.overSpeed = 0
  111. self.empty_flag = True
  112. if len(self.obj_id_list) > 1:
  113. # self.most_dangerous = {}
  114. # self.pass_percent = {}
  115. self._safe_metric_cal()
  116. self._overspeed_cal()
  117. # # no car following scene
  118. # if len(self.obj_id_list) > 1:
  119. # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  120. # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  121. # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  122. # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  123. # self.unsafe_acce_drac_df = pd.DataFrame(
  124. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  125. # self.unsafe_acce_xtn_df = pd.DataFrame(
  126. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  127. # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  128. # self.most_dangerous = {}
  129. # self.pass_percent = {}
  130. # self._safe_param_cal_new()
  131. def _overspeed_cal(self):
  132. """
  133. :return:
  134. """
  135. self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist()
  136. if self.overSpeed_list:
  137. max_speed = max(self.overSpeed_list)
  138. overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100
  139. self.overSpeed = round(overspeed, 2)
  140. else:
  141. self.overSpeed = 0
  142. def _safe_metric_cal(self):
  143. """
  144. """
  145. self.collisionRisk_list = []
  146. self.collision_dist_list = []
  147. self.collisionSeverity_list = []
  148. Tc = 0.3
  149. rho = 0.3 # 驾驶员制动反应时间
  150. ego_accel_max = 6 # 自车油门最大加速度
  151. obj_decel_max = 8 # 前车刹车最大减速度
  152. ego_decel_min = 1 # 自车刹车最小减速度 ug
  153. ego_decel_lon_max = 8
  154. ego_decel_lat_max = 1
  155. # 构建双层字典数据结构
  156. obj_dict = defaultdict(dict)
  157. obj_data_dict = self.df.to_dict('records')
  158. for item in obj_data_dict:
  159. obj_dict[item['simFrame']][item['playerId']] = item
  160. # print("obj_dict is", obj_dict)
  161. df_list = []
  162. EGO_PLAYER_ID = 1
  163. self.empty_flag = True
  164. # print("obj_dict[1] is", self.frame_list)
  165. for frame_num in self.frame_list:
  166. # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  167. # ego_data = self.ego_df.iloc[frame_num]
  168. ego_data = self.ego_df[self.ego_df['simFrame'] == frame_num]
  169. # if frame_num == 1:
  170. # print("ego_data is", ego_data)
  171. v1 = ego_data['v'] # km/h to m/s
  172. # print("ego_data['posX'].values is", ego_data['posX'].values[0])
  173. x1 = ego_data['posX'].values[0]
  174. # print("x1 is", x1)
  175. y1 = ego_data['posY'].values[0]
  176. # print("x1 and y1 are", x1, y1)
  177. h1 = ego_data['yaw']
  178. # len1 = 2.3 # 随手录入的数值
  179. # width1 = ego_data['dimY'] #
  180. # o_x1 = ego_data['offX']
  181. v_x1 = ego_data['lon_v'].values[0] # km/h to m/s
  182. # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
  183. v_y1 = 0 # 随手录入的数值
  184. a_x1 = ego_data['speedH'].values[0]
  185. a_y1 = 0 # 随手录入的数值
  186. # a_y1 = ego_data['accelY']
  187. # a1 = ego_data['accel']
  188. # print("calculate is beginning...")
  189. # print("self.obj_id_list is", self.obj_id_list)
  190. for playerId in self.obj_id_list:
  191. dist_list = []
  192. # print("playerId is", playerId)
  193. if playerId == EGO_PLAYER_ID:
  194. continue
  195. try:
  196. obj_data = obj_dict[frame_num][playerId]
  197. except KeyError:
  198. continue
  199. x2 = obj_data['posX']
  200. y2 = obj_data['posY']
  201. # print(" x2 and y2 are", x2, y2)
  202. # print("x2 is", y2)
  203. dist = self.dist(x1, y1, x2, y2)
  204. # print("dist is", dist)
  205. # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
  206. if dist <= DANGEROUS_DIST:
  207. dist_list.append(0)
  208. else:
  209. dist_list.append(1)
  210. obj_data['dist'] = dist
  211. # print("obj_data['dist'] is", obj_data['dist'])
  212. v_x2 = obj_data['lon_v'] # km/h to m/s
  213. v_y2 = obj_data['lat_v'] # km/h to m/s
  214. v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
  215. # v2 = obj_data['v'] / 3.6 # km/h to m/s
  216. # h2 = obj_data['posH']
  217. # len2 = obj_data['dimX']
  218. # width2 = obj_data['dimY']
  219. # o_x2 = obj_data['offX']
  220. # a_x2 = obj_data['accelX']
  221. # a_y2 = obj_data['accelY']
  222. a_x2 = obj_data['lon_acc']
  223. a_y2 = obj_data['lat_acc']
  224. # a2 = obj_data['accel']
  225. dx, dy = x2 - x1, y2 - y1
  226. # 定义矢量A和x轴正向向量x
  227. A = np.array([dx, dy])
  228. # print("A is", A)
  229. x = np.array([1, 0])
  230. # 计算点积和向量长度
  231. dot_product = np.dot(A, x)
  232. vector_length_A = np.linalg.norm(A)
  233. vector_length_x = np.linalg.norm(x)
  234. # 计算夹角的余弦值
  235. cos_theta = dot_product / (vector_length_A * vector_length_x)
  236. # 将余弦值转换为角度值(弧度制)
  237. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  238. lon_d = dist * math.cos(beta - h1)
  239. lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
  240. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  241. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  242. if lon_d > 5 or lon_d < -2 or lat_d > 1:
  243. continue
  244. self.empty_flag = False
  245. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  246. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  247. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  248. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  249. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  250. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  251. v_x2, v_y2)
  252. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  253. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  254. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  255. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  256. # obj_type = obj_data['type']
  257. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  258. # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
  259. # THW = self._cal_THW(dist, v_ego_p)
  260. # 单车道时可用
  261. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  262. # obj_decel_max)
  263. # lat_dist = 0.5
  264. # v_right = v1
  265. # v_left = v2
  266. # a_right_lat_brake_min = 1
  267. # a_left_lat_brake_min = 1
  268. # a_lat_max = 5
  269. # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  270. # a_left_lat_brake_min,
  271. # a_lat_max)
  272. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  273. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  274. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  275. # lon_a = abs(lon_a1 - lon_a2)
  276. # lon_d = dist * abs(math.cos(beta - h1))
  277. # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  278. # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  279. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  280. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  281. lat_a = abs(lat_a1 - lat_a2)
  282. lat_d = dist * abs(math.sin(beta - h1))
  283. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  284. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  285. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  286. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  287. # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
  288. # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
  289. TTC = None if (not TTC or TTC < 0) else TTC
  290. # MTTC = None if (not MTTC or MTTC < 0) else MTTC
  291. # THW = None if (not THW or THW < 0) else THW
  292. #
  293. # DRAC = 10 if DRAC >= 10 else DRAC
  294. # print("calculate is beginning...")
  295. if not TTC or TTC > 4000: # threshold = 4258.41
  296. collisionSeverity = 0
  297. pr_death = 0
  298. collisionRisk = 0
  299. else:
  300. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  301. collisionSeverity = 1 - result
  302. # print("collisionSeverity is", collisionSeverity)
  303. # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  304. pr_death = self._death_pr(vrel_projection_in_dist)
  305. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  306. # print("collinsionRisk is", collisionRisk)
  307. self.collisionRisk_list.append(collisionRisk)
  308. self.collisionSeverity_list.append(collisionSeverity)
  309. self.collision_dist_list.append(max(dist_list))
  310. # print("obj_data['dist'] is", collision_dist_list)
  311. prev_value = self.collision_dist_list[0] # 初始化前一个元素为列表的第一个元素
  312. # 遍历列表(从第二个元素开始)
  313. for i in range(1, len(self.collision_dist_list)):
  314. current_value = self.collision_dist_list[i] # 当前元素
  315. # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  316. if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  317. self.collisionCount += 1 # 增加0的段的计数
  318. self.collisionRisk = max(self.collisionRisk_list)
  319. self.collisionSeverity = max(self.collisionSeverity_list)
  320. print("collision count is", self.collisionCount)
  321. print("collisionRisk is", self.collisionRisk)
  322. print("collisionSeverity is", self.collisionSeverity)
  323. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  324. # 计算 AB 连线的向量 AB
  325. # dx = x2 - x1
  326. # dy = y2 - y1
  327. # 计算 AB 连线的模长 |AB|
  328. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  329. # 计算 AB 连线的单位向量 U_AB
  330. U_ABx = dx / AB_mod
  331. U_ABy = dy / AB_mod
  332. # 计算 A 在 AB 连线上的速度 V1_on_AB
  333. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  334. return V1_on_AB
  335. def _cal_v_projection(self, dx, dy, vx, vy):
  336. # 计算 AB 连线的向量 AB
  337. # dx = x2 - x1
  338. # dy = y2 - y1
  339. # 计算 AB 连线的模长 |AB|
  340. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  341. # 计算 AB 连线的单位向量 U_AB
  342. U_ABx = dx / AB_mod
  343. U_ABy = dy / AB_mod
  344. # 计算 A 相对于 B 的速度 V_relative
  345. # vx = vx1 - vx2
  346. # vy = vy1 - vy2
  347. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  348. V_on_AB = vx * U_ABx + vy * U_ABy
  349. return V_on_AB
  350. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  351. # 计算 AB 连线的向量 AB
  352. # dx = x2 - x1
  353. # dy = y2 - y1
  354. # 计算 θ
  355. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  356. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  357. if V_mod == 0 or AB_mod == 0:
  358. return 0
  359. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  360. theta = math.acos(cos_theta)
  361. # 计算 AB 连线的模长 |AB|
  362. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  363. # 计算 AB 连线的单位向量 U_AB
  364. U_ABx = dx / AB_mod
  365. U_ABy = dy / AB_mod
  366. # 计算 A 相对于 B 的加速度 a_relative
  367. # ax = ax1 - ax2
  368. # ay = ay1 - ay2
  369. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  370. a_on_AB = ax * U_ABx + ay * U_ABy
  371. VA = np.array([v_x1, v_y1])
  372. VB = np.array([v_x2, v_y2])
  373. D_A = np.array([x1, y1])
  374. D_B = np.array([x2, y2])
  375. V_r = VA - VB
  376. V = np.linalg.norm(V_r)
  377. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  378. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  379. return a_on_AB_back
  380. # 计算相对加速度
  381. def _calculate_derivative(self, a, w, V, theta):
  382. # 计算(V×cos(θ))'的值
  383. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  384. derivative = a - w * V * math.sin(theta)
  385. return derivative
  386. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  387. dx = A[0] - B[0]
  388. dy = A[1] - B[1]
  389. dvx = VA[0] - VB[0]
  390. dvy = VA[1] - VB[1]
  391. # (dx * dvy - dy * dvx)
  392. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  393. return angular_velocity
  394. # def _death_pr(self, obj_type, v_relative):
  395. def _death_pr(self, v_relative):
  396. # if obj_type == 5:
  397. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  398. # else:
  399. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  400. return p_death
  401. # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  402. def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
  403. # if obj_type == 5:
  404. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  405. # else:
  406. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  407. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  408. return collisionRisk
  409. # 求两车之间当前距离
  410. def dist(self, x1, y1, x2, y2):
  411. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  412. return dist
  413. def _cal_count_(self, dist):
  414. count = 0
  415. # TTC (time to collision)
  416. def _cal_TTC(self, dist, vrel_projection_in_dist):
  417. if vrel_projection_in_dist == 0:
  418. return math.inf
  419. TTC = dist / vrel_projection_in_dist
  420. return TTC
  421. def _normal_distribution(self, x):
  422. mean = 1.32
  423. std_dev = 0.26
  424. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  425. def _safe_statistic(self):
  426. """
  427. """
  428. self.collision_risk_dict = _cal_max_min_avg(self.collisionRisk_list)
  429. self.collision_severity_dict = _cal_max_min_avg(self.collisionSeverity_list)
  430. self.over_speed_dict = _cal_max_min_avg(self.overSpeed_list)
  431. arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]]
  432. return arr_safe
  433. def safe_score_new(self):
  434. """
  435. """
  436. score_metric_dict = {}
  437. score_type_dict = {}
  438. arr_safe = self._safe_statistic()
  439. print("\n[安全性表现及得分情况]")
  440. print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
  441. if arr_safe:
  442. arr_safe = np.array(arr_safe)
  443. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
  444. score_sub = score_model.cal_score()
  445. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  446. # metric_num = len(metric_list)
  447. if len(self.obj_id_list) > 1 and not self.empty_flag:
  448. # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
  449. pass
  450. else:
  451. score_sub = [80] * len(score_sub)
  452. score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
  453. score_metric = score_sub
  454. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  455. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  456. score_metric = list(score_metric_dict.values())
  457. if self.weight_custom: # 用户自定义权重
  458. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  459. self.weight_dict}
  460. for type in self.type_list:
  461. type_score = sum(
  462. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  463. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  464. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  465. score_type_dict}
  466. score_safe = sum(score_type_with_weight_dict.values())
  467. else: # 动态客观赋权
  468. self.weight_list = cal_weight_from_80(score_metric)
  469. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  470. score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  471. for type in self.type_list:
  472. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  473. for key, value in self.weight_dict.items():
  474. if key in self.metric_dict[type]:
  475. # self.weight_dict[key] = round(value / type_weight, 4)
  476. self.weight_dict[key] = value / type_weight
  477. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  478. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  479. type_priority_list = [value for key, value in self.priority_dict.items() if
  480. key in self.metric_dict[type]]
  481. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  482. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  483. for key in self.weight_dict:
  484. self.weight_dict[key] = round(self.weight_dict[key], 4)
  485. score_type = list(score_type_dict.values())
  486. self.weight_type_list = cal_weight_from_80(score_type)
  487. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  488. score_safe = round(score_safe, 2)
  489. # score_type = [round(x, 2) for key, x in score_type_dict.items()]
  490. # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
  491. print("安全性各指标基准值:", self.optimal_list)
  492. print(f"安全性得分为:{score_safe:.2f}分。")
  493. print(f"安全性各类型得分为:{score_type_dict}。")
  494. print(f"安全性各指标得分为:{score_metric_dict}。")
  495. # return score_safe, score_type, score_metric
  496. return score_safe, score_type_dict, score_metric_dict
  497. def report_statistic(self):
  498. report_dict = {
  499. "name": "安全性",
  500. "weight": f"{self.weight * 100:.2f}%",
  501. # 'collisionRisk': self.collisionRisk,
  502. # "noObjectCar": True,
  503. }
  504. # No objects
  505. if self.obj_id_list == 0:
  506. score_safe = 80
  507. grade_safe = "良好"
  508. score_type_dict = {
  509. "safeCollision": 80,
  510. "safeSpeed": 80
  511. }
  512. score_metric_dict = {
  513. "collisionCount": 80,
  514. "collisionRisk": 80,
  515. "collisionSeverity": 80,
  516. "overSpeed": 80
  517. }
  518. description = "安全性在无目标车情况下表现良好。"
  519. description1 = "次数:0次"
  520. description2 = f"最大值:0%;" \
  521. f"最小值:0%;" \
  522. f"平均值:0%"
  523. description3 = f"最大值:0%;" \
  524. f"最小值:0%;" \
  525. f"平均值:0%"
  526. description4 = f"最大值:{100 * self.over_speed_dict['max']}%;" \
  527. f"最小值:{100 * self.over_speed_dict['min']}%;" \
  528. f"平均值:{100 * self.over_speed_dict['avg']}%"
  529. # with objects
  530. else:
  531. score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
  532. score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
  533. grade_safe = score_grade(score_safe)
  534. description = f"· 在安全性方面,得分{score_safe}分,表现{grade_safe},"
  535. is_good = True
  536. if self.collisionCount > 0:
  537. is_good = False
  538. description += f"出现{self.collisionCount}次碰撞,需重点优化。"
  539. if self.over_speed_dict['max'] > 0:
  540. is_good = False
  541. description += f"超速比例最大值{100 * self.over_speed_dict['max']}%,需重点优化。"
  542. if is_good:
  543. description += "速度控制良好。"
  544. description1 = f"次数:{self.collisionCount}次"
  545. description2 = f"最大值:{self.collision_risk_dict['max']}%;" \
  546. f"最小值:{self.collision_risk_dict['min']}%;" \
  547. f"平均值:{self.collision_risk_dict['avg']}%"
  548. description3 = f"最大值:{self.collision_severity_dict['max']}%;" \
  549. f"最小值:{self.collision_severity_dict['min']}%;" \
  550. f"平均值:{self.collision_severity_dict['avg']}%"
  551. description4 = f"最大值:{self.over_speed_dict['max']}%;" \
  552. f"最小值:{self.over_speed_dict['min']}%;" \
  553. f"平均值:{self.over_speed_dict['avg']}%"
  554. report_dict["score"] = score_safe
  555. report_dict["level"] = grade_safe
  556. report_dict["description"] = replace_key_with_value(description, self.name_dict)
  557. collisionCount_index = {
  558. "weight": self.weight_dict['collisionCount'],
  559. "score": score_metric_dict['collisionCount'],
  560. "description": description1
  561. }
  562. collisionRisk_index = {
  563. "weight": self.weight_dict['collisionRisk'],
  564. "score": score_metric_dict['collisionRisk'],
  565. "description": description2
  566. }
  567. collisionSeverity_index = {
  568. "weight": self.weight_dict['collisionSeverity'],
  569. "score": score_metric_dict['collisionSeverity'],
  570. "description": description3
  571. }
  572. overSpeed_index = {
  573. "weight": self.weight_dict['overSpeed'],
  574. "score": score_metric_dict['overSpeed'],
  575. "description": description4
  576. }
  577. indexes_dict = {
  578. "collisionCount": collisionCount_index,
  579. "collisionRisk": collisionRisk_index,
  580. "collisionSeverity": collisionSeverity_index,
  581. "overSpeed": overSpeed_index
  582. }
  583. report_dict["indexes"] = indexes_dict
  584. print(report_dict)
  585. return report_dict
  586. # if __name__ == "__main__":
  587. # pass
  588. #
  589. # print("---started---")
  590. # count = 0
  591. # dataPath = './task/case0613' # 设置路径
  592. # configPath = r"./config_safe.json"
  593. # config = ConfigParse(configPath)
  594. # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
  595. # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
  596. # safe_indicator = Safe(data_processed)
  597. # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
  598. #
  599. # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
  600. #
  601. # # 遍历列表(从第二个元素开始)
  602. # for i in range(1, len(collision_dist_count)):
  603. # current_value = collision_dist_count[i] # 当前元素
  604. #
  605. # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  606. # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  607. # count += 1 # 增加0的段的计数
  608. # print("collisionRisk is", max(collisionRisk_list))
  609. # print("collisionSeverity is", max(collisionSeverity_list))
  610. # print("collision count is", count)
  611. # print("---end---")