accurate.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import math
  15. import numpy as np
  16. import pandas as pd
  17. from scipy.spatial import KDTree
  18. from score_weight import cal_score_with_priority, cal_weight_from_80
  19. from common import score_grade, string_concatenate, replace_key_with_value, _cal_max_min_avg
  20. class Accurate(object):
  21. """
  22. Class for achieving accurate metrics for autonomous driving.
  23. Attributes:
  24. df: Vehicle driving data, stored in dataframe format.
  25. """
  26. def __init__(self, data_processed, scoreModel, resultPath):
  27. # self.eval_data = pd.DataFrame()
  28. # self.data_processed = data_processed
  29. self.scoreModel = scoreModel
  30. self.resultPath = resultPath
  31. self.df = data_processed.ego_df
  32. self.df_trajectory = data_processed.trajectory_df # 读取轨迹的数据
  33. self.config = data_processed.config
  34. accurate_config = data_processed.accurate_config
  35. self.accurate_config = accurate_config
  36. # common data
  37. self.builtin_metric_list = self.config.builtinMetricList
  38. # dimension data
  39. self.weight_custom = accurate_config['weightCustom']
  40. self.metric_list = accurate_config['metric']
  41. self.type_list = accurate_config['type']
  42. self.type_name_dict = accurate_config['typeName']
  43. self.name_dict = accurate_config['name']
  44. self.unit_dict = accurate_config['unit']
  45. # custom metric data
  46. self.customMetricParam = accurate_config['customMetricParam']
  47. self.custom_metric_list = list(self.customMetricParam.keys())
  48. self.custom_data = {}
  49. self.custom_param_dict = {}
  50. # score data
  51. self.weight = accurate_config['weightDimension']
  52. self.weight_type_dict = accurate_config['typeWeight']
  53. self.weight_type_list = accurate_config['typeWeightList']
  54. self.weight_dict = accurate_config['weight']
  55. self.weight_list = accurate_config['weightList']
  56. self.priority_dict = accurate_config['priority']
  57. self.priority_list = accurate_config['priorityList']
  58. self.kind_dict = accurate_config['kind']
  59. self.optimal_dict = accurate_config['optimal']
  60. self.multiple_dict = accurate_config['multiple']
  61. self.kind_list = accurate_config['kindList']
  62. self.optimal_list = accurate_config['optimalList']
  63. self.multiple_list = accurate_config['multipleList']
  64. # metric data
  65. self.metric_dict = accurate_config['typeMetricDict']
  66. # self.drive_metric_list = self.metric_dict['accurateDrive']
  67. # self.stop_metric_list = self.metric_dict['accurateStop']
  68. # self.drive_metric_list = ["averageSpeed"]
  69. # self.stop_metric_list = ["stopDuration", "stopCount"]
  70. # metric value
  71. self.positionError_list = []
  72. self.executeAccurateError_list = []
  73. self.positionError_time_max = []
  74. self.positionError_time_min = []
  75. self.positionError_dict = {}
  76. self.positionError_sum = 0
  77. # self.executeAccurateError_count = 0
  78. self.positionError = 0
  79. self.executeAccurateError = 0
  80. self.error_segment_time_list = 0
  81. def _accurate_metric_cal(self):
  82. self._positionError_cal()
  83. self._executeAccurateError_cal()
  84. def detect_discontinuities(self, target_error_codes):
  85. error_periods = {error_code: [] for error_code in target_error_codes}
  86. # 遍历DataFrame,检测目标错误码并统计时间段
  87. current_error_code = None
  88. start_time = None
  89. for index, row in self.df.iterrows():
  90. if row['task_error_code'] in target_error_codes:
  91. if current_error_code is None or current_error_code != row['task_error_code']:
  92. # 如果当前错误码与前一个不同,或者这是第一个错误码,则记录新的时间段开始
  93. current_error_code = row['task_error_code']
  94. start_time = row['simTime']
  95. else:
  96. # 如果当前行不是目标错误码,并且我们之前记录了一个错误码,则记录时间段结束
  97. if current_error_code is not None:
  98. # 注意:这里我们假设下一个非匹配行的时间不是当前错误码的时间段的一部分
  99. # 但为了包含整个错误码持续的时间,我们不减去任何时间
  100. error_periods[current_error_code].append((start_time, row['simTime']))
  101. current_error_code = None
  102. # 处理最后一个错误码的时间段(如果DataFrame以错误码结束)
  103. if current_error_code is not None:
  104. # 由于我们想要包含最后一个错误码的时间点,所以我们需要检查是否有下一个时间点
  105. # 如果没有(即这是DataFrame的最后一行),我们就直接使用最后一个时间点
  106. end_time = self.df['simTime'].iloc[-1] if current_error_code == self.df['task_error_code'].iloc[-1] else self.df['simTime'].iloc[-2]
  107. error_periods[current_error_code].append((start_time, end_time))
  108. return error_periods
  109. # 对每个错误码进行分组处理
  110. def _executeAccurateError_cal(self):
  111. # 用于记录段数的变量
  112. error_segment_count = 0
  113. error_segment_time_list = []
  114. # 标记当前是否在目标数字的段中
  115. in_segment = False
  116. targets = [21201000300, 21201000001, 21201000002, 21202000000, 21203000200, 21203000300]
  117. error_periods = self.detect_discontinuities(targets)
  118. print("error_periods is", error_periods)
  119. for key, periods in error_periods.items():
  120. if periods:
  121. error_segment_count += 1
  122. for start, end in periods:
  123. error_segment_time_list.append(start)
  124. self.executeAccurateError = error_segment_count
  125. self.error_segment_time_list = [str("{:.02f}".format(e)) + "s" for e in error_segment_time_list]
  126. print("self.error_segment_time_list is", self.error_segment_time_list)
  127. def _positionError_cal(self):
  128. # self.df_trajectory['ego_pos'] = self.df_trajectory.apply(lambda row: (row['ego_posX'], row['ego_posY']),
  129. self.df_trajectory['ego_pos'] = self.df_trajectory.apply(lambda row: (row['posX'], row['posY']),
  130. axis=1).tolist()
  131. self.df_trajectory['target_pos'] = self.df_trajectory.apply(lambda row: (row['TargetX'], row['TargetY']),
  132. axis=1).tolist()
  133. print("self.df_trajectory['target_pos'] is", self.df_trajectory['target_pos'])
  134. ego_pos = np.array(self.df_trajectory['ego_pos'].tolist())
  135. target_pos = KDTree(np.array(self.df_trajectory['target_pos'].tolist()))
  136. min_distances = target_pos.query(ego_pos, k=1)[0].ravel()
  137. # 初始化存储最小距离的列表
  138. min_distance = min_distances
  139. self.positionError_list = min_distance
  140. # indice_max = [idx for idx, val in enumerate(self.positionError_list) if val == max(self.positionError_list)]
  141. # indice_min = [idx for idx, val in enumerate(self.positionError_list) if val == min(self.positionError_list)]
  142. indice_max = np.where(self.positionError_list == np.max(self.positionError_list))
  143. indice_min = np.where(self.positionError_list == np.min(self.positionError_list))
  144. self.positionError_time_max = self.df_trajectory["simTime"].loc[indice_max].tolist()
  145. if len(self.positionError_time_max) > 2:
  146. positionError_time1 = self.positionError_time_max[0:2]
  147. self.positionError_time_max = set([str("{:.02f}".format(m)) + "s" for m in positionError_time1])
  148. self.positionError_time_min = self.df_trajectory["simTime"].loc[indice_min].tolist()
  149. if len(self.positionError_time_min) > 2:
  150. positionError_time2 = self.positionError_time_min[0:2]
  151. self.positionError_time_min = set([str("{:.02f}".format(m)) + "s" for m in positionError_time2])
  152. print("self.positionError_time_max is", self.positionError_time_max)
  153. self.positionError = np.std(np.array(min_distance))
  154. def _accurate_statistic(self):
  155. """
  156. """
  157. self._accurate_metric_cal()
  158. self.positionError_dict = _cal_max_min_avg(self.positionError_list) if len(self.positionError_list)>0 else {}
  159. arr_accurate = [[self.positionError, self.executeAccurateError]]
  160. return arr_accurate
  161. def _score_cal(self):
  162. """
  163. """
  164. arr_accurate = self._accurate_statistic()
  165. print("\n[准确性表现及得分情况]")
  166. print("准确性各指标值:", [[round(num, 2) for num in row] for row in arr_accurate])
  167. arr_accurate = np.array(arr_accurate)
  168. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_accurate)
  169. score_sub = score_model.cal_score()
  170. score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
  171. score_metric = [round(num, 2) for num in score_sub]
  172. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  173. print("score_sub is", metric_list)
  174. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  175. if self.executeAccurateError > 0:
  176. score_metric_dict['executeAccurateError'] = 0
  177. else:
  178. score_metric_dict['executeAccurateError'] = 100
  179. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  180. score_metric = list(score_metric_dict.values())
  181. score_type_dict = {}
  182. if self.weight_custom: # 自定义权重
  183. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  184. self.weight_dict}
  185. for type in self.type_list:
  186. type_score = sum(
  187. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  188. score_type_dict[type] = round(type_score, 2)
  189. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  190. score_type_dict}
  191. score_accurate = sum(score_type_with_weight_dict.values())
  192. else: # 客观赋权
  193. self.weight_list = cal_weight_from_80(score_metric)
  194. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  195. score_accurate = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  196. for type in self.type_list:
  197. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  198. self.weight_dict = {key: round(value / type_weight, 4) for key, value in self.weight_dict.items() if
  199. key in self.metric_dict[type]}
  200. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  201. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  202. type_priority_list = [value for key, value in self.priority_dict.items() if
  203. key in self.metric_dict[type]]
  204. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  205. score_type_dict[type] = round(type_score, 2)
  206. score_accurate = round(score_accurate, 2)
  207. print("准确性各指标基准值:", self.optimal_list)
  208. print(f"准确性得分为:{score_accurate:.2f}分。")
  209. print(f"准确性各类型得分为:{score_type_dict}。")
  210. print(f"准确性各指标得分为:{score_metric_dict}。")
  211. return score_accurate, score_type_dict, score_metric_dict
  212. def report_statistic(self):
  213. """
  214. Returns:
  215. """
  216. report_dict = {
  217. "name": "准确性",
  218. "weight": f"{self.weight * 100:.2f}%",
  219. }
  220. score_accurate, score_type_dict, score_metric_dict = self._score_cal()
  221. # score_accurate, score_metric = self.effi_score()
  222. score_accurate = int(score_accurate) if int(score_accurate) == score_accurate else round(score_accurate, 2)
  223. grade_accurate = score_grade(score_accurate)
  224. report_dict["score"] = score_accurate
  225. report_dict["level"] = grade_accurate
  226. description = f"· 在准确性方面,得分{score_accurate:.2f}分,表现{grade_accurate}。"
  227. is_good = True
  228. is_semicolon = False
  229. if self.positionError_sum > 1:
  230. is_good = False
  231. is_semicolon = True
  232. description += f"行驶过程中,位置偏移总误差为{self.positionError_sum:.2f}米,需重点优化"
  233. if self.executeAccurateError > 0:
  234. is_good = False
  235. if is_semicolon:
  236. description += ";"
  237. is_semicolon = False
  238. description += f"出现{self.executeAccurateError}次任务执行状态错误,需重点优化。"
  239. else:
  240. if is_semicolon:
  241. description += f"。"
  242. if is_good:
  243. description += f"行驶准确且任务执行准确,算法表现优秀。"
  244. report_dict["description"] = description
  245. str_tmp_positionError_max = "、".join(self.positionError_time_max)
  246. str_tmp_positionError_min = "、".join(self.positionError_time_min)
  247. description1 = f"在{str_tmp_positionError_max}时刻达到最大值{self.positionError_dict['max']:.2f}m\n"
  248. if self.positionError_dict['min'] > 0.3:
  249. description1 += f"在{str_tmp_positionError_min}时刻达到最小值{self.positionError_dict['min']:.2f}m\n"
  250. description1 += f"平均值:{self.positionError_dict['avg']:.2f}m" if self.positionError_dict else "位置偏移无误差"
  251. str_temp_error_segment = "、".join(self.error_segment_time_list)
  252. description2 = f"次数:{self.executeAccurateError}次"
  253. if len(str_temp_error_segment) > 0:
  254. description2 += f"\n在{str_temp_error_segment}时刻任务执行错误"
  255. else:
  256. description2 += f"\n任务执行过程中没有发生错误"
  257. positionError_index = {
  258. "weight": self.weight_dict['positionError'],
  259. "score": score_metric_dict['positionError'],
  260. "description": description1
  261. }
  262. executeAccurateError_index = {
  263. "weight": self.weight_dict['executeAccurateError'],
  264. "score": score_metric_dict['executeAccurateError'],
  265. "description": description2
  266. }
  267. indexes_dict = {
  268. "positionError": positionError_index,
  269. "executeAccurateError": executeAccurateError_index
  270. }
  271. report_dict["indexes"] = indexes_dict
  272. print(report_dict)
  273. return report_dict