#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/28 @Last Modified: 2023/11/28 @Summary: Evaluate single case. """ import os import sys import pandas as pd import numpy as np import requests import json import pathlib import time import traceback import log from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \ import_class from data_process import DataProcess # from customDimension import CustomDimension # from custom_run import custom_run # from safe import Safe # from function import Function # from compliance import Compliance from comfort import Comfort from efficient import Efficient def single_case_evaluate(config, path, resultPath, case_name): """ This function takes a str of single case' path and a dict of config infos. The function uses the scripts of 5 dimensions to analyze and evaluate the single case. Arguments: path: A str of a single case's csv files path. config: A dict of config infos, which contains the algorithm info, index info and so on. case_name: A str of case name. Returns: case_dict: A dict containing evaluation results. Basic infos, scores and descriptions of 5 dimensions. """ logger = log.get_logger() try: data_processed = DataProcess(path, config) except Exception as e: traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True) # custom metrics evaluate # customMetricPath = '../custom' # custom_metric_list = [metric for metric in config.metric_list if metric not in config.builtinMetricList] custom_data = dict() # if customMetricPath if empty # try: # if custom_metric_list: # custom_data = custom_run(customMetricPath, data_processed, custom_metric_list, case_name) # else: # custom_data = dict() # # except Exception as e: # traceback.print_exc() # logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Custom metric evaluate ERROR: {repr(e)}!", exc_info=True) # scoreFile = 'linear_score' # custom score # if config.scoreModel == "builtin": # scorePath = r"./" # scoreFile = "score_weight" # scoreModel = import_class(scorePath, scoreFile) # else: # # customScorePath = r"C:\Users\cicv\Desktop\ads_evaluate_V4.1.0\customScore" # scoreFile = config.scoreModel # scoreModel = import_class(customScorePath, scoreFile) scorePath = r"./" scoreFile = "score_weight" scoreModel = import_class(scorePath, scoreFile) builtin_dimension_list = config.builtinDimensionList dimension_name_dict = config.dimension_name # initialization case_dict = {} case_dict["name"] = case_name # score info dimension_list = config.dimension_list score_dimension_dict = {} bad_dimension_list = [] # statistic the index by good or bad score good_indicator_list = [] bad_indicator_list = [] # bad_indicator_count_list = [] dimension_dict = {} # eval_data_list = [] try: logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ") for dimension in dimension_list: dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel, resultPath) # if dimension in builtin_dimension_list: # dimension_instance = globals()[dimension.capitalize()](data_processed, custom_data, scoreModel) # else: # dimension_instance = CustomDimension(dimension, data_processed, custom_data, scoreModel) dimension_report_dict = dimension_instance.report_statistic() dimension_dict[dimension] = dimension_report_dict score_dimension_dict[dimension] = dimension_report_dict['score'] # dimension_eval_data = dimension_instance.get_eval_data() # if not dimension_eval_data.empty: # eval_data_list.append(dimension_eval_data) # eval_data_dict[dimension] = dimension_eval_data if score_dimension_dict[dimension] < 80: bad_dimension_list.append(dimension) # if dimension == "function": # followStopCount = dimension_report_dict['followStopCount'] # bad_indicator_list.append("跟停行为") if followStopCount != 0 else good_indicator_list.append("跟停行为") # elif dimension == "compliance": # illegalCount = dimension_report_dict['illegalCount'] # bad_indicator_list.append("违反交通规则行为") if illegalCount != 0 else good_indicator_list.append("违反交通规则行为") # elif dimension == "comfort": # discomfortCount = dimension_report_dict['discomfortCount'] # bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append("不舒适行为") except Exception as e: traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True) sys.exit(-1) case_dict["details"] = dimension_dict # calculate the score and grade of the case weight_dimension_dict = config.dimension_weight score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict} score_case = round(sum(score_dimension.values()), 2) grade_case = score_grade(score_case) case_dict['algorithmComprehensiveScore'] = score_case case_dict['algorithmLevel'] = grade_case case_dict['testMileage'] = data_processed.report_info['mileage'] case_dict['testDuration'] = data_processed.report_info['duration'] # generate algorithmResultDescription1 according to the actual case # if bad_indicator_list: # for type in bad_indicator_list: # if type == '不舒适行为': # bad_indicator_count_list.append(f'{discomfortCount}次不舒适行为') # if type == '跟停行为': # bad_indicator_count_list.append(f'{followStopCount}次跟停行为') # if type == '违反交通规则行为': # bad_indicator_count_list.append(f'违反交通规则{illegalCount}次') # str_bad_indicator_count = string_concatenate(bad_indicator_count_list) if bad_indicator_count_list else "" # str_good_indicator = string_concatenate(good_indicator_list) if good_indicator_list else "" # algorithmResultDescription1 = "车辆在本轮测试中," # if str_bad_indicator_count: # algorithmResultDescription1 += f"共出现{str_bad_indicator_count}," # if str_good_indicator: # algorithmResultDescription1 += f"未出现{str_good_indicator}。" # # if not str_good_indicator and not str_bad_indicator_count: # algorithmResultDescription1 += "【未进行舒适性、跟车及合规性评测】。" # generate algorithmResultDescription2 if not bad_dimension_list: algorithmResultDescription2 = '综上所述,算法在各个维度的表现俱佳。' else: str_bad_dimension = string_concatenate(bad_dimension_list) # replace English to Chinese dimension name algorithmResultDescription2 = f'综上所述,建议算法优化在{str_bad_dimension}指标上的表现。' # description = f"算法总体表现{grade_case},高效性表现{dimension_dict['efficient']['level']},平顺性表现{dimension_dict['comfort']['level']}。" description = f"算法总体表现{grade_case}。" # case_dict['algorithmResultDescription1'] = algorithmResultDescription1 # case_dict['algorithmResultDescription2'] = algorithmResultDescription2 # case_dict['algorithmResultDescription1'] = replace_key_with_value(algorithmResultDescription1, dimension_name_dict) # case_dict['algorithmResultDescription'] = replace_key_with_value(algorithmResultDescription2, dimension_name_dict) case_dict['algorithmResultDescription'] = description graph_dict = { "linearAccelerate": os.path.join(resultPath, "LinearAccelerate.png"), "angularAccelerate": os.path.join(resultPath, "AngularAccelerate.png"), "speed": os.path.join(resultPath, "Speed.png") } case_dict['graphPath'] = graph_dict # for df in eval_data_list: # eval_data_df = pd.merge(eval_data_df, df, on=['simTime', 'simFrame', 'playerId'], how="left") # eval_data_df = eval_data_df[eval_data_df['simFrame'] > 0] # # case_dict['evalData'] = eval_data_df # case_dict['playbackData'] = playback(eval_data_df) # report.json # case_dict = { # 'testMileage': mileage, # 'testDuration': duration, # 'algorithmResultDescription1': algorithmResultDescription1, # 'algorithmResultDescription2': algorithmResultDescription2, # 'algorithmComprehensiveScore': score_case, # 'algorithmLevel': grade_case, # # 'safe': safe_report_dict, # 'function': func_report_dict, # 'compliance': comp_report_dict, # 'comfort': comf_report_dict, # 'efficient': effi_report_dict, # # 'commonData': common_report_dict # # } return case_dict def playback(df): """ Args: df: Returns: """ target_id = 2 # choose_df = df[["simTime", "simFrame", "v", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d", "curvHor", "rollRel", "pitchRel"]].copy() ego_df = df[df['playerId'] == 1][["simTime", "simFrame", "v", "curvHor", "rollRel", "pitchRel"]].copy() if "lat_v_rel" in df.columns: rel_df = df[df['playerId'] == target_id][ ["simTime", "simFrame", "lat_v_rel", "lon_v_rel", "lat_d", "lon_d"]].copy() result = pd.merge(ego_df, rel_df, on=['simTime', 'simFrame'], how='left') else: result = ego_df.copy() result["lat_v_rel"] = ["-"] * len(result) result["lon_v_rel"] = ["-"] * len(result) result["lat_d"] = ["-"] * len(result) result["lon_d"] = ["-"] * len(result) result.rename( columns={"v": "speed", "lat_v_rel": "latSpeedRel", "lon_v_rel": "lonSpeedRel", "lat_d": "latDistanceRel", "lon_d": "lonDistanceRel"}, inplace=True) result = result[result['simFrame'] > 0] # simTime, simFrame, speed, latSpeedRel, lonSpeedRel, # latDistanceRel, lonDistanceRel, curvHor, rollRel, pitchRel return result def single_case_statistic(case_dict): """ This function add the basic infos to the case_dict. Arguments: case_dict: A dict of single case scores and descriptions. config: A dict of config infos, contains basic infos. Returns: single_case_dict: A full dict of single case's performance. """ single_case_dict = case_dict.copy() single_case_dict['testMileage'] = mileage_format(single_case_dict['testMileage']) single_case_dict['testDuration'] = duration_format(single_case_dict['testDuration']) # single_case_dict = { # 'testMileage': mileage, # 'testDuration': duration, # 'algorithmResultDescription1': algorithmResultDescription1, # 'algorithmResultDescription2': algorithmResultDescription2, # 'algorithmComprehensiveScore': score_case, # 'algorithmLevel': grade_case, # 'safe': safe_report_dict, # 'function': func_report_dict, # 'compliance': comp_report_dict, # 'comfort': comf_report_dict, # 'efficient': effi_report_dict # } return single_case_dict def single_report_post(single_case_dict, case_path): """ This function generate the single case report based on single_case_dict. Arguments: single_case_dict: A dict of single case scores and descriptions. case_path: A str of path of files, which says where the generated report is to be stored. Returns: None """ print('准备发送请求:') report_name = case_path.split('\\')[-2] # get case name for report name url_json = 'http://36.110.106.156:18080/report/generate' data_json = json.dumps(single_case_dict) response = requests.post(url_json, data_json, headers={'Content-Type': 'application/json; charset=utf-8'}) print("返回报告结果:", response) runtime = time.strftime('%Y%m%d%H%M%S', time.localtime()) p = pathlib.Path(rf'{case_path}\{report_name}_{runtime}.pdf') p.write_bytes(response.content)