123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2024/03/06
- @Last Modified: 2024/03/06
- @Summary: Custom metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import math
- import numpy as np
- import pandas as pd
- from score_weight import cal_score_with_priority, cal_weight_from_80
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- class CustomDimension(object):
- """
- Class for achieving custom metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- """
- def __init__(self, dimension, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.data_processed = data_processed
- self.scoreModel = scoreModel
- # self.df = data_processed.object_df
- self.dimension = dimension
- self.custom_data = custom_data
- # config infos for calculating score
- self.config = data_processed.config
- self.dimension_config = self.config.config[dimension]
- # common data
- # self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = self.dimension_config['weightCustom']
- self.metric_list = self.dimension_config['metric']
- self.type_list = self.dimension_config['type']
- self.type_name_dict = self.dimension_config['typeName']
- self.name_dict = self.dimension_config['name']
- self.unit_dict = self.dimension_config['unit']
- # custom metric
- self.customMetricParam = self.dimension_config['customMetricParam']
- self.custom_metric_list = list(self.customMetricParam.keys())
- self.custom_param_dict = {}
- self.weight = self.dimension_config['weightDimension']
- self.weight_type_dict = self.dimension_config['typeWeight']
- self.weight_type_list = self.dimension_config['typeWeightList']
- self.weight_dict = self.dimension_config['weight']
- self.weight_list = self.dimension_config['weightList']
- self.priority_dict = self.dimension_config['priority']
- self.priority_list = self.dimension_config['priorityList']
- self.metric_dict = self.dimension_config['typeMetricDict']
- # self.unit_dict = self.dimension_config['unit']
- # self.kind_dict = self.dimension_config['kind']
- # self.optimal_dict = self.dimension_config['optimal']
- # self.multiple_dict = self.dimension_config['multiple']
- # self.kind_list = self.dimension_config['kindList']
- # self.optimal_list = self.dimension_config['optimalList']
- # self.multiple_list = self.dimension_config['multipleList']
- self.time_list = self.data_processed.driver_ctrl_data['time_list']
- def _custom_metric_param_parser(self, param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def _custom_metric_score(self, metric, value, param_list):
- """
- """
- param = self._custom_metric_param_parser(param_list)
- self.custom_param_dict[metric] = param
- score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
- score_sub = score_model.cal_score()
- score = sum(score_sub) / len(score_sub)
- return score
- def _cal_score(self):
- """
- """
- score_metric_dict = {}
- score_type_dict = {}
- # score_dimension = 0
- # custom_metric_list = list(self.customMetricParam.keys())
- for metric in self.custom_metric_list:
- value = self.custom_data[metric]['value']
- param_list = self.customMetricParam[metric]
- score = self._custom_metric_score(metric, value, param_list)
- score_metric_dict[metric] = round(score, 2)
- # score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- score_metric = list(score_metric_dict.values())
- if self.weight_custom: # 自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_dimension = sum(score_type_with_weight_dict.values())
- else: # 客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_dimension = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- for key, value in self.weight_dict.items():
- if key in self.metric_dict[type]:
- # self.weight_dict[key] = round(value / type_weight, 4)
- self.weight_dict[key] = value / type_weight
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- for key in self.weight_dict:
- self.weight_dict[key] = round(self.weight_dict[key], 4)
- score_type = list(score_type_dict.values())
- self.weight_type_list = cal_weight_from_80(score_type)
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- print(f"\n[{self.dimension}表现及得分情况]")
- print(f"{self.dimension}得分为:{score_dimension:.2f}分。")
- print(f"{self.dimension}各类型得分为:{score_type_dict}")
- print(f"{self.dimension}各指标得分为:{score_metric_dict}。")
- return score_dimension, score_type_dict, score_metric_dict
- # def zip_time_pairs(self, zip_list, upper_limit=9999):
- # zip_time_pairs = zip(self.time_list, zip_list)
- # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
- # return zip_vs_time
- def zip_time_pairs(self, zip_list):
- zip_time_pairs = zip(self.time_list, zip_list)
- zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
- return zip_vs_time
- def _get_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name[self.dimension]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def report_statistic(self):
- """
- """
- report_dict = {
- "name": self.dimension,
- "weight": f"{self.weight * 100:.2f}%",
- }
- score_dimension, score_type_dict, score_metric_dict = self._cal_score()
- score_dimension = int(score_dimension) if int(score_dimension) == score_dimension else round(score_dimension, 2)
- grade_dimension = score_grade(score_dimension)
- report_dict["score"] = score_dimension
- report_dict["level"] = grade_dimension
- report_dict["weightDistribution"] = self._get_weight_distribution()
- # for description
- bad_metric_list = []
- dimension_over_optimal = []
- # for description
- good_custom_type_list = []
- bad_custom_type_list = []
- type_details_dict = {}
- for type in self.type_list:
- bad_custom_type_list.append(type) if score_type_dict[type] < 80 else good_custom_type_list.append(type)
- type_dict = {
- "name": f"{self.type_name_dict[type]}",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_custom_type = score_type_dict[type]
- grade_custom_type = score_grade(score_custom_type)
- type_dict["score"] = score_custom_type
- type_dict["level"] = grade_custom_type
- # custom type description
- good_custom_metric_list = []
- bad_custom_metric_list = []
- type_dict_indexes = {}
- for metric in self.metric_dict[type]:
- bad_custom_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else good_custom_metric_list.append(metric)
- type_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- type_dict_indexes[metric]["range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][1]}]"
- custom_graph_dict[metric] = self.custom_data[metric]['reportData']
- type_dict["indexes"] = type_dict_indexes
- type_dict["builtin"] = builtin_graph_dict
- type_dict["custom"] = custom_graph_dict
- str_type_over_optimal = ""
- if not bad_custom_metric_list:
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- type_description = f"{str_good_custom_metric}指标均表现良好"
- else:
- for metric in bad_custom_metric_list:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_type_over_optimal = str_type_over_optimal[:-1]
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
- if not good_custom_metric_list:
- type_description = f"{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
- else:
- type_description = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}"
- type_dict["description"] = replace_key_with_value(type_description, self.name_dict)
- type_details_dict[type] = type_dict
- bad_metric_list.extend(bad_custom_metric_list)
- dimension_over_optimal.append(str_type_over_optimal)
- report_dict["details"] = type_details_dict
- dimension_over_optimal = [s for s in dimension_over_optimal if s]
- str_dimension_over_optimal = ";".join(dimension_over_optimal)
- str_dimension_over_optimal = replace_key_with_value(str_dimension_over_optimal, self.name_dict)
- if grade_dimension == '优秀':
- str_good_type = string_concatenate(good_custom_type_list)
- dimension_description1 = f'算法在{str_good_type}类型上表现优秀;'
- elif grade_dimension == '良好':
- str_good_type = string_concatenate(good_custom_type_list)
- dimension_description1 = f'算法在{str_good_type}类型上总体表现良好,满足设计指标要求;'
- elif grade_dimension == '一般':
- str_bad_type = string_concatenate(bad_custom_type_list)
- str_bad_metric = string_concatenate(bad_custom_metric_list)
- str_bad_metric = replace_key_with_value(str_bad_metric, self.name_dict)
- dimension_description1 = f'算法在{str_bad_type}类型上表现一般、需要在{str_bad_metric}指标上进一步优化。其中,{str_dimension_over_optimal};'
- elif grade_dimension == '较差':
- str_bad_type = string_concatenate(bad_custom_type_list)
- dimension_description1 = f'算法在{str_bad_type}类型上表现较差,需要提高算法的类型性表现。其中,{str_dimension_over_optimal};'
- if not bad_custom_type_list:
- dimension_description2 = f'{self.dimension}在各个类型上的表现俱佳'
- else:
- str_bad_type = string_concatenate(bad_custom_type_list)
- dimension_description2 = f"算法在{str_bad_type}类型上需要重点优化"
- report_dict["description1"] = replace_key_with_value(dimension_description1, self.type_name_dict)
- report_dict["description2"] = replace_key_with_value(dimension_description2, self.type_name_dict)
- brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
- throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
- steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
- # common parameter calculate
- brake_vs_time = self.zip_time_pairs(brakePedal_list)
- throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
- steering_vs_time = self.zip_time_pairs(steeringWheel_list)
- report_dict['commonData'] = {
- "per": {
- "name": "脚刹/油门踏板开度(百分比)",
- "legend": ["刹车踏板开度", "油门踏板开度"],
- "data": [brake_vs_time, throttle_vs_time]
- },
- "ang": {
- "name": "方向盘转角(角度°)",
- "data": steering_vs_time
- },
- # "spe": {
- # "name": "速度(km/h)",
- # "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
- # "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- #
- # },
- # "acc": {
- # "name": "加速度(m/s²)",
- # "legend": ["横向加速度", "纵向加速度"],
- # "data": [lat_acc_vs_time, lon_acc_vs_time]
- #
- # },
- # "dis": {
- # "name": "前车距离(m)",
- # "data": distance_vs_time
- # }
- }
- return report_dict
- def get_eval_data(self):
- df = self.eval_data
- return df
|