function.py 93 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Function metrics
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. sys.path.append('../results')
  18. import math
  19. import numpy as np
  20. import pandas as pd
  21. import scipy.signal as sg
  22. from score_weight import cal_score_with_priority, cal_weight_from_80
  23. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
  24. class Function(object):
  25. """
  26. Class for achieving function metrics for autonomous driving.
  27. Attributes:
  28. df: Vehicle driving data, stored in dataframe format.
  29. """
  30. def __init__(self, data_processed, custom_data, scoreModel):
  31. self.eval_data = pd.DataFrame()
  32. self.data_processed = data_processed
  33. self.scoreModel = scoreModel
  34. self.status_trigger_dict = data_processed.status_trigger_dict
  35. self.df = data_processed.object_df
  36. self.ego_df = data_processed.ego_data
  37. self.obj_id_list = data_processed.obj_id_list
  38. self.df_roadmark = data_processed.road_mark_df
  39. self.df_roadpos = data_processed.road_pos_df
  40. self.df_drivectrl = data_processed.driver_ctrl_df
  41. self.unfunc_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  42. self.unfunc_follow_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  43. self.unfunc_lane_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  44. self.unfunc_custom_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  45. self.custom_markline_list = []
  46. # config infos for calculating score
  47. self.config = data_processed.config
  48. function_config = self.config.config['function']
  49. self.function_config = function_config
  50. # common data
  51. self.bulitin_metric_list = self.config.builtinMetricList
  52. # dimension data
  53. self.weight_custom = function_config['weightCustom']
  54. self.metric_list = function_config['metric']
  55. self.type_list = function_config['type']
  56. self.type_name_dict = function_config['typeName']
  57. self.name_dict = function_config['name']
  58. self.unit_dict = function_config['unit']
  59. # custom metric data
  60. self.customMetricParam = function_config['customMetricParam']
  61. self.custom_metric_list = list(self.customMetricParam.keys())
  62. self.custom_data = custom_data
  63. self.custom_param_dict = {}
  64. # score data
  65. self.weight = function_config['weightDimension']
  66. self.weight_type_dict = function_config['typeWeight']
  67. self.weight_type_list = function_config['typeWeightList']
  68. self.weight_dict = function_config['weight']
  69. self.weight_list = function_config['weightList']
  70. self.priority_dict = function_config['priority']
  71. self.priority_list = function_config['priorityList']
  72. self.kind_dict = function_config['kind']
  73. self.optimal_dict = function_config['optimal']
  74. self.multiple_dict = function_config['multiple']
  75. self.kind_list = function_config['kindList']
  76. self.optimal_list = function_config['optimalList']
  77. self.multiple_list = function_config['multipleList']
  78. # self.unit_dict = function_config['unit']
  79. self.metric_dict = function_config['typeMetricDict']
  80. if "function_ACC" in self.metric_dict.keys():
  81. self.acc_metric_list = self.metric_dict['function_ACC']
  82. if "functionLKA" in self.metric_dict.keys():
  83. self.lka_metric_list = self.metric_dict['functionLKA']
  84. # self.acc_metric_list = ['followSpeedDeviation', 'followDistanceDeviation', 'followStopDistance',
  85. # 'followResponseTime']
  86. # self.lka_metric_list = ['laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation',
  87. # 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency',
  88. # 'centerDistanceRange']
  89. # custom metric
  90. self.flag_type_dict = {}
  91. # lists of drving control info
  92. self.time_list = data_processed.driver_ctrl_data['time_list']
  93. self.frame_list = data_processed.driver_ctrl_data['frame_list']
  94. self.time_list_follow = []
  95. self.frame_list_follow = []
  96. self.dist_list = []
  97. self.v_deviation_list = []
  98. self.v_relative_list = []
  99. self.dist_deviation_list = []
  100. self.stop_distance_list = []
  101. self.follow_stop_time_start_list = [] # 起停跟车响应时长
  102. self.dist_list_full_time = list()
  103. self.dist_deviation_list_full_time = list()
  104. self.line_dist = []
  105. self.center_dist = []
  106. self.center_dist_with_nan = []
  107. self.center_time_list = []
  108. self.center_frame_list = []
  109. self.ICA_FLAG = False
  110. self.follow_stop_count = 0
  111. self.result = {}
  112. def _following(self, df):
  113. """
  114. 稳定跟车行驶:
  115. 1.稳态控制速度偏差:筛选出跟车行驶数据后,
  116. 2.稳态控制距离偏差
  117. ACC stastus
  118. 0x7: Stand-wait State
  119. 0x6: Stand-active State
  120. 0x5: Passive State
  121. 0x4: Standby State
  122. 0x3: Shut-off State
  123. 0x2: Override State
  124. 0x1: Active State
  125. 0x0: Off
  126. # 速度变化不能以0到非0为参考,要改为从0到0.05的变化,有一个阈值滤波,避免前车车辆抖动误差影响传感器
  127. 弯道行驶: 最小跟随弯道半径
  128. """
  129. # df = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
  130. # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
  131. col_list = ['simTime', 'simFrame', 'playerId', 'v', 'posX', 'posY'] # target_id
  132. df = df[col_list].copy()
  133. ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'posX', 'posY']]
  134. # 筛选目标车(同一车道内,距离最近的前车)
  135. # obj_df = df[df['playerId'] == df['target_id']]
  136. target_id = 2
  137. obj_df = df[df['playerId'] == target_id][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 目标车
  138. obj_df = obj_df.rename(columns={'v': 'v_obj', 'posX': 'posX_obj', 'posY': 'posY_obj'})
  139. df_merge = pd.merge(ego_df, obj_df, on=['simTime', 'simFrame'], how='left')
  140. df_merge['v_relative'] = df_merge['v'] - df_merge['v_obj']
  141. df_merge['dist'] = df_merge.apply(
  142. lambda row: self.dist(row['posX'], row['posY'], row['posX_obj'], row['posY_obj']), axis=1)
  143. self.dist_list = df_merge['dist'].values.tolist()
  144. df_merge['time_gap'] = df_merge['dist'] / df_merge['v']
  145. SAFE_TIME_GAP = 3
  146. df_merge['dist_deviation'] = df_merge['time_gap'].apply(
  147. lambda x: 0 if (x >= SAFE_TIME_GAP) else (SAFE_TIME_GAP - x))
  148. df_stop = df_merge[(df_merge['v'] == 0) & (df_merge['v_obj'] == 0)]
  149. stop_distance_list = df_stop['dist'].values.tolist()
  150. self.stop_distance_list = stop_distance_list
  151. t_list = df_stop['simTime'].values.tolist()
  152. stop_group = []
  153. sub_group = []
  154. CONTINUOUS_TIME_PERIOD = 1
  155. CONTINUOUS_FRAME_PERIOD = 13
  156. for i in range(len(t_list)):
  157. if not sub_group or t_list[i] - t_list[i - 1] <= CONTINUOUS_TIME_PERIOD:
  158. sub_group.append(t_list[i])
  159. else:
  160. stop_group.append(sub_group)
  161. sub_group = [t_list[i]]
  162. stop_group.append(sub_group)
  163. stop_group = [g for g in stop_group if len(g) >= CONTINUOUS_FRAME_PERIOD]
  164. self.follow_stop_count = len(stop_group)
  165. # solution 1: 跟车状态下,算法输出预设跟车速度
  166. # df['velocity_deviation'] = abs(df['v'] - df['set_velocity'])
  167. # max_velocity_deviation = df['velocity_deviation'].max()
  168. # solution 2: 跟车状态下,跟车设定速度为目标车速度
  169. # velocity_deviation = []
  170. # distance_deviation = []
  171. # stop_distance = []
  172. #
  173. # for f, data in df.groupby('simFrame'):
  174. # ego_data = data.iloc[0].copy()
  175. # # df.loc[len(df)] = ego_data
  176. #
  177. # if len(data) < 2:
  178. # continue
  179. # v1 = ego_data['v']
  180. # x1 = ego_data['posX']
  181. # y1 = ego_data['posY']
  182. #
  183. # for i in range(1, len(data)):
  184. # obj_data = data.iloc[i].copy()
  185. #
  186. # v2 = obj_data['v']
  187. # x2 = obj_data['posX']
  188. # y2 = obj_data['posY']
  189. #
  190. # v_delta = abs(v1 - v2)
  191. # velocity_deviation.append(v_delta)
  192. #
  193. # dist = self.dist(x1, y1, x2, y2)
  194. # distance_deviation.append(dist)
  195. #
  196. # if v2 == 0 and v1 == 0:
  197. # stop_distance.append(dist)
  198. # df.loc[len(df)] = obj_data
  199. # self.df = df
  200. df_merge.replace([np.inf, -np.inf], np.nan, inplace=True) # 异常值处理
  201. self.time_list_follow = df_merge['simTime'].values.tolist()
  202. self.frame_list_follow = df_merge['simFrame'].values.tolist()
  203. self.v_relative_list = df_merge['v_relative'].values.tolist()
  204. self.v_deviation_list = abs(df_merge['v_relative']).values.tolist()
  205. self.dist_deviation_list = df_merge['dist_deviation'].values.tolist()
  206. tmp_df = self.ego_df[['simTime', 'simFrame']].copy()
  207. v_rel_df = df_merge[['simTime', 'v_relative']].copy()
  208. df_merged1 = pd.merge(tmp_df, v_rel_df, on='simTime', how='left')
  209. self.v_relative_list_full_time = df_merged1['v_relative'].values.tolist()
  210. dist_deviation_df = df_merge[['simTime', 'dist_deviation']].copy()
  211. df_merged1 = pd.merge(tmp_df, dist_deviation_df, on='simTime', how='left')
  212. self.dist_deviation_list_full_time = df_merged1['dist_deviation'].values.tolist()
  213. dist_df = df_merge[['simTime', 'dist']].copy()
  214. df_merged1 = pd.merge(tmp_df, dist_df, on='simTime', how='left')
  215. self.dist_list_full_time = df_merged1['dist'].values.tolist()
  216. max_velocity_deviation = abs(df_merge['v_relative']).max()
  217. distance_deviation = df_merge['dist_deviation'].max()
  218. min_stop_distance = min(self.stop_distance_list) if self.stop_distance_list else 9999
  219. self.result['followSpeedDeviation'] = max_velocity_deviation
  220. self.result['followDistanceDeviation'] = distance_deviation
  221. self.result['followStopDistance'] = min_stop_distance
  222. def _stop_and_go(self, df):
  223. """
  224. 停走功能:
  225. 1.跟停距离3-4m,
  226. 2.启动跟车响应时间<=1.2s
  227. 3.停车跟车响应时间<=1.2s
  228. decisionType_target
  229. 0 无障碍物巡航
  230. 1 停车减速让行
  231. 2 跟车
  232. 3 绕行
  233. 4 到达终点?
  234. """
  235. # df = self.df[self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
  236. # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
  237. df = df.dropna(subset=['type'])
  238. STOP_SPEED_THRESHOLD = 0.05
  239. df['v'] = df['v'].apply(lambda x: 0 if x <= STOP_SPEED_THRESHOLD else 1) # 区分速度为0或非0
  240. target_id = 2
  241. df_ego = df[df['playerId'] == 1].copy()
  242. df_obj = df[df['playerId'] == target_id].copy() # 目标车
  243. df_obj_time = df_obj['simTime'].values.tolist()
  244. df_ego = df_ego[df_ego['simTime'].isin(df_obj_time)].copy()
  245. df_ego = df_ego.drop_duplicates(["simTime", "simFrame"])
  246. df_obj = df_obj.drop_duplicates(["simTime", "simFrame"])
  247. df_ego['v_diff'] = df_ego['v'].diff()
  248. df_ego['v_start_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == 1 else 0) # 起步即为1
  249. df_ego['v_stop_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == -1 else 0) # 停车即为-1
  250. df_obj['v_diff'] = df_obj['v'].diff()
  251. obj_v_start_flag = df_obj['v_diff'].apply(lambda x: 1 if x == 1 else 0).values # 起步即为1
  252. obj_v_stop_flag = df_obj['v_diff'].apply(lambda x: 1 if x == -1 else 0).values # 停车即为1
  253. df_ego['obj_v_start_flag'] = obj_v_start_flag
  254. df_ego['obj_v_stop_flag'] = obj_v_stop_flag
  255. df_ego['flag_start'] = df_ego['obj_v_start_flag'] - df_ego['v_start_flag'] # 目标车起步即为1,自车起步即为-1
  256. df_ego['flag_stop'] = df_ego['obj_v_stop_flag'] - df_ego['v_stop_flag'] # 目标车停车即为1,自车停车即为-1
  257. flag_start_list = df_ego['flag_start'].values
  258. flag_stop_list = df_ego['flag_stop'].values
  259. time_list = df_ego['simTime'].values
  260. time_start_list = []
  261. time_stop_list = []
  262. for i, flag in enumerate(flag_start_list):
  263. if flag:
  264. t1 = time_list[i]
  265. if flag == -1:
  266. t2 = time_list[i]
  267. time_start_list.append(t2 - t1) # t2-t1即为自车起步响应时间
  268. for i, flag in enumerate(flag_stop_list):
  269. if flag:
  270. t1 = time_list[i]
  271. if flag == -1:
  272. t2 = time_list[i]
  273. time_stop_list.append(t2 - t1) # t2-t1即为自车停车响应时间
  274. time_start_list = [i for i in time_start_list if i != 0]
  275. self.result['followResponseTime'] = max(time_start_list) if time_start_list else 0
  276. self.follow_stop_time_start_list = time_start_list
  277. # self.result['跟车停止响应最长时间'] = max(time_stop_list)
  278. def dist(self, x1, y1, x2, y2):
  279. dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
  280. return dis
  281. def _line_dist_merge(self):
  282. # line_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
  283. # line_dist_group = line_dist_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
  284. # line_dist_df['line_dist'] = line_dist_group
  285. # line_dist_df = line_dist_df[['simFrame', 'line_dist']]
  286. # line_dist_df.columns = ['simFrame', 'line_dist']
  287. # line_dist_df = line_dist_df.drop_duplicates()
  288. line_dist_df = self.df_roadmark[(self.df_roadmark['id'] == 0) | (self.df_roadmark['id'] == 2)].copy()
  289. # df = line_dist_df[line_dist_df['id'] == 2][['simTime', 'simFrame']].copy()
  290. # df['line_dist'] = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
  291. if line_dist_df.empty:
  292. self.ego_df['line_dist'] = pd.Series(dtype=float)
  293. else:
  294. df = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).reset_index()
  295. # df = df.rename(columns={'lateralDist': 'line_dist'})
  296. df.columns = ["simFrame", "line_dist"]
  297. self.ego_df = pd.merge(self.ego_df, df, on='simFrame', how='left')
  298. def _lane_transverse_distance(self, df):
  299. """
  300. 0x7:Reserved
  301. 0x6: Reserved
  302. 0x5: Reserved
  303. 0x4: Error
  304. 0x3: Active
  305. 0x2: Standby
  306. 0x1: Passive
  307. 0x0: Off
  308. """
  309. # 车道内偏移行驶时,与近侧车道线的横向距离
  310. # data_group = lane_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
  311. # data_group = self.df_roadmark.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
  312. # df = data_group.to_frame(name='line_dist')
  313. # df['simFrame'] = data_group.index.tolist()
  314. # df = self.df_roadmark[self.df_roadmark['id'] == 0][['simTime', 'simFrame']].copy()
  315. # df['line_dist'] = self.df_roadmark.groupby('simFrame').apply(
  316. # lambda t: abs(t['lateralDist']).min()).values.tolist()
  317. # self.ego_df = pd.merge(self.ego_df, df, on=['simTime', 'simFrame'], how='left')
  318. # self._line_dist_merge()
  319. # # df = self.df[self.df['LKA_status'] == 3]
  320. # df = self.ego_df[self.ego_df['LKA_status'] == "Active"]
  321. self.line_dist = df['line_dist'].to_list()
  322. self.line_dist = [x for x in self.line_dist if not np.isnan(x)]
  323. self.result['laneDistance'] = min(self.line_dist) if self.line_dist else self.optimal_dict['laneDistance']
  324. def _lateral_dist_of_center_line(self, df_laneoffset):
  325. # df_laneoffset = self.df[(self.df['LKA_status'] == "Active") & (self.df['playerId'] == 1)]
  326. self.center_dist_with_nan = df_laneoffset['laneOffset'].to_list()
  327. self.center_time_list = df_laneoffset['simTime'].to_list()
  328. self.center_frame_list = df_laneoffset['simFrame'].to_list()
  329. # self.center_dist_df = self.df_roadpos[self.df_roadpos["playerId"] == 1]
  330. # self.center_dist = self.center_dist_df["laneOffset"]
  331. # self.center_time_list = self.center_dist_df['simTime'].to_list()
  332. # self.center_frame_list = self.center_dist_df['simFrame'].to_list()
  333. self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
  334. if not self.center_dist:
  335. self.result['centerDistanceExpectation'] = 0
  336. self.result['centerDistanceStandardDeviation'] = 0
  337. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  338. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  339. self.result['centerDistanceMax'] = 0
  340. self.result['centerDistanceMin'] = 0
  341. self.result['centerDistanceFrequency'] = 0
  342. self.result['centerDistanceRange'] = 0
  343. else:
  344. center_dist = [abs(x) for x in self.center_dist]
  345. # 车道中心线横向距离分布期望与标准差
  346. E_lane_center_dist = np.mean(center_dist)
  347. D_lane_center_dist = np.std(center_dist)
  348. # 车道中心线横向距离分布极值
  349. center_dist = np.array(center_dist)
  350. extrmax_index = sg.argrelmax(center_dist)[0]
  351. extrmin_index = sg.argrelmin(center_dist)[0]
  352. extreme_max_value = center_dist[extrmax_index]
  353. extreme_min_value = center_dist[extrmin_index]
  354. # 横向相对位置震荡频率
  355. # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
  356. length1 = len(extrmax_index) # 极大值列表长度
  357. length2 = len(extrmin_index) # 极小值列表长度
  358. # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
  359. FREQUENCY = 100
  360. time_diff = 1.0 / FREQUENCY
  361. period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
  362. period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
  363. try:
  364. period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
  365. except ZeroDivisionError:
  366. period = 0
  367. frequency = 1 / period if period else 0
  368. # length = min(len(extreme_max_value), len(extreme_min_value))
  369. # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
  370. # frequency = 1 / period
  371. # 横向相对位置震荡极差
  372. length = min(len(extreme_max_value), len(extreme_min_value))
  373. extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
  374. self.result['centerDistanceExpectation'] = E_lane_center_dist
  375. self.result['centerDistanceStandardDeviation'] = D_lane_center_dist
  376. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  377. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  378. self.result['centerDistanceMax'] = center_dist.max()
  379. self.result['centerDistanceMin'] = center_dist.min()
  380. self.result['centerDistanceFrequency'] = abs(frequency)
  381. self.result['centerDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
  382. def _continous_judge(self, frame_list):
  383. CONTINUOUS_FRAME_PERIOD = 3
  384. if not frame_list:
  385. return 0
  386. cnt = 1
  387. for i in range(1, len(frame_list)):
  388. if frame_list[i] - frame_list[i - 1] <= CONTINUOUS_FRAME_PERIOD:
  389. continue
  390. cnt += 1
  391. return cnt
  392. def _lane_departure_warning(self, ldw_df):
  393. """
  394. 工作车速范围:55/60-200/250 km/h
  395. 最小可用车道宽度:2.6m
  396. 误报警:距离安全,但是预警
  397. 漏报警:距离危险,但是没报警
  398. 灵敏度差:线外40cm
  399. 灵敏度零级:压线
  400. 灵敏度一级:线内20cm(20%)
  401. 灵敏度二级:线内40cm(40%)
  402. 灵敏度三级:线内60cm(60%)
  403. 偏离车速1.0m/s:线内60cm报警
  404. 偏离车速0.5m/s:线内40cm报警
  405. """
  406. # 自车, 车道线宽度>2.6m,转向灯没开
  407. # ego_df = self.df[self.df['playerId'] == 1].copy()
  408. # ego_df['line_dist'] = self.line_dist
  409. # lane_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
  410. # lane_dist_group = lane_dist_df.groupby("simFrame").apply(
  411. # lambda t: abs(t['lateralDist']).min()).values.tolist()
  412. # lane_dist_df['line_dist'] = lane_dist_group
  413. # lane_dist_df = lane_dist_df[['simFrame', 'line_dist']]
  414. # lane_dist_df.columns = ['simFrame', 'line_dist']
  415. # # lane_dist_df = lane_dist_df[['simTime', 'simFrame', 'line_dist']]
  416. # # lane_dist_df.columns = ['simTime', 'simFrame', 'line_dist']
  417. # lane_dist_df = lane_dist_df.drop_duplicates()
  418. # ldw_status_df = self.df[self.df['playerId'] == 1].copy()[['simTime', 'simFrame', 'LDW_status']].drop_duplicates()
  419. # ldw_status_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy().drop_duplicates()
  420. # ldw_df = pd.merge_asof(ldw_status_df, lane_dist_df, on='simFrame', direction='nearest')
  421. # ldw_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
  422. # calculate max DLC
  423. warning_dist_max = ldw_df[ldw_df['LDW_status'] == "Active"]['line_dist'].max()
  424. # warning_dist_max = ldw_df[ldw_df['LDW_status'] == 3]['line_dist'].max()
  425. self.result['预警时距离车道线最大距离'] = warning_dist_max
  426. # count miss warning
  427. miss_warning_df = ldw_df[(ldw_df['line_dist'] <= 0.4) & (ldw_df['LDW_status'] != "Active")]
  428. miss_warning_frame_list = miss_warning_df['simFrame'].values.tolist()
  429. miss_warning_count = self._continous_judge(miss_warning_frame_list)
  430. self.result['车道偏离漏预警次数'] = miss_warning_count
  431. # count false warning
  432. false_warning_df = ldw_df[(ldw_df['line_dist'] >= 0.4) & (ldw_df['LDW_status'] == "Active")]
  433. false_warning_frame_list = false_warning_df['simFrame'].values.tolist()
  434. false_warning_count = self._continous_judge(false_warning_frame_list)
  435. self.result['车道偏离误预警次数'] = false_warning_count
  436. def _autonomous_emergency_brake(self):
  437. """
  438. 刹得准、刹得稳
  439. 碰撞预警 + 紧急制动
  440. <=4s 预警>1s,制动<3s
  441. 触发目标类型
  442. 目标类型准确度
  443. 目标为车辆的速度降:
  444. 弱势目标的速度降:
  445. 触发时TTC<4s
  446. 一级制动减速度:4m/s^2
  447. 二级制动减速度:10m/s^2
  448. 刹停行驶距离:10-20m
  449. 刹停时前车距离:0.8-1.5m
  450. 误触发次数:频率 n/10万km
  451. 漏触发:
  452. 能绕开就没必要刹停:开始刹车时距离远近
  453. Returns:
  454. """
  455. # ego_df = self.df[self.df['playerId'] == 1]
  456. df = self.ego_df[self.ego_df['Aeb_status'] == 'Active']
  457. # df = self.ego_df[self.ego_df['Aeb_status'] == 0]
  458. # calculate brakeDecelerate
  459. df['lon_accel'] = df['accelX'] * np.cos(df['posH']) + df['accelY'] * np.sin(df['posH'])
  460. decelerate_max = df['lon_accel'].min()
  461. # calculate brake stop travelDist
  462. df_dist = df[df['lon_accel'] < 0]
  463. frame_list = df_dist['simFrame'].to_list()
  464. travelDist_list = df_dist['travelDist'].to_list()
  465. travelDist_group = []
  466. start = 0
  467. for i in range(1, len(frame_list)):
  468. if frame_list[i] - frame_list[i - 1] > 5:
  469. end = i - 1
  470. # travelDist_group.append(travelDist_list[start:end])
  471. travelDist_group.append(travelDist_list[end] - travelDist_list[start])
  472. start = i
  473. end = -1
  474. travelDist_group.append(travelDist_list[end] - travelDist_list[start])
  475. brake_distance_max = max(travelDist_group) if travelDist_group else np.inf
  476. self.result['brakeDecelerate'] = abs(round(decelerate_max, 4))
  477. self.result['brakeDistance'] = round(brake_distance_max, 4)
  478. def _integrated_cruise_assist(self, df):
  479. df_LCC = df[df['ICA_status'].isin(['LLC_follow_vehicle', 'LLC_follow_line'])].copy()
  480. self.LCC_line_dist = df_LCC['line_dist'].to_list()
  481. self.result['LCClaneDistance'] = min(self.LCC_line_dist) if self.LCC_line_dist else self.optimal_dict[
  482. 'LCClaneDistance']
  483. self.LCC_center_dist = df_LCC['laneOffset'].to_list()
  484. self.LCC_center_time_list = df_LCC['simTime'].to_list()
  485. self.LCC_center_frame_list = df_LCC['simFrame'].to_list()
  486. if not self.LCC_center_dist:
  487. self.result['LCCcenterDistanceExpectation'] = 0
  488. self.result['LCCcenterDistanceStandardDeviation'] = 0
  489. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  490. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  491. self.result['LCCcenterDistanceMax'] = 0
  492. self.result['LCCcenterDistanceMin'] = 0
  493. self.result['LCCcenterDistanceFrequency'] = 0
  494. self.result['LCCcenterDistanceRange'] = 0
  495. else:
  496. center_dist = [abs(x) for x in self.LCC_center_dist]
  497. # 车道中心线横向距离分布期望与标准差
  498. E_lane_center_dist = np.mean(center_dist)
  499. D_lane_center_dist = np.std(center_dist)
  500. # 车道中心线横向距离分布极值
  501. center_dist = np.array(center_dist)
  502. extrmax_index = sg.argrelmax(center_dist)[0]
  503. extrmin_index = sg.argrelmin(center_dist)[0]
  504. extreme_max_value = center_dist[extrmax_index]
  505. extreme_min_value = center_dist[extrmin_index]
  506. # 横向相对位置震荡频率
  507. # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
  508. length1 = len(extrmax_index) # 极大值列表长度
  509. length2 = len(extrmin_index) # 极小值列表长度
  510. # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
  511. FREQUENCY = 100
  512. time_diff = 1.0 / FREQUENCY
  513. period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
  514. period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
  515. try:
  516. period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
  517. except ZeroDivisionError:
  518. period = 0
  519. frequency = 1 / period if period else 0
  520. # length = min(len(extreme_max_value), len(extreme_min_value))
  521. # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
  522. # frequency = 1 / period
  523. # 横向相对位置震荡极差
  524. length = min(len(extreme_max_value), len(extreme_min_value))
  525. extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
  526. self.result['LCCcenterDistanceExpectation'] = E_lane_center_dist
  527. self.result['LCCcenterDistanceStandardDeviation'] = D_lane_center_dist
  528. # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
  529. # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
  530. self.result['LCCcenterDistanceMax'] = center_dist.max()
  531. self.result['LCCcenterDistanceMin'] = center_dist.min()
  532. self.result['LCCcenterDistanceFrequency'] = abs(frequency)
  533. self.result['LCCcenterDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
  534. df_LC = df[df['ICA_status'] == 'Only_Longitudinal_Control'].copy()
  535. v_max = df_LC['v'].max()
  536. v_min = df_LC['v'].min()
  537. self.result['OLC_v_deviation'] = v_max - v_min
  538. def _function_statistic(self):
  539. """
  540. """
  541. ACC_optimal_list = []
  542. if "function_ACC" in self.type_list:
  543. for function_type in self.type_list:
  544. if "ICA" in function_type:
  545. continue
  546. if len(self.obj_id_list) > 1:
  547. # 给定的双层列表
  548. follow_time_ranges = self.status_trigger_dict['ACC']['ACC_active_time']
  549. # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
  550. # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
  551. filtered_df = pd.DataFrame(columns=self.df.columns)
  552. for start, end in follow_time_ranges:
  553. mask = (self.df['simTime'] >= start) & (self.df['simTime'] <= end)
  554. if filtered_df is None:
  555. filtered_df = self.df[mask]
  556. else:
  557. filtered_df = pd.concat([filtered_df, self.df[mask]])
  558. # 上述方式可能引入重复行,使用drop_duplicates去重
  559. df_follow = filtered_df.drop_duplicates()
  560. df_stop_and_go = df_follow.copy()
  561. # 重置索引(如果需要)
  562. # filtered_df.reset_index(drop=True, inplace=True)
  563. # df_follow111 = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
  564. self._following(df_follow)
  565. # df_stop_and_go = self.df[
  566. # self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
  567. self._stop_and_go(df_stop_and_go)
  568. if df_follow.empty and df_stop_and_go.empty:
  569. self.flag_type_dict['function_ACC'] = False
  570. else:
  571. self.flag_type_dict['function_ACC'] = True
  572. else:
  573. ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
  574. if "functionLKA" in self.type_list:
  575. self._line_dist_merge()
  576. # df_line_dist = self.df[self.df['LKA_status'] == 3]
  577. # 给定的双层列表
  578. follow_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time']
  579. # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
  580. # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
  581. filtered_df = pd.DataFrame(columns=self.ego_df.columns)
  582. for start, end in follow_time_ranges:
  583. mask = (self.ego_df['simTime'] >= start) & (self.ego_df['simTime'] <= end)
  584. if filtered_df is None:
  585. filtered_df = self.ego_df[mask]
  586. else:
  587. filtered_df = pd.concat([filtered_df, self.ego_df[mask]])
  588. # 上述方式可能引入重复行,使用drop_duplicates去重
  589. df_lka = filtered_df.drop_duplicates()
  590. # df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"]
  591. if df_lka.empty:
  592. self.flag_type_dict['functionLKA'] = False
  593. else:
  594. self.flag_type_dict['functionLKA'] = True
  595. self._lane_transverse_distance(df_lka)
  596. self._lateral_dist_of_center_line(df_lka)
  597. # if "functionLDW" in self.type_list:
  598. # df_ldw = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
  599. # self._lane_departure_warning(df_ldw)
  600. # if "functionAEB" in self.type_list:
  601. # self._autonomous_emergency_brake()
  602. # if self.df['ICA_status'].nunique() != 1:
  603. # self.ICA_FLAG = True
  604. # # if "functionICA" in self.type_list:
  605. # # self._integrated_cruise_assist(self.ego_df)
  606. # # ACC
  607. # if len(self.obj_id_list) > 1:
  608. # df_follow = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3,对应Active
  609. # self._following(df_follow)
  610. #
  611. # df_stop_and_go = self.df[
  612. # self.df['ACC_status'].isin(["Shut_off", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
  613. # self._stop_and_go(df_stop_and_go)
  614. #
  615. # else:
  616. # ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
  617. #
  618. # # LKA
  619. # self._line_dist_merge()
  620. # # df_line_dist = self.df[self.df['LKA_status'] == 3]
  621. # df_lka = self.ego_df[self.ego_df['ICA_status'] == "LLC_Follow_Line"]
  622. # self._lane_transverse_distance(df_lka)
  623. # self._lateral_dist_of_center_line(df_lka)
  624. arr_func = [value for key, value in self.result.items() if key in self.metric_list]
  625. # arr_func = list(self.result.values())
  626. if "function_ACC" in self.type_list and len(self.obj_id_list) == 1:
  627. arr_func = ACC_optimal_list + arr_func
  628. return arr_func
  629. def custom_metric_param_parser(self, param_list):
  630. """
  631. param_dict = {
  632. "paramA" [
  633. {
  634. "kind": "-1",
  635. "optimal": "1",
  636. "multiple": ["0.5","5"],
  637. "spare1": null,
  638. "spare2": null
  639. }
  640. ]
  641. }
  642. """
  643. kind_list = []
  644. optimal_list = []
  645. multiple_list = []
  646. spare_list = []
  647. # spare1_list = []
  648. # spare2_list = []
  649. for i in range(len(param_list)):
  650. kind_list.append(int(param_list[i]['kind']))
  651. optimal_list.append(float(param_list[i]['optimal']))
  652. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  653. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  654. # spare1_list.append(param_list[i]['spare1'])
  655. # spare2_list.append(param_list[i]['spare2'])
  656. result = {
  657. "kind": kind_list,
  658. "optimal": optimal_list,
  659. "multiple": multiple_list,
  660. "spare": spare_list,
  661. # "spare1": spare1_list,
  662. # "spare2": spare2_list
  663. }
  664. return result
  665. def custom_metric_score(self, metric, value, param_list):
  666. """
  667. """
  668. param = self.custom_metric_param_parser(param_list)
  669. self.custom_param_dict[metric] = param
  670. score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
  671. score_sub = score_model.cal_score()
  672. score = sum(score_sub) / len(score_sub)
  673. return score
  674. def func_score(self):
  675. score_metric_dict = {}
  676. score_type_dict = {}
  677. arr_func = self._function_statistic()
  678. print("\n[功能性表现及得分情况]", arr_func)
  679. print("功能性各指标值:", [round(num, 2) for num in arr_func])
  680. if arr_func:
  681. arr_func = np.array([arr_func])
  682. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_func)
  683. score_sub = score_model.cal_score()
  684. score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
  685. score_metric = [round(num, 2) for num in score_sub]
  686. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  687. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  688. flag_custom_type_dict = {}
  689. for metric in self.custom_metric_list:
  690. value = self.custom_data[metric]['value']
  691. param_list = self.customMetricParam[metric]
  692. score = self.custom_metric_score(metric, value, param_list)
  693. score_metric_dict[metric] = round(score, 2)
  694. if 'statusFlag' in self.custom_data[metric].keys():
  695. custom_type, flag_custom_type = next(iter(self.custom_data[metric]['statusFlag'].items()))
  696. if (custom_type in flag_custom_type_dict) and (flag_custom_type == True):
  697. flag_custom_type_dict[custom_type] = True
  698. else:
  699. flag_custom_type_dict[custom_type] = flag_custom_type
  700. #
  701. #
  702. # update metric_list and metric_dict
  703. #
  704. #
  705. flag_custom_type_dict1 = {k: v for k, v in flag_custom_type_dict.items() if v == True}
  706. flag_builtin_type_dict2 = {k: v for k, v in self.flag_type_dict.items() if v == True}
  707. flag_builtin_type_dict2.update(flag_custom_type_dict1)
  708. self.type_list = list(flag_builtin_type_dict2.keys())
  709. print("self.type_list is", self.type_list)
  710. self.metric_list = []
  711. print("self.metric_dict is", self.metric_dict)
  712. for type in self.type_list:
  713. self.metric_list.extend(self.metric_dict[type])
  714. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  715. score_metric = list(score_metric_dict.values())
  716. # no score in list
  717. if not score_metric:
  718. return 0, {}, {}
  719. if self.weight_custom: # 自定义权重
  720. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  721. self.weight_dict}
  722. for type in self.type_list:
  723. type_score = sum(
  724. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  725. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  726. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  727. score_type_dict}
  728. score_function = sum(score_type_with_weight_dict.values())
  729. else: # 客观赋权
  730. self.weight_list = cal_weight_from_80(score_metric)
  731. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  732. score_function = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  733. for type in self.type_list:
  734. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  735. for key, value in self.weight_dict.items():
  736. if key in self.metric_dict[type]:
  737. # self.weight_dict[key] = round(value / type_weight, 4)
  738. self.weight_dict[key] = value / type_weight
  739. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  740. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  741. type_priority_list = [value for key, value in self.priority_dict.items() if
  742. key in self.metric_dict[type]]
  743. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  744. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  745. for key in self.weight_dict:
  746. self.weight_dict[key] = round(self.weight_dict[key], 4)
  747. score_type = list(score_type_dict.values())
  748. self.weight_type_list = cal_weight_from_80(score_type)
  749. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  750. score_function = round(score_function, 2)
  751. print(f"功能性得分为:{score_function:.2f}分。")
  752. print(f"功能性各类型得分为:{score_type_dict}分。")
  753. print(f"功能性各指标得分为:{score_metric_dict}。")
  754. return score_function, score_type_dict, score_metric_dict
  755. def continuous_group(self, df):
  756. time_list = df['simTime'].values.tolist()
  757. frame_list = df['simFrame'].values.tolist()
  758. group_time = []
  759. group_frame = []
  760. sub_group_time = []
  761. sub_group_frame = []
  762. for i in range(len(frame_list)):
  763. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  764. sub_group_time.append(time_list[i])
  765. sub_group_frame.append(frame_list[i])
  766. else:
  767. group_time.append(sub_group_time)
  768. group_frame.append(sub_group_frame)
  769. sub_group_time = [time_list[i]]
  770. sub_group_frame = [frame_list[i]]
  771. group_time.append(sub_group_time)
  772. group_frame.append(sub_group_frame)
  773. group_time = [g for g in group_time if len(g) >= 2]
  774. group_frame = [g for g in group_frame if len(g) >= 2]
  775. # 输出图表值
  776. time = [[g[0], g[-1]] for g in group_time]
  777. frame = [[g[0], g[-1]] for g in group_frame]
  778. unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  779. unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  780. unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
  781. return unfunc_df
  782. def unfunctional_follow_v_deviation_df_statistic(self):
  783. unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
  784. 'v_deviation': self.v_deviation_list})
  785. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  786. v_df = unfunc_df[unfunc_df['v_deviation'] > self.optimal_dict['followSpeedDeviation']]
  787. v_df = v_df[['simTime', 'simFrame', 'v_deviation']]
  788. v_follow_df = self.continuous_group(v_df)
  789. # v_follow_df['type'] = 'follow'
  790. # v_follow_df['type'] = 'ACC'
  791. v_follow_df['type'] = f"{self.type_name_dict['function_ACC']}"
  792. self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, v_follow_df], ignore_index=True)
  793. def unfunctional_follow_dist_deviation_df_statistic(self):
  794. unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
  795. 'dist_deviation': self.dist_deviation_list})
  796. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  797. dist_df = unfunc_df[unfunc_df['dist_deviation'] > self.optimal_dict['followDistanceDeviation']] # 阈值由1改为了3
  798. dist_df = dist_df[['simTime', 'simFrame', 'dist_deviation']]
  799. dist_follow_df = self.continuous_group(dist_df)
  800. # v_follow_df['type'] = 'follow'
  801. # dist_follow_df['type'] = 'ACC'
  802. dist_follow_df['type'] = f"{self.type_name_dict['function_ACC']}"
  803. self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, dist_follow_df], ignore_index=True)
  804. def unfunctional_lane_df_statistic(self):
  805. unfunc_df = pd.DataFrame(
  806. {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
  807. 'center_dist': self.center_dist_with_nan})
  808. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  809. unfunc_df = unfunc_df.dropna(subset=['center_dist'])
  810. lane_df = unfunc_df[abs(unfunc_df['center_dist']) > self.optimal_dict['centerDistanceMax']]
  811. lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
  812. dist_lane_df = self.continuous_group(lane_df)
  813. # dist_lane_df['type'] = 'lane'
  814. # dist_lane_df['type'] = 'LKA'
  815. dist_lane_df['type'] = f"{self.type_name_dict['functionLKA']}"
  816. self.unfunc_lane_df = pd.concat([self.unfunc_lane_df, dist_lane_df], ignore_index=True)
  817. # def zip_time_pairs(self, zip_list, upper_limit=9999):
  818. # zip_time_pairs = zip(self.time_list, zip_list)
  819. # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
  820. # return zip_vs_time
  821. def zip_time_pairs(self, zip_list):
  822. zip_time_pairs = zip(self.time_list, zip_list)
  823. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  824. return zip_vs_time
  825. def _get_weight_distribution(self, dimension):
  826. # get weight distribution
  827. weight_distribution = {}
  828. weight_distribution["name"] = self.config.dimension_name[dimension]
  829. for type in self.type_list:
  830. type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
  831. self.weight_dict.items() if
  832. key in self.metric_dict[type]}
  833. weight_distribution_type = {
  834. "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
  835. "indexes": type_weight_indexes_dict
  836. }
  837. weight_distribution[type] = weight_distribution_type
  838. return weight_distribution
  839. def report_statistic(self):
  840. # report_dict = {
  841. # "name": "功能性",
  842. # "weight": f"{self.weight * 100:.2f}%",
  843. # "weightDistribution": weight_distribution,
  844. # "score": score_function,
  845. # "level": grade_function,
  846. # 'score_type': score_type,
  847. # 'score_metric': score_metric,
  848. # 'followStopCount': self.follow_stop_count,
  849. # "description1": func_description1,
  850. # "description2": func_description2,
  851. #
  852. # "functionACC": acc_dict,
  853. # "functionLKA": lka_dict,
  854. #
  855. # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
  856. # "accData": [lat_acc_vs_time, lon_acc_vs_time],
  857. #
  858. # }
  859. brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
  860. throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
  861. steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
  862. # common parameter calculate
  863. brake_vs_time = self.zip_time_pairs(brakePedal_list)
  864. throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
  865. steering_vs_time = self.zip_time_pairs(steeringWheel_list)
  866. report_dict = {
  867. "name": self.config.dimension_name["function"],
  868. "weight": f"{self.weight * 100:.2f}%",
  869. 'followStopCount': self.follow_stop_count,
  870. }
  871. # upper_limit = 40
  872. times_upper = 2
  873. # len_time = len(self.time_list)
  874. duration = self.time_list[-1]
  875. # score_function, score_type, score_metric = self.func_score()
  876. score_function, score_type_dict, score_metric_dict = self.func_score()
  877. # no metric
  878. if not score_metric_dict:
  879. return {}
  880. # get weight distribution
  881. report_dict["weightDistribution"] = self._get_weight_distribution("function")
  882. score_function = int(score_function) if int(score_function) == score_function else round(
  883. score_function, 2)
  884. # score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
  885. # score_metric = [int(n) if int(n) == n else n for key, n in score_metric_dict.items()]
  886. grade_function = score_grade(score_function)
  887. report_dict["score"] = score_function
  888. report_dict["level"] = grade_function
  889. # report_dict["score_type"] = score_type
  890. # report_dict["score_metric"] = score_metric
  891. # speed data
  892. ego_speed_list = self.ego_df['v'].values.tolist()
  893. ego_speed_vs_time = self.zip_time_pairs(ego_speed_list)
  894. obj_speed_vs_time = []
  895. rel_speed_vs_time = []
  896. # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  897. # accData
  898. lat_acc_list = self.ego_df['lat_acc'].values.tolist()
  899. lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
  900. lon_acc_list = self.ego_df['lon_acc'].values.tolist()
  901. lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
  902. # acc data
  903. # lat_acc_list = self.ego_df['lat_acc'].values.tolist()
  904. # lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
  905. # lon_acc_list = self.ego_df['lon_acc'].values.tolist()
  906. # lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
  907. # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time]
  908. # statistic type report infos
  909. unfunc_metric_list = []
  910. func_over_optimal = []
  911. # function description
  912. func_type_list = []
  913. unfunc_type_list = []
  914. type_details_dict = {}
  915. for type in self.type_list:
  916. if type == "function_ACC":
  917. builtin_graph_dict = {}
  918. custom_graph_dict = {}
  919. if len(self.obj_id_list) == 1:
  920. follow_description1 = "无目标车数据可计算;"
  921. follow_description2 = ""
  922. follow_description3 = "无目标车数据可计算;"
  923. follow_description4 = "无目标车数据可计算;"
  924. acc_dict_indexes = {}
  925. for metric in self.metric_dict[type]:
  926. acc_dict_indexes[metric] = {
  927. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  928. "name": f"{self.name_dict[metric]}",
  929. "score": 80,
  930. "avg": "-",
  931. "max": "-",
  932. "min": "-",
  933. }
  934. if self.kind_dict[metric] == -1:
  935. acc_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]"
  936. elif self.kind_dict[metric] == 1:
  937. acc_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)"
  938. elif self.kind_dict[metric] == 0:
  939. acc_dict_indexes[metric][
  940. "range"] = f"[{self.optimal_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal_dict[metric] * self.multiple_dict[metric][1]}]"
  941. acc_dict = {
  942. "name": f"{self.type_name_dict[type]}",
  943. "score": 80,
  944. "level": "良好",
  945. "description1": follow_description1,
  946. "description2": follow_description2,
  947. "description3": follow_description3,
  948. "description4": follow_description4,
  949. "noObjectCar": True,
  950. "indexes": acc_dict_indexes,
  951. "builtin": {},
  952. "custom": {}
  953. }
  954. # follow cruise description
  955. func_follow_metric_list = []
  956. unfunc_follow_metric_list = []
  957. str_follow_over_optimal = ''
  958. distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
  959. else:
  960. acc_dict = {
  961. "name": f"{self.type_name_dict[type]}",
  962. "noObjectCar": False,
  963. }
  964. # follow dict
  965. score_follow = score_type_dict[type]
  966. grade_follow = score_grade(score_follow)
  967. acc_dict["score"] = score_follow
  968. acc_dict["level"] = grade_follow
  969. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  970. # follow cruise description
  971. func_follow_metric_list = []
  972. unfunc_follow_metric_list = []
  973. for metric in self.metric_dict[type]:
  974. unfunc_follow_metric_list.append(metric) if score_metric_dict[
  975. metric] < 80 else func_follow_metric_list.append(
  976. metric)
  977. str_follow_over_optimal = ''
  978. if not unfunc_follow_metric_list:
  979. str_func_follow_metric = string_concatenate(func_follow_metric_list)
  980. follow_description1 = f"{str_func_follow_metric}指标均表现良好"
  981. else:
  982. for metric in unfunc_follow_metric_list:
  983. if metric in self.bulitin_metric_list:
  984. value = self.result[metric]
  985. if self.kind_dict[metric] == -1:
  986. metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
  987. metric]) * 100
  988. elif self.kind_dict[metric] == 1:
  989. metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
  990. metric]) * 100
  991. elif self.kind_dict[metric] == 0:
  992. metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
  993. metric]) * 100
  994. else:
  995. value = self.custom_data[metric]["value"][0]
  996. if self.custom_param_dict[metric]['kind'][0] == -1:
  997. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  998. self.custom_param_dict[metric]['optimal'][0]) * 100
  999. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1000. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  1001. self.custom_param_dict[metric]['optimal'][0]) * 100
  1002. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1003. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  1004. self.custom_param_dict[metric]['optimal'][0]) * 100
  1005. # str_follow_over_optimal += f"{metric}为{value:.2f},超过合理范围{metric_over_optimal:.2f}%;"
  1006. # str_follow_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  1007. str_follow_over_optimal += f"{metric}为{round(value, 2)}{self.unit_dict[metric]},超过合理范围{round(metric_over_optimal, 2)}%;"
  1008. str_follow_over_optimal = str_follow_over_optimal[:-1] if str_follow_over_optimal else ""
  1009. str_func_follow_metric = string_concatenate(func_follow_metric_list)
  1010. str_unfunc_follow_metric = string_concatenate(unfunc_follow_metric_list)
  1011. if not func_follow_metric_list:
  1012. follow_description1 = f"{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
  1013. else:
  1014. follow_description1 = f"{str_func_follow_metric}指标表现良好,{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
  1015. # for follow_description2
  1016. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  1017. follow_description2 = f"自车在行驶时有{round(follow_duration, 2)}s处于跟车状态,"
  1018. follow_description3 = ""
  1019. if "followResponseTime" in self.metric_list:
  1020. # for follow_description3
  1021. cnt3_1 = len(self.follow_stop_time_start_list)
  1022. tmp3_list = [x for x in self.follow_stop_time_start_list if
  1023. x > self.optimal_dict['followResponseTime']]
  1024. cnt3_2 = len(tmp3_list)
  1025. percent3 = 0 if cnt3_1 == 0 else round(cnt3_2 / cnt3_1 * 100, 2)
  1026. follow_description3 = f"起停跟车响应时间有{cnt3_2}次超出合理范围,占比为{percent3}%;"
  1027. follow_description4 = ""
  1028. if "followDistanceDeviation" in self.metric_list:
  1029. # for follow_description4
  1030. cnt4_1 = len(self.stop_distance_list)
  1031. tmp4_list = [x for x in self.stop_distance_list if
  1032. x < self.optimal_dict['followDistanceDeviation']]
  1033. cnt4_2 = len(tmp4_list)
  1034. percent4 = 0 if cnt4_1 == 0 else round(cnt4_2 / cnt4_1 * 100, 2)
  1035. follow_description4 = f"在本次测试中,跟车状态下目标车共发生了{cnt4_1}次停车,有{cnt4_2}次超出合理范围,占比为{percent4}%;"
  1036. # distance_list = obj_df['dist'].values.tolist()
  1037. # distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
  1038. #
  1039. # acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
  1040. # acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
  1041. # acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
  1042. # acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
  1043. # common parameter calculate
  1044. obj_df = self.df[self.df['playerId'] == 2]
  1045. obj_speed_list = obj_df['v'].values.tolist()
  1046. obj_speed_vs_time = self.zip_time_pairs(obj_speed_list)
  1047. rel_speed_vs_time = self.zip_time_pairs(self.v_relative_list_full_time)
  1048. distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
  1049. # acc_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  1050. # acc_dict["disData"] = distance_vs_time
  1051. unfunc_follow_df = self.unfunc_follow_df.copy()
  1052. unfunc_follow_df['type'] = "origin"
  1053. unfunc_follow_slices = unfunc_follow_df.to_dict('records')
  1054. # acc_dict["speMarkLine"] = unfunc_follow_slices
  1055. # acc_dict["disMarkLine"] = unfunc_follow_slices
  1056. distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
  1057. # function ACC data
  1058. acc_dict_indexes = {}
  1059. for metric in self.metric_dict[type]:
  1060. if metric == "followSpeedDeviation":
  1061. self.unfunctional_follow_v_deviation_df_statistic()
  1062. unfunc_v_df = self.unfunc_follow_df.copy()
  1063. unfunc_v_df.loc[unfunc_v_df['type'] != 'ACC', 'type'] = "origin"
  1064. # unfunc_v_df.loc[unfunc_v_df['type'] == 'time', 'type'] = "time"
  1065. unfunc_v_slices = unfunc_v_df.to_dict('records')
  1066. df1 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
  1067. df1['v_time'] = df1['end_time'] - df1['start_time']
  1068. devi_v_time = df1['v_time'].sum()
  1069. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  1070. percent2_1 = devi_v_time / follow_duration * 100 if follow_duration else 0
  1071. if devi_v_time != 0:
  1072. follow_description2 += f"跟车状态下自车和目标车的速度差共有{round(devi_v_time, 2)}s超出合理范围,占比为{round(percent2_1, 2)}%;"
  1073. else:
  1074. follow_description2 += "跟车状态下自车和目标车的速度差均在合理范围内;"
  1075. v_deviation_list = [x for x in self.v_deviation_list if not np.isnan(x)]
  1076. acc_dict_indexes["followSpeedDeviation"] = {
  1077. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1078. "name": f"{self.name_dict[metric]}",
  1079. "score": score_metric_dict[
  1080. "followSpeedDeviation"],
  1081. "avg": round(np.mean(v_deviation_list), 2) if v_deviation_list else '-',
  1082. "max": round(max(v_deviation_list), 2) if v_deviation_list else '-',
  1083. "min": round(min(v_deviation_list), 2) if v_deviation_list else '-',
  1084. # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]"
  1085. "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]"
  1086. }
  1087. fsd_data = {
  1088. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1089. "name": f"{self.name_dict[metric]}",
  1090. "data": rel_speed_vs_time,
  1091. "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]",
  1092. # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]",
  1093. # "markLine": unfunc_v_slices,
  1094. # "markLine2": [-1 * self.optimal_dict['followSpeedDeviation'],
  1095. # self.optimal_dict['followSpeedDeviation']],
  1096. }
  1097. builtin_graph_dict["followSpeedDeviation"] = fsd_data
  1098. if metric == "followDistanceDeviation":
  1099. self.unfunctional_follow_dist_deviation_df_statistic()
  1100. unfunc_dist_df = self.unfunc_follow_df.copy()
  1101. unfunc_dist_df.loc[unfunc_dist_df['type'] != 'ACC', 'type'] = "origin"
  1102. # unfunc_dist_df.loc[unfunc_dist_df['type'] == 'distance', 'type'] = "distance"
  1103. unfunc_dist_slices = unfunc_dist_df.to_dict('records')
  1104. df2 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
  1105. df2['dist_time'] = df2['end_time'] - df2['start_time']
  1106. devi_dist_time = df2['dist_time'].sum()
  1107. follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
  1108. percent2_2 = devi_dist_time / follow_duration * 100 if follow_duration else 0
  1109. if devi_dist_time != 0:
  1110. follow_description2 += f"跟车状态下自车和目标车的距离差共有{round(devi_dist_time, 2)}s超出合理范围,占比为{round(percent2_2, 2)}%;"
  1111. else:
  1112. follow_description2 += "跟车状态下自车和目标车的距离差均在合理范围内;"
  1113. dist_deviation_list = [x for x in self.dist_deviation_list if not np.isnan(x)]
  1114. acc_dict_indexes["followDistanceDeviation"] = {
  1115. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1116. "name": f"{self.name_dict[metric]}",
  1117. "score": score_metric_dict["followDistanceDeviation"],
  1118. "avg": round(np.mean(dist_deviation_list), 2) if dist_deviation_list else '-',
  1119. "max": round(max(dist_deviation_list), 2) if dist_deviation_list else '-',
  1120. "min": round(min(dist_deviation_list), 2) if dist_deviation_list else '-',
  1121. "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]"
  1122. }
  1123. fdd_data = {
  1124. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1125. "name": f"{self.name_dict[metric]}",
  1126. "data": distance_deviation_vs_time,
  1127. "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]",
  1128. # "markLine": unfunc_dist_slices,
  1129. # "markLine2": [self.optimal_dict['followDistanceDeviation']],
  1130. }
  1131. builtin_graph_dict["followDistanceDeviation"] = fdd_data
  1132. if metric == "followStopDistance":
  1133. acc_dict_indexes["followStopDistance"] = {
  1134. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1135. "name": f"{self.name_dict[metric]}",
  1136. "score": score_metric_dict["followStopDistance"],
  1137. "avg": round(np.mean(self.stop_distance_list), 2) if self.stop_distance_list else '-',
  1138. "max": round(max(self.stop_distance_list), 2) if self.stop_distance_list else '-',
  1139. "min": round(min(self.stop_distance_list), 2) if self.stop_distance_list else '-',
  1140. "range": f"[{self.optimal_dict['followStopDistance']}, inf)"
  1141. }
  1142. # sdd_data = {
  1143. # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1144. # "name": f"{self.name_dict[metric]}",
  1145. # "data": self.stop_distance_list,
  1146. # "range": f"[{self.optimal_dict['followStopDistance']}, inf)",
  1147. # # "ref": [self.optimal_dict['followStopDistance'] - 1,
  1148. # # self.optimal_dict['followStopDistance']],
  1149. # }
  1150. # builtin_graph_dict["followStopDistance"] = sdd_data
  1151. if metric == "followResponseTime":
  1152. acc_dict_indexes["followResponseTime"] = {
  1153. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1154. "name": f"{self.name_dict[metric]}",
  1155. "score": score_metric_dict["followResponseTime"],
  1156. "avg": round(np.mean(self.follow_stop_time_start_list),
  1157. 2) if self.follow_stop_time_start_list else '-',
  1158. "max": round(max(self.follow_stop_time_start_list),
  1159. 2) if self.follow_stop_time_start_list else '-',
  1160. "min": round(min(self.follow_stop_time_start_list),
  1161. 2) if self.follow_stop_time_start_list else '-',
  1162. "range": f"[0, {self.optimal_dict['followResponseTime']}]"
  1163. }
  1164. # rt_data = {
  1165. # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1166. # "name": f"{self.name_dict[metric]}",
  1167. # "data": self.follow_stop_time_start_list,
  1168. # # "range": f"[0, {self.optimal_dict['followResponseTime']})"
  1169. # }
  1170. # builtin_graph_dict["followResponseTime"] = rt_data
  1171. if metric not in self.bulitin_metric_list:
  1172. acc_dict_indexes[metric] = {
  1173. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1174. "name": f"{self.name_dict[metric]}",
  1175. "score": score_metric_dict[metric],
  1176. "avg": self.custom_data[metric]['tableData']['avg'],
  1177. "max": self.custom_data[metric]['tableData']['max'],
  1178. "min": self.custom_data[metric]['tableData']['min'],
  1179. }
  1180. if self.custom_param_dict[metric]['kind'][0] == -1:
  1181. acc_dict_indexes[metric][
  1182. "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  1183. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1184. acc_dict_indexes[metric][
  1185. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  1186. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1187. acc_dict_indexes[metric][
  1188. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
  1189. custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
  1190. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1191. acc_dict["indexes"] = acc_dict_indexes
  1192. acc_dict["builtin"] = builtin_graph_dict
  1193. acc_dict["custom"] = custom_graph_dict
  1194. acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
  1195. acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
  1196. acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
  1197. acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
  1198. type_details_dict[type] = acc_dict
  1199. unfunc_metric_list += unfunc_follow_metric_list
  1200. func_over_optimal.append(str_follow_over_optimal)
  1201. elif type == "functionLKA":
  1202. lka_dict = {
  1203. "name": f"{self.type_name_dict[type]}",
  1204. }
  1205. builtin_graph_dict = {}
  1206. custom_graph_dict = {}
  1207. # get score and grade
  1208. score_lane = score_type_dict["functionLKA"]
  1209. grade_lane = score_grade(score_lane)
  1210. lka_dict['score'] = score_lane
  1211. lka_dict['level'] = grade_lane
  1212. # unfunc_type_list.append('车道保持') if score_type_dict["functionLKA"] < 80 else func_type_list.append(
  1213. # '车道保持')
  1214. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  1215. # lane keep description
  1216. func_lane_metric_list = []
  1217. unfunc_lane_metric_list = []
  1218. for metric in self.metric_dict[type]:
  1219. unfunc_lane_metric_list.append(metric) if score_metric_dict[
  1220. metric] < 80 else func_lane_metric_list.append(
  1221. metric)
  1222. lka_dict_indexes = {}
  1223. for metric in self.metric_dict[type]:
  1224. if metric == "laneDistance":
  1225. lka_dict_indexes["laneDistance"] = {
  1226. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1227. "name": f"{self.name_dict[metric]}",
  1228. "score": score_metric_dict["laneDistance"],
  1229. "avg": round(np.mean(self.line_dist), 2) if self.line_dist else "-",
  1230. "max": round(max(self.line_dist), 2) if self.line_dist else "-",
  1231. "min": round(min(self.line_dist), 2) if self.line_dist else "-",
  1232. "range": f"[{self.optimal_dict['laneDistance']}, 1.875]"
  1233. }
  1234. if metric == "centerDistanceExpectation":
  1235. lka_dict_indexes["centerDistanceExpectation"] = {
  1236. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1237. "name": f"{self.name_dict[metric]}",
  1238. "score": score_metric_dict['centerDistanceExpectation'],
  1239. "avg": round(self.result["centerDistanceExpectation"], 2) if self.center_dist else "-",
  1240. "max": "-",
  1241. "min": "-",
  1242. "range": f"[0, {self.optimal_dict['centerDistanceExpectation']}]"
  1243. }
  1244. if metric == "centerDistanceStandardDeviation":
  1245. lka_dict_indexes["centerDistanceStandardDeviation"] = {
  1246. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1247. "name": f"{self.name_dict[metric]}",
  1248. "score": score_metric_dict['centerDistanceStandardDeviation'],
  1249. "avg": round(self.result["centerDistanceStandardDeviation"],
  1250. 2) if self.center_dist else "-",
  1251. "max": "-",
  1252. "min": "-",
  1253. "range": f"[0, {self.optimal_dict['centerDistanceStandardDeviation']}]"
  1254. }
  1255. if metric == "centerDistanceMax":
  1256. lka_dict_indexes["centerDistanceMax"] = {
  1257. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1258. "name": f"{self.name_dict[metric]}",
  1259. "score": score_metric_dict['centerDistanceMax'],
  1260. "avg": "-",
  1261. "max": round(self.result["centerDistanceMax"], 2) if self.center_dist else "-",
  1262. "min": "-",
  1263. "range": f"[0, {self.optimal_dict['centerDistanceMax']}]"
  1264. }
  1265. if metric == "centerDistanceMin":
  1266. lka_dict_indexes["centerDistanceMin"] = {
  1267. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1268. "name": f"{self.name_dict[metric]}",
  1269. "score": score_metric_dict['centerDistanceMin'],
  1270. "avg": "-",
  1271. "max": "-",
  1272. "min": round(self.result["centerDistanceMin"], 2) if self.center_dist else "-",
  1273. "range": f"[0, {self.optimal_dict['centerDistanceMin']}]"
  1274. }
  1275. if metric == "centerDistanceFrequency":
  1276. lka_dict_indexes["centerDistanceFrequency"] = {
  1277. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1278. "name": f"{self.name_dict[metric]}",
  1279. "score": score_metric_dict['centerDistanceFrequency'],
  1280. "avg": round(self.result["centerDistanceFrequency"], 2) if self.center_dist else "-",
  1281. "max": "-",
  1282. "min": "-",
  1283. "range": f"[0, {self.optimal_dict['centerDistanceFrequency']}]"
  1284. }
  1285. if metric == "centerDistanceRange":
  1286. lka_dict_indexes["centerDistanceRange"] = {
  1287. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1288. "name": f"{self.name_dict[metric]}",
  1289. "score": score_metric_dict['centerDistanceRange'],
  1290. "avg": round(self.result["centerDistanceRange"], 2) if self.center_dist else "-",
  1291. "max": "-",
  1292. "min": "-",
  1293. "range": f"[0, {self.optimal_dict['centerDistanceRange']}]"
  1294. }
  1295. if metric not in self.bulitin_metric_list:
  1296. lka_dict_indexes[metric] = {
  1297. # "name": self.custom_data[metric]['name'],
  1298. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1299. "name": f"{self.name_dict[metric]}",
  1300. "score": score_metric_dict[metric],
  1301. "avg": self.custom_data[metric]['tableData']['avg'],
  1302. "max": self.custom_data[metric]['tableData']['max'],
  1303. "min": self.custom_data[metric]['tableData']['min'],
  1304. }
  1305. if self.custom_param_dict[metric]['kind'][0] == -1:
  1306. lka_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  1307. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1308. lka_dict_indexes[metric][
  1309. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  1310. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1311. lka_dict_indexes[metric][
  1312. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
  1313. custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
  1314. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1315. lka_dict["indexes"] = lka_dict_indexes
  1316. str_lane_over_optimal = ''
  1317. if not unfunc_lane_metric_list:
  1318. str_func_lane_metric = string_concatenate(func_lane_metric_list)
  1319. lane_description1 = f"{str_func_lane_metric}指标均表现良好"
  1320. else:
  1321. for metric in unfunc_lane_metric_list:
  1322. if metric in self.bulitin_metric_list:
  1323. value = self.result[metric]
  1324. if self.kind_dict[metric] == -1:
  1325. metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
  1326. metric]) * 100
  1327. elif self.kind_dict[metric] == 1:
  1328. metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
  1329. metric]) * 100
  1330. elif self.kind_dict[metric] == 0:
  1331. metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
  1332. metric]) * 100
  1333. else:
  1334. value = self.custom_data[metric]["value"][0]
  1335. if self.custom_param_dict[metric]['kind'][0] == -1:
  1336. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  1337. self.custom_param_dict[metric]['optimal'][0]) * 100
  1338. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1339. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  1340. self.custom_param_dict[metric]['optimal'][0]) * 100
  1341. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1342. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  1343. self.custom_param_dict[metric]['optimal'][0]) * 100
  1344. str_lane_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  1345. str_lane_over_optimal = str_lane_over_optimal[:-1] if str_lane_over_optimal else ""
  1346. str_func_lane_metric = string_concatenate(func_lane_metric_list)
  1347. str_unfunc_lane_metric = string_concatenate(unfunc_lane_metric_list)
  1348. if not func_lane_metric_list:
  1349. lane_description1 = f"{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
  1350. else:
  1351. lane_description1 = f"{str_func_lane_metric}指标表现良好,{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
  1352. lane_description2 = ""
  1353. center_distance_vs_time = []
  1354. if "centerDistanceMax" in self.metric_list:
  1355. cnt2_1 = len(self.center_dist)
  1356. tmp2_list = [x for x in self.center_dist if x > self.optimal_dict['centerDistanceMax']]
  1357. cnt2_2 = len(tmp2_list)
  1358. percent2 = 0 if cnt2_1 == 0 else (cnt2_2 / cnt2_1 * 100)
  1359. dist_time = percent2 * duration / 100
  1360. if dist_time != 0:
  1361. lane_description2 = f"距离有{dist_time:.2f}秒超出合理范围,占比为{percent2:.2f}%"
  1362. else:
  1363. lane_description2 = f"距离均在合理范围内,算法车道保持表现良好"
  1364. # lane keep data for graph
  1365. # center_distance_vs_time = self.zip_time_pairs(self.center_dist)
  1366. center_distance_vs_time = self.zip_time_pairs(self.center_dist_with_nan)
  1367. self.unfunctional_lane_df_statistic()
  1368. #
  1369. # lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
  1370. # lka_dict["description2"] = lane_description2
  1371. unfunc_lane_df = self.unfunc_lane_df.copy()
  1372. unfunc_lane_df['type'] = "origin"
  1373. unfunc_lane_slices = unfunc_lane_df.to_dict('records')
  1374. unfunc_lane_dist_df = self.unfunc_lane_df.copy()
  1375. unfunc_lane_dist_df.loc[unfunc_lane_dist_df['type'] != 'LKA', 'type'] = "origin"
  1376. unfunc_lane_dist_slices = unfunc_lane_dist_df.to_dict('records')
  1377. lka_data = {
  1378. "name": "车辆中心线横向距离(m)",
  1379. "data": center_distance_vs_time,
  1380. # "markLine": unfunc_lane_dist_slices
  1381. }
  1382. if 'centerDistanceMax' in self.optimal_dict:
  1383. lka_range = f"[0, {self.optimal_dict['centerDistanceMax']}]"
  1384. elif 'centerDistanceMin' in self.optimal_dict:
  1385. lka_range = f"[0, {self.optimal_dict['centerDistanceMin']}]"
  1386. else:
  1387. lka_range = f"[0, 0.5]"
  1388. lka_data["range"] = lka_range
  1389. builtin_graph_dict["centerDistance"] = lka_data
  1390. lka_dict["builtin"] = builtin_graph_dict
  1391. lka_dict["custom"] = custom_graph_dict
  1392. lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
  1393. lka_dict["description2"] = lane_description2
  1394. # lka_dict["centerDisData"] = center_distance_vs_time
  1395. # lka_dict["centerDisMarkLine"] = unfunc_lane_dist_slices
  1396. type_details_dict[type] = lka_dict
  1397. unfunc_metric_list += unfunc_lane_metric_list
  1398. func_over_optimal.append(str_lane_over_optimal)
  1399. else:
  1400. type_dict = {
  1401. "name": f"{self.type_name_dict[type]}",
  1402. }
  1403. builtin_graph_dict = {}
  1404. custom_graph_dict = {}
  1405. # get score and grade
  1406. score_custom_type = score_type_dict[type]
  1407. grade_custom_type = score_grade(score_custom_type)
  1408. type_dict["score"] = score_custom_type
  1409. type_dict["level"] = grade_custom_type
  1410. # custom type description
  1411. good_custom_metric_list = []
  1412. bad_custom_metric_list = []
  1413. unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
  1414. type_dict_indexes = {}
  1415. for metric in self.metric_dict[type]:
  1416. bad_custom_metric_list.append(metric) if score_metric_dict[
  1417. metric] < 80 else good_custom_metric_list.append(
  1418. metric)
  1419. type_dict_indexes[metric] = {
  1420. # "name": self.custom_data[metric]['name'],
  1421. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  1422. "name": f"{self.name_dict[metric]}",
  1423. "score": score_metric_dict[metric],
  1424. "avg": self.custom_data[metric]['tableData']['avg'],
  1425. "max": self.custom_data[metric]['tableData']['max'],
  1426. "min": self.custom_data[metric]['tableData']['min'],
  1427. }
  1428. if self.custom_param_dict[metric]['kind'][0] == -1:
  1429. type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]:.3f}]"
  1430. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1431. type_dict_indexes[metric][
  1432. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]:.3f}, inf)"
  1433. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1434. type_dict_indexes[metric][
  1435. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]:.3f}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]:.3f}]"
  1436. custom_graph_dict[metric] = self.custom_data[metric]['reportData']
  1437. self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
  1438. type_dict["indexes"] = type_dict_indexes
  1439. str_type_over_optimal = ""
  1440. if not bad_custom_metric_list:
  1441. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  1442. type_description1 = f"{str_good_custom_metric}指标均表现良好"
  1443. type_description2 = f"指标均在合理范围内,算法表现良好"
  1444. else:
  1445. for metric in bad_custom_metric_list:
  1446. value = self.custom_data[metric]["value"][0]
  1447. if self.custom_param_dict[metric]['kind'][0] == -1:
  1448. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  1449. self.custom_param_dict[metric]['optimal'][0]) * 100
  1450. elif self.custom_param_dict[metric]['kind'][0] == 1:
  1451. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  1452. self.custom_param_dict[metric]['optimal'][0]) * 100
  1453. elif self.custom_param_dict[metric]['kind'][0] == 0:
  1454. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  1455. self.custom_param_dict[metric]['optimal'][0]) * 100
  1456. str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  1457. str_type_over_optimal = str_type_over_optimal[:-1]
  1458. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  1459. str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
  1460. if not good_custom_metric_list:
  1461. type_description1 = f"{str_bad_custom_metric}指标表现不佳"
  1462. type_description2 = f"{str_type_over_optimal}"
  1463. else:
  1464. type_description1 = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳"
  1465. type_description2 = f"{str_type_over_optimal}"
  1466. type_dict["builtin"] = builtin_graph_dict
  1467. type_dict["custom"] = custom_graph_dict
  1468. type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict)
  1469. type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict)
  1470. type_details_dict[type] = type_dict
  1471. unfunc_metric_list += bad_custom_metric_list
  1472. func_over_optimal.append(str_type_over_optimal)
  1473. report_dict["details"] = type_details_dict
  1474. # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  1475. func_over_optimal = [s for s in func_over_optimal if s]
  1476. str_func_over_optimal = ';'.join(func_over_optimal)
  1477. if grade_function == '优秀':
  1478. str_func_type = string_concatenate(func_type_list)
  1479. func_description1 = f'算法在{str_func_type}功能上表现优秀;'
  1480. elif grade_function == '良好':
  1481. str_func_type = string_concatenate(func_type_list)
  1482. func_description1 = f'算法在{str_func_type}功能上总体表现良好,满足设计指标要求;'
  1483. elif grade_function == '一般':
  1484. str_unfunc_type = string_concatenate(unfunc_type_list)
  1485. str_unfunc_metric = string_concatenate(unfunc_metric_list)
  1486. str_unfunc_metric = replace_key_with_value(str_unfunc_metric, self.type_name_dict)
  1487. func_description1 = f'算法在{str_unfunc_type}功能上表现一般、需要在{str_unfunc_metric}指标上进一步优化。其中,{str_func_over_optimal};'
  1488. elif grade_function == '较差':
  1489. str_unfunc_type = string_concatenate(unfunc_type_list)
  1490. func_description1 = f'算法在{str_unfunc_type}功能上表现较差,需要提高算法的功能性表现。其中,{str_func_over_optimal};'
  1491. if not unfunc_type_list:
  1492. func_description2 = '功能性在各个功能上的表现俱佳。'
  1493. else:
  1494. str_unfunc_type = string_concatenate(unfunc_type_list)
  1495. func_description2 = f"算法在{str_unfunc_type}功能上需要重点优化。"
  1496. # report_dict["description1"] = replace_key_with_value(func_description1, self.name_dict)
  1497. report_dict["description1"] = replace_key_with_value(replace_key_with_value(func_description1, self.name_dict),
  1498. self.type_name_dict)
  1499. report_dict["description2"] = replace_key_with_value(func_description2, self.type_name_dict)
  1500. report_dict['commonData'] = {
  1501. "per": {
  1502. "name": "脚刹/油门踏板开度(百分比)",
  1503. "legend": ["刹车踏板开度", "油门踏板开度"],
  1504. "data": [brake_vs_time, throttle_vs_time]
  1505. },
  1506. "ang": {
  1507. "name": "方向盘转角(角度°)",
  1508. "data": steering_vs_time
  1509. },
  1510. "spe": {
  1511. "name": "速度(km/h)",
  1512. "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
  1513. "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  1514. },
  1515. "acc": {
  1516. "name": "加速度(m/s²)",
  1517. "legend": ["横向加速度", "纵向加速度"],
  1518. "data": [lat_acc_vs_time, lon_acc_vs_time]
  1519. },
  1520. # "dis": {
  1521. # "name": "前车距离(m)",
  1522. # "data": distance_vs_time
  1523. # }
  1524. }
  1525. if "function_ACC" in self.type_list:
  1526. report_dict['commonData']["dis"] = {
  1527. "name": "前车距离(m)",
  1528. "data": distance_vs_time
  1529. }
  1530. self.unfunc_df = pd.concat([self.unfunc_df, self.unfunc_follow_df, self.unfunc_lane_df], ignore_index=True)
  1531. unfunc_df = self.unfunc_df.copy().dropna()
  1532. unfunc_slices = unfunc_df.to_dict('records')
  1533. unfunc_slices.extend(self.custom_markline_list)
  1534. report_dict["commonMarkLine"] = unfunc_slices
  1535. # report_dict = {
  1536. # "name": "功能性",
  1537. # "weight": f"{self.weight * 100:.2f}%",
  1538. # "weightDistribution": weight_distribution,
  1539. # "score": score_function,
  1540. # "level": grade_function,
  1541. # 'followStopCount': self.follow_stop_count,
  1542. # "description1": func_description1,
  1543. # "description2": func_description2,
  1544. #
  1545. # "functionACC": acc_dict,
  1546. # "functionLKA": lka_dict,
  1547. #
  1548. # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
  1549. # "accData": [lat_acc_vs_time, lon_acc_vs_time],
  1550. #
  1551. # }
  1552. self.eval_data = self.ego_df.copy()
  1553. return report_dict
  1554. def get_eval_data(self):
  1555. if 'line_dist' in self.eval_data.columns:
  1556. df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc', 'line_dist']].copy()
  1557. else:
  1558. df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc']].copy()
  1559. self.df['line_dist'] = pd.Series(dtype=float)
  1560. return df