customDimension.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2024/03/06
  11. @Last Modified: 2024/03/06
  12. @Summary: Custom metrics
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. sys.path.append('../results')
  18. import math
  19. import numpy as np
  20. import pandas as pd
  21. from score_weight import cal_score_with_priority, cal_weight_from_80
  22. from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
  23. class CustomDimension(object):
  24. """
  25. Class for achieving custom metrics for autonomous driving.
  26. Attributes:
  27. df: Vehicle driving data, stored in dataframe format.
  28. """
  29. def __init__(self, dimension, data_processed, custom_data, scoreModel):
  30. self.eval_data = pd.DataFrame()
  31. self.data_processed = data_processed
  32. self.scoreModel = scoreModel
  33. # self.df = data_processed.object_df
  34. self.dimension = dimension
  35. self.custom_data = custom_data
  36. # config infos for calculating score
  37. self.config = data_processed.config
  38. self.dimension_config = self.config.config[dimension]
  39. # common data
  40. # self.bulitin_metric_list = self.config.builtinMetricList
  41. # dimension data
  42. self.weight_custom = self.dimension_config['weightCustom']
  43. self.metric_list = self.dimension_config['metric']
  44. self.type_list = self.dimension_config['type']
  45. self.type_name_dict = self.dimension_config['typeName']
  46. self.name_dict = self.dimension_config['name']
  47. self.unit_dict = self.dimension_config['unit']
  48. # custom metric
  49. self.customMetricParam = self.dimension_config['customMetricParam']
  50. self.custom_metric_list = list(self.customMetricParam.keys())
  51. self.custom_param_dict = {}
  52. self.weight = self.dimension_config['weightDimension']
  53. self.weight_type_dict = self.dimension_config['typeWeight']
  54. self.weight_type_list = self.dimension_config['typeWeightList']
  55. self.weight_dict = self.dimension_config['weight']
  56. self.weight_list = self.dimension_config['weightList']
  57. self.priority_dict = self.dimension_config['priority']
  58. self.priority_list = self.dimension_config['priorityList']
  59. self.metric_dict = self.dimension_config['typeMetricDict']
  60. # self.unit_dict = self.dimension_config['unit']
  61. # self.kind_dict = self.dimension_config['kind']
  62. # self.optimal_dict = self.dimension_config['optimal']
  63. # self.multiple_dict = self.dimension_config['multiple']
  64. # self.kind_list = self.dimension_config['kindList']
  65. # self.optimal_list = self.dimension_config['optimalList']
  66. # self.multiple_list = self.dimension_config['multipleList']
  67. self.time_list = self.data_processed.driver_ctrl_data['time_list']
  68. def _custom_metric_param_parser(self, param_list):
  69. """
  70. param_dict = {
  71. "paramA" [
  72. {
  73. "kind": "-1",
  74. "optimal": "1",
  75. "multiple": ["0.5","5"],
  76. "spare1": null,
  77. "spare2": null
  78. }
  79. ]
  80. }
  81. """
  82. kind_list = []
  83. optimal_list = []
  84. multiple_list = []
  85. spare_list = []
  86. # spare1_list = []
  87. # spare2_list = []
  88. for i in range(len(param_list)):
  89. kind_list.append(int(param_list[i]['kind']))
  90. optimal_list.append(float(param_list[i]['optimal']))
  91. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  92. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  93. # spare1_list.append(param_list[i]['spare1'])
  94. # spare2_list.append(param_list[i]['spare2'])
  95. result = {
  96. "kind": kind_list,
  97. "optimal": optimal_list,
  98. "multiple": multiple_list,
  99. "spare": spare_list,
  100. # "spare1": spare1_list,
  101. # "spare2": spare2_list
  102. }
  103. return result
  104. def _custom_metric_score(self, metric, value, param_list):
  105. """
  106. """
  107. param = self._custom_metric_param_parser(param_list)
  108. self.custom_param_dict[metric] = param
  109. score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
  110. score_sub = score_model.cal_score()
  111. score = sum(score_sub) / len(score_sub)
  112. return score
  113. def _cal_score(self):
  114. """
  115. """
  116. score_metric_dict = {}
  117. score_type_dict = {}
  118. # score_dimension = 0
  119. # custom_metric_list = list(self.customMetricParam.keys())
  120. for metric in self.custom_metric_list:
  121. value = self.custom_data[metric]['value']
  122. param_list = self.customMetricParam[metric]
  123. score = self._custom_metric_score(metric, value, param_list)
  124. score_metric_dict[metric] = round(score, 2)
  125. # score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
  126. score_metric = list(score_metric_dict.values())
  127. if self.weight_custom: # 自定义权重
  128. score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
  129. self.weight_dict}
  130. for type in self.type_list:
  131. type_score = sum(
  132. value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
  133. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  134. score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
  135. score_type_dict}
  136. score_dimension = sum(score_type_with_weight_dict.values())
  137. else: # 客观赋权
  138. self.weight_list = cal_weight_from_80(score_metric)
  139. self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
  140. score_dimension = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
  141. for type in self.type_list:
  142. type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
  143. for key, value in self.weight_dict.items():
  144. if key in self.metric_dict[type]:
  145. # self.weight_dict[key] = round(value / type_weight, 4)
  146. self.weight_dict[key] = value / type_weight
  147. type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
  148. type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
  149. type_priority_list = [value for key, value in self.priority_dict.items() if
  150. key in self.metric_dict[type]]
  151. type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
  152. score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
  153. for key in self.weight_dict:
  154. self.weight_dict[key] = round(self.weight_dict[key], 4)
  155. score_type = list(score_type_dict.values())
  156. self.weight_type_list = cal_weight_from_80(score_type)
  157. self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
  158. print(f"\n[{self.dimension}表现及得分情况]")
  159. print(f"{self.dimension}得分为:{score_dimension:.2f}分。")
  160. print(f"{self.dimension}各类型得分为:{score_type_dict}")
  161. print(f"{self.dimension}各指标得分为:{score_metric_dict}。")
  162. return score_dimension, score_type_dict, score_metric_dict
  163. # def zip_time_pairs(self, zip_list, upper_limit=9999):
  164. # zip_time_pairs = zip(self.time_list, zip_list)
  165. # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
  166. # return zip_vs_time
  167. def zip_time_pairs(self, zip_list):
  168. zip_time_pairs = zip(self.time_list, zip_list)
  169. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  170. return zip_vs_time
  171. def _get_weight_distribution(self):
  172. # get weight distribution
  173. weight_distribution = {}
  174. weight_distribution["name"] = self.config.dimension_name[self.dimension]
  175. for type in self.type_list:
  176. type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
  177. self.weight_dict.items() if
  178. key in self.metric_dict[type]}
  179. weight_distribution_type = {
  180. "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
  181. "indexes": type_weight_indexes_dict
  182. }
  183. weight_distribution[type] = weight_distribution_type
  184. return weight_distribution
  185. def report_statistic(self):
  186. """
  187. """
  188. report_dict = {
  189. "name": self.dimension,
  190. "weight": f"{self.weight * 100:.2f}%",
  191. }
  192. score_dimension, score_type_dict, score_metric_dict = self._cal_score()
  193. score_dimension = int(score_dimension) if int(score_dimension) == score_dimension else round(score_dimension, 2)
  194. grade_dimension = score_grade(score_dimension)
  195. report_dict["score"] = score_dimension
  196. report_dict["level"] = grade_dimension
  197. report_dict["weightDistribution"] = self._get_weight_distribution()
  198. # for description
  199. bad_metric_list = []
  200. dimension_over_optimal = []
  201. # for description
  202. good_custom_type_list = []
  203. bad_custom_type_list = []
  204. type_details_dict = {}
  205. for type in self.type_list:
  206. bad_custom_type_list.append(type) if score_type_dict[type] < 80 else good_custom_type_list.append(type)
  207. type_dict = {
  208. "name": f"{self.type_name_dict[type]}",
  209. }
  210. builtin_graph_dict = {}
  211. custom_graph_dict = {}
  212. # get score and grade
  213. score_custom_type = score_type_dict[type]
  214. grade_custom_type = score_grade(score_custom_type)
  215. type_dict["score"] = score_custom_type
  216. type_dict["level"] = grade_custom_type
  217. # custom type description
  218. good_custom_metric_list = []
  219. bad_custom_metric_list = []
  220. type_dict_indexes = {}
  221. for metric in self.metric_dict[type]:
  222. bad_custom_metric_list.append(metric) if score_metric_dict[
  223. metric] < 80 else good_custom_metric_list.append(metric)
  224. type_dict_indexes[metric] = {
  225. # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
  226. "name": f"{self.name_dict[metric]}",
  227. "score": score_metric_dict[metric],
  228. "avg": self.custom_data[metric]['tableData']['avg'],
  229. "max": self.custom_data[metric]['tableData']['max'],
  230. "min": self.custom_data[metric]['tableData']['min'],
  231. }
  232. if self.custom_param_dict[metric]['kind'][0] == -1:
  233. type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
  234. elif self.custom_param_dict[metric]['kind'][0] == 1:
  235. type_dict_indexes[metric]["range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
  236. elif self.custom_param_dict[metric]['kind'][0] == 0:
  237. type_dict_indexes[metric][
  238. "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][1]}]"
  239. custom_graph_dict[metric] = self.custom_data[metric]['reportData']
  240. type_dict["indexes"] = type_dict_indexes
  241. type_dict["builtin"] = builtin_graph_dict
  242. type_dict["custom"] = custom_graph_dict
  243. str_type_over_optimal = ""
  244. if not bad_custom_metric_list:
  245. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  246. type_description = f"{str_good_custom_metric}指标均表现良好"
  247. else:
  248. for metric in bad_custom_metric_list:
  249. value = self.custom_data[metric]["value"][0]
  250. if self.custom_param_dict[metric]['kind'][0] == -1:
  251. metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
  252. self.custom_param_dict[metric]['optimal'][0]) * 100
  253. elif self.custom_param_dict[metric]['kind'][0] == 1:
  254. metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
  255. self.custom_param_dict[metric]['optimal'][0]) * 100
  256. elif self.custom_param_dict[metric]['kind'][0] == 0:
  257. metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
  258. self.custom_param_dict[metric]['optimal'][0]) * 100
  259. str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
  260. str_type_over_optimal = str_type_over_optimal[:-1]
  261. str_good_custom_metric = string_concatenate(good_custom_metric_list)
  262. str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
  263. if not good_custom_metric_list:
  264. type_description = f"{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
  265. else:
  266. type_description = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
  267. type_dict["description"] = replace_key_with_value(type_description, self.name_dict)
  268. type_details_dict[type] = type_dict
  269. bad_metric_list.extend(bad_custom_metric_list)
  270. dimension_over_optimal.append(str_type_over_optimal)
  271. report_dict["details"] = type_details_dict
  272. dimension_over_optimal = [s for s in dimension_over_optimal if s]
  273. str_dimension_over_optimal = ";".join(dimension_over_optimal)
  274. str_dimension_over_optimal = replace_key_with_value(str_dimension_over_optimal, self.name_dict)
  275. if grade_dimension == '优秀':
  276. str_good_type = string_concatenate(good_custom_type_list)
  277. dimension_description1 = f'算法在{str_good_type}类型上表现优秀;'
  278. elif grade_dimension == '良好':
  279. str_good_type = string_concatenate(good_custom_type_list)
  280. dimension_description1 = f'算法在{str_good_type}类型上总体表现良好,满足设计指标要求;'
  281. elif grade_dimension == '一般':
  282. str_bad_type = string_concatenate(bad_custom_type_list)
  283. str_bad_metric = string_concatenate(bad_custom_metric_list)
  284. str_bad_metric = replace_key_with_value(str_bad_metric, self.name_dict)
  285. dimension_description1 = f'算法在{str_bad_type}类型上表现一般、需要在{str_bad_metric}指标上进一步优化。其中,{str_dimension_over_optimal};'
  286. elif grade_dimension == '较差':
  287. str_bad_type = string_concatenate(bad_custom_type_list)
  288. dimension_description1 = f'算法在{str_bad_type}类型上表现较差,需要提高算法的类型性表现。其中,{str_dimension_over_optimal};'
  289. if not bad_custom_type_list:
  290. dimension_description2 = f'{self.dimension}在各个类型上的表现俱佳'
  291. else:
  292. str_bad_type = string_concatenate(bad_custom_type_list)
  293. dimension_description2 = f"算法在{str_bad_type}类型上需要重点优化"
  294. report_dict["description1"] = replace_key_with_value(dimension_description1, self.type_name_dict)
  295. report_dict["description2"] = replace_key_with_value(dimension_description2, self.type_name_dict)
  296. brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
  297. throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
  298. steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
  299. # common parameter calculate
  300. brake_vs_time = self.zip_time_pairs(brakePedal_list)
  301. throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
  302. steering_vs_time = self.zip_time_pairs(steeringWheel_list)
  303. report_dict['commonData'] = {
  304. "per": {
  305. "name": "脚刹/油门踏板开度(百分比)",
  306. "legend": ["刹车踏板开度", "油门踏板开度"],
  307. "data": [brake_vs_time, throttle_vs_time]
  308. },
  309. "ang": {
  310. "name": "方向盘转角(角度°)",
  311. "data": steering_vs_time
  312. },
  313. # "spe": {
  314. # "name": "速度(km/h)",
  315. # "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
  316. # "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
  317. #
  318. # },
  319. # "acc": {
  320. # "name": "加速度(m/s²)",
  321. # "legend": ["横向加速度", "纵向加速度"],
  322. # "data": [lat_acc_vs_time, lon_acc_vs_time]
  323. #
  324. # },
  325. # "dis": {
  326. # "name": "前车距离(m)",
  327. # "data": distance_vs_time
  328. # }
  329. }
  330. return report_dict
  331. def get_eval_data(self):
  332. df = self.eval_data
  333. return df