123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/11/28
- @Last Modified: 2023/11/28
- @Summary: Evaluate single case.
- """
- import sys
- import pandas as pd
- import numpy as np
- import requests
- import json
- import pathlib
- import time
- import traceback
- from collections import defaultdict
- import log
- from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \
- import_score_class
- from data_process import DataProcess
- from customDimension import CustomDimension
- from custom_run import custom_run
- from safe import Safe
- from function import Function
- from compliance import Compliance
- from comfort import Comfort
- from efficient import Efficient
- def single_case_evaluate(dataPath, config, customMetricPath, customScorePath, case_name):
- """
- This function takes a str of single case' path and a dict of config infos.
- The function uses the scripts of 5 dimensions to analyze and evaluate the single case.
- Arguments:
- dataPath: A str of a single case's csv files path.
- config: A dict of config infos, which contains the algorithm info, index info and so on.
- customMetricPath: A str of custom metric files path.
- customScorePath:
- case_name: A str of case name.
- Returns:
- case_dict: A dict containing evaluation results. Basic infos, scores and descriptions of 5 dimensions.
- """
- logger = log.get_logger()
- eval_data_df = pd.DataFrame()
- try:
- data_processed = DataProcess(dataPath, config, case_name)
- eval_data_df = data_processed.object_df
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- # custom metrics evaluate
- custom_metric_list = [metric for metric in config.metric_list if metric not in config.builtinMetricList]
- custom_data = process_custom_metrics(customMetricPath, data_processed, custom_metric_list, case_name, logger)
- # custom_data = defaultdict(dict)
- # # if customMetricPath if empty
- # try:
- # if custom_metric_list:
- # custom_data = custom_run(customMetricPath, data_processed, custom_metric_list, case_name)
- # else:
- # custom_data = defaultdict(dict)
- #
- # except Exception as e:
- # traceback.print_exc()
- # logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Custom metric evaluate ERROR: {repr(e)}!", exc_info=True)
- # custom score model
- if config.scoreModel == "builtin":
- scorePath = r"./"
- scoreFile = "score_weight"
- scoreModel = import_score_class(scorePath, scoreFile)
- else:
- # customScorePath = r"C:\Users\cicv\Desktop\ads_evaluate_V4.1.0\customScore"
- scoreFile = config.scoreModel
- scoreModel = import_score_class(customScorePath, scoreFile)
- builtin_dimension_list = config.builtinDimensionList
- dimension_name_dict = config.dimension_name
- # initialization
- case_dict = {}
- # score info
- dimension_list = config.dimension_list
- score_dimension_dict = {}
- bad_dimension_list = []
- # statistic the index by good or bad score
- good_indicator_list = []
- bad_indicator_list = []
- bad_indicator_count_list = []
- dimension_dict = {}
- eval_data_list = []
- BENCHMARK = 80
- logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ")
- # 一级指标列表内的指标进行评价
- for dimension in dimension_list:
- try:
- # 实例化
- if dimension in builtin_dimension_list:
- dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel)
- else:
- dimension_instance = CustomDimension(dimension, data_processed, custom_data, scoreModel)
- dimension_report_dict = dimension_instance.report_statistic()
- if not dimension_report_dict:
- logger.error(
- f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension {dimension} evaluate ERROR: No metric score!")
- continue
- dimension_dict[dimension] = dimension_report_dict
- score_dimension_dict[dimension] = dimension_report_dict['score']
- dimension_eval_data = dimension_instance.get_eval_data()
- if not dimension_eval_data.empty:
- eval_data_list.append(dimension_eval_data)
- # eval_data_dict[dimension] = dimension_eval_data
- if score_dimension_dict[dimension] < BENCHMARK:
- bad_dimension_list.append(dimension)
- if dimension == "function":
- followStopCount = dimension_report_dict['followStopCount']
- bad_indicator_list.append("跟停行为") if followStopCount != 0 else good_indicator_list.append(
- "跟停行为")
- elif dimension == "compliance":
- illegalCount = dimension_report_dict['illegalCount']
- bad_indicator_list.append("违反交通规则行为") if illegalCount != 0 else good_indicator_list.append(
- "违反交通规则行为")
- elif dimension == "comfort":
- discomfortCount = dimension_report_dict['discomfortCount']
- bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append(
- "不舒适行为")
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True)
- continue
- case_dict["details"] = dimension_dict
- # calculate the score and grade of the case
- weight_dimension_dict = config.dimension_weight
- score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict}
- score_case = round(sum(score_dimension.values()), 2)
- grade_case = score_grade(score_case)
- case_dict['algorithmComprehensiveScore'] = score_case
- case_dict['algorithmLevel'] = grade_case
- case_dict['testMileage'] = data_processed.report_info['mileage']
- case_dict['testDuration'] = data_processed.report_info['duration']
- # generate algorithmResultDescription1 according to the actual case
- if bad_indicator_list:
- for type in bad_indicator_list:
- if type == '不舒适行为':
- bad_indicator_count_list.append(f'{discomfortCount}次不舒适行为')
- if type == '跟停行为':
- bad_indicator_count_list.append(f'{followStopCount}次跟停行为')
- if type == '违反交通规则行为':
- bad_indicator_count_list.append(f'违反交通规则{illegalCount}次')
- str_bad_indicator_count = string_concatenate(bad_indicator_count_list) if bad_indicator_count_list else ""
- str_good_indicator = string_concatenate(good_indicator_list) if good_indicator_list else ""
- # update the description, add more error and information
- algorithmResultDescription1 = "车辆在本轮测试中,"
- if str_bad_indicator_count:
- algorithmResultDescription1 += f"共出现{str_bad_indicator_count},"
- if str_good_indicator:
- algorithmResultDescription1 += f"未出现{str_good_indicator}。"
- if not str_good_indicator and not str_bad_indicator_count:
- algorithmResultDescription1 += "【未进行舒适性、跟车及合规性评测】。"
- # generate algorithmResultDescription2
- if not bad_dimension_list:
- algorithmResultDescription2 = '综上所述,算法在各个维度的表现俱佳。'
- else:
- str_bad_dimension = string_concatenate(bad_dimension_list)
- # replace English to Chinese dimension name
- algorithmResultDescription2 = f'综上所述,建议算法优化在{str_bad_dimension}指标上的表现。'
- # case_dict['algorithmResultDescription1'] = algorithmResultDescription1
- # case_dict['algorithmResultDescription2'] = algorithmResultDescription2
- case_dict['algorithmResultDescription1'] = replace_key_with_value(algorithmResultDescription1, dimension_name_dict)
- case_dict['algorithmResultDescription2'] = replace_key_with_value(algorithmResultDescription2, dimension_name_dict)
- case_dict['evalData'] = getEvalData(eval_data_df, eval_data_list)
- case_dict['playbackData'] = getPlaybackData(eval_data_df)
- return case_dict
- def process_custom_metrics(customMetricPath, data_processed, custom_metric_list, case_name, logger):
- if custom_metric_list:
- try:
- return custom_run(customMetricPath, data_processed, custom_metric_list, case_name)
- except Exception as e:
- logger.error(f"[case:{case_name}] Custom metric evaluate ERROR: {repr(e)}!", exc_info=True)
- return defaultdict(dict)
- return defaultdict(dict)
- def getEvalData(eval_data_df, eval_data_list):
- for df in eval_data_list:
- eval_data_df = pd.merge(eval_data_df, df, on=['simTime', 'simFrame', 'playerId'], how="left")
- eval_data_df = eval_data_df[eval_data_df['simFrame'] > 0]
- return eval_data_df
- def getPlaybackData(df):
- """
- Args:
- df:
- Returns:
- """
- EGO_ID = 1
- OBJ_ID = 2
- PLAYBACK_ACCURACY = 3
- # choose_df = df[["simTime", "simFrame", "v", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d", "curvHor", "rollRel", "pitchRel"]].copy()
- column_list = ["simTime", "simFrame", "v", "curvHor", "rollRel", "pitchRel"]
- ego_df = df[df['playerId'] == EGO_ID][column_list].copy()
- if "lat_v_rel" in df.columns:
- column_list = ["simTime", "simFrame", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d"]
- rel_df = df[df['playerId'] == OBJ_ID][column_list].copy()
- result = pd.merge(ego_df, rel_df, on=['simTime', 'simFrame'], how='left')
- else:
- result = ego_df.copy()
- result["lat_v_rel"] = ["-"] * len(result)
- result["lon_v_rel"] = ["-"] * len(result)
- result["lat_d"] = ["-"] * len(result)
- result["lon_d"] = ["-"] * len(result)
- rename_dict = {"v": "speed", "lat_v_rel": "latSpeedRel", "lon_v_rel": "lonSpeedRel", "lat_d": "latDistanceRel",
- "lon_d": "lonDistanceRel"}
- result.rename(columns=rename_dict, inplace=True)
- result = result[result['simFrame'] > 0]
- result = result.round(PLAYBACK_ACCURACY).copy()
- # simTime, simFrame, speed, latSpeedRel, lonSpeedRel,
- # latDistanceRel, lonDistanceRel, curvHor, rollRel, pitchRel
- return result
- def single_case_statistic(case_dict):
- """
- This function add the basic infos to the case_dict.
- Arguments:
- case_dict: A dict of single case scores and descriptions.
- config: A dict of config infos, contains basic infos.
- Returns:
- single_case_dict: A full dict of single case's performance.
- """
- single_case_dict = case_dict.copy()
- single_case_dict['testMileage'] = mileage_format(single_case_dict['testMileage'])
- single_case_dict['testDuration'] = duration_format(single_case_dict['testDuration'])
- # single_case_dict = {
- # 'testMileage': mileage,
- # 'testDuration': duration,
- # 'algorithmResultDescription1': algorithmResultDescription1,
- # 'algorithmResultDescription2': algorithmResultDescription2,
- # 'algorithmComprehensiveScore': score_case,
- # 'algorithmLevel': grade_case,
- # 'safe': safe_report_dict,
- # 'function': func_report_dict,
- # 'compliance': comp_report_dict,
- # 'comfort': comf_report_dict,
- # 'efficient': effi_report_dict
- # }
- return single_case_dict
- def single_report_post(single_case_dict, case_path):
- """
- This function generate the single case report based on single_case_dict.
- Arguments:
- single_case_dict: A dict of single case scores and descriptions.
- case_path: A str of path of files, which says where the generated report is to be stored.
- Returns:
- None
- """
- print('准备发送请求:')
- report_name = case_path.split('\\')[-2] # get case name for report name
- url_json = 'http://36.110.106.156:18080/report/generate'
- data_json = json.dumps(single_case_dict)
- response = requests.post(url_json, data_json, headers={'Content-Type': 'application/json; charset=utf-8'})
- print("返回报告结果:", response)
- runtime = time.strftime('%Y%m%d%H%M%S', time.localtime())
- p = pathlib.Path(rf'{case_path}\{report_name}_{runtime}.pdf')
- p.write_bytes(response.content)
|