safe.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import os
  15. import math
  16. import numpy as np
  17. import pandas as pd
  18. from collections import defaultdict
  19. import scipy.integrate as spi
  20. from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
  21. OVERSPEED_THRESHOLD
  22. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100, _cal_max_min_avg
  23. from score_weight import cal_score_with_priority, cal_weight_from_80
  24. class Safe(object):
  25. """
  26. Class for achieving safe metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
  30. Methods:
  31. _safe_param_cal_new: Calculate parameters for evaluateion.
  32. _cal_v_ego_projection: Calculate ego car velocity project over distance.
  33. _cal_v_projection: Calculate relative velocity project over distance.
  34. _cal_a_projection: Calculate relative acceleration project over distance.
  35. _calculate_derivative: Calculate derivative.
  36. _cal_relative_angular_v: Calculate relative angular velocity.
  37. _death_pr: Calculate death probability.
  38. _cal_collisionRisk_level: Calculate collisionRisk level.
  39. dist: Calculate distance of two cars.
  40. """
  41. def __init__(self, data_processed, scoreModel, resultPath):
  42. # self.eval_data = pd.DataFrame()
  43. # self.data_processed = data_processed
  44. self.scoreModel = scoreModel
  45. self.resultPath = resultPath
  46. self.df = data_processed.object_df.copy()
  47. self.ego_df = data_processed.ego_df
  48. # print("self.ego_df is", self.ego_df)
  49. self.obj_id_list = data_processed.obj_id_list
  50. # print("self.obj_id_list is", self.obj_id_list)
  51. #
  52. # # config infos for calculating score
  53. self.config = data_processed.config
  54. # print("self.config is", self.config)
  55. safe_config = self.config.config['safe']
  56. self.safe_config = safe_config
  57. # # common data
  58. self.bulitin_metric_list = self.config.builtinMetricList
  59. #
  60. # # dimension data
  61. self.weight_custom = safe_config['weightCustom']
  62. self.metric_list = safe_config['metric']
  63. self.type_list = safe_config['type']
  64. self.type_name_dict = safe_config['typeName']
  65. self.name_dict = safe_config['name']
  66. self.unit_dict = safe_config['unit']
  67. # # custom metric data
  68. # # self.customMetricParam = safe_config['customMetricParam']
  69. # # self.custom_metric_list = list(self.customMetricParam.keys())
  70. # # self.custom_data = {}
  71. # # self.custom_param_dict = {}
  72. #
  73. # # score data
  74. self.weight = safe_config['weightDimension']
  75. self.weight_type_dict = safe_config['typeWeight']
  76. self.weight_type_list = safe_config['typeWeightList']
  77. self.weight_dict = safe_config['weight']
  78. self.weight_list = safe_config['weightList']
  79. self.priority_dict = safe_config['priority']
  80. self.priority_list = safe_config['priorityList']
  81. self.kind1_dict = safe_config['kind']
  82. # self.kind1_dict = self.kind_dict[0]
  83. self.optimal1_dict = safe_config['optimal']
  84. # self.optimal1_dict = self.optimal_dict[0]
  85. self.multiple_dict = safe_config['multiple']
  86. self.kind_list = safe_config['kindList']
  87. self.optimal_list = safe_config['optimalList']
  88. self.multiple_list = safe_config['multipleList']
  89. self.metric_dict = safe_config['typeMetricDict']
  90. # # self.time_metric_list = self.metric_dict['safeTime']
  91. # # self.distance_metric_list = self.metric_dict['safeDistance']
  92. #
  93. # # lists of drving control info
  94. # self.time_list = data_processed.driver_ctrl_data['time_list']
  95. # self.frame_list = self.df['simFrame'].values.tolist()
  96. if len(self.df) > 0:
  97. if self.df['simFrame'].values.tolist()[-1] <= self.ego_df['simFrame'].values.tolist()[-1]:
  98. self.frame_list = self.df['simFrame'].values.tolist()
  99. else:
  100. self.frame_list = self.ego_df['simFrame'].values.tolist()
  101. else:
  102. self.frame_list = []
  103. self.collision_dist_list = []
  104. self.collisionRisk_list = []
  105. self.collisionSeverity_list = []
  106. self.overSpeed_list = []
  107. self.simFrame_list = []
  108. self.simtime_risk_max = []
  109. self.simtime_risk_min = []
  110. self.simFrame_severity_list = []
  111. self.simtime_severity_max = []
  112. self.simtime_severity_min = []
  113. self.simtime_collisioncount = []
  114. self.collision_risk_dict = dict()
  115. self.collision_severity_dict = dict()
  116. self.over_speed_dict = dict()
  117. self.collisionCount = 0
  118. self.collisionRisk = 0
  119. self.collisionSeverity = 0
  120. self.overSpeed = 0
  121. self.simFrame_collisionCount_list = []
  122. self.empty_flag = True
  123. if len(self.obj_id_list) > 1:
  124. # self.most_dangerous = {}
  125. # self.pass_percent = {}
  126. self._safe_metric_cal()
  127. self._overspeed_cal()
  128. # # no car following scene
  129. # if len(self.obj_id_list) > 1:
  130. # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  131. # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  132. # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  133. # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  134. # self.unsafe_acce_drac_df = pd.DataFrame(
  135. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  136. # self.unsafe_acce_xtn_df = pd.DataFrame(
  137. # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  138. # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  139. # self.most_dangerous = {}
  140. # self.pass_percent = {}
  141. # self._safe_param_cal_new()
  142. def _overspeed_cal(self):
  143. """
  144. :return:
  145. """
  146. self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist()
  147. self.overSpeed_time_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['simTime'].values.tolist()
  148. if self.overSpeed_list:
  149. max_speed = max(self.overSpeed_list)
  150. overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100
  151. self.overSpeed = round(overspeed, 2)
  152. else:
  153. self.overSpeed = 0
  154. def _safe_metric_cal(self):
  155. """
  156. """
  157. self.collisionRisk_list = []
  158. self.collision_dist_list = []
  159. self.collisionSeverity_list = []
  160. Tc = 0.3
  161. rho = 0.3 # 驾驶员制动反应时间
  162. ego_accel_max = 6 # 自车油门最大加速度
  163. obj_decel_max = 8 # 前车刹车最大减速度
  164. ego_decel_min = 1 # 自车刹车最小减速度 ug
  165. ego_decel_lon_max = 8
  166. ego_decel_lat_max = 1
  167. # 构建双层字典数据结构
  168. obj_dict = defaultdict(dict)
  169. obj_data_dict = self.df.to_dict('records')
  170. for item in obj_data_dict:
  171. obj_dict[item['simFrame']][item['playerId']] = item
  172. # print("obj_dict is", obj_dict)
  173. df_list = []
  174. EGO_PLAYER_ID = 1
  175. self.empty_flag = True
  176. # print("obj_dict[1] is", self.frame_list)
  177. for frame_num in self.frame_list:
  178. # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
  179. # ego_data = self.ego_df.iloc[frame_num]
  180. ego_data = self.ego_df[self.ego_df['simFrame'] == frame_num]
  181. # if frame_num == 1:
  182. v1 = ego_data['v'] # km/h to m/s
  183. # print("ego_data['posX'].values is", ego_data['posX'].values[0])
  184. x1 = ego_data['posX'].values[0]
  185. # print("x1 is", x1)
  186. y1 = ego_data['posY'].values[0]
  187. # print("x1 and y1 are", x1, y1)
  188. h1 = ego_data['yaw']
  189. # len1 = 2.3 # 随手录入的数值
  190. # width1 = ego_data['dimY'] #
  191. # o_x1 = ego_data['offX']
  192. v_x1 = ego_data['lon_v'].values[0] # km/h to m/s
  193. # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
  194. v_y1 = 0 # 随手录入的数值
  195. a_x1 = ego_data['speedH'].values[0]
  196. a_y1 = 0 # 随手录入的数值
  197. # a_y1 = ego_data['accelY']
  198. # a1 = ego_data['accel']
  199. # print("calculate is beginning...")
  200. # print("self.obj_id_list is", self.obj_id_list)
  201. for playerId in self.obj_id_list:
  202. dist_list = []
  203. # print("playerId is", playerId)
  204. if playerId == EGO_PLAYER_ID:
  205. continue
  206. try:
  207. obj_data = obj_dict[frame_num][playerId]
  208. except KeyError:
  209. continue
  210. x2 = obj_data['posX']
  211. y2 = obj_data['posY']
  212. # print(" obj_data is", obj_data)
  213. # print("x2 is", y2)
  214. dist = self.dist(x1, y1, x2, y2)
  215. # print("dist is", dist)
  216. # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
  217. if dist <= DANGEROUS_DIST:
  218. dist_list.append(0)
  219. else:
  220. dist_list.append(1)
  221. obj_data['dist'] = dist
  222. # print("obj_data['dist'] is", obj_data['dist'])
  223. v_x2 = obj_data['lon_v'] # km/h to m/s
  224. v_y2 = obj_data['lat_v'] # km/h to m/s
  225. v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
  226. # v2 = obj_data['v'] / 3.6 # km/h to m/s
  227. # h2 = obj_data['posH']
  228. # len2 = obj_data['dimX']
  229. # width2 = obj_data['dimY']
  230. # o_x2 = obj_data['offX']
  231. # a_x2 = obj_data['accelX']
  232. # a_y2 = obj_data['accelY']
  233. a_x2 = obj_data['lon_acc']
  234. a_y2 = obj_data['lat_acc']
  235. # a2 = obj_data['accel']
  236. dx, dy = x2 - x1, y2 - y1
  237. # 定义矢量A和x轴正向向量x
  238. A = np.array([dx, dy])
  239. # print("A is", A)
  240. x = np.array([1, 0])
  241. # 计算点积和向量长度
  242. dot_product = np.dot(A, x)
  243. vector_length_A = np.linalg.norm(A)
  244. vector_length_x = np.linalg.norm(x)
  245. # 计算夹角的余弦值
  246. cos_theta = dot_product / (vector_length_A * vector_length_x)
  247. # 将余弦值转换为角度值(弧度制)
  248. beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
  249. lon_d = dist * math.cos(beta - h1)
  250. lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
  251. obj_dict[frame_num][playerId]['lon_d'] = lon_d
  252. obj_dict[frame_num][playerId]['lat_d'] = lat_d
  253. if lon_d > 5 or lon_d < -2 or lat_d > 1:
  254. continue
  255. self.empty_flag = False
  256. vx, vy = v_x1 - v_x2, v_y1 - v_y2
  257. ax, ay = a_x2 - a_x1, a_y2 - a_y1
  258. v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
  259. v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
  260. vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
  261. arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
  262. v_x2, v_y2)
  263. obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
  264. obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
  265. obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
  266. obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
  267. # obj_type = obj_data['type']
  268. TTC = self._cal_TTC(dist, vrel_projection_in_dist)
  269. # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
  270. # THW = self._cal_THW(dist, v_ego_p)
  271. # 单车道时可用
  272. # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
  273. # obj_decel_max)
  274. # lat_dist = 0.5
  275. # v_right = v1
  276. # v_left = v2
  277. # a_right_lat_brake_min = 1
  278. # a_left_lat_brake_min = 1
  279. # a_lat_max = 5
  280. # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
  281. # a_left_lat_brake_min,
  282. # a_lat_max)
  283. # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
  284. lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
  285. lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
  286. # lon_a = abs(lon_a1 - lon_a2)
  287. # lon_d = dist * abs(math.cos(beta - h1))
  288. # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
  289. # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
  290. lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
  291. lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
  292. lat_a = abs(lat_a1 - lat_a2)
  293. lat_d = dist * abs(math.sin(beta - h1))
  294. lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
  295. # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
  296. obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
  297. obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
  298. # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
  299. # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
  300. TTC = None if (not TTC or TTC < 0) else TTC
  301. # MTTC = None if (not MTTC or MTTC < 0) else MTTC
  302. # THW = None if (not THW or THW < 0) else THW
  303. #
  304. # DRAC = 10 if DRAC >= 10 else DRAC
  305. # print("calculate is beginning...")
  306. if not TTC or TTC > 4000: # threshold = 4258.41
  307. collisionSeverity = 0
  308. pr_death = 0
  309. collisionRisk = 0
  310. else:
  311. result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
  312. collisionSeverity = 1 - result
  313. # print("collisionSeverity is", collisionSeverity)
  314. # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
  315. pr_death = self._death_pr(vrel_projection_in_dist)
  316. collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
  317. self.collisionRisk_list.append(collisionRisk)
  318. self.simFrame_list.append(frame_num)
  319. self.collisionSeverity_list.append(collisionSeverity)
  320. self.collision_dist_list.append(max(dist_list))
  321. if len(self.collision_dist_list) > 0:
  322. prev_value = self.collision_dist_list[0] if len(self.collision_dist_list) > 0 else 0 # 初始化前一个元素为列表的第一个元素
  323. # 遍历列表(从第二个元素开始)
  324. # print("len(self.collision_dist_list) is", len(self.collision_dist_list))
  325. for i in range(1, len(self.collision_dist_list)):
  326. current_value = self.collision_dist_list[i] # 当前元素
  327. # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  328. if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  329. self.collisionCount += 1 # 增加0的段的计数
  330. self.simFrame_collisionCount_list.append(self.simFrame_list[i])
  331. self.collisionRisk = max(self.collisionRisk_list)
  332. self.collisionSeverity = max(self.collisionSeverity_list)
  333. else:
  334. self.collisionRisk = 0
  335. self.collisionSeverity = 0
  336. print("collision count is", self.collisionCount)
  337. print("collisionRisk is", self.collisionRisk)
  338. print("collisionSeverity is", self.collisionSeverity)
  339. def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
  340. # 计算 AB 连线的向量 AB
  341. # dx = x2 - x1
  342. # dy = y2 - y1
  343. # 计算 AB 连线的模长 |AB|
  344. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  345. # 计算 AB 连线的单位向量 U_AB
  346. U_ABx = dx / AB_mod
  347. U_ABy = dy / AB_mod
  348. # 计算 A 在 AB 连线上的速度 V1_on_AB
  349. V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
  350. return V1_on_AB
  351. def _cal_v_projection(self, dx, dy, vx, vy):
  352. # 计算 AB 连线的向量 AB
  353. # dx = x2 - x1
  354. # dy = y2 - y1
  355. # 计算 AB 连线的模长 |AB|
  356. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  357. # 计算 AB 连线的单位向量 U_AB
  358. U_ABx = dx / AB_mod
  359. U_ABy = dy / AB_mod
  360. # 计算 A 相对于 B 的速度 V_relative
  361. # vx = vx1 - vx2
  362. # vy = vy1 - vy2
  363. # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
  364. V_on_AB = vx * U_ABx + vy * U_ABy
  365. return V_on_AB
  366. def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
  367. # 计算 AB 连线的向量 AB
  368. # dx = x2 - x1
  369. # dy = y2 - y1
  370. # 计算 θ
  371. V_mod = math.sqrt(vx ** 2 + vy ** 2)
  372. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  373. if V_mod == 0 or AB_mod == 0:
  374. return 0
  375. cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
  376. theta = math.acos(cos_theta)
  377. # 计算 AB 连线的模长 |AB|
  378. AB_mod = math.sqrt(dx ** 2 + dy ** 2)
  379. # 计算 AB 连线的单位向量 U_AB
  380. U_ABx = dx / AB_mod
  381. U_ABy = dy / AB_mod
  382. # 计算 A 相对于 B 的加速度 a_relative
  383. # ax = ax1 - ax2
  384. # ay = ay1 - ay2
  385. # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
  386. a_on_AB = ax * U_ABx + ay * U_ABy
  387. VA = np.array([v_x1, v_y1])
  388. VB = np.array([v_x2, v_y2])
  389. D_A = np.array([x1, y1])
  390. D_B = np.array([x2, y2])
  391. V_r = VA - VB
  392. V = np.linalg.norm(V_r)
  393. w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
  394. a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
  395. return a_on_AB_back
  396. # 计算相对加速度
  397. def _calculate_derivative(self, a, w, V, theta):
  398. # 计算(V×cos(θ))'的值
  399. # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
  400. derivative = a - w * V * math.sin(theta)
  401. return derivative
  402. def _cal_relative_angular_v(self, theta, A, B, VA, VB):
  403. dx = A[0] - B[0]
  404. dy = A[1] - B[1]
  405. dvx = VA[0] - VB[0]
  406. dvy = VA[1] - VB[1]
  407. # (dx * dvy - dy * dvx)
  408. angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
  409. return angular_velocity
  410. # def _death_pr(self, obj_type, v_relative):
  411. def _death_pr(self, v_relative):
  412. # if obj_type == 5:
  413. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  414. # else:
  415. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  416. return p_death
  417. # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
  418. def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
  419. # if obj_type == 5:
  420. # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
  421. # else:
  422. p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
  423. collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
  424. return collisionRisk
  425. # 求两车之间当前距离
  426. def dist(self, x1, y1, x2, y2):
  427. dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  428. return dist
  429. def _cal_count_(self, dist):
  430. count = 0
  431. # TTC (time to collision)
  432. def _cal_TTC(self, dist, vrel_projection_in_dist):
  433. if vrel_projection_in_dist == 0:
  434. return math.inf
  435. TTC = dist / vrel_projection_in_dist
  436. return TTC
  437. def _normal_distribution(self, x):
  438. mean = 1.32
  439. std_dev = 0.26
  440. return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
  441. def _safe_statistic(self):
  442. """
  443. """
  444. self.collision_risk_dict = _cal_max_min_avg(self.collisionRisk_list)
  445. self.collision_severity_dict = _cal_max_min_avg(self.collisionSeverity_list)
  446. self.over_speed_dict = _cal_max_min_avg(self.overSpeed_list)
  447. arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]]
  448. return arr_safe
  449. def safe_score_new(self):
  450. """
  451. """
  452. score_metric_dict = {}
  453. score_type_dict = {}
  454. arr_safe = self._safe_statistic()
  455. print("\n[安全性表现及得分情况]")
  456. print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
  457. if arr_safe:
  458. arr_safe = np.array(arr_safe)
  459. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
  460. score_sub = score_model.cal_score()
  461. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  462. # metric_num = len(metric_list)
  463. if len(self.obj_id_list) > 1 and not self.empty_flag:
  464. # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
  465. pass
  466. else:
  467. score_sub = [100] * len(score_sub)
  468. score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
  469. score_metric = score_sub
  470. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  471. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  472. if self.collisionCount > 0:
  473. score_metric_dict["collisionCount"] = 0
  474. else:
  475. score_metric_dict["collisionCount"] = 100
  476. # print("score_metric_dict is", score_metric_dict)
  477. score_metric = list(score_metric_dict.values())
  478. if self.weight_custom: # 用户自定义权重
  479. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  480. self.weight_dict}
  481. for type in self.type_list:
  482. type_score = sum(
  483. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  484. print("safe type_score is", type_score)
  485. score_type_dict[type] = round(type_score, 2)
  486. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  487. score_type_dict}
  488. score_safe = sum(score_type_with_weight_dict.values())
  489. else: # 动态客观赋权
  490. self.weight_list = cal_weight_from_80(score_metric)
  491. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  492. score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  493. for type in self.type_list:
  494. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  495. for key, value in self.weight_dict.items():
  496. if key in self.metric_dict[type]:
  497. # self.weight_dict[key] = round(value / type_weight, 4)
  498. self.weight_dict[key] = value / type_weight
  499. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  500. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  501. type_priority_list = [value for key, value in self.priority_dict.items() if
  502. key in self.metric_dict[type]]
  503. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  504. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  505. for key in self.weight_dict:
  506. self.weight_dict[key] = round(self.weight_dict[key], 4)
  507. score_type = list(score_type_dict.values())
  508. self.weight_type_list = cal_weight_from_80(score_type)
  509. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  510. score_safe = round(score_safe, 2)
  511. # score_type = [round(x, 2) for key, x in score_type_dict.items()]
  512. # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
  513. print("安全性各指标基准值:", self.optimal_list)
  514. print(f"安全性得分为:{score_safe:.2f}分。")
  515. print(f"安全性各类型得分为:{score_type_dict}。")
  516. print(f"安全性各指标得分为:{score_metric_dict}。")
  517. # return score_safe, score_type, score_metric
  518. return score_safe, score_type_dict, score_metric_dict
  519. def report_statistic(self):
  520. simTime_time_list_max = []
  521. simTime_time_list_min = []
  522. report_dict = {
  523. "name": "安全性",
  524. "weight": f"{self.weight * 100:.2f}%",
  525. # 'collisionRisk': self.collisionRisk,
  526. # "noObjectCar": True,
  527. }
  528. # No objects
  529. if self.obj_id_list == 0:
  530. score_safe = 80
  531. grade_safe = "良好"
  532. score_type_dict = {
  533. "safeCollision": 80,
  534. "safeSpeed": 80
  535. }
  536. score_metric_dict = {
  537. "collisionCount": 80,
  538. "collisionRisk": 80,
  539. "collisionSeverity": 80,
  540. "overSpeed": 80
  541. }
  542. description = "安全性在无目标车情况下表现良好。"
  543. description1 = "次数:0次"
  544. description2 = f"最大值0.00%;" \
  545. f"最小值0.00%;" \
  546. f"平均值0.00%"
  547. description3 = f"最大值0.00%;" \
  548. f"最小值0.00%;" \
  549. f"平均值0.00%"
  550. description4 = f"最大值{100 * self.over_speed_dict['max']}%;" \
  551. f"最小值{100 * self.over_speed_dict['min']}%;" \
  552. f"平均值{100 * self.over_speed_dict['avg']}%"
  553. # with objects
  554. else:
  555. score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
  556. score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
  557. grade_safe = score_grade(score_safe)
  558. description = f"· 在安全性方面,得分{score_safe:.2f}分,表现{grade_safe}。"
  559. is_good = True
  560. is_semicolon = False
  561. if self.collisionCount > 0:
  562. is_good = False
  563. is_semicolon = True
  564. description += f"出现{self.collisionCount}次碰撞,需重点优化" if score_safe < 80 else f"出现{self.collisionCount}次碰撞,需注意"
  565. if self.over_speed_dict['max'] != '-':
  566. if self.over_speed_dict['max'] > 0:
  567. is_good = False
  568. if is_semicolon:
  569. description += ";"
  570. is_semicolon = False
  571. description += f"超速比例最大值{self.over_speed_dict['max']:.2f}%,需重点优化。" if score_safe < 80 else f"超速比例最大值{self.over_speed_dict['max']:.2f}%,需注意"
  572. else:
  573. if is_semicolon:
  574. description += f"。"
  575. if is_good:
  576. description += "速度控制良好。"
  577. if self.collisionCount == 0:
  578. description1 = f"次数:{self.collisionCount}次\n没有发生碰撞"
  579. else:
  580. self.simtime_collisioncount = \
  581. self.ego_df.loc[self.ego_df['simFrame'].isin(list(set(self.simFrame_collisionCount_list)))][
  582. 'simTime'].tolist()
  583. self.simtime_collisioncount = [str("{:.02f}".format(s)) for s in self.simtime_collisioncount]
  584. str_temp_collisioncount = "、".join(self.simtime_collisioncount)
  585. description1 = f"次数:{self.collisionCount}次\n在{str_temp_collisioncount}时刻发生碰撞"
  586. if all(x == 0 for x in self.collisionRisk_list):
  587. pass
  588. else:
  589. index_risk_max = [i for i, x in enumerate(self.collisionRisk_list) if x == max(self.collisionRisk_list)]
  590. index_risk_min = [i for i, x in enumerate(self.collisionRisk_list) if x == min(self.collisionRisk_list)]
  591. print("self.simFrame_list[index_risk_max] is", self.simFrame_list)
  592. simframe_risk_max = list(set([self.simFrame_list[x] for x in index_risk_max]))
  593. simframe_risk_min = list(set([self.simFrame_list[y] for y in index_risk_min]))
  594. print("simframe_risk_max is", simframe_risk_max)
  595. self.simtime_risk_max = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_risk_max)][
  596. 'simTime'].tolist()
  597. self.simtime_risk_min = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_risk_min)][
  598. 'simTime'].tolist()
  599. self.simtime_risk_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_risk_max]
  600. self.simtime_risk_min = [str("{:.02f}".format(ss)) + "s" for ss in self.simtime_risk_min[:2]]
  601. str_temp_risk_max = "、".join(self.simtime_risk_max)
  602. str_temp_risk_min = "、".join(self.simtime_risk_min)
  603. if len(str_temp_risk_max) > 0:
  604. description2 = f"在{str_temp_risk_max}时刻达到"
  605. else:
  606. description2 = ""
  607. description2 += f"最大值{self.collision_risk_dict['max']:.2f}%\n" if (
  608. self.collision_risk_dict['max'] != '-') else "最大值0.00%\n"
  609. if self.collision_risk_dict['min'] != '-' and self.collision_risk_dict['min'] > 0.1:
  610. if len(str_temp_risk_min) > 0:
  611. description2 += f"在{str_temp_risk_min}时刻达到"
  612. description2 += f"最小值{self.collision_risk_dict['min']:.2f}%\n"
  613. description2 += f"平均值{self.collision_risk_dict['avg']:.2f}%" if (
  614. self.collision_risk_dict['avg'] != '-') else "平均值0.00%"
  615. if all(x == 0 for x in self.collisionSeverity_list):
  616. pass
  617. else:
  618. index_severity_max = [i for i, x in enumerate(self.collisionSeverity_list) if
  619. x == max(self.collisionSeverity_list)]
  620. index_severity_min = [i for i, x in enumerate(self.collisionSeverity_list) if
  621. x == min(self.collisionSeverity_list)]
  622. simframe_severity_max = list(set([self.simFrame_list[x] for x in index_severity_max]))
  623. simframe_severity_min = list(set([self.simFrame_list[y] for y in index_severity_min]))
  624. self.simtime_severity_max = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_severity_max)][
  625. 'simTime'].tolist()
  626. self.simtime_severity_min = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_severity_min)][
  627. 'simTime'].tolist()
  628. if len(self.simtime_severity_max) > 2:
  629. self.simtime_severity_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_severity_max[:2]]
  630. else:
  631. self.simtime_severity_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_severity_max]
  632. self.simtime_severity_min = [str("{:.02f}".format(ss)) + "s" for ss in self.simtime_severity_min[:2]]
  633. str_temp_severity_max = "、".join(self.simtime_severity_max)
  634. str_temp_severity_min = "、".join(self.simtime_severity_min)
  635. if len(str_temp_severity_max) > 0:
  636. description3 = f"在{str_temp_severity_max}时刻达到"
  637. else:
  638. description3 = ""
  639. description3 += f"最大值{self.collision_severity_dict['max']:.2f}%\n" if (
  640. self.collision_severity_dict['max'] != '-') else "最大值0.00%\n"
  641. if self.collision_severity_dict['min'] != '-' and self.collision_severity_dict['min'] > 0.1:
  642. if len(str_temp_severity_min) > 0:
  643. description3 += f"在{str_temp_severity_min}时刻达到"
  644. description3 += f"最小值{self.collision_severity_dict['min']:.2f}%\n"
  645. description3 += f"平均值{self.collision_severity_dict['avg']:.2f}%" if (
  646. self.collision_severity_dict['avg'] != '-') else "平均值0.00%"
  647. index_speed_max = [i for i, x in enumerate(self.overSpeed_list) if x == max(self.overSpeed_list)]
  648. index_speed_min = [i for i, x in enumerate(self.overSpeed_list) if x == min(self.overSpeed_list)]
  649. if len(index_speed_max) > 2:
  650. for ii in index_speed_max[:2]:
  651. simTime_time_list_max.append(str("{:.02f}".format(self.overSpeed_time_list[ii])) + "s")
  652. else:
  653. for ii in index_speed_max:
  654. simTime_time_list_max.append(str("{:.02f}".format(self.overSpeed_time_list[ii])) + "s")
  655. if len(index_speed_min) > 2:
  656. for xx in index_speed_min[:2]:
  657. simTime_time_list_min.append(str("{:.02f}".format(self.overSpeed_time_list[xx])) + "s")
  658. else:
  659. for xx in index_speed_min:
  660. simTime_time_list_min.append(str("{:.02f}".format(self.overSpeed_time_list[xx])) + "s")
  661. str_temp_overspeed_max = "、".join(simTime_time_list_max)
  662. str_temp_overspeed_min = "、".join(simTime_time_list_min)
  663. if len(simTime_time_list_max) > 0:
  664. description4 = f"在{str_temp_overspeed_max}时刻达到"
  665. else:
  666. description4 = ""
  667. description4 += f"最大值{self.over_speed_dict['max']:.2f}%\n" if (
  668. self.over_speed_dict['max'] != '-') else "最大值0.00%\n"
  669. if self.over_speed_dict['min'] != '-':
  670. if self.over_speed_dict['min'] > 0:
  671. if len(simTime_time_list_min) > 0:
  672. description4 += f"在{str_temp_overspeed_min}时刻达到"
  673. description4 += f"最小值{self.over_speed_dict['min']:.2f}%\n"
  674. else:
  675. description4 += f"最小值0.00%\n"
  676. description4 += f"平均值{self.over_speed_dict['avg']:.2f}%" if (
  677. self.over_speed_dict['avg'] != '-') else "平均值0.00%"
  678. report_dict["score"] = score_safe
  679. report_dict["level"] = grade_safe
  680. report_dict["description"] = replace_key_with_value(description, self.name_dict)
  681. collisionCount_index = {
  682. "weight": self.weight_dict['collisionCount'],
  683. "score": score_metric_dict['collisionCount'],
  684. "description": description1
  685. }
  686. collisionRisk_index = {
  687. "weight": self.weight_dict['collisionRisk'],
  688. "score": score_metric_dict['collisionRisk'],
  689. "description": description2
  690. }
  691. collisionSeverity_index = {
  692. "weight": self.weight_dict['collisionSeverity'],
  693. "score": score_metric_dict['collisionSeverity'],
  694. "description": description3
  695. }
  696. overSpeed_index = {
  697. "weight": self.weight_dict['overSpeed'],
  698. "score": score_metric_dict['overSpeed'],
  699. "description": description4
  700. }
  701. indexes_dict = {
  702. "collisionCount": collisionCount_index,
  703. "collisionRisk": collisionRisk_index,
  704. "collisionSeverity": collisionSeverity_index,
  705. "overSpeed": overSpeed_index
  706. }
  707. report_dict["indexes"] = indexes_dict
  708. print(report_dict)
  709. return report_dict
  710. # if __name__ == "__main__":
  711. # pass
  712. #
  713. # print("---started---")
  714. # count = 0
  715. # dataPath = './task/case0613' # 设置路径
  716. # configPath = r"./config_safe.json"
  717. # config = ConfigParse(configPath)
  718. # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
  719. # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
  720. # safe_indicator = Safe(data_processed)
  721. # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
  722. #
  723. # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
  724. #
  725. # # 遍历列表(从第二个元素开始)
  726. # for i in range(1, len(collision_dist_count)):
  727. # current_value = collision_dist_count[i] # 当前元素
  728. #
  729. # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
  730. # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
  731. # count += 1 # 增加0的段的计数
  732. # print("collisionRisk is", max(collisionRisk_list))
  733. # print("collisionSeverity is", max(collisionSeverity_list))
  734. # print("collision count is", count)
  735. # print("---end---")