safe.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import os
  15. import math
  16. import numpy as np
  17. import pandas as pd
  18. from collections import defaultdict
  19. import scipy.integrate as spi
  20. from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
  21. OVERSPEED_THRESHOLD
  22. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100, _cal_max_min_avg
  23. from score_weight import cal_score_with_priority, cal_weight_from_80
  24. class Safe(object):
  25. """
  26. Class for achieving safe metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
  30. Methods:
  31. _safe_param_cal_new: Calculate parameters for evaluateion.
  32. _cal_v_ego_projection: Calculate ego car velocity project over distance.
  33. _cal_v_projection: Calculate relative velocity project over distance.
  34. _cal_a_projection: Calculate relative acceleration project over distance.
  35. _calculate_derivative: Calculate derivative.
  36. _cal_relative_angular_v: Calculate relative angular velocity.
  37. _death_pr: Calculate death probability.
  38. _cal_collisionRisk_level: Calculate collisionRisk level.
  39. dist: Calculate distance of two cars.
  40. """
  41. def __init__(self, data_processed, scoreModel, resultPath):
  42. # self.eval_data = pd.DataFrame()
  43. # self.data_processed = data_processed
  44. self.scoreModel = scoreModel
  45. self.resultPath = resultPath
  46. self.df = data_processed.object_df.copy()
  47. self.ego_df = data_processed.ego_df
  48. # print("self.ego_df is", self.ego_df)
  49. self.obj_id_list = data_processed.obj_id_list
  50. # print("self.obj_id_list is", self.obj_id_list)
  51. #
  52. # # config infos for calculating score
  53. self.config = data_processed.config
  54. # print("self.config is", self.config)
  55. safe_config = self.config.config['safe']
  56. self.safe_config = safe_config
  57. # # common data
  58. self.bulitin_metric_list = self.config.builtinMetricList
  59. #
  60. # # dimension data
  61. self.weight_custom = safe_config['weightCustom']
  62. self.metric_list = safe_config['metric']
  63. self.type_list = safe_config['type']
  64. self.type_name_dict = safe_config['typeName']
  65. self.name_dict = safe_config['name']
  66. self.unit_dict = safe_config['unit']
  67. # # custom metric data
  68. # # self.customMetricParam = safe_config['customMetricParam']
  69. # # self.custom_metric_list = list(self.customMetricParam.keys())
  70. # # self.custom_data = {}
  71. # # self.custom_param_dict = {}
  72. #
  73. # # score data
  74. self.weight = safe_config['weightDimension']
  75. self.weight_type_dict = safe_config['typeWeight']
  76. self.weight_type_list = safe_config['typeWeightList']
  77. self.weight_dict = safe_config['weight']
  78. self.weight_list = safe_config['weightList']
  79. self.priority_dict = safe_config['priority']
  80. self.priority_list = safe_config['priorityList']
  81. self.kind1_dict = safe_config['kind']
  82. # self.kind1_dict = self.kind_dict[0]
  83. self.optimal1_dict = safe_config['optimal']
  84. # self.optimal1_dict = self.optimal_dict[0]
  85. self.multiple_dict = safe_config['multiple']
  86. self.kind_list = safe_config['kindList']
  87. self.optimal_list = safe_config['optimalList']
  88. self.multiple_list = safe_config['multipleList']
  89. self.metric_dict = safe_config['typeMetricDict']
  90. # # self.time_metric_list = self.metric_dict['safeTime']
  91. # # self.distance_metric_list = self.metric_dict['safeDistance']
  92. #
  93. # # lists of drving control info
  94. # self.time_list = data_processed.driver_ctrl_data['time_list']
  95. # self.frame_list = self.df['simFrame'].values.tolist()
  96. self.frame_list = self.df['simFrame'].values.tolist()
  97. self.collision_dist_list = []
  98. self.collisionRisk_list = []
  99. self.collisionSeverity_list = []
  100. self.overSpeed_list = []
  101. self.collision_risk_dict = dict()
  102. self.collision_severity_dict = dict()
  103. self.over_speed_dict = dict()
  104. self.collisionCount = 0
  105. self.collisionRisk = 0
  106. self.collisionSeverity = 0
  107. self.overSpeed = 0
  108. self.empty_flag = True
  109. if len(self.obj_id_list) > 1:
  110. # self.most_dangerous = {}
  111. # self.pass_percent = {}
  112. self._safe_metric_cal()
  113. self._overspeed_cal()
  114. # # no car following scene
  115. # if len(self.obj_id_list) > 1:
  116. # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  117. # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  118. # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  119. # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  120. # self.unsafe_acce_drac_df = pd.DataFrame(
  121. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  122. # self.unsafe_acce_xtn_df = pd.DataFrame(
  123. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  124. # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  125. # self.most_dangerous = {}
  126. # self.pass_percent = {}
  127. # self._safe_param_cal_new()
  128. def _overspeed_cal(self):
  129. """
  130. :return:
  131. """
  132. self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist()
  133. if self.overSpeed_list:
  134. max_speed = max(self.overSpeed_list)
  135. overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100
  136. self.overSpeed = round(overspeed, 2)
  137. else:
  138. self.overSpeed = 0
  139. def _safe_metric_cal(self):
  140. """
  141. """
  142. self.collisionRisk_list = []
  143. self.collision_dist_list = []
  144. self.collisionSeverity_list = []
  145. Tc = 0.3
  146. rho = 0.3 # 驾驶员制动反应时间
  147. ego_accel_max = 6 # 自车油门最大加速度
  148. obj_decel_max = 8 # 前车刹车最大减速度
  149. ego_decel_min = 1 # 自车刹车最小减速度 ug
  150. ego_decel_lon_max = 8
  151. ego_decel_lat_max = 1
  152. # 构建双层字典数据结构
  153. obj_dict = defaultdict(dict)
  154. obj_data_dict = self.df.to_dict('records')
  155. for item in obj_data_dict:
  156. obj_dict[item['simFrame']][item['playerId']] = item
  157. # print("obj_dict is", obj_dict)
  158. df_list = []
  159. EGO_PLAYER_ID = 1
  160. self.empty_flag = True
  161. # print("obj_dict[1] is", self.frame_list)
  162. for frame_num in self.frame_list:
  163. # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  164. # ego_data = self.ego_df.iloc[frame_num]
  165. ego_data = self.ego_df[self.ego_df['simFrame'] == frame_num]
  166. # if frame_num == 1:
  167. # print("ego_data is", ego_data)
  168. v1 = ego_data['v'] # km/h to m/s
  169. x1 = ego_data['posX'].values[0]
  170. # print("x1 is", x1)
  171. y1 = ego_data['posY'].values[0]
  172. # print("x1 and y1 are", x1, y1)
  173. h1 = ego_data['yaw']
  174. # len1 = 2.3 # 随手录入的数值
  175. # width1 = ego_data['dimY'] #
  176. # o_x1 = ego_data['offX']
  177. v_x1 = ego_data['lon_v'].values[0] # km/h to m/s
  178. # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
  179. v_y1 = 0 # 随手录入的数值
  180. a_x1 = ego_data['speedH'].values[0]
  181. a_y1 = 0 # 随手录入的数值
  182. # a_y1 = ego_data['accelY']
  183. # a1 = ego_data['accel']
  184. # print("calculate is beginning...")
  185. # print("self.obj_id_list is", self.obj_id_list)
  186. for playerId in self.obj_id_list:
  187. dist_list = []
  188. # print("playerId is", playerId)
  189. if playerId == EGO_PLAYER_ID:
  190. continue
  191. try:
  192. obj_data = obj_dict[frame_num][playerId]
  193. except KeyError:
  194. continue
  195. x2 = obj_data['posX']
  196. y2 = obj_data['posY']
  197. # print(" x2 and y2 are", x2, y2)
  198. # print("x2 is", y2)
  199. dist = self.dist(x1, y1, x2, y2)
  200. # print("dist is", dist)
  201. # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
  202. if dist <= DANGEROUS_DIST:
  203. dist_list.append(0)
  204. else:
  205. dist_list.append(1)
  206. obj_data['dist'] = dist
  207. # print("obj_data['dist'] is", obj_data['dist'])
  208. v_x2 = obj_data['lon_v'] # km/h to m/s
  209. v_y2 = obj_data['lat_v'] # km/h to m/s
  210. v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
  211. # v2 = obj_data['v'] / 3.6 # km/h to m/s
  212. # h2 = obj_data['posH']
  213. # len2 = obj_data['dimX']
  214. # width2 = obj_data['dimY']
  215. # o_x2 = obj_data['offX']
  216. # a_x2 = obj_data['accelX']
  217. # a_y2 = obj_data['accelY']
  218. a_x2 = obj_data['lon_acc']
  219. a_y2 = obj_data['lat_acc']
  220. # a2 = obj_data['accel']
  221. dx, dy = x2 - x1, y2 - y1
  222. # 定义矢量A和x轴正向向量x
  223. A = np.array([dx, dy])
  224. # print("A is", A)
  225. x = np.array([1, 0])
  226. # 计算点积和向量长度
  227. dot_product = np.dot(A, x)
  228. vector_length_A = np.linalg.norm(A)
  229. vector_length_x = np.linalg.norm(x)
  230. # 计算夹角的余弦值
  231. cos_theta = dot_product / (vector_length_A * vector_length_x)
  232. # 将余弦值转换为角度值(弧度制)
  233. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  234. lon_d = dist * math.cos(beta - h1)
  235. lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
  236. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  237. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  238. if lon_d > 5 or lon_d < -2 or lat_d > 1:
  239. continue
  240. self.empty_flag = False
  241. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  242. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  243. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  244. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  245. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  246. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  247. v_x2, v_y2)
  248. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  249. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  250. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  251. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  252. # obj_type = obj_data['type']
  253. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  254. # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
  255. # THW = self._cal_THW(dist, v_ego_p)
  256. # 单车道时可用
  257. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  258. # obj_decel_max)
  259. # lat_dist = 0.5
  260. # v_right = v1
  261. # v_left = v2
  262. # a_right_lat_brake_min = 1
  263. # a_left_lat_brake_min = 1
  264. # a_lat_max = 5
  265. # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  266. # a_left_lat_brake_min,
  267. # a_lat_max)
  268. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  269. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  270. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  271. # lon_a = abs(lon_a1 - lon_a2)
  272. # lon_d = dist * abs(math.cos(beta - h1))
  273. # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  274. # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  275. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  276. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  277. lat_a = abs(lat_a1 - lat_a2)
  278. lat_d = dist * abs(math.sin(beta - h1))
  279. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  280. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  281. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  282. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  283. # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
  284. # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
  285. TTC = None if (not TTC or TTC < 0) else TTC
  286. # MTTC = None if (not MTTC or MTTC < 0) else MTTC
  287. # THW = None if (not THW or THW < 0) else THW
  288. #
  289. # DRAC = 10 if DRAC >= 10 else DRAC
  290. # print("calculate is beginning...")
  291. if not TTC or TTC > 4000: # threshold = 4258.41
  292. collisionSeverity = 0
  293. pr_death = 0
  294. collisionRisk = 0
  295. else:
  296. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  297. collisionSeverity = 1 - result
  298. # print("collisionSeverity is", collisionSeverity)
  299. # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  300. pr_death = self._death_pr(vrel_projection_in_dist)
  301. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  302. # print("collinsionRisk is", collisionRisk)
  303. self.collisionRisk_list.append(collisionRisk)
  304. self.collisionSeverity_list.append(collisionSeverity)
  305. self.collision_dist_list.append(max(dist_list))
  306. # print("obj_data['dist'] is", collision_dist_list)
  307. prev_value = self.collision_dist_list[0] # 初始化前一个元素为列表的第一个元素
  308. # 遍历列表(从第二个元素开始)
  309. for i in range(1, len(self.collision_dist_list)):
  310. current_value = self.collision_dist_list[i] # 当前元素
  311. # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  312. if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  313. self.collisionCount += 1 # 增加0的段的计数
  314. self.collisionRisk = max(self.collisionRisk_list)
  315. self.collisionSeverity = max(self.collisionSeverity_list)
  316. print("collision count is", self.collisionCount)
  317. print("collisionRisk is", self.collisionRisk)
  318. print("collisionSeverity is", self.collisionSeverity)
  319. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  320. # 计算 AB 连线的向量 AB
  321. # dx = x2 - x1
  322. # dy = y2 - y1
  323. # 计算 AB 连线的模长 |AB|
  324. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  325. # 计算 AB 连线的单位向量 U_AB
  326. U_ABx = dx / AB_mod
  327. U_ABy = dy / AB_mod
  328. # 计算 A 在 AB 连线上的速度 V1_on_AB
  329. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  330. return V1_on_AB
  331. def _cal_v_projection(self, dx, dy, vx, vy):
  332. # 计算 AB 连线的向量 AB
  333. # dx = x2 - x1
  334. # dy = y2 - y1
  335. # 计算 AB 连线的模长 |AB|
  336. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  337. # 计算 AB 连线的单位向量 U_AB
  338. U_ABx = dx / AB_mod
  339. U_ABy = dy / AB_mod
  340. # 计算 A 相对于 B 的速度 V_relative
  341. # vx = vx1 - vx2
  342. # vy = vy1 - vy2
  343. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  344. V_on_AB = vx * U_ABx + vy * U_ABy
  345. return V_on_AB
  346. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  347. # 计算 AB 连线的向量 AB
  348. # dx = x2 - x1
  349. # dy = y2 - y1
  350. # 计算 θ
  351. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  352. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  353. if V_mod == 0 or AB_mod == 0:
  354. return 0
  355. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  356. theta = math.acos(cos_theta)
  357. # 计算 AB 连线的模长 |AB|
  358. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  359. # 计算 AB 连线的单位向量 U_AB
  360. U_ABx = dx / AB_mod
  361. U_ABy = dy / AB_mod
  362. # 计算 A 相对于 B 的加速度 a_relative
  363. # ax = ax1 - ax2
  364. # ay = ay1 - ay2
  365. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  366. a_on_AB = ax * U_ABx + ay * U_ABy
  367. VA = np.array([v_x1, v_y1])
  368. VB = np.array([v_x2, v_y2])
  369. D_A = np.array([x1, y1])
  370. D_B = np.array([x2, y2])
  371. V_r = VA - VB
  372. V = np.linalg.norm(V_r)
  373. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  374. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  375. return a_on_AB_back
  376. # 计算相对加速度
  377. def _calculate_derivative(self, a, w, V, theta):
  378. # 计算(V×cos(θ))'的值
  379. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  380. derivative = a - w * V * math.sin(theta)
  381. return derivative
  382. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  383. dx = A[0] - B[0]
  384. dy = A[1] - B[1]
  385. dvx = VA[0] - VB[0]
  386. dvy = VA[1] - VB[1]
  387. # (dx * dvy - dy * dvx)
  388. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  389. return angular_velocity
  390. # def _death_pr(self, obj_type, v_relative):
  391. def _death_pr(self, v_relative):
  392. # if obj_type == 5:
  393. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  394. # else:
  395. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  396. return p_death
  397. # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  398. def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
  399. # if obj_type == 5:
  400. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  401. # else:
  402. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  403. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  404. return collisionRisk
  405. # 求两车之间当前距离
  406. def dist(self, x1, y1, x2, y2):
  407. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  408. return dist
  409. def _cal_count_(self, dist):
  410. count = 0
  411. # TTC (time to collision)
  412. def _cal_TTC(self, dist, vrel_projection_in_dist):
  413. if vrel_projection_in_dist == 0:
  414. return math.inf
  415. TTC = dist / vrel_projection_in_dist
  416. return TTC
  417. def _normal_distribution(self, x):
  418. mean = 1.32
  419. std_dev = 0.26
  420. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  421. def _safe_statistic(self):
  422. """
  423. """
  424. self.collision_risk_dict = _cal_max_min_avg(self.collisionRisk_list)
  425. self.collision_severity_dict = _cal_max_min_avg(self.collisionSeverity_list)
  426. self.over_speed_dict = _cal_max_min_avg(self.overSpeed_list)
  427. arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]]
  428. return arr_safe
  429. def safe_score_new(self):
  430. """
  431. """
  432. score_metric_dict = {}
  433. score_type_dict = {}
  434. arr_safe = self._safe_statistic()
  435. print("\n[安全性表现及得分情况]")
  436. print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
  437. if arr_safe:
  438. arr_safe = np.array(arr_safe)
  439. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
  440. score_sub = score_model.cal_score()
  441. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  442. # metric_num = len(metric_list)
  443. if len(self.obj_id_list) > 1 and not self.empty_flag:
  444. # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
  445. pass
  446. else:
  447. score_sub = [80] * len(score_sub)
  448. score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
  449. score_metric = score_sub
  450. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  451. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  452. score_metric = list(score_metric_dict.values())
  453. if self.weight_custom: # 用户自定义权重
  454. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  455. self.weight_dict}
  456. for type in self.type_list:
  457. type_score = sum(
  458. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  459. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  460. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  461. score_type_dict}
  462. score_safe = sum(score_type_with_weight_dict.values())
  463. else: # 动态客观赋权
  464. self.weight_list = cal_weight_from_80(score_metric)
  465. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  466. score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  467. for type in self.type_list:
  468. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  469. for key, value in self.weight_dict.items():
  470. if key in self.metric_dict[type]:
  471. # self.weight_dict[key] = round(value / type_weight, 4)
  472. self.weight_dict[key] = value / type_weight
  473. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  474. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  475. type_priority_list = [value for key, value in self.priority_dict.items() if
  476. key in self.metric_dict[type]]
  477. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  478. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  479. for key in self.weight_dict:
  480. self.weight_dict[key] = round(self.weight_dict[key], 4)
  481. score_type = list(score_type_dict.values())
  482. self.weight_type_list = cal_weight_from_80(score_type)
  483. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  484. score_safe = round(score_safe, 2)
  485. # score_type = [round(x, 2) for key, x in score_type_dict.items()]
  486. # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
  487. print("安全性各指标基准值:", self.optimal_list)
  488. print(f"安全性得分为:{score_safe:.2f}分。")
  489. print(f"安全性各类型得分为:{score_type_dict}。")
  490. print(f"安全性各指标得分为:{score_metric_dict}。")
  491. # return score_safe, score_type, score_metric
  492. return score_safe, score_type_dict, score_metric_dict
  493. def report_statistic(self):
  494. report_dict = {
  495. "name": "安全性",
  496. "weight": f"{self.weight * 100:.2f}%",
  497. # 'collisionRisk': self.collisionRisk,
  498. # "noObjectCar": True,
  499. }
  500. # No objects
  501. if self.obj_id_list == 0:
  502. score_safe = 80
  503. grade_safe = "良好"
  504. score_type_dict = {
  505. "safeCollision": 80,
  506. "safeSpeed": 80
  507. }
  508. score_metric_dict = {
  509. "collisionCount": 80,
  510. "collisionRisk": 80,
  511. "collisionSeverity": 80,
  512. "overSpeed": 80
  513. }
  514. description = "安全性在无目标车情况下表现良好。"
  515. description1 = "次数:0次"
  516. description2 = f"最大值:0%;" \
  517. f"最小值:0%;" \
  518. f"平均值:0%"
  519. description3 = f"最大值:0%;" \
  520. f"最小值:0%;" \
  521. f"平均值:0%"
  522. description4 = f"最大值:{100 * self.over_speed_dict['max']}%;" \
  523. f"最小值:{100 * self.over_speed_dict['min']}%;" \
  524. f"平均值:{100 * self.over_speed_dict['avg']}%"
  525. # with objects
  526. else:
  527. score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
  528. score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
  529. grade_safe = score_grade(score_safe)
  530. description = f"· 在安全性方面,得分{score_safe}分,表现{grade_safe},"
  531. is_good = True
  532. if self.collisionCount > 0:
  533. is_good = False
  534. description += f"出现{self.collisionCount}次碰撞,需重点优化。"
  535. if self.over_speed_dict['max'] > 0:
  536. is_good = False
  537. description += f"超速比例最大值{100 * self.over_speed_dict['max']}%,需重点优化。"
  538. if is_good:
  539. description += "速度控制良好。"
  540. description1 = f"次数:{self.collisionCount}次"
  541. description2 = f"最大值:{self.collision_risk_dict['max']}%;" \
  542. f"最小值:{self.collision_risk_dict['min']}%;" \
  543. f"平均值:{self.collision_risk_dict['avg']}%"
  544. description3 = f"最大值:{self.collision_severity_dict['max']}%;" \
  545. f"最小值:{self.collision_severity_dict['min']}%;" \
  546. f"平均值:{self.collision_severity_dict['avg']}%"
  547. description4 = f"最大值:{self.over_speed_dict['max']}%;" \
  548. f"最小值:{self.over_speed_dict['min']}%;" \
  549. f"平均值:{self.over_speed_dict['avg']}%"
  550. report_dict["score"] = score_safe
  551. report_dict["level"] = grade_safe
  552. report_dict["description"] = replace_key_with_value(description, self.name_dict)
  553. collisionCount_index = {
  554. "weight": self.weight_dict['collisionCount'],
  555. "score": score_metric_dict['collisionCount'],
  556. "description": description1
  557. }
  558. collisionRisk_index = {
  559. "weight": self.weight_dict['collisionRisk'],
  560. "score": score_metric_dict['collisionRisk'],
  561. "description": description2
  562. }
  563. collisionSeverity_index = {
  564. "weight": self.weight_dict['collisionSeverity'],
  565. "score": score_metric_dict['collisionSeverity'],
  566. "description": description3
  567. }
  568. overSpeed_index = {
  569. "weight": self.weight_dict['overSpeed'],
  570. "score": score_metric_dict['overSpeed'],
  571. "description": description4
  572. }
  573. indexes_dict = {
  574. "collisionCount": collisionCount_index,
  575. "collisionRisk": collisionRisk_index,
  576. "collisionSeverity": collisionSeverity_index,
  577. "overSpeed": overSpeed_index
  578. }
  579. report_dict["indexes"] = indexes_dict
  580. print(report_dict)
  581. return report_dict
  582. # if __name__ == "__main__":
  583. # pass
  584. #
  585. # print("---started---")
  586. # count = 0
  587. # dataPath = './task/case0613' # 设置路径
  588. # configPath = r"./config_safe.json"
  589. # config = ConfigParse(configPath)
  590. # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
  591. # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
  592. # safe_indicator = Safe(data_processed)
  593. # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
  594. #
  595. # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
  596. #
  597. # # 遍历列表(从第二个元素开始)
  598. # for i in range(1, len(collision_dist_count)):
  599. # current_value = collision_dist_count[i] # 当前元素
  600. #
  601. # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  602. # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  603. # count += 1 # 增加0的段的计数
  604. # print("collisionRisk is", max(collisionRisk_list))
  605. # print("collisionSeverity is", max(collisionSeverity_list))
  606. # print("collision count is", count)
  607. # print("---end---")