single_case_evaluate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/11/28
  11. @Last Modified: 2023/11/28
  12. @Summary: Evaluate single case.
  13. """
  14. import sys
  15. import pandas as pd
  16. import numpy as np
  17. import requests
  18. import json
  19. import pathlib
  20. import time
  21. import traceback
  22. from collections import defaultdict
  23. import log
  24. from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \
  25. import_score_class
  26. from data_process import DataProcess
  27. from customDimension import CustomDimension
  28. from custom_run import custom_run
  29. from safe import Safe
  30. from function import Function
  31. from compliance import Compliance
  32. from comfort import Comfort
  33. from efficient import Efficient
  34. def single_case_evaluate(dataPath, config, customMetricPath, customScorePath, case_name):
  35. """
  36. This function takes a str of single case' path and a dict of config infos.
  37. The function uses the scripts of 5 dimensions to analyze and evaluate the single case.
  38. Arguments:
  39. dataPath: A str of a single case's csv files path.
  40. config: A dict of config infos, which contains the algorithm info, index info and so on.
  41. customMetricPath: A str of custom metric files path.
  42. customScorePath:
  43. case_name: A str of case name.
  44. Returns:
  45. case_dict: A dict containing evaluation results. Basic infos, scores and descriptions of 5 dimensions.
  46. """
  47. logger = log.get_logger()
  48. eval_data_df = pd.DataFrame()
  49. try:
  50. data_processed = DataProcess(dataPath, config, case_name)
  51. eval_data_df = data_processed.object_df
  52. except Exception as e:
  53. traceback.print_exc()
  54. logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True)
  55. sys.exit(-1)
  56. # custom metrics evaluate
  57. custom_metric_list = [metric for metric in config.metric_list if metric not in config.builtinMetricList]
  58. custom_data = process_custom_metrics(customMetricPath, data_processed, custom_metric_list, case_name, logger)
  59. # custom_data = defaultdict(dict)
  60. # # if customMetricPath if empty
  61. # try:
  62. # if custom_metric_list:
  63. # custom_data = custom_run(customMetricPath, data_processed, custom_metric_list, case_name)
  64. # else:
  65. # custom_data = defaultdict(dict)
  66. #
  67. # except Exception as e:
  68. # traceback.print_exc()
  69. # logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Custom metric evaluate ERROR: {repr(e)}!", exc_info=True)
  70. # custom score model
  71. if config.scoreModel == "builtin":
  72. scorePath = r"./"
  73. scoreFile = "score_weight"
  74. scoreModel = import_score_class(scorePath, scoreFile)
  75. else:
  76. # customScorePath = r"C:\Users\cicv\Desktop\ads_evaluate_V4.1.0\customScore"
  77. scoreFile = config.scoreModel
  78. scoreModel = import_score_class(customScorePath, scoreFile)
  79. builtin_dimension_list = config.builtinDimensionList
  80. dimension_name_dict = config.dimension_name
  81. # initialization
  82. case_dict = {}
  83. # score info
  84. dimension_list = config.dimension_list
  85. score_dimension_dict = {}
  86. bad_dimension_list = []
  87. # statistic the index by good or bad score
  88. good_indicator_list = []
  89. bad_indicator_list = []
  90. bad_indicator_count_list = []
  91. dimension_dict = {}
  92. eval_data_list = []
  93. BENCHMARK = 80
  94. logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ")
  95. # 一级指标列表内的指标进行评价
  96. for dimension in dimension_list:
  97. try:
  98. # 实例化
  99. if dimension in builtin_dimension_list:
  100. dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel)
  101. else:
  102. dimension_instance = CustomDimension(dimension, data_processed, custom_data, scoreModel)
  103. dimension_report_dict = dimension_instance.report_statistic()
  104. if not dimension_report_dict:
  105. logger.error(
  106. f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension {dimension} evaluate ERROR: No metric score!")
  107. continue
  108. dimension_dict[dimension] = dimension_report_dict
  109. score_dimension_dict[dimension] = dimension_report_dict['score']
  110. dimension_eval_data = dimension_instance.get_eval_data()
  111. if not dimension_eval_data.empty:
  112. eval_data_list.append(dimension_eval_data)
  113. # eval_data_dict[dimension] = dimension_eval_data
  114. if score_dimension_dict[dimension] < BENCHMARK:
  115. bad_dimension_list.append(dimension)
  116. if dimension == "function":
  117. followStopCount = dimension_report_dict['followStopCount']
  118. bad_indicator_list.append("跟停行为") if followStopCount != 0 else good_indicator_list.append(
  119. "跟停行为")
  120. elif dimension == "compliance":
  121. illegalCount = dimension_report_dict['illegalCount']
  122. bad_indicator_list.append("违反交通规则行为") if illegalCount != 0 else good_indicator_list.append(
  123. "违反交通规则行为")
  124. elif dimension == "comfort":
  125. discomfortCount = dimension_report_dict['discomfortCount']
  126. bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append(
  127. "不舒适行为")
  128. except Exception as e:
  129. traceback.print_exc()
  130. logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True)
  131. continue
  132. case_dict["details"] = dimension_dict
  133. # calculate the score and grade of the case
  134. weight_dimension_dict = config.dimension_weight
  135. score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict}
  136. score_case = round(sum(score_dimension.values()), 2)
  137. grade_case = score_grade(score_case)
  138. case_dict['algorithmComprehensiveScore'] = score_case
  139. case_dict['algorithmLevel'] = grade_case
  140. case_dict['testMileage'] = data_processed.report_info['mileage']
  141. case_dict['testDuration'] = data_processed.report_info['duration']
  142. # generate algorithmResultDescription1 according to the actual case
  143. if bad_indicator_list:
  144. for type in bad_indicator_list:
  145. if type == '不舒适行为':
  146. bad_indicator_count_list.append(f'{discomfortCount}次不舒适行为')
  147. if type == '跟停行为':
  148. bad_indicator_count_list.append(f'{followStopCount}次跟停行为')
  149. if type == '违反交通规则行为':
  150. bad_indicator_count_list.append(f'违反交通规则{illegalCount}次')
  151. str_bad_indicator_count = string_concatenate(bad_indicator_count_list) if bad_indicator_count_list else ""
  152. str_good_indicator = string_concatenate(good_indicator_list) if good_indicator_list else ""
  153. # update the description, add more error and information
  154. algorithmResultDescription1 = "车辆在本轮测试中,"
  155. if str_bad_indicator_count:
  156. algorithmResultDescription1 += f"共出现{str_bad_indicator_count},"
  157. if str_good_indicator:
  158. algorithmResultDescription1 += f"未出现{str_good_indicator}。"
  159. if not str_good_indicator and not str_bad_indicator_count:
  160. algorithmResultDescription1 += "【未进行舒适性、跟车及合规性评测】。"
  161. # generate algorithmResultDescription2
  162. if not bad_dimension_list:
  163. algorithmResultDescription2 = '综上所述,算法在各个维度的表现俱佳。'
  164. else:
  165. str_bad_dimension = string_concatenate(bad_dimension_list)
  166. # replace English to Chinese dimension name
  167. algorithmResultDescription2 = f'综上所述,建议算法优化在{str_bad_dimension}指标上的表现。'
  168. # case_dict['algorithmResultDescription1'] = algorithmResultDescription1
  169. # case_dict['algorithmResultDescription2'] = algorithmResultDescription2
  170. case_dict['algorithmResultDescription1'] = replace_key_with_value(algorithmResultDescription1, dimension_name_dict)
  171. case_dict['algorithmResultDescription2'] = replace_key_with_value(algorithmResultDescription2, dimension_name_dict)
  172. case_dict['evalData'] = getEvalData(eval_data_df, eval_data_list)
  173. case_dict['playbackData'] = getPlaybackData(eval_data_df)
  174. return case_dict
  175. def process_custom_metrics(customMetricPath, data_processed, custom_metric_list, case_name, logger):
  176. if custom_metric_list:
  177. try:
  178. return custom_run(customMetricPath, data_processed, custom_metric_list, case_name)
  179. except Exception as e:
  180. logger.error(f"[case:{case_name}] Custom metric evaluate ERROR: {repr(e)}!", exc_info=True)
  181. return defaultdict(dict)
  182. return defaultdict(dict)
  183. def getEvalData(eval_data_df, eval_data_list):
  184. for df in eval_data_list:
  185. eval_data_df = pd.merge(eval_data_df, df, on=['simTime', 'simFrame', 'playerId'], how="left")
  186. eval_data_df = eval_data_df[eval_data_df['simFrame'] > 0]
  187. return eval_data_df
  188. def getPlaybackData(df):
  189. """
  190. Args:
  191. df:
  192. Returns:
  193. """
  194. EGO_ID = 1
  195. OBJ_ID = 2
  196. PLAYBACK_ACCURACY = 3
  197. # choose_df = df[["simTime", "simFrame", "v", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d", "curvHor", "rollRel", "pitchRel"]].copy()
  198. column_list = ["simTime", "simFrame", "v", "curvHor", "rollRel", "pitchRel"]
  199. ego_df = df[df['playerId'] == EGO_ID][column_list].copy()
  200. if "lat_v_rel" in df.columns:
  201. column_list = ["simTime", "simFrame", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d"]
  202. rel_df = df[df['playerId'] == OBJ_ID][column_list].copy()
  203. result = pd.merge(ego_df, rel_df, on=['simTime', 'simFrame'], how='left')
  204. else:
  205. result = ego_df.copy()
  206. result["lat_v_rel"] = ["-"] * len(result)
  207. result["lon_v_rel"] = ["-"] * len(result)
  208. result["lat_d"] = ["-"] * len(result)
  209. result["lon_d"] = ["-"] * len(result)
  210. rename_dict = {"v": "speed", "lat_v_rel": "latSpeedRel", "lon_v_rel": "lonSpeedRel", "lat_d": "latDistanceRel",
  211. "lon_d": "lonDistanceRel"}
  212. result.rename(columns=rename_dict, inplace=True)
  213. result = result[result['simFrame'] > 0]
  214. result = result.round(PLAYBACK_ACCURACY).copy()
  215. # simTime, simFrame, speed, latSpeedRel, lonSpeedRel,
  216. # latDistanceRel, lonDistanceRel, curvHor, rollRel, pitchRel
  217. return result
  218. def single_case_statistic(case_dict):
  219. """
  220. This function add the basic infos to the case_dict.
  221. Arguments:
  222. case_dict: A dict of single case scores and descriptions.
  223. config: A dict of config infos, contains basic infos.
  224. Returns:
  225. single_case_dict: A full dict of single case's performance.
  226. """
  227. single_case_dict = case_dict.copy()
  228. single_case_dict['testMileage'] = mileage_format(single_case_dict['testMileage'])
  229. single_case_dict['testDuration'] = duration_format(single_case_dict['testDuration'])
  230. # single_case_dict = {
  231. # 'testMileage': mileage,
  232. # 'testDuration': duration,
  233. # 'algorithmResultDescription1': algorithmResultDescription1,
  234. # 'algorithmResultDescription2': algorithmResultDescription2,
  235. # 'algorithmComprehensiveScore': score_case,
  236. # 'algorithmLevel': grade_case,
  237. # 'safe': safe_report_dict,
  238. # 'function': func_report_dict,
  239. # 'compliance': comp_report_dict,
  240. # 'comfort': comf_report_dict,
  241. # 'efficient': effi_report_dict
  242. # }
  243. return single_case_dict
  244. def single_report_post(single_case_dict, case_path):
  245. """
  246. This function generate the single case report based on single_case_dict.
  247. Arguments:
  248. single_case_dict: A dict of single case scores and descriptions.
  249. case_path: A str of path of files, which says where the generated report is to be stored.
  250. Returns:
  251. None
  252. """
  253. print('准备发送请求:')
  254. report_name = case_path.split('\\')[-2] # get case name for report name
  255. url_json = 'http://36.110.106.156:18080/report/generate'
  256. data_json = json.dumps(single_case_dict)
  257. response = requests.post(url_json, data_json, headers={'Content-Type': 'application/json; charset=utf-8'})
  258. print("返回报告结果:", response)
  259. runtime = time.strftime('%Y%m%d%H%M%S', time.localtime())
  260. p = pathlib.Path(rf'{case_path}\{report_name}_{runtime}.pdf')
  261. p.write_bytes(response.content)