safe.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import os
  15. import math
  16. import numpy as np
  17. import pandas as pd
  18. from collections import defaultdict
  19. import scipy.integrate as spi
  20. from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
  21. OVERSPEED_THRESHOLD
  22. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
  23. from score_weight import cal_score_with_priority, cal_weight_from_80
  24. class Safe(object):
  25. """
  26. Class for achieving safe metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
  30. Methods:
  31. _safe_param_cal_new: Calculate parameters for evaluateion.
  32. _cal_v_ego_projection: Calculate ego car velocity project over distance.
  33. _cal_v_projection: Calculate relative velocity project over distance.
  34. _cal_a_projection: Calculate relative acceleration project over distance.
  35. _calculate_derivative: Calculate derivative.
  36. _cal_relative_angular_v: Calculate relative angular velocity.
  37. _death_pr: Calculate death probability.
  38. _cal_collisionRisk_level: Calculate collisionRisk level.
  39. dist: Calculate distance of two cars.
  40. """
  41. def __init__(self, data_processed, scoreModel, resultPath):
  42. # self.eval_data = pd.DataFrame()
  43. # self.data_processed = data_processed
  44. self.scoreModel = scoreModel
  45. self.resultPath = resultPath
  46. self.df = data_processed.object_df.copy()
  47. self.ego_df = data_processed.ego_df
  48. # print("self.ego_df is", self.ego_df)
  49. self.obj_id_list = data_processed.obj_id_list
  50. # print("self.obj_id_list is", self.obj_id_list)
  51. #
  52. # # config infos for calculating score
  53. self.config = data_processed.config
  54. # print("self.config is", self.config)
  55. safe_config = self.config.config['safe']
  56. self.safe_config = safe_config
  57. # # common data
  58. self.bulitin_metric_list = self.config.builtinMetricList
  59. #
  60. # # dimension data
  61. self.weight_custom = safe_config['weightCustom']
  62. self.metric_list = safe_config['metric']
  63. self.type_list = safe_config['type']
  64. self.type_name_dict = safe_config['typeName']
  65. self.name_dict = safe_config['name']
  66. self.unit_dict = safe_config['unit']
  67. # # custom metric data
  68. # # self.customMetricParam = safe_config['customMetricParam']
  69. # # self.custom_metric_list = list(self.customMetricParam.keys())
  70. # # self.custom_data = {}
  71. # # self.custom_param_dict = {}
  72. #
  73. # # score data
  74. self.weight = safe_config['weightDimension']
  75. self.weight_type_dict = safe_config['typeWeight']
  76. self.weight_type_list = safe_config['typeWeightList']
  77. self.weight_dict = safe_config['weight']
  78. self.weight_list = safe_config['weightList']
  79. self.priority_dict = safe_config['priority']
  80. self.priority_list = safe_config['priorityList']
  81. self.kind1_dict = safe_config['kind']
  82. # self.kind1_dict = self.kind_dict[0]
  83. self.optimal1_dict = safe_config['optimal']
  84. # self.optimal1_dict = self.optimal_dict[0]
  85. self.multiple_dict = safe_config['multiple']
  86. self.kind_list = safe_config['kindList']
  87. self.optimal_list = safe_config['optimalList']
  88. self.multiple_list = safe_config['multipleList']
  89. self.metric_dict = safe_config['typeMetricDict']
  90. # # self.time_metric_list = self.metric_dict['safeTime']
  91. # # self.distance_metric_list = self.metric_dict['safeDistance']
  92. #
  93. # # lists of drving control info
  94. # self.time_list = data_processed.driver_ctrl_data['time_list']
  95. self.frame_list = self.df['simFrame'].values.tolist()
  96. self.collision_dist_list = []
  97. self.collisionRisk_list = []
  98. self.collisionSeverity_list = []
  99. self.overSpeed_list = []
  100. self.collision_risk_dict = dict()
  101. self.collision_severity_dict = dict()
  102. self.over_speed_dict = dict()
  103. self.collisionCount = 0
  104. self.collisionRisk = 0
  105. self.collisionSeverity = 0
  106. self.overSpeed = 0
  107. self.empty_flag = True
  108. if len(self.obj_id_list) > 1:
  109. # self.most_dangerous = {}
  110. # self.pass_percent = {}
  111. self._safe_metric_cal()
  112. self._overspeed_cal()
  113. # # no car following scene
  114. # if len(self.obj_id_list) > 1:
  115. # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  116. # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  117. # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  118. # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  119. # self.unsafe_acce_drac_df = pd.DataFrame(
  120. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  121. # self.unsafe_acce_xtn_df = pd.DataFrame(
  122. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  123. # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  124. # self.most_dangerous = {}
  125. # self.pass_percent = {}
  126. # self._safe_param_cal_new()
  127. def _overspeed_cal(self):
  128. """
  129. :return:
  130. """
  131. self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist()
  132. if self.overSpeed_list:
  133. max_speed = max(self.overSpeed_list)
  134. overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100
  135. self.overSpeed = round(overspeed, 2)
  136. else:
  137. self.overSpeed = 0
  138. def _safe_metric_cal(self):
  139. """
  140. """
  141. self.collisionRisk_list = []
  142. self.collision_dist_list = []
  143. self.collisionSeverity_list = []
  144. Tc = 0.3
  145. rho = 0.3 # 驾驶员制动反应时间
  146. ego_accel_max = 6 # 自车油门最大加速度
  147. obj_decel_max = 8 # 前车刹车最大减速度
  148. ego_decel_min = 1 # 自车刹车最小减速度 ug
  149. ego_decel_lon_max = 8
  150. ego_decel_lat_max = 1
  151. # 构建双层字典数据结构
  152. obj_dict = defaultdict(dict)
  153. obj_data_dict = self.df.to_dict('records')
  154. for item in obj_data_dict:
  155. obj_dict[item['simFrame']][item['playerId']] = item
  156. # print("obj_dict is", obj_dict)
  157. df_list = []
  158. EGO_PLAYER_ID = 1
  159. self.empty_flag = True
  160. print("obj_dict[1] is", self.frame_list[-1])
  161. for frame_num in self.frame_list:
  162. # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  163. ego_data = self.ego_df.iloc[frame_num]
  164. v1 = ego_data['v'] # km/h to m/s
  165. x1 = ego_data['posX']
  166. # print("x1 is", x1)
  167. y1 = ego_data['posY']
  168. h1 = ego_data['yaw']
  169. # len1 = 2.3 # 随手录入的数值
  170. # width1 = ego_data['dimY'] #
  171. # o_x1 = ego_data['offX']
  172. v_x1 = ego_data['lon_v'] # km/h to m/s
  173. # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
  174. v_y1 = 0 # 随手录入的数值
  175. a_x1 = ego_data['speedH']
  176. a_y1 = 0 # 随手录入的数值
  177. # a_y1 = ego_data['accelY']
  178. # a1 = ego_data['accel']
  179. # print("calculate is beginning...")
  180. # print("self.obj_id_list is", self.obj_id_list)
  181. for playerId in self.obj_id_list:
  182. dist_list = []
  183. # print("playerId is", playerId)
  184. if playerId == EGO_PLAYER_ID:
  185. continue
  186. try:
  187. obj_data = obj_dict[frame_num][playerId]
  188. except KeyError:
  189. continue
  190. x2 = obj_data['posX']
  191. y2 = obj_data['posY']
  192. # print("x2 is", y2)
  193. dist = self.dist(x1, y1, x2, y2)
  194. # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
  195. if dist <= DANGEROUS_DIST:
  196. dist_list.append(0)
  197. else:
  198. dist_list.append(1)
  199. obj_data['dist'] = dist
  200. # print("obj_data['dist'] is", obj_data['dist'])
  201. v_x2 = obj_data['lon_v'] # km/h to m/s
  202. v_y2 = obj_data['lat_v'] # km/h to m/s
  203. v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
  204. # v2 = obj_data['v'] / 3.6 # km/h to m/s
  205. # h2 = obj_data['posH']
  206. # len2 = obj_data['dimX']
  207. # width2 = obj_data['dimY']
  208. # o_x2 = obj_data['offX']
  209. # a_x2 = obj_data['accelX']
  210. # a_y2 = obj_data['accelY']
  211. a_x2 = obj_data['lon_acc']
  212. a_y2 = obj_data['lat_acc']
  213. # a2 = obj_data['accel']
  214. dx, dy = x2 - x1, y2 - y1
  215. # 定义矢量A和x轴正向向量x
  216. A = np.array([dx, dy])
  217. # print("A is", A)
  218. x = np.array([1, 0])
  219. # 计算点积和向量长度
  220. dot_product = np.dot(A, x)
  221. vector_length_A = np.linalg.norm(A)
  222. vector_length_x = np.linalg.norm(x)
  223. # 计算夹角的余弦值
  224. cos_theta = dot_product / (vector_length_A * vector_length_x)
  225. # 将余弦值转换为角度值(弧度制)
  226. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  227. lon_d = dist * math.cos(beta - h1)
  228. lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
  229. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  230. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  231. if lon_d > 5 or lon_d < -2 or lat_d > 1:
  232. continue
  233. self.empty_flag = False
  234. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  235. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  236. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  237. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  238. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  239. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  240. v_x2, v_y2)
  241. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  242. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  243. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  244. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  245. # obj_type = obj_data['type']
  246. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  247. # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
  248. # THW = self._cal_THW(dist, v_ego_p)
  249. # 单车道时可用
  250. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  251. # obj_decel_max)
  252. # lat_dist = 0.5
  253. # v_right = v1
  254. # v_left = v2
  255. # a_right_lat_brake_min = 1
  256. # a_left_lat_brake_min = 1
  257. # a_lat_max = 5
  258. # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  259. # a_left_lat_brake_min,
  260. # a_lat_max)
  261. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  262. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  263. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  264. # lon_a = abs(lon_a1 - lon_a2)
  265. # lon_d = dist * abs(math.cos(beta - h1))
  266. # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  267. # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  268. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  269. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  270. lat_a = abs(lat_a1 - lat_a2)
  271. lat_d = dist * abs(math.sin(beta - h1))
  272. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  273. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  274. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  275. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  276. # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
  277. # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
  278. TTC = None if (not TTC or TTC < 0) else TTC
  279. # MTTC = None if (not MTTC or MTTC < 0) else MTTC
  280. # THW = None if (not THW or THW < 0) else THW
  281. #
  282. # DRAC = 10 if DRAC >= 10 else DRAC
  283. # print("calculate is beginning...")
  284. if not TTC or TTC > 4000: # threshold = 4258.41
  285. collisionSeverity = 0
  286. pr_death = 0
  287. collisionRisk = 0
  288. else:
  289. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  290. collisionSeverity = 1 - result
  291. # print("collisionSeverity is", collisionSeverity)
  292. # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  293. pr_death = self._death_pr(vrel_projection_in_dist)
  294. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  295. # print("collinsionRisk is", collisionRisk)
  296. self.collisionRisk_list.append(collisionRisk)
  297. self.collisionSeverity_list.append(collisionSeverity)
  298. self.collision_dist_list.append(max(dist_list))
  299. # print("obj_data['dist'] is", collision_dist_list)
  300. prev_value = self.collision_dist_list[0] # 初始化前一个元素为列表的第一个元素
  301. # 遍历列表(从第二个元素开始)
  302. for i in range(1, len(self.collision_dist_list)):
  303. current_value = self.collision_dist_list[i] # 当前元素
  304. # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  305. if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  306. self.collisionCount += 1 # 增加0的段的计数
  307. self.collisionRisk = max(self.collisionRisk_list)
  308. self.collisionSeverity = max(self.collisionSeverity_list)
  309. print("collision count is", self.collisionCount)
  310. print("collisionRisk is", self.collisionRisk)
  311. print("collisionSeverity is", self.collisionSeverity)
  312. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  313. # 计算 AB 连线的向量 AB
  314. # dx = x2 - x1
  315. # dy = y2 - y1
  316. # 计算 AB 连线的模长 |AB|
  317. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  318. # 计算 AB 连线的单位向量 U_AB
  319. U_ABx = dx / AB_mod
  320. U_ABy = dy / AB_mod
  321. # 计算 A 在 AB 连线上的速度 V1_on_AB
  322. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  323. return V1_on_AB
  324. def _cal_v_projection(self, dx, dy, vx, vy):
  325. # 计算 AB 连线的向量 AB
  326. # dx = x2 - x1
  327. # dy = y2 - y1
  328. # 计算 AB 连线的模长 |AB|
  329. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  330. # 计算 AB 连线的单位向量 U_AB
  331. U_ABx = dx / AB_mod
  332. U_ABy = dy / AB_mod
  333. # 计算 A 相对于 B 的速度 V_relative
  334. # vx = vx1 - vx2
  335. # vy = vy1 - vy2
  336. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  337. V_on_AB = vx * U_ABx + vy * U_ABy
  338. return V_on_AB
  339. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  340. # 计算 AB 连线的向量 AB
  341. # dx = x2 - x1
  342. # dy = y2 - y1
  343. # 计算 θ
  344. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  345. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  346. if V_mod == 0 or AB_mod == 0:
  347. return 0
  348. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  349. theta = math.acos(cos_theta)
  350. # 计算 AB 连线的模长 |AB|
  351. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  352. # 计算 AB 连线的单位向量 U_AB
  353. U_ABx = dx / AB_mod
  354. U_ABy = dy / AB_mod
  355. # 计算 A 相对于 B 的加速度 a_relative
  356. # ax = ax1 - ax2
  357. # ay = ay1 - ay2
  358. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  359. a_on_AB = ax * U_ABx + ay * U_ABy
  360. VA = np.array([v_x1, v_y1])
  361. VB = np.array([v_x2, v_y2])
  362. D_A = np.array([x1, y1])
  363. D_B = np.array([x2, y2])
  364. V_r = VA - VB
  365. V = np.linalg.norm(V_r)
  366. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  367. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  368. return a_on_AB_back
  369. # 计算相对加速度
  370. def _calculate_derivative(self, a, w, V, theta):
  371. # 计算(V×cos(θ))'的值
  372. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  373. derivative = a - w * V * math.sin(theta)
  374. return derivative
  375. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  376. dx = A[0] - B[0]
  377. dy = A[1] - B[1]
  378. dvx = VA[0] - VB[0]
  379. dvy = VA[1] - VB[1]
  380. # (dx * dvy - dy * dvx)
  381. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  382. return angular_velocity
  383. # def _death_pr(self, obj_type, v_relative):
  384. def _death_pr(self, v_relative):
  385. # if obj_type == 5:
  386. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  387. # else:
  388. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  389. return p_death
  390. # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  391. def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
  392. # if obj_type == 5:
  393. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  394. # else:
  395. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  396. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  397. return collisionRisk
  398. # 求两车之间当前距离
  399. def dist(self, x1, y1, x2, y2):
  400. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  401. return dist
  402. def _cal_count_(self, dist):
  403. count = 0
  404. # TTC (time to collision)
  405. def _cal_TTC(self, dist, vrel_projection_in_dist):
  406. if vrel_projection_in_dist == 0:
  407. return math.inf
  408. TTC = dist / vrel_projection_in_dist
  409. return TTC
  410. def _normal_distribution(self, x):
  411. mean = 1.32
  412. std_dev = 0.26
  413. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  414. def _cal_max_min_avg(self, num_list):
  415. maxx = round(max(num_list), 4) if num_list else "-"
  416. minn = round(min(num_list), 4) if num_list else "-"
  417. avg = round(sum(num_list) / len(num_list), 4) if num_list else "-"
  418. result = {
  419. "max": maxx,
  420. "min": minn,
  421. "avg": avg
  422. }
  423. return result
  424. def _safe_statistic(self):
  425. """
  426. """
  427. self.collision_risk_dict = self._cal_max_min_avg(self.collisionRisk_list)
  428. self.collision_severity_dict = self._cal_max_min_avg(self.collisionSeverity_list)
  429. self.over_speed_dict = self._cal_max_min_avg(self.overSpeed_list)
  430. arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]]
  431. return arr_safe
  432. def safe_score_new(self):
  433. """
  434. """
  435. score_metric_dict = {}
  436. score_type_dict = {}
  437. arr_safe = self._safe_statistic()
  438. print("\n[安全性表现及得分情况]")
  439. print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
  440. if arr_safe:
  441. arr_safe = np.array(arr_safe)
  442. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
  443. score_sub = score_model.cal_score()
  444. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  445. # metric_num = len(metric_list)
  446. if len(self.obj_id_list) > 1 and not self.empty_flag:
  447. # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
  448. pass
  449. else:
  450. score_sub = [80] * len(score_sub)
  451. score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
  452. score_metric = score_sub
  453. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  454. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  455. score_metric = list(score_metric_dict.values())
  456. if self.weight_custom: # 用户自定义权重
  457. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  458. self.weight_dict}
  459. for type in self.type_list:
  460. type_score = sum(
  461. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  462. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  463. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  464. score_type_dict}
  465. score_safe = sum(score_type_with_weight_dict.values())
  466. else: # 动态客观赋权
  467. self.weight_list = cal_weight_from_80(score_metric)
  468. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  469. score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  470. for type in self.type_list:
  471. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  472. for key, value in self.weight_dict.items():
  473. if key in self.metric_dict[type]:
  474. # self.weight_dict[key] = round(value / type_weight, 4)
  475. self.weight_dict[key] = value / type_weight
  476. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  477. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  478. type_priority_list = [value for key, value in self.priority_dict.items() if
  479. key in self.metric_dict[type]]
  480. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  481. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  482. for key in self.weight_dict:
  483. self.weight_dict[key] = round(self.weight_dict[key], 4)
  484. score_type = list(score_type_dict.values())
  485. self.weight_type_list = cal_weight_from_80(score_type)
  486. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  487. score_safe = round(score_safe, 2)
  488. # score_type = [round(x, 2) for key, x in score_type_dict.items()]
  489. # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
  490. print("安全性各指标基准值:", self.optimal_list)
  491. print(f"安全性得分为:{score_safe:.2f}分。")
  492. print(f"安全性各类型得分为:{score_type_dict}。")
  493. print(f"安全性各指标得分为:{score_metric_dict}。")
  494. # return score_safe, score_type, score_metric
  495. return score_safe, score_type_dict, score_metric_dict
  496. def report_statistic(self):
  497. report_dict = {
  498. "name": "安全性",
  499. "weight": f"{self.weight * 100:.2f}%",
  500. # 'collisionRisk': self.collisionRisk,
  501. # "noObjectCar": True,
  502. }
  503. # No objects
  504. if self.obj_id_list == 0:
  505. score_safe = 80
  506. grade_safe = "良好"
  507. score_type_dict = {
  508. "safeCollision": 80,
  509. "safeSpeed": 80
  510. }
  511. score_metric_dict = {
  512. "collisionCount": 80,
  513. "collisionRisk": 80,
  514. "collisionSeverity": 80,
  515. "overSpeed": 80
  516. }
  517. description = "安全性在无目标车情况下表现良好。"
  518. description1 = "次数:0次"
  519. description2 = f"最大值:0%;" \
  520. f"最小值:0%;" \
  521. f"平均值:0%"
  522. description3 = f"最大值:0%;" \
  523. f"最小值:0%;" \
  524. f"平均值:0%"
  525. description4 = f"最大值:{self.over_speed_dict['max']:.4f}%;" \
  526. f"最小值:{self.over_speed_dict['min']:.4f}%;" \
  527. f"平均值:{self.over_speed_dict['avg']:.4f}%"
  528. # with objects
  529. else:
  530. score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
  531. score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
  532. grade_safe = score_grade(score_safe)
  533. description = f"· 在安全性方面,得分{score_safe}分,表现{grade_safe},"
  534. is_good = True
  535. if self.collisionCount > 0:
  536. is_good = False
  537. description += f"出现{self.collisionCount}次碰撞,需重点优化。"
  538. if self.over_speed_dict['max'] > 0:
  539. is_good = False
  540. description += f"超速比例最大值{self.over_speed_dict['max']},需重点优化。"
  541. if is_good:
  542. description += "速度控制良好。"
  543. description1 = f"次数:{self.collisionCount}次"
  544. description2 = f"最大值:{self.collision_risk_dict['max']}%;" \
  545. f"最小值:{self.collision_risk_dict['min']}%;" \
  546. f"平均值:{self.collision_risk_dict['avg']}%"
  547. description3 = f"最大值:{self.collision_severity_dict['max']}%;" \
  548. f"最小值:{self.collision_severity_dict['min']}%;" \
  549. f"平均值:{self.collision_severity_dict['avg']}%"
  550. description4 = f"最大值:{self.over_speed_dict['max']}%;" \
  551. f"最小值:{self.over_speed_dict['min']}%;" \
  552. f"平均值:{self.over_speed_dict['avg']}%"
  553. report_dict["score"] = score_safe
  554. report_dict["level"] = grade_safe
  555. report_dict["description"] = replace_key_with_value(description, self.name_dict)
  556. collisionCount_index = {
  557. "weight": self.weight_dict['collisionCount'],
  558. "score": score_metric_dict['collisionCount'],
  559. "description": description1
  560. }
  561. collisionRisk_index = {
  562. "weight": self.weight_dict['collisionRisk'],
  563. "score": score_metric_dict['collisionRisk'],
  564. "description": description2
  565. }
  566. collisionSeverity_index = {
  567. "weight": self.weight_dict['collisionSeverity'],
  568. "score": score_metric_dict['collisionSeverity'],
  569. "description": description3
  570. }
  571. overSpeed_index = {
  572. "weight": self.weight_dict['overSpeed'],
  573. "score": score_metric_dict['overSpeed'],
  574. "description": description4
  575. }
  576. indexes_dict = {
  577. "collisionCount": collisionCount_index,
  578. "collisionRisk": collisionRisk_index,
  579. "collisionSeverity": collisionSeverity_index,
  580. "overSpeed": overSpeed_index
  581. }
  582. report_dict["indexes"] = indexes_dict
  583. print(report_dict)
  584. return report_dict
  585. # if __name__ == "__main__":
  586. # pass
  587. #
  588. # print("---started---")
  589. # count = 0
  590. # dataPath = './task/case0613' # 设置路径
  591. # configPath = r"./config_safe.json"
  592. # config = ConfigParse(configPath)
  593. # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
  594. # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
  595. # safe_indicator = Safe(data_processed)
  596. # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
  597. #
  598. # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
  599. #
  600. # # 遍历列表(从第二个元素开始)
  601. # for i in range(1, len(collision_dist_count)):
  602. # current_value = collision_dist_count[i] # 当前元素
  603. #
  604. # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  605. # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  606. # count += 1 # 增加0的段的计数
  607. # print("collisionRisk is", max(collisionRisk_list))
  608. # print("collisionSeverity is", max(collisionSeverity_list))
  609. # print("collision count is", count)
  610. # print("---end---")