123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/11/28
- @Last Modified: 2023/11/28
- @Summary: Evaluate single case.
- """
- import os
- import sys
- import json
- import traceback
- import log
- from common import score_grade, mileage_format, duration_format, string_concatenate, replace_key_with_value, \
- import_class
- from config_parser import ConfigParse
- from data_process import DataProcess
- from report_generate import report_generate
- from safe import Safe
- from comfort import Comfort
- from accurate import Accurate
- from efficient import Efficient
- def single_case_eval(configPath, dataPath, resultPath, trackPath, case_name):
- """
- :param configPath: 配置文件路径
- :param dataPath: 评价数据路径
- :param resultPath: 评价结果路径
- :param trackPath: 轨迹图片路径
- :param case_name: 用例名称
- :return: None
- """
- # 获取日志记录器
- logger = log.get_logger()
- # 判断文件夹是否为空
- if len(os.listdir(dataPath)) == 0:
- print("No files in data_path!") # 路径异常
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: No files in data_path!")
- sys.exit(-1)
- # 解析配置文件信息,获得各指标权重及评分系数
- try:
- config = ConfigParse(configPath)
- except Exception as e:
- print('Config file parsing ERROR!', e)
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Config file parsing ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- # 单用例评价,并生成报告
- try:
- reportDict = single_case_evaluate(config, dataPath, resultPath, case_name) # 评估单用例
- # reportDict = single_case_statistic(case_report_dict) # 对单用例结果增加内容
- # 生成过程文件report.json
- with open(f'{resultPath}/report.json', 'w', encoding='utf-8') as f:
- f.write(json.dumps(reportDict, ensure_ascii=False))
- # 通过report.json生成报告
- reportPdf = os.path.join(resultPath, 'report.pdf')
- report_generate(reportDict, reportPdf, trackPath)
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Evaluate single case ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- def single_case_evaluate(config, dataPath, resultPath, case_name):
- """
- This function takes a str of single case' path and a dict of config info.
- The function uses the scripts of 5 dimensions to analyze and evaluate the single case.
- Arguments:
- dataPath: A str of a single case's csv files path.
- config: A dict of config info, which contains the algorithm info, index info and so on.
- resultPath: A str of report path.
- case_name: A str of case name.
- Returns:
- case_report_dict: A dict containing evaluation results. Basic info, scores and descriptions of 5 dimensions.
- """
- # 获取日志记录器
- logger = log.get_logger()
- # 数据处理
- try:
- data_processed = DataProcess(dataPath, config) # ego_df; obj_df; trajectory_df
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Data processed ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- # 选择评分模型(默认内置模型)
- scorePath = r"./"
- scoreFile = "score_weight"
- scoreModel = import_class(scorePath, scoreFile)
- # initialization
- case_report_dict = {
- "name": case_name
- }
- # score info
- dimension_list = config.dimension_list
- score_dimension_dict = {}
- bad_dimension_list = []
- dimension_dict = {}
- # 分维度进行评价
- try:
- logger.info(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate start: ")
- for dimension in dimension_list:
- # print("dimension is", dimension)
- # 根据维度列表动态调用类
- dimension_instance = globals()[dimension.capitalize()](data_processed, scoreModel, resultPath)
- # print("dimension_instance is", dimension_instance)
- # 运行各维度评价函数,进行分析评价及报告内容统计
- dimension_report_dict = dimension_instance.report_statistic()
- dimension_dict[dimension] = dimension_report_dict
- score_dimension_dict[dimension] = dimension_report_dict['score']
- # 对各维度分数进行判断,分类表现差的维度
- if score_dimension_dict[dimension] < 80:
- bad_dimension_list.append(dimension)
- # if dimension == "comfort":
- # discomfortCount = dimension_report_dict['discomfortCount']
- # bad_indicator_list.append("不舒适行为") if discomfortCount != 0 else good_indicator_list.append("不舒适行为")
- # 异常打印,并退出
- except Exception as e:
- traceback.print_exc()
- logger.error(f"[case:{case_name}] SINGLE_CASE_EVAL: Dimension evaluate ERROR: {repr(e)}!", exc_info=True)
- sys.exit(-1)
- # 将评价结果写入报告字典内
- case_report_dict["details"] = dimension_dict
- # calculate the score and grade of the case
- weight_dimension_dict = config.dimension_weight
- score_dimension = {key: score_dimension_dict[key] * weight_dimension_dict[key] for key in score_dimension_dict}
- # 计算用例分数、等级,写入报告字典
- score_case = round(sum(score_dimension.values()), 2)
- grade_case = score_grade(score_case)
- case_report_dict['algorithmComprehensiveScore'] = score_case
- case_report_dict['algorithmLevel'] = grade_case
- # 计算用例里程和持续时长
- # case_report_dict['testMileage'] = data_processed.report_info['mileage']
- # case_report_dict['testDuration'] = data_processed.report_info['duration']
- case_report_dict['testMileage'] = mileage_format(data_processed.report_info['mileage'])
- case_report_dict['testDuration'] = duration_format(data_processed.report_info['duration'])
- # generate algorithmResultDescription2
- if not bad_dimension_list:
- algorithmResultDescription2 = '算法在各个维度的表现俱佳。'
- else:
- str_bad_dimension = string_concatenate(bad_dimension_list)
- # replace English to Chinese dimension name
- algorithmResultDescription2 = f'建议算法优化在{str_bad_dimension}方面的表现。'
- algorithmResultDescription2 = replace_key_with_value(algorithmResultDescription2, config.dimension_name)
- algorithmResultDescription = f"算法得分{score_case}分,总体表现{grade_case}。{algorithmResultDescription2}"
- case_report_dict['algorithmResultDescription'] = algorithmResultDescription
- # 生成图片字典,将过程生成图片路径记录,用于报告生成
- graph_dict = {
- # "speed": os.path.join(resultPath, "Speed.png"),
- # "commandSpeed": os.path.join(resultPath, "CommandSpeed.png"),
- # "linearAccelerate": os.path.join(resultPath, "LinearAccelerate.png"),
- # "angularAccelerate": os.path.join(resultPath, "AngularAccelerate.png"),
- # "trajectory": os.path.join(resultPath, "track.png")
- "速度": os.path.join(resultPath, "Speed.png"),
- "控制指令速度": os.path.join(resultPath, "CommandSpeed.png"),
- "线加速度": os.path.join(resultPath, "LinearAccelerate.png"),
- "角加速度": os.path.join(resultPath, "AngularAccelerate.png"),
- "行驶轨迹": os.path.join(resultPath, "track.png")
- }
- case_report_dict['graphPath'] = graph_dict
- return case_report_dict
|