safe_indecator.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import numpy as np
  15. import pandas as pd
  16. import math
  17. from collections import defaultdict
  18. import scipy.integrate as spi
  19. from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
  20. OVERSPEED_THRESHOLD
  21. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
  22. from score_weight import cal_score_with_priority, cal_weight_from_80
  23. class Safe(object):
  24. """
  25. Class for achieving safe metrics for autonomous driving.
  26. Attributes:
  27. df: Vehicle driving data, stored in dataframe format.
  28. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
  29. Methods:
  30. _safe_param_cal_new: Calculate parameters for evaluateion.
  31. _cal_v_ego_projection: Calculate ego car velocity project over distance.
  32. _cal_v_projection: Calculate relative velocity project over distance.
  33. _cal_a_projection: Calculate relative acceleration project over distance.
  34. _calculate_derivative: Calculate derivative.
  35. _cal_relative_angular_v: Calculate relative angular velocity.
  36. _death_pr: Calculate death probability.
  37. _cal_collisionRisk_level: Calculate collisionRisk level.
  38. dist: Calculate distance of two cars.
  39. """
  40. def __init__(self, data_processed, scoreModel, resultPath):
  41. # self.eval_data = pd.DataFrame()
  42. # self.data_processed = data_processed
  43. self.scoreModel = scoreModel
  44. self.resultPath = resultPath
  45. self.df = data_processed.object_df.copy()
  46. self.ego_df = data_processed.ego_df
  47. # print("self.ego_df is", self.ego_df)
  48. self.obj_id_list = data_processed.obj_id_list
  49. # print("self.obj_id_list is", self.obj_id_list)
  50. #
  51. # # config infos for calculating score
  52. self.config = data_processed.config
  53. # print("self.config is", self.config)
  54. safe_config = self.config.config['safe']
  55. self.safe_config = safe_config
  56. # # common data
  57. self.bulitin_metric_list = self.config.builtinMetricList
  58. #
  59. # # dimension data
  60. self.weight_custom = safe_config['weightCustom']
  61. self.metric_list = safe_config['metric']
  62. self.type_list = safe_config['type']
  63. self.type_name_dict = safe_config['typeName']
  64. self.name_dict = safe_config['name']
  65. self.unit_dict = safe_config['unit']
  66. # # custom metric data
  67. # # self.customMetricParam = safe_config['customMetricParam']
  68. # # self.custom_metric_list = list(self.customMetricParam.keys())
  69. # # self.custom_data = {}
  70. # # self.custom_param_dict = {}
  71. #
  72. # # score data
  73. self.weight = safe_config['weightDimension']
  74. self.weight_type_dict = safe_config['typeWeight']
  75. self.weight_type_list = safe_config['typeWeightList']
  76. self.weight_dict = safe_config['weight']
  77. self.weight_list = safe_config['weightList']
  78. self.priority_dict = safe_config['priority']
  79. self.priority_list = safe_config['priorityList']
  80. self.kind1_dict = safe_config['kind']
  81. # self.kind1_dict = self.kind_dict[0]
  82. self.optimal1_dict = safe_config['optimal']
  83. # self.optimal1_dict = self.optimal_dict[0]
  84. self.multiple_dict = safe_config['multiple']
  85. self.kind_list = safe_config['kindList']
  86. self.optimal_list = safe_config['optimalList']
  87. self.multiple_list = safe_config['multipleList']
  88. self.metric_dict = safe_config['typeMetricDict']
  89. # # self.time_metric_list = self.metric_dict['safeTime']
  90. # # self.distance_metric_list = self.metric_dict['safeDistance']
  91. #
  92. # # lists of drving control info
  93. # self.time_list = data_processed.driver_ctrl_data['time_list']
  94. self.frame_list = self.df['simFrame'].values.tolist()
  95. self.collisionRisk_list = []
  96. self.collision_dist_list = []
  97. self.collisionSeverity_list = []
  98. self.collision_risk_dict = dict()
  99. self.collision_severity_dict = dict()
  100. self.over_speed_dict = dict()
  101. self.collision_count = 0
  102. self.collisionRisk = 0
  103. self.collisionSeverity = 0
  104. self.overSpeed = 0
  105. self.empty_flag = True
  106. if len(self.obj_id_list) > 1:
  107. # self.most_dangerous = {}
  108. # self.pass_percent = {}
  109. self._safe_param_cal_new()
  110. #
  111. # # no car following scene
  112. # if len(self.obj_id_list) > 1:
  113. # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  114. # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  115. # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  116. # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  117. # self.unsafe_acce_drac_df = pd.DataFrame(
  118. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  119. # self.unsafe_acce_xtn_df = pd.DataFrame(
  120. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  121. # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  122. # self.most_dangerous = {}
  123. # self.pass_percent = {}
  124. # self._safe_param_cal_new()
  125. def _overspeed_cal(self):
  126. speed_list = self.ego_df['v'].values.tolist()
  127. max_speed = max(speed_list)
  128. self.overSpeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD
  129. def _safe_param_cal_new(self):
  130. """
  131. """
  132. self.collisionRisk_list = []
  133. self.collision_dist_list = []
  134. self.collisionSeverity_list = []
  135. Tc = 0.3
  136. rho = 0.3 # 驾驶员制动反应时间
  137. ego_accel_max = 6 # 自车油门最大加速度
  138. obj_decel_max = 8 # 前车刹车最大减速度
  139. ego_decel_min = 1 # 自车刹车最小减速度 ug
  140. ego_decel_lon_max = 8
  141. ego_decel_lat_max = 1
  142. # 构建双层字典数据结构
  143. obj_dict = defaultdict(dict)
  144. obj_data_dict = self.df.to_dict('records')
  145. for item in obj_data_dict:
  146. obj_dict[item['simFrame']][item['playerId']] = item
  147. # print("obj_dict is", obj_dict)
  148. df_list = []
  149. EGO_PLAYER_ID = 1
  150. self.empty_flag = True
  151. print("obj_dict[1] is", self.frame_list[-1])
  152. for frame_num in self.frame_list:
  153. # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  154. ego_data = self.ego_df.iloc[frame_num]
  155. v1 = ego_data['v'] # km/h to m/s
  156. x1 = ego_data['posX']
  157. # print("x1 is", x1)
  158. y1 = ego_data['posY']
  159. h1 = ego_data['yaw']
  160. # len1 = 2.3 # 随手录入的数值
  161. # width1 = ego_data['dimY'] #
  162. # o_x1 = ego_data['offX']
  163. v_x1 = ego_data['lon_v'] # km/h to m/s
  164. # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
  165. v_y1 = 0 # 随手录入的数值
  166. a_x1 = ego_data['speedH']
  167. a_y1 = 0 # 随手录入的数值
  168. # a_y1 = ego_data['accelY']
  169. # a1 = ego_data['accel']
  170. # print("calculate is beginning...")
  171. # print("self.obj_id_list is", self.obj_id_list)
  172. for playerId in self.obj_id_list:
  173. dist_list = []
  174. # print("playerId is", playerId)
  175. if playerId == EGO_PLAYER_ID:
  176. continue
  177. try:
  178. obj_data = obj_dict[frame_num][playerId]
  179. except KeyError:
  180. continue
  181. x2 = obj_data['posX']
  182. y2 = obj_data['posY']
  183. # print("x2 is", y2)
  184. dist = self.dist(x1, y1, x2, y2)
  185. # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
  186. if dist <= DANGEROUS_DIST:
  187. dist_list.append(0)
  188. else:
  189. dist_list.append(1)
  190. obj_data['dist'] = dist
  191. # print("obj_data['dist'] is", obj_data['dist'])
  192. v_x2 = obj_data['velocity_linear_x'] # km/h to m/s
  193. v_y2 = obj_data['velocity_linear_y'] # km/h to m/s
  194. v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
  195. # v2 = obj_data['v'] / 3.6 # km/h to m/s
  196. # h2 = obj_data['posH']
  197. # len2 = obj_data['dimX']
  198. # width2 = obj_data['dimY']
  199. # o_x2 = obj_data['offX']
  200. # a_x2 = obj_data['accelX']
  201. # a_y2 = obj_data['accelY']
  202. a_x2 = obj_data['velocity_angular_x']
  203. a_y2 = obj_data['velocity_angular_y']
  204. # a2 = obj_data['accel']
  205. dx, dy = x2 - x1, y2 - y1
  206. # 定义矢量A和x轴正向向量x
  207. A = np.array([dx, dy])
  208. # print("A is", A)
  209. x = np.array([1, 0])
  210. # 计算点积和向量长度
  211. dot_product = np.dot(A, x)
  212. vector_length_A = np.linalg.norm(A)
  213. vector_length_x = np.linalg.norm(x)
  214. # 计算夹角的余弦值
  215. cos_theta = dot_product / (vector_length_A * vector_length_x)
  216. # 将余弦值转换为角度值(弧度制)
  217. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  218. lon_d = dist * math.cos(beta - h1)
  219. lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
  220. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  221. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  222. if lon_d > 5 or lon_d < -2 or lat_d > 1:
  223. continue
  224. self.empty_flag = False
  225. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  226. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  227. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  228. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  229. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  230. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  231. v_x2, v_y2)
  232. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  233. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  234. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  235. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  236. # obj_type = obj_data['type']
  237. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  238. # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
  239. # THW = self._cal_THW(dist, v_ego_p)
  240. # 单车道时可用
  241. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  242. # obj_decel_max)
  243. # lat_dist = 0.5
  244. # v_right = v1
  245. # v_left = v2
  246. # a_right_lat_brake_min = 1
  247. # a_left_lat_brake_min = 1
  248. # a_lat_max = 5
  249. # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  250. # a_left_lat_brake_min,
  251. # a_lat_max)
  252. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  253. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  254. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  255. # lon_a = abs(lon_a1 - lon_a2)
  256. # lon_d = dist * abs(math.cos(beta - h1))
  257. # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  258. # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  259. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  260. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  261. lat_a = abs(lat_a1 - lat_a2)
  262. lat_d = dist * abs(math.sin(beta - h1))
  263. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  264. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  265. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  266. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  267. # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
  268. # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
  269. TTC = None if (not TTC or TTC < 0) else TTC
  270. # MTTC = None if (not MTTC or MTTC < 0) else MTTC
  271. # THW = None if (not THW or THW < 0) else THW
  272. #
  273. # DRAC = 10 if DRAC >= 10 else DRAC
  274. # print("calculate is beginning...")
  275. if not TTC or TTC > 4000: # threshold = 4258.41
  276. collisionSeverity = 0
  277. pr_death = 0
  278. collisionRisk = 0
  279. else:
  280. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  281. collisionSeverity = 1 - result
  282. # print("collisionSeverity is", collisionSeverity)
  283. # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  284. pr_death = self._death_pr(vrel_projection_in_dist)
  285. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  286. # print("collinsionRisk is", collisionRisk)
  287. self.collisionRisk_list.append(collisionRisk)
  288. self.collisionSeverity_list.append(collisionSeverity)
  289. self.collision_dist_list.append(max(dist_list))
  290. # print("obj_data['dist'] is", collision_dist_list)
  291. prev_value = self.collision_dist_list[0] # 初始化前一个元素为列表的第一个元素
  292. # 遍历列表(从第二个元素开始)
  293. for i in range(1, len(self.collision_dist_list)):
  294. current_value = self.collision_dist_list[i] # 当前元素
  295. # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  296. if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  297. self.collision_count += 1 # 增加0的段的计数
  298. self.collisionRisk = max(self.collisionRisk_list)
  299. self.collisionSeverity = max(self.collisionSeverity_list)
  300. print("collision count is", self.collision_count)
  301. print("collisionRisk is", self.collisionRisk)
  302. print("collisionSeverity is", self.collisionSeverity)
  303. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  304. # 计算 AB 连线的向量 AB
  305. # dx = x2 - x1
  306. # dy = y2 - y1
  307. # 计算 AB 连线的模长 |AB|
  308. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  309. # 计算 AB 连线的单位向量 U_AB
  310. U_ABx = dx / AB_mod
  311. U_ABy = dy / AB_mod
  312. # 计算 A 在 AB 连线上的速度 V1_on_AB
  313. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  314. return V1_on_AB
  315. def _cal_v_projection(self, dx, dy, vx, vy):
  316. # 计算 AB 连线的向量 AB
  317. # dx = x2 - x1
  318. # dy = y2 - y1
  319. # 计算 AB 连线的模长 |AB|
  320. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  321. # 计算 AB 连线的单位向量 U_AB
  322. U_ABx = dx / AB_mod
  323. U_ABy = dy / AB_mod
  324. # 计算 A 相对于 B 的速度 V_relative
  325. # vx = vx1 - vx2
  326. # vy = vy1 - vy2
  327. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  328. V_on_AB = vx * U_ABx + vy * U_ABy
  329. return V_on_AB
  330. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  331. # 计算 AB 连线的向量 AB
  332. # dx = x2 - x1
  333. # dy = y2 - y1
  334. # 计算 θ
  335. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  336. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  337. if V_mod == 0 or AB_mod == 0:
  338. return 0
  339. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  340. theta = math.acos(cos_theta)
  341. # 计算 AB 连线的模长 |AB|
  342. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  343. # 计算 AB 连线的单位向量 U_AB
  344. U_ABx = dx / AB_mod
  345. U_ABy = dy / AB_mod
  346. # 计算 A 相对于 B 的加速度 a_relative
  347. # ax = ax1 - ax2
  348. # ay = ay1 - ay2
  349. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  350. a_on_AB = ax * U_ABx + ay * U_ABy
  351. VA = np.array([v_x1, v_y1])
  352. VB = np.array([v_x2, v_y2])
  353. D_A = np.array([x1, y1])
  354. D_B = np.array([x2, y2])
  355. V_r = VA - VB
  356. V = np.linalg.norm(V_r)
  357. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  358. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  359. return a_on_AB_back
  360. # 计算相对加速度
  361. def _calculate_derivative(self, a, w, V, theta):
  362. # 计算(V×cos(θ))'的值
  363. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  364. derivative = a - w * V * math.sin(theta)
  365. return derivative
  366. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  367. dx = A[0] - B[0]
  368. dy = A[1] - B[1]
  369. dvx = VA[0] - VB[0]
  370. dvy = VA[1] - VB[1]
  371. # (dx * dvy - dy * dvx)
  372. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  373. return angular_velocity
  374. # def _death_pr(self, obj_type, v_relative):
  375. def _death_pr(self, v_relative):
  376. # if obj_type == 5:
  377. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  378. # else:
  379. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  380. return p_death
  381. # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  382. def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
  383. # if obj_type == 5:
  384. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  385. # else:
  386. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  387. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  388. return collisionRisk
  389. # 求两车之间当前距离
  390. def dist(self, x1, y1, x2, y2):
  391. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  392. return dist
  393. def _cal_count_(self, dist):
  394. count = 0
  395. # TTC (time to collision)
  396. def _cal_TTC(self, dist, vrel_projection_in_dist):
  397. if vrel_projection_in_dist == 0:
  398. return math.inf
  399. TTC = dist / vrel_projection_in_dist
  400. return TTC
  401. def _normal_distribution(self, x):
  402. mean = 1.32
  403. std_dev = 0.26
  404. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  405. def safe_score_new(self):
  406. """
  407. """
  408. score_metric_dict = {}
  409. score_type_dict = {}
  410. if len(self.obj_id_list) == 1:
  411. arr_safe = self._safe_no_obj_statistic()
  412. else:
  413. arr_safe = self._safe_statistic()
  414. print("\n[安全性表现及得分情况]")
  415. print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
  416. if arr_safe:
  417. arr_safe = np.array(arr_safe)
  418. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
  419. score_sub = score_model.cal_score()
  420. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  421. metric_num = len(metric_list)
  422. if len(self.obj_id_list) > 1 and not self.empty_flag:
  423. score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
  424. else:
  425. score_sub = [80] * len(score_sub)
  426. score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
  427. score_metric = []
  428. for i in range(metric_num):
  429. score_tmp = (score_sub[i] + score_sub[i + metric_num]) / 2
  430. score_metric.append(round(score_tmp, 2))
  431. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  432. # for metric in self.custom_metric_list:
  433. # value = self.custom_data[metric]['value']
  434. # param_list = self.customMetricParam[metric]
  435. # score = self.custom_metric_score(metric, value, param_list)
  436. # score_metric_dict[metric] = round(score, 2)
  437. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  438. score_metric = list(score_metric_dict.values())
  439. if self.weight_custom: # 用户自定义权重
  440. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  441. self.weight_dict}
  442. for type in self.type_list:
  443. type_score = sum(
  444. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  445. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  446. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  447. score_type_dict}
  448. score_safe = sum(score_type_with_weight_dict.values())
  449. else: # 动态客观赋权
  450. self.weight_list = cal_weight_from_80(score_metric)
  451. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  452. score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  453. for type in self.type_list:
  454. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  455. for key, value in self.weight_dict.items():
  456. if key in self.metric_dict[type]:
  457. # self.weight_dict[key] = round(value / type_weight, 4)
  458. self.weight_dict[key] = value / type_weight
  459. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  460. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  461. type_priority_list = [value for key, value in self.priority_dict.items() if
  462. key in self.metric_dict[type]]
  463. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  464. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  465. for key in self.weight_dict:
  466. self.weight_dict[key] = round(self.weight_dict[key], 4)
  467. score_type = list(score_type_dict.values())
  468. self.weight_type_list = cal_weight_from_80(score_type)
  469. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  470. score_safe = round(score_safe, 2)
  471. # score_type = [round(x, 2) for key, x in score_type_dict.items()]
  472. # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
  473. print("安全性各指标基准值:", self.optimal_list)
  474. print(f"安全性得分为:{score_safe:.2f}分。")
  475. print(f"安全性各类型得分为:{score_type_dict}。")
  476. print(f"安全性各指标得分为:{score_metric_dict}。")
  477. # return score_safe, score_type, score_metric
  478. return score_safe, score_type_dict, score_metric_dict
  479. def report_statistic(self):
  480. report_dict = {
  481. "name": "安全性",
  482. "weight": f"{self.weight * 100:.2f}%",
  483. # 'collisionRisk': self.collisionRisk,
  484. # "noObjectCar": True,
  485. }
  486. # No objects
  487. if self.obj_id_list == 0:
  488. score_safe = 80
  489. grade_safe = "良好"
  490. score_type_dict = {
  491. "safeCollision": 80,
  492. "safeSpeed": 80
  493. }
  494. score_metric_dict = {
  495. "collisionCount": 80,
  496. "collisionRisk": 80,
  497. "collisionSeverity": 80,
  498. "overSpeed": 80
  499. }
  500. description = "安全性在无目标车情况下表现良好。"
  501. description1 = "次数:0次"
  502. description2 = f"最大值:0%;" \
  503. f"最小值:0%;" \
  504. f"平均值:0%"
  505. description3 = f"最大值:0%;" \
  506. f"最小值:0%;" \
  507. f"平均值:0%"
  508. description4 = f"最大值:{self.over_speed_dict['max']:.4f}%;" \
  509. f"最小值:{self.over_speed_dict['min']:.4f}%;" \
  510. f"平均值:{self.over_speed_dict['avg']:.4f}%"
  511. # with objects
  512. else:
  513. score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
  514. score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
  515. grade_safe = score_grade(score_safe)
  516. description = f"· 在安全性方面,得分{score_safe}分,表现{grade_safe},"
  517. is_good = True
  518. if self.collision_count > 0:
  519. is_good = False
  520. description += f"出现{self.collision_count}次碰撞,需重点优化。"
  521. if self.over_speed_dict['max'] > 0:
  522. is_good = False
  523. description += f"超速比例最大值{self.over_speed_dict['max']},需重点优化。"
  524. if is_good:
  525. description += "速度控制良好。"
  526. description1 = f"次数:{self.collision_count}次"
  527. description2 = f"最大值:{self.collision_risk_dict['max']:.4f}%;" \
  528. f"最小值:{self.collision_risk_dict['min']:.4f}%;" \
  529. f"平均值:{self.collision_risk_dict['avg']:.4f}%"
  530. description3 = f"最大值:{self.collision_severity_dict['max']:.4f}%;" \
  531. f"最小值:{self.collision_severity_dict['min']:.4f}%;" \
  532. f"平均值:{self.collision_severity_dict['avg']:.4f}%"
  533. description4 = f"最大值:{self.over_speed_dict['max']:.4f}%;" \
  534. f"最小值:{self.over_speed_dict['min']:.4f}%;" \
  535. f"平均值:{self.over_speed_dict['avg']:.4f}%"
  536. report_dict["score"] = score_safe
  537. report_dict["level"] = grade_safe
  538. report_dict["description"] = replace_key_with_value(description, self.name_dict)
  539. collisionCount_index = {
  540. "weight": self.weight_dict['collisionCount'],
  541. "score": score_metric_dict['collisionCount'],
  542. "description": description1
  543. }
  544. collisionRisk_index = {
  545. "weight": self.weight_dict['collisionRisk'],
  546. "score": score_metric_dict['comfortLon'],
  547. "description": description2
  548. }
  549. collisionSeverity_index = {
  550. "weight": self.weight_dict['collisionSeverity'],
  551. "score": score_metric_dict['comfortLon'],
  552. "description": description3
  553. }
  554. overSpeed_index = {
  555. "weight": self.weight_dict['overSpeed'],
  556. "score": score_metric_dict['comfortLon'],
  557. "description": description4
  558. }
  559. indexes_dict = {
  560. "collisionCount": collisionCount_index,
  561. "collisionRisk": collisionRisk_index,
  562. "collisionSeverity": collisionSeverity_index,
  563. "overSpeed": overSpeed_index
  564. }
  565. report_dict["indexes"] = indexes_dict
  566. print(report_dict)
  567. return report_dict
  568. # if __name__ == "__main__":
  569. # pass
  570. #
  571. # print("---started---")
  572. # count = 0
  573. # dataPath = './task/case0613' # 设置路径
  574. # configPath = r"./config_safe.json"
  575. # config = ConfigParse(configPath)
  576. # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
  577. # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
  578. # safe_indicator = Safe(data_processed)
  579. # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
  580. #
  581. # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
  582. #
  583. # # 遍历列表(从第二个元素开始)
  584. # for i in range(1, len(collision_dist_count)):
  585. # current_value = collision_dist_count[i] # 当前元素
  586. #
  587. # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  588. # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  589. # count += 1 # 增加0的段的计数
  590. # print("collisionRisk is", max(collisionRisk_list))
  591. # print("collisionSeverity is", max(collisionSeverity_list))
  592. # print("collision count is", count)
  593. # print("---end---")