accurate.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/08/03
  11. @Last Modified: 2023/08/03
  12. @Summary: Functionality metrics
  13. """
  14. import math
  15. import numpy as np
  16. import pandas as pd
  17. from score_weight import cal_score_with_priority, cal_weight_from_80
  18. from common import score_grade, string_concatenate, replace_key_with_value
  19. class Accurate(object):
  20. """
  21. Class for achieving accurate metrics for autonomous driving.
  22. Attributes:
  23. df: Vehicle driving data, stored in dataframe format.
  24. """
  25. def __init__(self, data_processed, scoreModel, resultPath):
  26. # self.eval_data = pd.DataFrame()
  27. # self.data_processed = data_processed
  28. self.scoreModel = scoreModel
  29. self.resultPath = resultPath
  30. self.df = data_processed.ego_df
  31. self.df_trajectory = data_processed.trajectory_df # 读取轨迹的数据
  32. self.config = data_processed.config
  33. accurate_config = data_processed.accurate_config
  34. self.accurate_config = accurate_config
  35. # common data
  36. self.builtin_metric_list = self.config.builtinMetricList
  37. # dimension data
  38. self.weight_custom = accurate_config['weightCustom']
  39. self.metric_list = accurate_config['metric']
  40. self.type_list = accurate_config['type']
  41. self.type_name_dict = accurate_config['typeName']
  42. self.name_dict = accurate_config['name']
  43. self.unit_dict = accurate_config['unit']
  44. # custom metric data
  45. self.customMetricParam = accurate_config['customMetricParam']
  46. self.custom_metric_list = list(self.customMetricParam.keys())
  47. self.custom_data = {}
  48. self.custom_param_dict = {}
  49. # score data
  50. self.weight = accurate_config['weightDimension']
  51. self.weight_type_dict = accurate_config['typeWeight']
  52. self.weight_type_list = accurate_config['typeWeightList']
  53. self.weight_dict = accurate_config['weight']
  54. self.weight_list = accurate_config['weightList']
  55. self.priority_dict = accurate_config['priority']
  56. self.priority_list = accurate_config['priorityList']
  57. self.kind_dict = accurate_config['kind']
  58. self.optimal_dict = accurate_config['optimal']
  59. self.multiple_dict = accurate_config['multiple']
  60. self.kind_list = accurate_config['kindList']
  61. self.optimal_list = accurate_config['optimalList']
  62. self.multiple_list = accurate_config['multipleList']
  63. # metric data
  64. self.metric_dict = accurate_config['typeMetricDict']
  65. # self.drive_metric_list = self.metric_dict['accurateDrive']
  66. # self.stop_metric_list = self.metric_dict['accurateStop']
  67. # self.drive_metric_list = ["averageSpeed"]
  68. # self.stop_metric_list = ["stopDuration", "stopCount"]
  69. # metric value
  70. self.positionError_list = []
  71. self.executeAccurateError_list = []
  72. self.positionError_dict = {}
  73. self.positionError_sum = 0
  74. # self.executeAccurateError_count = 0
  75. self.positionError = 0
  76. self.executeAccurateError = 0
  77. def _accurate_metric_cal(self):
  78. self._positionError_cal()
  79. self._executeAccurateError_cal()
  80. def _executeAccurateError_cal(self):
  81. # 用于记录段数的变量
  82. error_segment_count = 0
  83. # 标记当前是否在目标数字的段中
  84. in_segment = False
  85. targets = [21201000300, 21201000001, 21201000002, 21202000000, 21203000200, 21203000300]
  86. task_error_code_list = self.df.task_error_code.tolist()
  87. for target in targets:
  88. for number in task_error_code_list:
  89. # 如果当前数字是目标数字,并且我们之前不在段中
  90. if number == target and not in_segment:
  91. # 开始一个新的段
  92. error_segment_count += 1
  93. in_segment = True
  94. # 如果当前数字不是目标数字,并且我们之前在段中
  95. elif number != target and in_segment:
  96. # 结束当前的段
  97. in_segment = False
  98. # 注意:如果列表以目标数字结束,并且没有额外的非目标数字来结束段,
  99. # 则上面的循环将不会将最后一个段计数。我们需要在这里检查它。
  100. if task_error_code_list and (task_error_code_list[-1] == target) and in_segment:
  101. error_segment_count += 1
  102. self.executeAccurateError = error_segment_count
  103. def _positionError_cal(self):
  104. self.df_trajectory['ego_pos'] = self.df_trajectory.apply(lambda row: (row['ego_posX'], row['ego_posY']), axis=1)
  105. self.df_trajectory['target_pos'] = self.df_trajectory.apply(lambda row: (row['TargetX'], row['TargetY']),
  106. axis=1)
  107. # 初始化存储最小距离的列表
  108. min_distances = []
  109. # 遍历第一组坐标
  110. for P in self.df_trajectory['ego_pos']:
  111. min_dist = float('inf') # 初始化为正无穷大
  112. # 遍历第二组坐标
  113. for Q in self.df_trajectory['target_pos']:
  114. # 计算两点间的距离
  115. dist = math.sqrt((P[0] - Q[0]) ** 2 + (P[1] - Q[1]) ** 2)
  116. # 更新最小距离
  117. if dist < min_dist:
  118. min_dist = dist
  119. # 存储最小距离
  120. min_distances.append(min_dist)
  121. # print("min_distances is", min_distances)
  122. self.positionError = np.std(np.array(min_distances))
  123. def _cal_max_min_avg(self, num_list):
  124. maxx = round(max(num_list), 4) if num_list else "-"
  125. minn = round(min(num_list), 4) if num_list else "-"
  126. avg = round(sum(num_list) / len(num_list), 4) if num_list else "-"
  127. result = {
  128. "max": maxx,
  129. "min": minn,
  130. "avg": avg
  131. }
  132. return result
  133. def _accurate_statistic(self):
  134. """
  135. """
  136. self._accurate_metric_cal()
  137. self.positionError_dict = self._cal_max_min_avg(self.positionError_list)
  138. arr_accurate = [[self.positionError, self.executeAccurateError]]
  139. return arr_accurate
  140. def _score_cal(self):
  141. """
  142. """
  143. arr_accurate = self._accurate_statistic()
  144. print("\n[准确性表现及得分情况]")
  145. print("准确性各指标值:", [[round(num, 2) for num in row] for row in arr_accurate])
  146. arr_accurate = np.array(arr_accurate)
  147. score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_accurate)
  148. score_sub = score_model.cal_score()
  149. score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
  150. score_metric = [round(num, 2) for num in score_sub]
  151. metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
  152. score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
  153. score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  154. score_metric = list(score_metric_dict.values())
  155. score_type_dict = {}
  156. if self.weight_custom: # 自定义权重
  157. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  158. self.weight_dict}
  159. for type in self.type_list:
  160. type_score = sum(
  161. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  162. score_type_dict[type] = round(type_score, 2)
  163. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  164. score_type_dict}
  165. score_accurate = sum(score_type_with_weight_dict.values())
  166. else: # 客观赋权
  167. self.weight_list = cal_weight_from_80(score_metric)
  168. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  169. score_accurate = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  170. for type in self.type_list:
  171. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  172. self.weight_dict = {key: round(value / type_weight, 4) for key, value in self.weight_dict.items() if
  173. key in self.metric_dict[type]}
  174. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  175. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  176. type_priority_list = [value for key, value in self.priority_dict.items() if
  177. key in self.metric_dict[type]]
  178. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  179. score_type_dict[type] = round(type_score, 2)
  180. score_accurate = round(score_accurate, 2)
  181. print("准确性各指标基准值:", self.optimal_list)
  182. print(f"准确性得分为:{score_accurate:.2f}分。")
  183. print(f"准确性各类型得分为:{score_type_dict}。")
  184. print(f"准确性各指标得分为:{score_metric_dict}。")
  185. return score_accurate, score_type_dict, score_metric_dict
  186. def report_statistic(self):
  187. """
  188. Returns:
  189. """
  190. report_dict = {
  191. "name": "准确性",
  192. "weight": f"{self.weight * 100:.2f}%",
  193. }
  194. score_accurate, score_type_dict, score_metric_dict = self._score_cal()
  195. # score_accurate, score_metric = self.effi_score()
  196. score_accurate = int(score_accurate) if int(score_accurate) == score_accurate else round(score_accurate, 2)
  197. grade_accurate = score_grade(score_accurate)
  198. report_dict["score"] = score_accurate
  199. report_dict["level"] = grade_accurate
  200. description = "· 在准确性方面,"
  201. is_good = True
  202. if self.positionError_sum > 1:
  203. is_good = False
  204. description += f"行驶过程中,位置偏移总误差为{self.positionError_sum}米,需重点优化。"
  205. if self.executeAccurateError > 0:
  206. is_good = False
  207. description += f"出现{self.executeAccurateError}次任务执行状态错误,需重点优化。"
  208. if is_good:
  209. description += f"行驶准确且任务执行准确,算法表现优秀。"
  210. report_dict["description"] = description
  211. description1 = f"最大值:{self.positionError_dict['max']}m/s;" \
  212. f"最小值:{self.positionError_dict['min']}m/s;" \
  213. f"平均值:{self.positionError_dict['avg']}m/s"
  214. description2 = f"次数:{self.executeAccurateError}次;"
  215. positionError_index = {
  216. "weight": self.weight_dict['positionError'],
  217. "score": score_metric_dict['positionError'],
  218. "description": description1
  219. }
  220. executeAccurateError_index = {
  221. "weight": self.weight_dict['executeAccurateError'],
  222. "score": score_metric_dict['executeAccurateError'],
  223. "description": description2
  224. }
  225. indexes_dict = {
  226. "positionError": positionError_index,
  227. "executeAccurateError": executeAccurateError_index
  228. }
  229. report_dict["indexes"] = indexes_dict
  230. print(report_dict)
  231. return report_dict