function0530.py 87 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Function metrics
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. sys.path.append('../results')
  18. import math
  19. import numpy as np
  20. import pandas as pd
  21. import scipy.signal as sg
  22. from score_weight import cal_score_with_priority, cal_weight_from_80
  23. from common import score_grade, string_concatenate, replace_key_with_value
  24. class Function(object):
  25. """
  26. Class for achieving function metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. """
  30. def __init__(self, data_processed, custom_data, scoreModel):
  31. self.eval_data = pd.DataFrame()
  32. self.data_processed = data_processed
  33. self.scoreModel = scoreModel
  34. self.df = data_processed.object_df
  35. self.ego_df = data_processed.ego_data
  36. self.obj_id_list = data_processed.obj_id_list
  37. self.df_roadmark = data_processed.road_mark_df
  38. self.df_roadpos = data_processed.road_pos_df
  39. self.df_drivectrl = data_processed.driver_ctrl_df
  40. self.unfunc_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  41. self.unfunc_follow_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  42. self.unfunc_lane_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  43. self.unfunc_custom_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  44. self.custom_markline_list = []
  45. # config infos for calculating score
  46. self.config = data_processed.config
  47. function_config = self.config.config['function']
  48. self.function_config = function_config
  49. # common data
  50. self.bulitin_metric_list = self.config.builtinMetricList
  51. # dimension data
  52. self.weight_custom = function_config['weightCustom']
  53. self.metric_list = function_config['metric']
  54. self.type_list = function_config['type']
  55. self.type_name_dict = function_config['typeName']
  56. self.name_dict = function_config['name']
  57. self.unit_dict = function_config['unit']
  58. # custom metric data
  59. self.customMetricParam = function_config['customMetricParam']
  60. self.custom_metric_list = list(self.customMetricParam.keys())
  61. self.custom_data = custom_data
  62. self.custom_param_dict = {}
  63. # score data
  64. self.weight = function_config['weightDimension']
  65. self.weight_type_dict = function_config['typeWeight']
  66. self.weight_type_list = function_config['typeWeightList']
  67. self.weight_dict = function_config['weight']
  68. self.weight_list = function_config['weightList']
  69. self.priority_dict = function_config['priority']
  70. self.priority_list = function_config['priorityList']
  71. self.kind_dict = function_config['kind']
  72. self.optimal_dict = function_config['optimal']
  73. self.multiple_dict = function_config['multiple']
  74. self.kind_list = function_config['kindList']
  75. self.optimal_list = function_config['optimalList']
  76. self.multiple_list = function_config['multipleList']
  77. # self.unit_dict = function_config['unit']
  78. self.metric_dict = function_config['typeMetricDict']
  79. self.acc_metric_list = self.metric_dict['functionACC']
  80. self.lka_metric_list = self.metric_dict['functionLKA']
  81. # self.acc_metric_list = ['followSpeedDeviation', 'followDistanceDeviation', 'followStopDistance',
  82. # 'followResponseTime']
  83. # self.lka_metric_list = ['laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation',
  84. # 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency',
  85. # 'centerDistanceRange']
  86. # custom metric
  87. # lists of drving control info
  88. self.time_list = data_processed.driver_ctrl_data['time_list']
  89. self.frame_list = data_processed.driver_ctrl_data['frame_list']
  90. self.time_list_follow = []
  91. self.frame_list_follow = []
  92. self.dist_list = []
  93. self.v_deviation_list = []
  94. self.v_relative_list = []
  95. self.dist_deviation_list = []
  96. self.stop_distance_list = []
  97. self.follow_stop_time_start_list = [] # 起停跟车响应时长
  98. self.dist_list_full_time = list()
  99. self.dist_deviation_list_full_time = list()
  100. self.line_dist = []
  101. self.center_dist = []
  102. self.center_dist_with_nan = []
  103. self.center_time_list = []
  104. self.center_frame_list = []
  105. self.follow_stop_count = 0
  106. self.result = {}
  107. def _following(self, df):
  108. """
  109. 稳定跟车行驶:
  110. 1.稳态控制速度偏差:筛选出跟车行驶数据后,
  111. 2.稳态控制距离偏差
  112. ACC stastus
  113. 0x7: Stand-wait State
  114. 0x6: Stand-active State
  115. 0x5: Passive State
  116. 0x4: Standby State
  117. 0x3: Shut-off State
  118. 0x2: Override State
  119. 0x1: Active State
  120. 0x0: Off
  121. # 速度变化不能以0到非0为参考,要改为从0到0.05的变化,有一个阈值滤波,避免前车车辆抖动误差影响传感器
  122. 弯道行驶: 最小跟随弯道半径
  123. """
  124. # df = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
  125. # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
  126. col_list = ['simTime', 'simFrame', 'playerId', 'v', 'posX', 'posY'] # target_id
  127. df = df[col_list].copy()
  128. ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'posX', 'posY']]
  129. # 筛选目标车(同一车道内,距离最近的前车)
  130. # obj_df = df[df['playerId'] == df['target_id']]
  131. target_id = 2
  132. obj_df = df[df['playerId'] == target_id][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 目标车
  133. obj_df = obj_df.rename(columns={'v': 'v_obj', 'posX': 'posX_obj', 'posY': 'posY_obj'})
  134. df_merge = pd.merge(ego_df, obj_df, on=['simTime', 'simFrame'], how='left')
  135. df_merge['v_relative'] = df_merge['v'] - df_merge['v_obj']
  136. df_merge['dist'] = df_merge.apply(
  137. lambda row: self.dist(row['posX'], row['posY'], row['posX_obj'], row['posY_obj']), axis=1)
  138. self.dist_list = df_merge['dist'].values.tolist()
  139. df_merge['time_gap'] = df_merge['dist'] / df_merge['v']
  140. safe_time_gap = 3
  141. df_merge['dist_deviation'] = df_merge['time_gap'].apply(
  142. lambda x: 0 if (x >= safe_time_gap) else (safe_time_gap - x))
  143. df_stop = df_merge[(df_merge['v'] == 0) & (df_merge['v_obj'] == 0)]
  144. stop_distance_list = df_stop['dist'].values.tolist()
  145. self.stop_distance_list = stop_distance_list
  146. t_list = df_stop['simTime'].values.tolist()
  147. stop_group = []
  148. sub_group = []
  149. for i in range(len(t_list)):
  150. if not sub_group or t_list[i] - t_list[i - 1] <= 1:
  151. sub_group.append(t_list[i])
  152. else:
  153. stop_group.append(sub_group)
  154. sub_group = [t_list[i]]
  155. stop_group.append(sub_group)
  156. stop_group = [g for g in stop_group if len(g) >= 13]
  157. self.follow_stop_count = len(stop_group)
  158. # solution 1: 跟车状态下,算法输出预设跟车速度
  159. # df['velocity_deviation'] = abs(df['v'] - df['set_velocity'])
  160. # max_velocity_deviation = df['velocity_deviation'].max()
  161. # solution 2: 跟车状态下,跟车设定速度为目标车速度
  162. # velocity_deviation = []
  163. # distance_deviation = []
  164. # stop_distance = []
  165. #
  166. # for f, data in df.groupby('simFrame'):
  167. # ego_data = data.iloc[0].copy()
  168. # # df.loc[len(df)] = ego_data
  169. #
  170. # if len(data) < 2:
  171. # continue
  172. # v1 = ego_data['v']
  173. # x1 = ego_data['posX']
  174. # y1 = ego_data['posY']
  175. #
  176. # for i in range(1, len(data)):
  177. # obj_data = data.iloc[i].copy()
  178. #
  179. # v2 = obj_data['v']
  180. # x2 = obj_data['posX']
  181. # y2 = obj_data['posY']
  182. #
  183. # v_delta = abs(v1 - v2)
  184. # velocity_deviation.append(v_delta)
  185. #
  186. # dist = self.dist(x1, y1, x2, y2)
  187. # distance_deviation.append(dist)
  188. #
  189. # if v2 == 0 and v1 == 0:
  190. # stop_distance.append(dist)
  191. # df.loc[len(df)] = obj_data
  192. # self.df = df
  193. df_merge.replace([np.inf, -np.inf], np.nan, inplace=True) # 异常值处理
  194. self.time_list_follow = df_merge['simTime'].values.tolist()
  195. self.frame_list_follow = df_merge['simFrame'].values.tolist()
  196. self.v_relative_list = df_merge['v_relative'].values.tolist()
  197. self.v_deviation_list = abs(df_merge['v_relative']).values.tolist()
  198. self.dist_deviation_list = df_merge['dist_deviation'].values.tolist()
  199. tmp_df = self.ego_df[['simTime', 'simFrame']].copy()
  200. v_rel_df = df_merge[['simTime', 'v_relative']].copy()
  201. df_merged1 = pd.merge(tmp_df, v_rel_df, on='simTime', how='left')
  202. self.v_relative_list_full_time = df_merged1['v_relative'].values.tolist()
  203. dist_deviation_df = df_merge[['simTime', 'dist_deviation']].copy()
  204. df_merged1 = pd.merge(tmp_df, dist_deviation_df, on='simTime', how='left')
  205. self.dist_deviation_list_full_time = df_merged1['dist_deviation'].values.tolist()
  206. dist_df = df_merge[['simTime', 'dist']].copy()
  207. df_merged1 = pd.merge(tmp_df, dist_df, on='simTime', how='left')
  208. self.dist_list_full_time = df_merged1['dist'].values.tolist()
  209. max_velocity_deviation = abs(df_merge['v_relative']).max()
  210. distance_deviation = df_merge['dist_deviation'].max()
  211. min_stop_distance = min(self.stop_distance_list) if self.stop_distance_list else 9999
  212. self.result['followSpeedDeviation'] = max_velocity_deviation
  213. self.result['followDistanceDeviation'] = distance_deviation
  214. self.result['followStopDistance'] = min_stop_distance
  215. def _stop_and_go(self, df):
  216. """
  217. 停走功能:
  218. 1.跟停距离3-4m,
  219. 2.启动跟车响应时间<=1.2s
  220. 3.停车跟车响应时间<=1.2s
  221. decisionType_target
  222. 0 无障碍物巡航
  223. 1 停车减速让行
  224. 2 跟车
  225. 3 绕行
  226. 4 到达终点?
  227. """
  228. # df = self.df[self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
  229. # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
  230. stop_v_threshold = 0.05
  231. df['v'] = df['v'].apply(lambda x: 0 if x <= stop_v_threshold else 1) # 区分速度为0或非0
  232. target_id = 2
  233. df_ego = df[df['playerId'] == 1].copy()
  234. df_obj = df[df['playerId'] == target_id].copy() # 目标车
  235. df_obj_time = df_obj['simTime'].values.tolist()
  236. df_ego = df_ego[df_ego['simTime'].isin(df_obj_time)].copy()
  237. df_ego = df_ego.drop_duplicates(["simTime", "simFrame"])
  238. df_obj = df_obj.drop_duplicates(["simTime", "simFrame"])
  239. df_ego['v_diff'] = df_ego['v'].diff()
  240. df_ego['v_start_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == 1 else 0) # 起步即为1
  241. df_ego['v_stop_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == -1 else 0) # 停车即为-1
  242. df_obj['v_diff'] = df_obj['v'].diff()
  243. obj_v_start_flag = df_obj['v_diff'].apply(lambda x: 1 if x == 1 else 0).values # 起步即为1
  244. obj_v_stop_flag = df_obj['v_diff'].apply(lambda x: 1 if x == -1 else 0).values # 停车即为1
  245. df_ego['obj_v_start_flag'] = obj_v_start_flag
  246. df_ego['obj_v_stop_flag'] = obj_v_stop_flag
  247. df_ego['flag_start'] = df_ego['obj_v_start_flag'] - df_ego['v_start_flag'] # 目标车起步即为1,自车起步即为-1
  248. df_ego['flag_stop'] = df_ego['obj_v_stop_flag'] - df_ego['v_stop_flag'] # 目标车停车即为1,自车停车即为-1
  249. flag_start_list = df_ego['flag_start'].values
  250. flag_stop_list = df_ego['flag_stop'].values
  251. time_list = df_ego['simTime'].values
  252. time_start_list = []
  253. time_stop_list = []
  254. for i, flag in enumerate(flag_start_list):
  255. if flag:
  256. t1 = time_list[i]
  257. if flag == -1:
  258. t2 = time_list[i]
  259. time_start_list.append(t2 - t1) # t2-t1即为自车起步响应时间
  260. for i, flag in enumerate(flag_stop_list):
  261. if flag:
  262. t1 = time_list[i]
  263. if flag == -1:
  264. t2 = time_list[i]
  265. time_stop_list.append(t2 - t1) # t2-t1即为自车停车响应时间
  266. time_start_list = [i for i in time_start_list if i != 0]
  267. self.result['followResponseTime'] = max(time_start_list) if time_start_list else 0
  268. self.follow_stop_time_start_list = time_start_list
  269. # self.result['跟车停止响应最长时间'] = max(time_stop_list)
  270. def velocity(self, v_x, v_y):
  271. v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
  272. return v
  273. def dist(self, x1, y1, x2, y2):
  274. dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  275. return dis
  276. def _line_dist_merge(self):
  277. # line_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
  278. # line_dist_group = line_dist_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
  279. # line_dist_df['line_dist'] = line_dist_group
  280. # line_dist_df = line_dist_df[['simFrame', 'line_dist']]
  281. # line_dist_df.columns = ['simFrame', 'line_dist']
  282. # line_dist_df = line_dist_df.drop_duplicates()
  283. line_dist_df = self.df_roadmark[(self.df_roadmark['id'] == 0) | (self.df_roadmark['id'] == 2)].copy()
  284. # df = line_dist_df[line_dist_df['id'] == 2][['simTime', 'simFrame']].copy()
  285. # df['line_dist'] = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
  286. if line_dist_df.empty:
  287. self.ego_df['line_dist'] = pd.Series(dtype=float)
  288. else:
  289. df = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).reset_index()
  290. # df = df.rename(columns={'lateralDist': 'line_dist'})
  291. df.columns = ["simFrame", "line_dist"]
  292. self.ego_df = pd.merge(self.ego_df, df, on='simFrame', how='left')
  293. def _lane_transverse_distance(self, df):
  294. """
  295. 0x7:Reserved
  296. 0x6: Reserved
  297. 0x5: Reserved
  298. 0x4: Error
  299. 0x3: Active
  300. 0x2: Standby
  301. 0x1: Passive
  302. 0x0: Off
  303. """
  304. # 车道内偏移行驶时,与近侧车道线的横向距离
  305. # data_group = lane_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
  306. # data_group = self.df_roadmark.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
  307. # df = data_group.to_frame(name='line_dist')
  308. # df['simFrame'] = data_group.index.tolist()
  309. # df = self.df_roadmark[self.df_roadmark['id'] == 0][['simTime', 'simFrame']].copy()
  310. # df['line_dist'] = self.df_roadmark.groupby('simFrame').apply(
  311. # lambda t: abs(t['lateralDist']).min()).values.tolist()
  312. # self.ego_df = pd.merge(self.ego_df, df, on=['simTime', 'simFrame'], how='left')
  313. # self._line_dist_merge()
  314. # # df = self.df[self.df['LKA_status'] == 3]
  315. # df = self.ego_df[self.ego_df['LKA_status'] == "Active"]
  316. self.line_dist = df['line_dist'].to_list()
  317. self.line_dist = [x for x in self.line_dist if not np.isnan(x)]
  318. self.result['laneDistance'] = min(self.line_dist) if self.line_dist else self.optimal_dict['laneDistance']
  319. def _lateral_dist_of_center_line(self, df_laneoffset):
  320. # df_laneoffset = self.df[(self.df['LKA_status'] == "Active") & (self.df['playerId'] == 1)]
  321. self.center_dist_with_nan = df_laneoffset['laneOffset'].to_list()
  322. self.center_time_list = df_laneoffset['simTime'].to_list()
  323. self.center_frame_list = df_laneoffset['simFrame'].to_list()
  324. # self.center_dist_df = self.df_roadpos[self.df_roadpos["playerId"] == 1]
  325. # self.center_dist = self.center_dist_df["laneOffset"]
  326. # self.center_time_list = self.center_dist_df['simTime'].to_list()
  327. # self.center_frame_list = self.center_dist_df['simFrame'].to_list()
  328. self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
  329. if not self.center_dist:
  330. self.result['centerDistanceExpectation'] = 0
  331. self.result['centerDistanceStandardDeviation'] = 0
  332. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  333. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  334. self.result['centerDistanceMax'] = 0
  335. self.result['centerDistanceMin'] = 0
  336. self.result['centerDistanceFrequency'] = 0
  337. self.result['centerDistanceRange'] = 0
  338. else:
  339. center_dist = [abs(x) for x in self.center_dist]
  340. # 车道中心线横向距离分布期望与标准差
  341. E_lane_center_dist = np.mean(center_dist)
  342. D_lane_center_dist = np.std(center_dist)
  343. # 车道中心线横向距离分布极值
  344. center_dist = np.array(center_dist)
  345. extrmax_index = sg.argrelmax(center_dist)[0]
  346. extrmin_index = sg.argrelmin(center_dist)[0]
  347. extreme_max_value = center_dist[extrmax_index]
  348. extreme_min_value = center_dist[extrmin_index]
  349. # 横向相对位置震荡频率
  350. # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
  351. length1 = len(extrmax_index) # 极大值列表长度
  352. length2 = len(extrmin_index) # 极小值列表长度
  353. # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
  354. FRAME_RATE = 100
  355. time_diff = 1.0 / FRAME_RATE
  356. period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
  357. period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
  358. try:
  359. period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
  360. except ZeroDivisionError:
  361. period = 0
  362. frequency = 1 / period if period else 0
  363. # length = min(len(extreme_max_value), len(extreme_min_value))
  364. # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
  365. # frequency = 1 / period
  366. # 横向相对位置震荡极差
  367. length = min(len(extreme_max_value), len(extreme_min_value))
  368. extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
  369. self.result['centerDistanceExpectation'] = E_lane_center_dist
  370. self.result['centerDistanceStandardDeviation'] = D_lane_center_dist
  371. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  372. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  373. self.result['centerDistanceMax'] = center_dist.max()
  374. self.result['centerDistanceMin'] = center_dist.min()
  375. self.result['centerDistanceFrequency'] = abs(frequency)
  376. self.result['centerDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
  377. def _continous_judge(self, frame_list):
  378. if not frame_list:
  379. return 0
  380. cnt = 1
  381. for i in range(1, len(frame_list)):
  382. if frame_list[i] - frame_list[i - 1] <= 3:
  383. continue
  384. cnt += 1
  385. return cnt
  386. def _lane_departure_warning(self, ldw_df):
  387. """
  388. 工作车速范围:55/60-200/250 km/h
  389. 最小可用车道宽度:2.6m
  390. 误报警:距离安全,但是预警
  391. 漏报警:距离危险,但是没报警
  392. 灵敏度差:线外40cm
  393. 灵敏度零级:压线
  394. 灵敏度一级:线内20cm(20%)
  395. 灵敏度二级:线内40cm(40%)
  396. 灵敏度三级:线内60cm(60%)
  397. 偏离车速1.0m/s:线内60cm报警
  398. 偏离车速0.5m/s:线内40cm报警
  399. """
  400. # 自车, 车道线宽度>2.6m,转向灯没开
  401. # ego_df = self.df[self.df['playerId'] == 1].copy()
  402. # ego_df['line_dist'] = self.line_dist
  403. # lane_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
  404. # lane_dist_group = lane_dist_df.groupby("simFrame").apply(
  405. # lambda t: abs(t['lateralDist']).min()).values.tolist()
  406. # lane_dist_df['line_dist'] = lane_dist_group
  407. # lane_dist_df = lane_dist_df[['simFrame', 'line_dist']]
  408. # lane_dist_df.columns = ['simFrame', 'line_dist']
  409. # # lane_dist_df = lane_dist_df[['simTime', 'simFrame', 'line_dist']]
  410. # # lane_dist_df.columns = ['simTime', 'simFrame', 'line_dist']
  411. # lane_dist_df = lane_dist_df.drop_duplicates()
  412. # ldw_status_df = self.df[self.df['playerId'] == 1].copy()[['simTime', 'simFrame', 'LDW_status']].drop_duplicates()
  413. # ldw_status_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy().drop_duplicates()
  414. # ldw_df = pd.merge_asof(ldw_status_df, lane_dist_df, on='simFrame', direction='nearest')
  415. # ldw_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
  416. # calculate max DLC
  417. # warning_dist_max = ego_df[ego_df['LDW_status'] == "Active"]['line_dist'].max()
  418. warning_dist_max = ldw_df[ldw_df['LDW_status'] == 3]['line_dist'].max()
  419. self.result['预警时距离车道线最大距离'] = warning_dist_max
  420. # count miss warning
  421. miss_warning_df = ldw_df[(ldw_df['line_dist'] <= 0.4) & (ldw_df['LDW_status'] != "Active")]
  422. miss_warning_frame_list = miss_warning_df['simFrame'].values.tolist()
  423. miss_warning_count = self._continous_judge(miss_warning_frame_list)
  424. self.result['车道偏离漏预警次数'] = miss_warning_count
  425. # count false warning
  426. false_warning_df = ldw_df[(ldw_df['line_dist'] >= 0.4) & (ldw_df['LDW_status'] == "Active")]
  427. false_warning_frame_list = false_warning_df['simFrame'].values.tolist()
  428. false_warning_count = self._continous_judge(false_warning_frame_list)
  429. self.result['车道偏离误预警次数'] = false_warning_count
  430. def _autonomous_emergency_brake(self):
  431. """
  432. 刹得准、刹得稳
  433. 碰撞预警 + 紧急制动
  434. <=4s 预警>1s,制动<3s
  435. 触发目标类型
  436. 目标类型准确度
  437. 目标为车辆的速度降:
  438. 弱势目标的速度降:
  439. 触发时TTC<4s
  440. 一级制动减速度:4m/s^2
  441. 二级制动减速度:10m/s^2
  442. 刹停行驶距离:10-20m
  443. 刹停时前车距离:0.8-1.5m
  444. 误触发次数:频率 n/10万km
  445. 漏触发:
  446. 能绕开就没必要刹停:开始刹车时距离远近
  447. Returns:
  448. """
  449. # ego_df = self.df[self.df['playerId'] == 1]
  450. # df = ego_df[ego_df['Aeb_status'] == 'Active']
  451. df = self.ego_df[self.ego_df['Aeb_status'] == 0]
  452. # calculate brakeDecelerate
  453. df['lon_accel'] = df['accelX'] * np.cos(df['posH']) + df['accelY'] * np.sin(df['posH'])
  454. decelerate_max = df['lon_accel'].min()
  455. # calculate brake stop travelDist
  456. df_dist = df[df['lon_accel'] < 0]
  457. frame_list = df_dist['simFrame'].to_list()
  458. travelDist_list = df_dist['travelDist'].to_list()
  459. travelDist_group = []
  460. start = 0
  461. for i in range(1, len(frame_list)):
  462. if frame_list[i] - frame_list[i - 1] > 5:
  463. end = i - 1
  464. # travelDist_group.append(travelDist_list[start:end])
  465. travelDist_group.append(travelDist_list[end] - travelDist_list[start])
  466. start = i
  467. end = -1
  468. travelDist_group.append(travelDist_list[end] - travelDist_list[start])
  469. brake_distance_max = max(travelDist_group) if travelDist_group else np.inf
  470. self.result['brakeDecelerate'] = abs(round(decelerate_max, 4))
  471. self.result['brakeDistance'] = round(brake_distance_max, 4)
  472. def _integrated_cruise_assist(self, df):
  473. df_LCC = df[df['ICA_status'].isin(['LLC_follow_vehicle', 'LLC_follow_line'])].copy()
  474. self.LCC_line_dist = df_LCC['line_dist'].to_list()
  475. self.result['LCClaneDistance'] = min(self.LCC_line_dist) if self.LCC_line_dist else self.optimal_dict[
  476. 'LCClaneDistance']
  477. self.LCC_center_dist = df_LCC['laneOffset'].to_list()
  478. self.LCC_center_time_list = df_LCC['simTime'].to_list()
  479. self.LCC_center_frame_list = df_LCC['simFrame'].to_list()
  480. if not self.LCC_center_dist:
  481. self.result['LCCcenterDistanceExpectation'] = 0
  482. self.result['LCCcenterDistanceStandardDeviation'] = 0
  483. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  484. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  485. self.result['LCCcenterDistanceMax'] = 0
  486. self.result['LCCcenterDistanceMin'] = 0
  487. self.result['LCCcenterDistanceFrequency'] = 0
  488. self.result['LCCcenterDistanceRange'] = 0
  489. else:
  490. center_dist = [abs(x) for x in self.LCC_center_dist]
  491. # 车道中心线横向距离分布期望与标准差
  492. E_lane_center_dist = np.mean(center_dist)
  493. D_lane_center_dist = np.std(center_dist)
  494. # 车道中心线横向距离分布极值
  495. center_dist = np.array(center_dist)
  496. extrmax_index = sg.argrelmax(center_dist)[0]
  497. extrmin_index = sg.argrelmin(center_dist)[0]
  498. extreme_max_value = center_dist[extrmax_index]
  499. extreme_min_value = center_dist[extrmin_index]
  500. # 横向相对位置震荡频率
  501. # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
  502. length1 = len(extrmax_index) # 极大值列表长度
  503. length2 = len(extrmin_index) # 极小值列表长度
  504. # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
  505. FRAME_RATE = 100
  506. time_diff = 1.0 / FRAME_RATE
  507. period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
  508. period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
  509. try:
  510. period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
  511. except ZeroDivisionError:
  512. period = 0
  513. frequency = 1 / period if period else 0
  514. # length = min(len(extreme_max_value), len(extreme_min_value))
  515. # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
  516. # frequency = 1 / period
  517. # 横向相对位置震荡极差
  518. length = min(len(extreme_max_value), len(extreme_min_value))
  519. extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
  520. self.result['LCCcenterDistanceExpectation'] = E_lane_center_dist
  521. self.result['LCCcenterDistanceStandardDeviation'] = D_lane_center_dist
  522. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  523. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  524. self.result['LCCcenterDistanceMax'] = center_dist.max()
  525. self.result['LCCcenterDistanceMin'] = center_dist.min()
  526. self.result['LCCcenterDistanceFrequency'] = abs(frequency)
  527. self.result['LCCcenterDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
  528. df_LC = df[df['ICA_status'] == 'Only_Longitudinal_Control'].copy()
  529. v_max = df_LC['v'].max()
  530. v_min = df_LC['v'].min()
  531. self.result['OLC_v_deviation'] = v_max - v_min
  532. def _function_statistic(self):
  533. """
  534. """
  535. ACC_optimal_list = []
  536. if "functionACC" in self.type_list:
  537. if len(self.obj_id_list) > 1:
  538. df_follow = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
  539. self._following(df_follow)
  540. df_stop_and_go = self.df[
  541. self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
  542. self._stop_and_go(df_stop_and_go)
  543. else:
  544. ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
  545. if "functionLKA" in self.type_list:
  546. self._line_dist_merge()
  547. # df_line_dist = self.df[self.df['LKA_status'] == 3]
  548. df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"]
  549. self._lane_transverse_distance(df_lka)
  550. self._lateral_dist_of_center_line(df_lka)
  551. # if "functionLDW" in self.type_list:
  552. # df_ldw = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
  553. # self._lane_departure_warning(df_ldw)
  554. # if "functionAEB" in self.type_list:
  555. # self._autonomous_emergency_brake()
  556. if "functionICA" in self.type_list:
  557. self._integrated_cruise_assist(self.ego_df)
  558. arr_func = [value for key, value in self.result.items() if key in self.metric_list]
  559. # arr_func = list(self.result.values())
  560. if "functionACC" in self.type_list and len(self.obj_id_list) == 1:
  561. arr_func = ACC_optimal_list + arr_func
  562. return arr_func
  563. def custom_metric_param_parser(self, param_list):
  564. """
  565. param_dict = {
  566. "paramA" [
  567. {
  568. "kind": "-1",
  569. "optimal": "1",
  570. "multiple": ["0.5","5"],
  571. "spare1": null,
  572. "spare2": null
  573. }
  574. ]
  575. }
  576. """
  577. kind_list = []
  578. optimal_list = []
  579. multiple_list = []
  580. spare_list = []
  581. # spare1_list = []
  582. # spare2_list = []
  583. for i in range(len(param_list)):
  584. kind_list.append(int(param_list[i]['kind']))
  585. optimal_list.append(float(param_list[i]['optimal']))
  586. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  587. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  588. # spare1_list.append(param_list[i]['spare1'])
  589. # spare2_list.append(param_list[i]['spare2'])
  590. result = {
  591. "kind": kind_list,
  592. "optimal": optimal_list,
  593. "multiple": multiple_list,
  594. "spare": spare_list,
  595. # "spare1": spare1_list,
  596. # "spare2": spare2_list
  597. }
  598. return result
  599. def custom_metric_score(self, metric, value, param_list):
  600. """
  601. """
  602. param = self.custom_metric_param_parser(param_list)
  603. self.custom_param_dict[metric] = param
  604. score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
  605. score_sub = score_model.cal_score()
  606. score = sum(score_sub) / len(score_sub)
  607. return score
  608. def func_score(self):
  609. arr_func = self._function_statistic()
  610. print("\n[功能性表现及得分情况]")
  611. print("功能性各指标值:", [round(num, 2) for num in arr_func])
  612. arr_func = np.array([arr_func])
  613. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_func)
  614. score_sub = score_model.cal_score()
  615. score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
  616. score_metric = [round(num, 2) for num in score_sub]
  617. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  618. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  619. custom_metric_list = list(self.customMetricParam.keys())
  620. for metric in custom_metric_list:
  621. value = self.custom_data[metric]['value']
  622. param_list = self.customMetricParam[metric]
  623. score = self.custom_metric_score(metric, value, param_list)
  624. score_metric_dict[metric] = round(score, 2)
  625. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  626. score_metric = list(score_metric_dict.values())
  627. score_type_dict = {}
  628. if self.weight_custom: # 自定义权重
  629. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  630. self.weight_dict}
  631. for type in self.type_list:
  632. type_score = sum(
  633. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  634. score_type_dict[type] = round(type_score, 2)
  635. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  636. score_type_dict}
  637. score_function = sum(score_type_with_weight_dict.values())
  638. else: # 客观赋权
  639. self.weight_list = cal_weight_from_80(score_metric)
  640. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  641. score_function = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  642. for type in self.type_list:
  643. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  644. for key, value in self.weight_dict.items():
  645. if key in self.metric_dict[type]:
  646. self.weight_dict[key] = round(value / type_weight, 4)
  647. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  648. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  649. type_priority_list = [value for key, value in self.priority_dict.items() if
  650. key in self.metric_dict[type]]
  651. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  652. score_type_dict[type] = round(type_score, 2)
  653. score_type = list(score_type_dict.values())
  654. self.weight_type_list = cal_weight_from_80(score_type)
  655. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  656. score_function = round(score_function, 2)
  657. print(f"功能性得分为:{score_function:.2f}分。")
  658. print(f"功能性各类型得分为:{score_type_dict}分。")
  659. print(f"功能性各指标得分为:{score_metric_dict}。")
  660. return score_function, score_type_dict, score_metric_dict
  661. def continuous_group(self, df):
  662. time_list = df['simTime'].values.tolist()
  663. frame_list = df['simFrame'].values.tolist()
  664. group_time = []
  665. group_frame = []
  666. sub_group_time = []
  667. sub_group_frame = []
  668. for i in range(len(frame_list)):
  669. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  670. sub_group_time.append(time_list[i])
  671. sub_group_frame.append(frame_list[i])
  672. else:
  673. group_time.append(sub_group_time)
  674. group_frame.append(sub_group_frame)
  675. sub_group_time = [time_list[i]]
  676. sub_group_frame = [frame_list[i]]
  677. group_time.append(sub_group_time)
  678. group_frame.append(sub_group_frame)
  679. group_time = [g for g in group_time if len(g) >= 2]
  680. group_frame = [g for g in group_frame if len(g) >= 2]
  681. # 输出图表值
  682. time = [[g[0], g[-1]] for g in group_time]
  683. frame = [[g[0], g[-1]] for g in group_frame]
  684. unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  685. unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  686. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  687. return unfunc_df
  688. def unfunctional_follow_v_deviation_df_statistic(self):
  689. unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
  690. 'v_deviation': self.v_deviation_list})
  691. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  692. v_df = unfunc_df[unfunc_df['v_deviation'] > self.optimal_dict['followSpeedDeviation']]
  693. v_df = v_df[['simTime', 'simFrame', 'v_deviation']]
  694. v_follow_df = self.continuous_group(v_df)
  695. # v_follow_df['type'] = 'follow'
  696. # v_follow_df['type'] = 'ACC'
  697. v_follow_df['type'] = f"{self.type_name_dict['functionACC']}"
  698. self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, v_follow_df], ignore_index=True)
  699. def unfunctional_follow_dist_deviation_df_statistic(self):
  700. unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
  701. 'dist_deviation': self.dist_deviation_list})
  702. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  703. dist_df = unfunc_df[unfunc_df['dist_deviation'] > self.optimal_dict['followDistanceDeviation']] # 阈值由1改为了3
  704. dist_df = dist_df[['simTime', 'simFrame', 'dist_deviation']]
  705. dist_follow_df = self.continuous_group(dist_df)
  706. # v_follow_df['type'] = 'follow'
  707. # dist_follow_df['type'] = 'ACC'
  708. dist_follow_df['type'] = f"{self.type_name_dict['functionACC']}"
  709. self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, dist_follow_df], ignore_index=True)
  710. def unfunctional_lane_df_statistic(self):
  711. unfunc_df = pd.DataFrame(
  712. {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
  713. 'center_dist': self.center_dist_with_nan})
  714. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  715. unfunc_df = unfunc_df.dropna(subset=['center_dist'])
  716. lane_df = unfunc_df[abs(unfunc_df['center_dist']) > self.optimal_dict['centerDistanceMax']]
  717. lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
  718. dist_lane_df = self.continuous_group(lane_df)
  719. # dist_lane_df['type'] = 'lane'
  720. # dist_lane_df['type'] = 'LKA'
  721. dist_lane_df['type'] = f"{self.type_name_dict['functionLKA']}"
  722. self.unfunc_lane_df = pd.concat([self.unfunc_lane_df, dist_lane_df], ignore_index=True)
  723. # def zip_time_pairs(self, zip_list, upper_limit=9999):
  724. # zip_time_pairs = zip(self.time_list, zip_list)
  725. # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
  726. # return zip_vs_time
  727. def zip_time_pairs(self, zip_list):
  728. zip_time_pairs = zip(self.time_list, zip_list)
  729. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  730. return zip_vs_time
  731. def _get_weight_distribution(self, dimension):
  732. # get weight distribution
  733. weight_distribution = {}
  734. weight_distribution["name"] = self.config.dimension_name[dimension]
  735. for type in self.type_list:
  736. type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
  737. self.weight_dict.items() if
  738. key in self.metric_dict[type]}
  739. weight_distribution_type = {
  740. "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
  741. "indexes": type_weight_indexes_dict
  742. }
  743. weight_distribution[type] = weight_distribution_type
  744. return weight_distribution
  745. def report_statistic(self):
  746. # report_dict = {
  747. # "name": "功能性",
  748. # "weight": f"{self.weight * 100:.2f}%",
  749. # "weightDistribution": weight_distribution,
  750. # "score": score_function,
  751. # "level": grade_function,
  752. # 'score_type': score_type,
  753. # 'score_metric': score_metric,
  754. # 'followStopCount': self.follow_stop_count,
  755. # "description1": func_description1,
  756. # "description2": func_description2,
  757. #
  758. # "functionACC": acc_dict,
  759. # "functionLKA": lka_dict,
  760. #
  761. # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
  762. # "accData": [lat_acc_vs_time, lon_acc_vs_time],
  763. #
  764. # }
  765. brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
  766. throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
  767. steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
  768. # common parameter calculate
  769. brake_vs_time = self.zip_time_pairs(brakePedal_list)
  770. throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
  771. steering_vs_time = self.zip_time_pairs(steeringWheel_list)
  772. report_dict = {
  773. "name": self.config.dimension_name["function"],
  774. "weight": f"{self.weight * 100:.2f}%",
  775. 'followStopCount': self.follow_stop_count,
  776. }
  777. # upper_limit = 40
  778. times_upper = 2
  779. # len_time = len(self.time_list)
  780. duration = self.time_list[-1]
  781. # score_function, score_type, score_metric = self.func_score()
  782. score_function, score_type_dict, score_metric_dict = self.func_score()
  783. # get weight distribution
  784. report_dict["weightDistribution"] = self._get_weight_distribution("function")
  785. score_function = int(score_function) if int(score_function) == score_function else round(
  786. score_function, 2)
  787. # score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
  788. # score_metric = [int(n) if int(n) == n else n for key, n in score_metric_dict.items()]
  789. grade_function = score_grade(score_function)
  790. report_dict["score"] = score_function
  791. report_dict["level"] = grade_function
  792. # report_dict["score_type"] = score_type
  793. # report_dict["score_metric"] = score_metric
  794. # speed data
  795. ego_speed_list = self.ego_df['v'].values.tolist()
  796. ego_speed_vs_time = self.zip_time_pairs(ego_speed_list)
  797. obj_speed_vs_time = []
  798. rel_speed_vs_time = []
  799. # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  800. # accData
  801. lat_acc_list = self.ego_df['lat_acc'].values.tolist()
  802. lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
  803. lon_acc_list = self.ego_df['lon_acc'].values.tolist()
  804. lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
  805. # acc data
  806. # lat_acc_list = self.ego_df['lat_acc'].values.tolist()
  807. # lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
  808. # lon_acc_list = self.ego_df['lon_acc'].values.tolist()
  809. # lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
  810. # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time]
  811. # statistic type report infos
  812. unfunc_metric_list = []
  813. func_over_optimal = []
  814. # function description
  815. func_type_list = []
  816. unfunc_type_list = []
  817. type_details_dict = {}
  818. for type in self.type_list:
  819. if type == "functionACC":
  820. builtin_graph_dict = {}
  821. custom_graph_dict = {}
  822. if len(self.obj_id_list) == 1:
  823. follow_description1 = "无目标车数据可计算;"
  824. follow_description2 = ""
  825. follow_description3 = "无目标车数据可计算;"
  826. follow_description4 = "无目标车数据可计算;"
  827. acc_dict_indexes = {}
  828. for metric in self.metric_dict[type]:
  829. acc_dict_indexes[metric] = {
  830. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  831. "name": f"{self.name_dict[metric]}",
  832. "score": 80,
  833. "avg": "-",
  834. "max": "-",
  835. "min": "-",
  836. }
  837. if self.kind_dict[metric] == -1:
  838. acc_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]"
  839. elif self.kind_dict[metric] == 1:
  840. acc_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)"
  841. elif self.kind_dict[metric] == 0:
  842. acc_dict_indexes[metric][
  843. "range"] = f"[{self.optimal_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal_dict[metric] * self.multiple_dict[metric][1]}]"
  844. acc_dict = {
  845. "name": f"{self.type_name_dict[type]}",
  846. "score": 80,
  847. "level": "良好",
  848. "description1": follow_description1,
  849. "description2": follow_description2,
  850. "description3": follow_description3,
  851. "description4": follow_description4,
  852. "noObjectCar": True,
  853. "indexes": acc_dict_indexes,
  854. "builtin": {},
  855. "custom": {}
  856. }
  857. # follow cruise description
  858. func_follow_metric_list = []
  859. unfunc_follow_metric_list = []
  860. str_follow_over_optimal = ''
  861. distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
  862. else:
  863. acc_dict = {
  864. "name": f"{self.type_name_dict[type]}",
  865. "noObjectCar": False,
  866. }
  867. # follow dict
  868. score_follow = score_type_dict[type]
  869. grade_follow = score_grade(score_follow)
  870. acc_dict["score"] = score_follow
  871. acc_dict["level"] = grade_follow
  872. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  873. # follow cruise description
  874. func_follow_metric_list = []
  875. unfunc_follow_metric_list = []
  876. for metric in self.metric_dict[type]:
  877. unfunc_follow_metric_list.append(metric) if score_metric_dict[
  878. metric] < 80 else func_follow_metric_list.append(
  879. metric)
  880. str_follow_over_optimal = ''
  881. if not unfunc_follow_metric_list:
  882. str_func_follow_metric = string_concatenate(func_follow_metric_list)
  883. follow_description1 = f"{str_func_follow_metric}指标均表现良好"
  884. else:
  885. for metric in unfunc_follow_metric_list:
  886. if metric in self.bulitin_metric_list:
  887. value = self.result[metric]
  888. if self.kind_dict[metric] == -1:
  889. metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
  890. metric]) * 100
  891. elif self.kind_dict[metric] == 1:
  892. metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
  893. metric]) * 100
  894. elif self.kind_dict[metric] == 0:
  895. metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
  896. metric]) * 100
  897. else:
  898. value = self.custom_data[metric]["value"][0]
  899. if self.custom_param_dict[metric]['kind'][0] == -1:
  900. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  901. self.custom_param_dict[metric]['optimal'][0]) * 100
  902. elif self.custom_param_dict[metric]['kind'][0] == 1:
  903. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  904. self.custom_param_dict[metric]['optimal'][0]) * 100
  905. elif self.custom_param_dict[metric]['kind'][0] == 0:
  906. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  907. self.custom_param_dict[metric]['optimal'][0]) * 100
  908. # str_follow_over_optimal += f"{metric}为{value:.2f},超过合理范围{metric_over_optimal:.2f}%;"
  909. str_follow_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  910. str_follow_over_optimal = str_follow_over_optimal[:-1] if str_follow_over_optimal else ""
  911. str_func_follow_metric = string_concatenate(func_follow_metric_list)
  912. str_unfunc_follow_metric = string_concatenate(unfunc_follow_metric_list)
  913. if not func_follow_metric_list:
  914. follow_description1 = f"{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
  915. else:
  916. follow_description1 = f"{str_func_follow_metric}指标表现良好,{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
  917. # for follow_description2
  918. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  919. follow_description2 = f"自车在行驶时有{follow_duration:.2f}s处于跟车状态,"
  920. follow_description3 = ""
  921. if "followResponseTime" in self.metric_list:
  922. # for follow_description3
  923. cnt3_1 = len(self.follow_stop_time_start_list)
  924. tmp3_list = [x for x in self.follow_stop_time_start_list if
  925. x > self.optimal_dict['followResponseTime']]
  926. cnt3_2 = len(tmp3_list)
  927. percent3 = 0 if cnt3_1 == 0 else round(cnt3_2 / cnt3_1 * 100, 2)
  928. follow_description3 = f"起停跟车响应时间有{cnt3_2}次超出合理范围,占比为{percent3}%;"
  929. follow_description4 = ""
  930. if "followDistanceDeviation" in self.metric_list:
  931. # for follow_description4
  932. cnt4_1 = len(self.stop_distance_list)
  933. tmp4_list = [x for x in self.stop_distance_list if
  934. x < self.optimal_dict['followDistanceDeviation']]
  935. cnt4_2 = len(tmp4_list)
  936. percent4 = 0 if cnt4_1 == 0 else round(cnt4_2 / cnt4_1 * 100, 2)
  937. follow_description4 = f"在本次测试中,跟车状态下目标车共发生了{cnt4_1}次停车,有{cnt4_2}次超出合理范围,占比为{percent4}%;"
  938. # distance_list = obj_df['dist'].values.tolist()
  939. # distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
  940. #
  941. # acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
  942. # acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
  943. # acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
  944. # acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
  945. # common parameter calculate
  946. obj_df = self.df[self.df['playerId'] == 2]
  947. obj_speed_list = obj_df['v'].values.tolist()
  948. obj_speed_vs_time = self.zip_time_pairs(obj_speed_list)
  949. rel_speed_vs_time = self.zip_time_pairs(self.v_relative_list_full_time)
  950. distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
  951. # acc_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  952. # acc_dict["disData"] = distance_vs_time
  953. unfunc_follow_df = self.unfunc_follow_df.copy()
  954. unfunc_follow_df['type'] = "origin"
  955. unfunc_follow_slices = unfunc_follow_df.to_dict('records')
  956. # acc_dict["speMarkLine"] = unfunc_follow_slices
  957. # acc_dict["disMarkLine"] = unfunc_follow_slices
  958. distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
  959. # function ACC data
  960. acc_dict_indexes = {}
  961. for metric in self.metric_dict[type]:
  962. if metric == "followSpeedDeviation":
  963. self.unfunctional_follow_v_deviation_df_statistic()
  964. unfunc_v_df = self.unfunc_follow_df.copy()
  965. unfunc_v_df.loc[unfunc_v_df['type'] != 'ACC', 'type'] = "origin"
  966. # unfunc_v_df.loc[unfunc_v_df['type'] == 'time', 'type'] = "time"
  967. unfunc_v_slices = unfunc_v_df.to_dict('records')
  968. df1 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
  969. df1['v_time'] = df1['end_time'] - df1['start_time']
  970. devi_v_time = df1['v_time'].sum()
  971. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  972. percent2_1 = devi_v_time / follow_duration * 100 if follow_duration else 0
  973. if devi_v_time != 0:
  974. follow_description2 += f"跟车状态下自车和目标车的速度差共有{devi_v_time:.2f}s超出合理范围,占比为{percent2_1:.2f}%;"
  975. else:
  976. follow_description2 += "跟车状态下自车和目标车的速度差均在合理范围内;"
  977. v_deviation_list = [x for x in self.v_deviation_list if not np.isnan(x)]
  978. acc_dict_indexes["followSpeedDeviation"] = {
  979. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  980. "name": f"{self.name_dict[metric]}",
  981. "score": score_metric_dict[
  982. "followSpeedDeviation"],
  983. "avg": f'{np.mean(v_deviation_list):.2f}' if v_deviation_list else 0,
  984. "max": f'{max(v_deviation_list):.2f}' if v_deviation_list else 0,
  985. "min": f'{min(v_deviation_list):.2f}' if v_deviation_list else 0,
  986. # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]"
  987. "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]"
  988. }
  989. fsd_data = {
  990. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  991. "name": f"{self.name_dict[metric]}",
  992. "data": rel_speed_vs_time,
  993. "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]",
  994. # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]",
  995. # "markLine": unfunc_v_slices,
  996. # "markLine2": [-1 * self.optimal_dict['followSpeedDeviation'],
  997. # self.optimal_dict['followSpeedDeviation']],
  998. }
  999. builtin_graph_dict["followSpeedDeviation"] = fsd_data
  1000. if metric == "followDistanceDeviation":
  1001. self.unfunctional_follow_dist_deviation_df_statistic()
  1002. unfunc_dist_df = self.unfunc_follow_df.copy()
  1003. unfunc_dist_df.loc[unfunc_dist_df['type'] != 'ACC', 'type'] = "origin"
  1004. # unfunc_dist_df.loc[unfunc_dist_df['type'] == 'distance', 'type'] = "distance"
  1005. unfunc_dist_slices = unfunc_dist_df.to_dict('records')
  1006. df2 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
  1007. df2['dist_time'] = df2['end_time'] - df2['start_time']
  1008. devi_dist_time = df2['dist_time'].sum()
  1009. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  1010. percent2_2 = devi_dist_time / follow_duration * 100 if follow_duration else 0
  1011. if devi_dist_time != 0:
  1012. follow_description2 += f"跟车状态下自车和目标车的距离差共有{devi_dist_time:.2f}s超出合理范围,占比为{percent2_2:.2f}%;"
  1013. else:
  1014. follow_description2 += "跟车状态下自车和目标车的距离差均在合理范围内;"
  1015. dist_deviation_list = [x for x in self.dist_deviation_list if not np.isnan(x)]
  1016. acc_dict_indexes["followDistanceDeviation"] = {
  1017. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1018. "name": f"{self.name_dict[metric]}",
  1019. "score": score_metric_dict[
  1020. "followDistanceDeviation"],
  1021. "avg": f'{np.mean(dist_deviation_list):.2f}' if dist_deviation_list else 0,
  1022. "max": f'{max(dist_deviation_list):.2f}' if dist_deviation_list else 0,
  1023. "min": f'{min(dist_deviation_list):.2f}' if dist_deviation_list else 0,
  1024. "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]"
  1025. }
  1026. fdd_data = {
  1027. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1028. "name": f"{self.name_dict[metric]}",
  1029. "data": distance_deviation_vs_time,
  1030. "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]",
  1031. # "markLine": unfunc_dist_slices,
  1032. # "markLine2": [self.optimal_dict['followDistanceDeviation']],
  1033. }
  1034. builtin_graph_dict["followDistanceDeviation"] = fdd_data
  1035. if metric == "followStopDistance":
  1036. acc_dict_indexes["followStopDistance"] = {
  1037. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1038. "name": f"{self.name_dict[metric]}",
  1039. "score": score_metric_dict[
  1040. "followStopDistance"],
  1041. "avg": f'{np.mean(self.stop_distance_list):.2f}' if self.stop_distance_list else 0,
  1042. "max": f'{max(self.stop_distance_list):.2f}' if self.stop_distance_list else 0,
  1043. "min": f'{min(self.stop_distance_list):.2f}' if self.stop_distance_list else 0,
  1044. "range": f"[{self.optimal_dict['followStopDistance']}, inf)"
  1045. }
  1046. # sdd_data = {
  1047. # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1048. # "name": f"{self.name_dict[metric]}",
  1049. # "data": self.stop_distance_list,
  1050. # "range": f"[{self.optimal_dict['followStopDistance']}, inf)",
  1051. # # "ref": [self.optimal_dict['followStopDistance'] - 1,
  1052. # # self.optimal_dict['followStopDistance']],
  1053. # }
  1054. # builtin_graph_dict["followStopDistance"] = sdd_data
  1055. if metric == "followResponseTime":
  1056. acc_dict_indexes["followResponseTime"] = {
  1057. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1058. "name": f"{self.name_dict[metric]}",
  1059. "score": score_metric_dict["followResponseTime"],
  1060. "avg": f'{np.mean(self.follow_stop_time_start_list):.2f}' if self.follow_stop_time_start_list else 0,
  1061. "max": f'{max(self.follow_stop_time_start_list):.2f}' if self.follow_stop_time_start_list else 0,
  1062. "min": f'{min(self.follow_stop_time_start_list):.2f}' if self.follow_stop_time_start_list else 0,
  1063. "range": f"[0, {self.optimal_dict['followResponseTime']}]"
  1064. }
  1065. # rt_data = {
  1066. # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1067. # "name": f"{self.name_dict[metric]}",
  1068. # "data": self.follow_stop_time_start_list,
  1069. # # "range": f"[0, {self.optimal_dict['followResponseTime']})"
  1070. # }
  1071. # builtin_graph_dict["followResponseTime"] = rt_data
  1072. if metric not in self.bulitin_metric_list:
  1073. acc_dict_indexes[metric] = {
  1074. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1075. "name": f"{self.name_dict[metric]}",
  1076. "score": score_metric_dict[metric],
  1077. "avg": self.custom_data[metric]['tableData']['avg'],
  1078. "max": self.custom_data[metric]['tableData']['max'],
  1079. "min": self.custom_data[metric]['tableData']['min'],
  1080. }
  1081. if self.custom_param_dict[metric]['kind'][0] == -1:
  1082. acc_dict_indexes[metric][
  1083. "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  1084. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1085. acc_dict_indexes[metric][
  1086. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  1087. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1088. acc_dict_indexes[metric][
  1089. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
  1090. custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
  1091. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1092. acc_dict["indexes"] = acc_dict_indexes
  1093. acc_dict["builtin"] = builtin_graph_dict
  1094. acc_dict["custom"] = custom_graph_dict
  1095. acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
  1096. acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
  1097. acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
  1098. acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
  1099. type_details_dict[type] = acc_dict
  1100. unfunc_metric_list += unfunc_follow_metric_list
  1101. func_over_optimal.append(str_follow_over_optimal)
  1102. elif type == "functionLKA":
  1103. lka_dict = {
  1104. "name": f"{self.type_name_dict[type]}",
  1105. }
  1106. builtin_graph_dict = {}
  1107. custom_graph_dict = {}
  1108. # get score and grade
  1109. score_lane = score_type_dict["functionLKA"]
  1110. grade_lane = score_grade(score_lane)
  1111. lka_dict['score'] = score_lane
  1112. lka_dict['level'] = grade_lane
  1113. # unfunc_type_list.append('车道保持') if score_type_dict["functionLKA"] < 80 else func_type_list.append(
  1114. # '车道保持')
  1115. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  1116. # lane keep description
  1117. func_lane_metric_list = []
  1118. unfunc_lane_metric_list = []
  1119. for metric in self.metric_dict[type]:
  1120. unfunc_lane_metric_list.append(metric) if score_metric_dict[
  1121. metric] < 80 else func_lane_metric_list.append(
  1122. metric)
  1123. lka_dict_indexes = {}
  1124. for metric in self.metric_dict[type]:
  1125. if metric == "laneDistance":
  1126. lka_dict_indexes["laneDistance"] = {
  1127. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1128. "name": f"{self.name_dict[metric]}",
  1129. "score": score_metric_dict["laneDistance"],
  1130. "avg": f'{np.mean(self.line_dist):.2f}' if self.line_dist else 0,
  1131. "max": f'{max(self.line_dist):.2f}' if self.line_dist else 0,
  1132. "min": f'{min(self.line_dist):.2f}' if self.line_dist else 0,
  1133. "range": f"[{self.optimal_dict['laneDistance']}, 1.875]"
  1134. }
  1135. if metric == "centerDistanceExpectation":
  1136. lka_dict_indexes["centerDistanceExpectation"] = {
  1137. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1138. "name": f"{self.name_dict[metric]}",
  1139. "score": score_metric_dict['centerDistanceExpectation'],
  1140. "avg": f'{self.result["centerDistanceExpectation"]:.2f}',
  1141. "max": "-",
  1142. "min": "-",
  1143. "range": f"[0, {self.optimal_dict['centerDistanceExpectation']}]"
  1144. }
  1145. if metric == "centerDistanceStandardDeviation":
  1146. lka_dict_indexes["centerDistanceStandardDeviation"] = {
  1147. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1148. "name": f"{self.name_dict[metric]}",
  1149. "score": score_metric_dict['centerDistanceStandardDeviation'],
  1150. "avg": f'{self.result["centerDistanceStandardDeviation"]:.2f}',
  1151. "max": "-",
  1152. "min": "-",
  1153. "range": f"[0, {self.optimal_dict['centerDistanceStandardDeviation']}]"
  1154. }
  1155. if metric == "centerDistanceMax":
  1156. lka_dict_indexes["centerDistanceMax"] = {
  1157. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1158. "name": f"{self.name_dict[metric]}",
  1159. "score": score_metric_dict['centerDistanceMax'],
  1160. "avg": "-",
  1161. "max": f'{self.result["centerDistanceMax"]:.2f}',
  1162. "min": "-",
  1163. "range": f"[0, {self.optimal_dict['centerDistanceMax']}]"
  1164. }
  1165. if metric == "centerDistanceMin":
  1166. lka_dict_indexes["centerDistanceMin"] = {
  1167. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1168. "name": f"{self.name_dict[metric]}",
  1169. "score": score_metric_dict['centerDistanceMin'],
  1170. "avg": "-",
  1171. "max": "-",
  1172. "min": f'{self.result["centerDistanceMin"]:.2f}',
  1173. "range": f"[0, {self.optimal_dict['centerDistanceMin']}]"
  1174. }
  1175. if metric == "centerDistanceFrequency":
  1176. lka_dict_indexes["centerDistanceFrequency"] = {
  1177. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1178. "name": f"{self.name_dict[metric]}",
  1179. "score": score_metric_dict['centerDistanceFrequency'],
  1180. "avg": f'{self.result["centerDistanceFrequency"]:.2f}',
  1181. "max": "-",
  1182. "min": "-",
  1183. "range": f"[0, {self.optimal_dict['centerDistanceFrequency']}]"
  1184. }
  1185. if metric == "centerDistanceRange":
  1186. lka_dict_indexes["centerDistanceRange"] = {
  1187. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1188. "name": f"{self.name_dict[metric]}",
  1189. "score": score_metric_dict['centerDistanceRange'],
  1190. "avg": f'{self.result["centerDistanceRange"]:.2f}',
  1191. "max": "-",
  1192. "min": "-",
  1193. "range": f"[0, {self.optimal_dict['centerDistanceRange']}]"
  1194. }
  1195. if metric not in self.bulitin_metric_list:
  1196. lka_dict_indexes[metric] = {
  1197. # "name": self.custom_data[metric]['name'],
  1198. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1199. "name": f"{self.name_dict[metric]}",
  1200. "score": score_metric_dict[metric],
  1201. "avg": self.custom_data[metric]['tableData']['avg'],
  1202. "max": self.custom_data[metric]['tableData']['max'],
  1203. "min": self.custom_data[metric]['tableData']['min'],
  1204. }
  1205. if self.custom_param_dict[metric]['kind'][0] == -1:
  1206. lka_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  1207. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1208. lka_dict_indexes[metric][
  1209. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  1210. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1211. lka_dict_indexes[metric][
  1212. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
  1213. custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
  1214. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1215. lka_dict["indexes"] = lka_dict_indexes
  1216. str_lane_over_optimal = ''
  1217. if not unfunc_lane_metric_list:
  1218. str_func_lane_metric = string_concatenate(func_lane_metric_list)
  1219. lane_description1 = f"{str_func_lane_metric}指标均表现良好"
  1220. else:
  1221. for metric in unfunc_lane_metric_list:
  1222. if metric in self.bulitin_metric_list:
  1223. value = self.result[metric]
  1224. if self.kind_dict[metric] == -1:
  1225. metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
  1226. metric]) * 100
  1227. elif self.kind_dict[metric] == 1:
  1228. metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
  1229. metric]) * 100
  1230. elif self.kind_dict[metric] == 0:
  1231. metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
  1232. metric]) * 100
  1233. else:
  1234. value = self.custom_data[metric]["value"][0]
  1235. if self.custom_param_dict[metric]['kind'][0] == -1:
  1236. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  1237. self.custom_param_dict[metric]['optimal'][0]) * 100
  1238. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1239. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  1240. self.custom_param_dict[metric]['optimal'][0]) * 100
  1241. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1242. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  1243. self.custom_param_dict[metric]['optimal'][0]) * 100
  1244. str_lane_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  1245. str_lane_over_optimal = str_lane_over_optimal[:-1] if str_lane_over_optimal else ""
  1246. str_func_lane_metric = string_concatenate(func_lane_metric_list)
  1247. str_unfunc_lane_metric = string_concatenate(unfunc_lane_metric_list)
  1248. if not func_lane_metric_list:
  1249. lane_description1 = f"{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
  1250. else:
  1251. lane_description1 = f"{str_func_lane_metric}指标表现良好,{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
  1252. lane_description2 = ""
  1253. center_distance_vs_time = []
  1254. if "centerDistanceMax" in self.metric_list:
  1255. cnt2_1 = len(self.center_dist)
  1256. tmp2_list = [x for x in self.center_dist if x > self.optimal_dict['centerDistanceMax']]
  1257. cnt2_2 = len(tmp2_list)
  1258. percent2 = 0 if cnt2_1 == 0 else (cnt2_2 / cnt2_1 * 100)
  1259. dist_time = percent2 * duration / 100
  1260. if dist_time != 0:
  1261. lane_description2 = f"距离有{dist_time:.2f}秒超出合理范围,占比为{percent2:.2f}%"
  1262. else:
  1263. lane_description2 = f"距离均在合理范围内,算法车道保持表现良好"
  1264. # lane keep data for graph
  1265. # center_distance_vs_time = self.zip_time_pairs(self.center_dist)
  1266. center_distance_vs_time = self.zip_time_pairs(self.center_dist_with_nan)
  1267. self.unfunctional_lane_df_statistic()
  1268. #
  1269. # lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
  1270. # lka_dict["description2"] = lane_description2
  1271. unfunc_lane_df = self.unfunc_lane_df.copy()
  1272. unfunc_lane_df['type'] = "origin"
  1273. unfunc_lane_slices = unfunc_lane_df.to_dict('records')
  1274. unfunc_lane_dist_df = self.unfunc_lane_df.copy()
  1275. unfunc_lane_dist_df.loc[unfunc_lane_dist_df['type'] != 'LKA', 'type'] = "origin"
  1276. unfunc_lane_dist_slices = unfunc_lane_dist_df.to_dict('records')
  1277. lka_data = {
  1278. "name": "车辆中心线横向距离(m)",
  1279. "data": center_distance_vs_time,
  1280. # "markLine": unfunc_lane_dist_slices
  1281. }
  1282. if 'centerDistanceMax' in self.optimal_dict:
  1283. lka_range = f"[0, {self.optimal_dict['centerDistanceMax']}]"
  1284. elif 'centerDistanceMin' in self.optimal_dict:
  1285. lka_range = f"[0, {self.optimal_dict['centerDistanceMin']}]"
  1286. else:
  1287. lka_range = f"[0, 0.5]"
  1288. lka_data["range"] = lka_range
  1289. builtin_graph_dict["centerDistance"] = lka_data
  1290. lka_dict["builtin"] = builtin_graph_dict
  1291. lka_dict["custom"] = custom_graph_dict
  1292. lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
  1293. lka_dict["description2"] = lane_description2
  1294. # lka_dict["centerDisData"] = center_distance_vs_time
  1295. # lka_dict["centerDisMarkLine"] = unfunc_lane_dist_slices
  1296. type_details_dict[type] = lka_dict
  1297. unfunc_metric_list += unfunc_lane_metric_list
  1298. func_over_optimal.append(str_lane_over_optimal)
  1299. else:
  1300. type_dict = {
  1301. "name": f"{self.type_name_dict[type]}",
  1302. }
  1303. builtin_graph_dict = {}
  1304. custom_graph_dict = {}
  1305. # get score and grade
  1306. score_custom_type = score_type_dict[type]
  1307. grade_custom_type = score_grade(score_custom_type)
  1308. type_dict["score"] = score_custom_type
  1309. type_dict["level"] = grade_custom_type
  1310. # custom type description
  1311. good_custom_metric_list = []
  1312. bad_custom_metric_list = []
  1313. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  1314. type_dict_indexes = {}
  1315. for metric in self.metric_dict[type]:
  1316. bad_custom_metric_list.append(metric) if score_metric_dict[
  1317. metric] < 80 else good_custom_metric_list.append(
  1318. metric)
  1319. type_dict_indexes[metric] = {
  1320. # "name": self.custom_data[metric]['name'],
  1321. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1322. "name": f"{self.name_dict[metric]}",
  1323. "score": score_metric_dict[metric],
  1324. "avg": self.custom_data[metric]['tableData']['avg'],
  1325. "max": self.custom_data[metric]['tableData']['max'],
  1326. "min": self.custom_data[metric]['tableData']['min'],
  1327. }
  1328. if self.custom_param_dict[metric]['kind'][0] == -1:
  1329. type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  1330. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1331. type_dict_indexes[metric]["range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  1332. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1333. type_dict_indexes[metric][
  1334. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
  1335. custom_graph_dict[metric] = self.custom_data[metric]['reportData']
  1336. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1337. type_dict["indexes"] = type_dict_indexes
  1338. str_type_over_optimal = ""
  1339. if not bad_custom_metric_list:
  1340. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  1341. type_description = f"{str_good_custom_metric}指标均表现良好"
  1342. else:
  1343. for metric in bad_custom_metric_list:
  1344. value = self.custom_data[metric]["value"][0]
  1345. if self.custom_param_dict[metric]['kind'][0] == -1:
  1346. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  1347. self.custom_param_dict[metric]['optimal'][0]) * 100
  1348. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1349. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  1350. self.custom_param_dict[metric]['optimal'][0]) * 100
  1351. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1352. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  1353. self.custom_param_dict[metric]['optimal'][0]) * 100
  1354. str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  1355. str_type_over_optimal = str_type_over_optimal[:-1]
  1356. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  1357. str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
  1358. if not good_custom_metric_list:
  1359. type_description = f"{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
  1360. else:
  1361. type_description = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
  1362. type_dict["builtin"] = builtin_graph_dict
  1363. type_dict["custom"] = custom_graph_dict
  1364. type_dict["description"] = replace_key_with_value(type_description, self.name_dict)
  1365. type_details_dict[type] = type_dict
  1366. unfunc_metric_list += bad_custom_metric_list
  1367. func_over_optimal.append(str_type_over_optimal)
  1368. report_dict["details"] = type_details_dict
  1369. # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  1370. func_over_optimal = [s for s in func_over_optimal if s]
  1371. str_func_over_optimal = ';'.join(func_over_optimal)
  1372. if grade_function == '优秀':
  1373. str_func_type = string_concatenate(func_type_list)
  1374. func_description1 = f'算法在{str_func_type}功能上表现优秀;'
  1375. elif grade_function == '良好':
  1376. str_func_type = string_concatenate(func_type_list)
  1377. func_description1 = f'算法在{str_func_type}功能上总体表现良好,满足设计指标要求;'
  1378. elif grade_function == '一般':
  1379. str_unfunc_type = string_concatenate(unfunc_type_list)
  1380. str_unfunc_metric = string_concatenate(unfunc_metric_list)
  1381. str_unfunc_metric = replace_key_with_value(str_unfunc_metric, self.type_name_dict)
  1382. func_description1 = f'算法在{str_unfunc_type}功能上表现一般、需要在{str_unfunc_metric}指标上进一步优化。其中,{str_func_over_optimal};'
  1383. elif grade_function == '较差':
  1384. str_unfunc_type = string_concatenate(unfunc_type_list)
  1385. func_description1 = f'算法在{str_unfunc_type}功能上表现较差,需要提高算法的功能性表现。其中,{str_func_over_optimal};'
  1386. if not unfunc_type_list:
  1387. func_description2 = '功能性在各个功能上的表现俱佳。'
  1388. else:
  1389. str_unfunc_type = string_concatenate(unfunc_type_list)
  1390. func_description2 = f"算法在{str_unfunc_type}功能上需要重点优化。"
  1391. # report_dict["description1"] = replace_key_with_value(func_description1, self.name_dict)
  1392. report_dict["description1"] = replace_key_with_value(replace_key_with_value(func_description1, self.name_dict),
  1393. self.type_name_dict)
  1394. report_dict["description2"] = replace_key_with_value(func_description2, self.type_name_dict)
  1395. report_dict['commonData'] = {
  1396. "per": {
  1397. "name": "脚刹/油门踏板开度(百分比)",
  1398. "legend": ["刹车踏板开度", "油门踏板开度"],
  1399. "data": [brake_vs_time, throttle_vs_time]
  1400. },
  1401. "ang": {
  1402. "name": "方向盘转角(角度°)",
  1403. "data": steering_vs_time
  1404. },
  1405. "spe": {
  1406. "name": "速度(km/h)",
  1407. "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
  1408. "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  1409. },
  1410. "acc": {
  1411. "name": "加速度(m/s²)",
  1412. "legend": ["横向加速度", "纵向加速度"],
  1413. "data": [lat_acc_vs_time, lon_acc_vs_time]
  1414. },
  1415. "dis": {
  1416. "name": "前车距离(m)",
  1417. "data": distance_vs_time
  1418. }
  1419. }
  1420. self.unfunc_df = pd.concat([self.unfunc_df, self.unfunc_follow_df, self.unfunc_lane_df], ignore_index=True)
  1421. unfunc_df = self.unfunc_df.copy()
  1422. unfunc_slices = unfunc_df.to_dict('records')
  1423. unfunc_slices.extend(self.custom_markline_list)
  1424. report_dict["commonMarkLine"] = unfunc_slices
  1425. # report_dict = {
  1426. # "name": "功能性",
  1427. # "weight": f"{self.weight * 100:.2f}%",
  1428. # "weightDistribution": weight_distribution,
  1429. # "score": score_function,
  1430. # "level": grade_function,
  1431. # 'followStopCount': self.follow_stop_count,
  1432. # "description1": func_description1,
  1433. # "description2": func_description2,
  1434. #
  1435. # "functionACC": acc_dict,
  1436. # "functionLKA": lka_dict,
  1437. #
  1438. # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
  1439. # "accData": [lat_acc_vs_time, lon_acc_vs_time],
  1440. #
  1441. # }
  1442. self.eval_data = self.ego_df.copy()
  1443. return report_dict
  1444. def get_eval_data(self):
  1445. df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc', 'line_dist']].copy()
  1446. return df