#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/28 @Last Modified: 2023/11/28 @Summary: Evaluate single case. """ import os import sys import json import traceback import log from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \ import_class from config_parser import ConfigParse from data_process import DataProcess from report_generate import report_generate from safe import Safe from comfort import Comfort from accurate import Accurate def single_case_eval(configPath, dataPath, resultPath, trackPath, case_name): """ :param configPath: 配置文件路径 :param dataPath: 评价数据路径 :param resultPath: 评价结果路径 :param trackPath: 轨迹图片路径 :param case_name: 用例名称 :return: None """ # 获取日志记录器 logger = log.get_logger() # 判断文件夹是否为空 if len(os.listdir(dataPath)) == 0: print("No files in data_path!") # 路径异常 logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: No files in data_path!") sys.exit(-1) # 解析配置文件信息,获得各指标权重及评分系数 try: config = ConfigParse(configPath) except Exception as e: print('Config file parsing ERROR!', e) traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Config file parsing ERROR: {repr(e)}!", exc_info=True) sys.exit(-1) # 单用例评价,并生成报告 try: reportDict = single_case_evaluate(config, dataPath, resultPath, case_name) # 评估单用例 # reportDict = single_case_statistic(case_report_dict) # 对单用例结果增加内容 # 生成过程文件report.json with open(f'{resultPath}/report.json', 'w', encoding='utf-8') as f: f.write(json.dumps(reportDict, ensure_ascii=False)) # 通过report.json生成报告 reportPdf = os.path.join(resultPath, 'report.pdf') report_generate(reportDict, reportPdf, trackPath) except Exception as e: traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Evaluate single case ERROR: {repr(e)}!", exc_info=True) sys.exit(-1) def single_case_evaluate(config, dataPath, resultPath, case_name): """ This function takes a str of single case' path and a dict of config info. The function uses the scripts of 5 dimensions to analyze and evaluate the single case. Arguments: dataPath: A str of a single case's csv files path. config: A dict of config info, which contains the algorithm info, index info and so on. resultPath: A str of report path. case_name: A str of case name. Returns: case_report_dict: A dict containing evaluation results. Basic info, scores and descriptions of 5 dimensions. """ # 获取日志记录器 logger = log.get_logger() # 数据处理 try: data_processed = DataProcess(dataPath, config) # ego_df; obj_df; trajectory_df except Exception as e: traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True) sys.exit(-1) # 选择评分模型(默认内置模型) scorePath = r"./" scoreFile = "score_weight" scoreModel = import_class(scorePath, scoreFile) # initialization case_report_dict = { "name": case_name } # score info dimension_list = config.dimension_list score_dimension_dict = {} bad_dimension_list = [] dimension_dict = {} # 分维度进行评价 try: logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ") for dimension in dimension_list: # print("dimension is", dimension) # 根据维度列表动态调用类 dimension_instance = globals()[dimension.capitalize()](data_processed, scoreModel, resultPath) # print("dimension_instance is", dimension_instance) # 运行各维度评价函数,进行分析评价及报告内容统计 dimension_report_dict = dimension_instance.report_statistic() dimension_dict[dimension] = dimension_report_dict score_dimension_dict[dimension] = dimension_report_dict['score'] # 对各维度分数进行判断,分类表现差的维度 if score_dimension_dict[dimension] < 80: bad_dimension_list.append(dimension) # if dimension == "comfort": # discomfortCount = dimension_report_dict['discomfortCount'] # bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append("不舒适行为") # 异常打印,并退出 except Exception as e: traceback.print_exc() logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True) sys.exit(-1) # 将评价结果写入报告字典内 case_report_dict["details"] = dimension_dict # calculate the score and grade of the case weight_dimension_dict = config.dimension_weight score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict} # 计算用例分数、等级,写入报告字典 score_case = round(sum(score_dimension.values()), 2) grade_case = score_grade(score_case) case_report_dict['algorithmComprehensiveScore'] = score_case case_report_dict['algorithmLevel'] = grade_case # 计算用例里程和持续时长 # case_report_dict['testMileage'] = data_processed.report_info['mileage'] # case_report_dict['testDuration'] = data_processed.report_info['duration'] case_report_dict['testMileage'] = mileage_format(data_processed.report_info['mileage']) case_report_dict['testDuration'] = duration_format(data_processed.report_info['duration']) # generate algorithmResultDescription2 if not bad_dimension_list: algorithmResultDescription2 = '算法在各个维度的表现俱佳。' else: str_bad_dimension = string_concatenate(bad_dimension_list) # replace English to Chinese dimension name algorithmResultDescription2 = f'建议算法优化在{str_bad_dimension}指标上的表现。' algorithmResultDescription2 = replace_key_with_value(algorithmResultDescription2, config.dimension_name) algorithmResultDescription = f"算法得分{score_case}分,总体表现{grade_case}。{algorithmResultDescription2}" case_report_dict['algorithmResultDescription'] = algorithmResultDescription # 生成图片字典,将过程生成图片路径记录,用于报告生成 graph_dict = { # "speed": os.path.join(resultPath, "Speed.png"), # "commandSpeed": os.path.join(resultPath, "CommandSpeed.png"), # "linearAccelerate": os.path.join(resultPath, "LinearAccelerate.png"), # "angularAccelerate": os.path.join(resultPath, "AngularAccelerate.png"), # "trajectory": os.path.join(resultPath, "track.png") "速度": os.path.join(resultPath, "Speed.png"), "控制指令速度": os.path.join(resultPath, "CommandSpeed.png"), "线加速度": os.path.join(resultPath, "LinearAccelerate.png"), "角加速度": os.path.join(resultPath, "AngularAccelerate.png"), "行驶轨迹": os.path.join(resultPath, "track.png") } case_report_dict['graphPath'] = graph_dict return case_report_dict