123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/11/28
- @Last Modified: 2023/11/28
- @Summary: Evaluate single case.
- """
- import os
- import sys
- import pandas as pd
- import numpy as np
- import requests
- import json
- import pathlib
- import time
- import traceback
- import log
- from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \
- import_class
- from data_process import DataProcess
- # from customDimension import CustomDimension
- # from custom_run import custom_run
- # from safe import Safe
- # from function import Function
- # from compliance import Compliance
- from comfort import Comfort
- from efficient import Efficient
- def single_case_evaluate(config, path, resultPath, case_name):
- """
- This function takes a str of single case' path and a dict of config infos.
- The function uses the scripts of 5 dimensions to analyze and evaluate the single case.
- Arguments:
- path: A str of a single case's csv files path.
- config: A dict of config infos, which contains the algorithm info, index info and so on.
- case_name: A str of case name.
- Returns:
- case_dict: A dict containing evaluation results. Basic infos, scores and descriptions of 5 dimensions.
- """
- logger = log.get_logger()
- try:
- data_processed = DataProcess(path, config)
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True)
- # custom metrics evaluate
- # customMetricPath = '../custom'
- # custom_metric_list = [metric for metric in config.metric_list if metric not in config.builtinMetricList]
- custom_data = dict()
- # if customMetricPath if empty
- # try:
- # if custom_metric_list:
- # custom_data = custom_run(customMetricPath, data_processed, custom_metric_list, case_name)
- # else:
- # custom_data = dict()
- #
- # except Exception as e:
- # traceback.print_exc()
- # logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Custom metric evaluate ERROR: {repr(e)}!", exc_info=True)
- # scoreFile = 'linear_score'
- # custom score
- # if config.scoreModel == "builtin":
- # scorePath = r"./"
- # scoreFile = "score_weight"
- # scoreModel = import_class(scorePath, scoreFile)
- # else:
- # # customScorePath = r"C:\Users\cicv\Desktop\ads_evaluate_V4.1.0\customScore"
- # scoreFile = config.scoreModel
- # scoreModel = import_class(customScorePath, scoreFile)
- scorePath = r"./"
- scoreFile = "score_weight"
- scoreModel = import_class(scorePath, scoreFile)
- builtin_dimension_list = config.builtinDimensionList
- dimension_name_dict = config.dimension_name
- # initialization
- case_dict = {}
- case_dict["name"] = case_name
- # score info
- dimension_list = config.dimension_list
- score_dimension_dict = {}
- bad_dimension_list = []
- # statistic the index by good or bad score
- good_indicator_list = []
- bad_indicator_list = []
- # bad_indicator_count_list = []
- dimension_dict = {}
- # eval_data_list = []
- try:
- logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ")
- for dimension in dimension_list:
- dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel, resultPath)
- # if dimension in builtin_dimension_list:
- # dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel)
- # else:
- # dimension_instance = CustomDimension(dimension, data_processed, custom_data, scoreModel)
- dimension_report_dict = dimension_instance.report_statistic()
- dimension_dict[dimension] = dimension_report_dict
- score_dimension_dict[dimension] = dimension_report_dict['score']
- # dimension_eval_data = dimension_instance.get_eval_data()
- # if not dimension_eval_data.empty:
- # eval_data_list.append(dimension_eval_data)
- # eval_data_dict[dimension] = dimension_eval_data
- if score_dimension_dict[dimension] < 80:
- bad_dimension_list.append(dimension)
- # if dimension == "function":
- # followStopCount = dimension_report_dict['followStopCount']
- # bad_indicator_list.append("跟停行为") if followStopCount != 0 else good_indicator_list.append("跟停行为")
- # elif dimension == "compliance":
- # illegalCount = dimension_report_dict['illegalCount']
- # bad_indicator_list.append("违反交通规则行为") if illegalCount != 0 else good_indicator_list.append("违反交通规则行为")
- # elif dimension == "comfort":
- # discomfortCount = dimension_report_dict['discomfortCount']
- # bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append("不舒适行为")
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- case_dict["details"] = dimension_dict
- # calculate the score and grade of the case
- weight_dimension_dict = config.dimension_weight
- score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict}
- score_case = round(sum(score_dimension.values()), 2)
- grade_case = score_grade(score_case)
- case_dict['algorithmComprehensiveScore'] = score_case
- case_dict['algorithmLevel'] = grade_case
- case_dict['testMileage'] = data_processed.report_info['mileage']
- case_dict['testDuration'] = data_processed.report_info['duration']
- # generate algorithmResultDescription1 according to the actual case
- # if bad_indicator_list:
- # for type in bad_indicator_list:
- # if type == '不舒适行为':
- # bad_indicator_count_list.append(f'{discomfortCount}次不舒适行为')
- # if type == '跟停行为':
- # bad_indicator_count_list.append(f'{followStopCount}次跟停行为')
- # if type == '违反交通规则行为':
- # bad_indicator_count_list.append(f'违反交通规则{illegalCount}次')
- # str_bad_indicator_count = string_concatenate(bad_indicator_count_list) if bad_indicator_count_list else ""
- # str_good_indicator = string_concatenate(good_indicator_list) if good_indicator_list else ""
- # algorithmResultDescription1 = "车辆在本轮测试中,"
- # if str_bad_indicator_count:
- # algorithmResultDescription1 += f"共出现{str_bad_indicator_count},"
- # if str_good_indicator:
- # algorithmResultDescription1 += f"未出现{str_good_indicator}。"
- #
- # if not str_good_indicator and not str_bad_indicator_count:
- # algorithmResultDescription1 += "【未进行舒适性、跟车及合规性评测】。"
- # generate algorithmResultDescription2
- if not bad_dimension_list:
- algorithmResultDescription2 = '综上所述,算法在各个维度的表现俱佳。'
- else:
- str_bad_dimension = string_concatenate(bad_dimension_list)
- # replace English to Chinese dimension name
- algorithmResultDescription2 = f'综上所述,建议算法优化在{str_bad_dimension}指标上的表现。'
- # description = f"算法总体表现{grade_case},高效性表现{dimension_dict['efficient']['level']},平顺性表现{dimension_dict['comfort']['level']}。"
- description = f"算法总体表现{grade_case}。"
- # case_dict['algorithmResultDescription1'] = algorithmResultDescription1
- # case_dict['algorithmResultDescription2'] = algorithmResultDescription2
- # case_dict['algorithmResultDescription1'] = replace_key_with_value(algorithmResultDescription1, dimension_name_dict)
- # case_dict['algorithmResultDescription'] = replace_key_with_value(algorithmResultDescription2, dimension_name_dict)
- case_dict['algorithmResultDescription'] = description
- graph_dict = {
- "linearAccelerate": os.path.join(resultPath, "LinearAccelerate.png"),
- "angularAccelerate": os.path.join(resultPath, "AngularAccelerate.png"),
- "speed": os.path.join(resultPath, "Speed.png")
- }
- case_dict['graphPath'] = graph_dict
- # for df in eval_data_list:
- # eval_data_df = pd.merge(eval_data_df, df, on=['simTime', 'simFrame', 'playerId'], how="left")
- # eval_data_df = eval_data_df[eval_data_df['simFrame'] > 0]
- #
- # case_dict['evalData'] = eval_data_df
- # case_dict['playbackData'] = playback(eval_data_df)
- # report.json
- # case_dict = {
- # 'testMileage': mileage,
- # 'testDuration': duration,
- # 'algorithmResultDescription1': algorithmResultDescription1,
- # 'algorithmResultDescription2': algorithmResultDescription2,
- # 'algorithmComprehensiveScore': score_case,
- # 'algorithmLevel': grade_case,
- #
- # 'safe': safe_report_dict,
- # 'function': func_report_dict,
- # 'compliance': comp_report_dict,
- # 'comfort': comf_report_dict,
- # 'efficient': effi_report_dict,
- #
- # 'commonData': common_report_dict
- #
- # }
- return case_dict
- def playback(df):
- """
- Args:
- df:
- Returns:
- """
- target_id = 2
- # choose_df = df[["simTime", "simFrame", "v", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d", "curvHor", "rollRel", "pitchRel"]].copy()
- ego_df = df[df['playerId'] == 1][["simTime", "simFrame", "v", "curvHor", "rollRel", "pitchRel"]].copy()
- if "lat_v_rel" in df.columns:
- rel_df = df[df['playerId'] == target_id][
- ["simTime", "simFrame", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d"]].copy()
- result = pd.merge(ego_df, rel_df, on=['simTime', 'simFrame'], how='left')
- else:
- result = ego_df.copy()
- result["lat_v_rel"] = ["-"] * len(result)
- result["lon_v_rel"] = ["-"] * len(result)
- result["lat_d"] = ["-"] * len(result)
- result["lon_d"] = ["-"] * len(result)
- result.rename(
- columns={"v": "speed", "lat_v_rel": "latSpeedRel", "lon_v_rel": "lonSpeedRel", "lat_d": "latDistanceRel",
- "lon_d": "lonDistanceRel"}, inplace=True)
- result = result[result['simFrame'] > 0]
- # simTime, simFrame, speed, latSpeedRel, lonSpeedRel,
- # latDistanceRel, lonDistanceRel, curvHor, rollRel, pitchRel
- return result
- def single_case_statistic(case_dict):
- """
- This function add the basic infos to the case_dict.
- Arguments:
- case_dict: A dict of single case scores and descriptions.
- config: A dict of config infos, contains basic infos.
- Returns:
- single_case_dict: A full dict of single case's performance.
- """
- single_case_dict = case_dict.copy()
- single_case_dict['testMileage'] = mileage_format(single_case_dict['testMileage'])
- single_case_dict['testDuration'] = duration_format(single_case_dict['testDuration'])
- # single_case_dict = {
- # 'testMileage': mileage,
- # 'testDuration': duration,
- # 'algorithmResultDescription1': algorithmResultDescription1,
- # 'algorithmResultDescription2': algorithmResultDescription2,
- # 'algorithmComprehensiveScore': score_case,
- # 'algorithmLevel': grade_case,
- # 'safe': safe_report_dict,
- # 'function': func_report_dict,
- # 'compliance': comp_report_dict,
- # 'comfort': comf_report_dict,
- # 'efficient': effi_report_dict
- # }
- return single_case_dict
- def single_report_post(single_case_dict, case_path):
- """
- This function generate the single case report based on single_case_dict.
- Arguments:
- single_case_dict: A dict of single case scores and descriptions.
- case_path: A str of path of files, which says where the generated report is to be stored.
- Returns:
- None
- """
- print('准备发送请求:')
- report_name = case_path.split('\\')[-2] # get case name for report name
- url_json = 'http://36.110.106.156:18080/report/generate'
- data_json = json.dumps(single_case_dict)
- response = requests.post(url_json, data_json, headers={'Content-Type': 'application/json; charset=utf-8'})
- print("返回报告结果:", response)
- runtime = time.strftime('%Y%m%d%H%M%S', time.localtime())
- p = pathlib.Path(rf'{case_path}\{report_name}_{runtime}.pdf')
- p.write_bytes(response.content)
|