#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2024/03/06 @Last Modified: 2024/03/06 @Summary: Custom metrics """ import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import math import numpy as np import pandas as pd from score_weight import cal_score_with_priority, cal_weight_from_80 from common import score_grade, string_concatenate, replace_key_with_value, score_over_100 class CustomDimension(object): """ Class for achieving custom metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. """ def __init__(self, dimension, data_processed, custom_data, scoreModel): self.eval_data = pd.DataFrame() self.data_processed = data_processed self.scoreModel = scoreModel # self.df = data_processed.object_df self.dimension = dimension self.custom_data = custom_data # config infos for calculating score self.config = data_processed.config self.dimension_config = self.config.config[dimension] # common data # self.bulitin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = self.dimension_config['weightCustom'] self.metric_list = self.dimension_config['metric'] self.type_list = self.dimension_config['type'] self.type_name_dict = self.dimension_config['typeName'] self.name_dict = self.dimension_config['name'] self.unit_dict = self.dimension_config['unit'] # custom metric self.customMetricParam = self.dimension_config['customMetricParam'] self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_param_dict = {} self.weight = self.dimension_config['weightDimension'] self.weight_type_dict = self.dimension_config['typeWeight'] self.weight_type_list = self.dimension_config['typeWeightList'] self.weight_dict = self.dimension_config['weight'] self.weight_list = self.dimension_config['weightList'] self.priority_dict = self.dimension_config['priority'] self.priority_list = self.dimension_config['priorityList'] self.metric_dict = self.dimension_config['typeMetricDict'] # self.unit_dict = self.dimension_config['unit'] # self.kind_dict = self.dimension_config['kind'] # self.optimal_dict = self.dimension_config['optimal'] # self.multiple_dict = self.dimension_config['multiple'] # self.kind_list = self.dimension_config['kindList'] # self.optimal_list = self.dimension_config['optimalList'] # self.multiple_list = self.dimension_config['multipleList'] self.time_list = self.data_processed.driver_ctrl_data['time_list'] def _custom_metric_param_parser(self, param_list): """ param_dict = { "paramA" [ { "kind": "-1", "optimal": "1", "multiple": ["0.5","5"], "spare1": null, "spare2": null } ] } """ kind_list = [] optimal_list = [] multiple_list = [] spare_list = [] # spare1_list = [] # spare2_list = [] for i in range(len(param_list)): kind_list.append(int(param_list[i]['kind'])) optimal_list.append(float(param_list[i]['optimal'])) multiple_list.append([float(x) for x in param_list[i]['multiple']]) spare_list.append([item["param"] for item in param_list[i]["spare"]]) # spare1_list.append(param_list[i]['spare1']) # spare2_list.append(param_list[i]['spare2']) result = { "kind": kind_list, "optimal": optimal_list, "multiple": multiple_list, "spare": spare_list, # "spare1": spare1_list, # "spare2": spare2_list } return result def _custom_metric_score(self, metric, value, param_list): """ """ param = self._custom_metric_param_parser(param_list) self.custom_param_dict[metric] = param score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value])) score_sub = score_model.cal_score() score = sum(score_sub) / len(score_sub) return score def _cal_score(self): """ """ score_metric_dict = {} score_type_dict = {} # score_dimension = 0 # custom_metric_list = list(self.customMetricParam.keys()) for metric in self.custom_metric_list: value = self.custom_data[metric]['value'] param_list = self.customMetricParam[metric] score = self._custom_metric_score(metric, value, param_list) score_metric_dict[metric] = round(score, 2) # score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) if self.weight_custom: # 自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_dimension = sum(score_type_with_weight_dict.values()) else: # 客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_dimension = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) for key, value in self.weight_dict.items(): if key in self.metric_dict[type]: # self.weight_dict[key] = round(value / type_weight, 4) self.weight_dict[key] = value / type_weight type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 for key in self.weight_dict: self.weight_dict[key] = round(self.weight_dict[key], 4) score_type = list(score_type_dict.values()) self.weight_type_list = cal_weight_from_80(score_type) self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)} print(f"\n[{self.dimension}表现及得分情况]") print(f"{self.dimension}得分为:{score_dimension:.2f}分。") print(f"{self.dimension}各类型得分为:{score_type_dict}") print(f"{self.dimension}各指标得分为:{score_metric_dict}。") return score_dimension, score_type_dict, score_metric_dict # def zip_time_pairs(self, zip_list, upper_limit=9999): # zip_time_pairs = zip(self.time_list, zip_list) # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)] # return zip_vs_time def zip_time_pairs(self, zip_list): zip_time_pairs = zip(self.time_list, zip_list) zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs] return zip_vs_time def _get_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name[self.dimension] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def report_statistic(self): """ """ report_dict = { "name": self.dimension, "weight": f"{self.weight * 100:.2f}%", } score_dimension, score_type_dict, score_metric_dict = self._cal_score() score_dimension = int(score_dimension) if int(score_dimension) == score_dimension else round(score_dimension, 2) grade_dimension = score_grade(score_dimension) report_dict["score"] = score_dimension report_dict["level"] = grade_dimension report_dict["weightDistribution"] = self._get_weight_distribution() # for description bad_metric_list = [] dimension_over_optimal = [] # for description good_custom_type_list = [] bad_custom_type_list = [] type_details_dict = {} for type in self.type_list: bad_custom_type_list.append(type) if score_type_dict[type] < 80 else good_custom_type_list.append(type) type_dict = { "name": f"{self.type_name_dict[type]}", } builtin_graph_dict = {} custom_graph_dict = {} # get score and grade score_custom_type = score_type_dict[type] grade_custom_type = score_grade(score_custom_type) type_dict["score"] = score_custom_type type_dict["level"] = grade_custom_type # custom type description good_custom_metric_list = [] bad_custom_metric_list = [] type_dict_indexes = {} for metric in self.metric_dict[type]: bad_custom_metric_list.append(metric) if score_metric_dict[ metric] < 80 else good_custom_metric_list.append(metric) type_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "avg": self.custom_data[metric]['tableData']['avg'], "max": self.custom_data[metric]['tableData']['max'], "min": self.custom_data[metric]['tableData']['min'], } if self.custom_param_dict[metric]['kind'][0] == -1: type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: type_dict_indexes[metric]["range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.multiple_dict[metric][1]}]" custom_graph_dict[metric] = self.custom_data[metric]['reportData'] type_dict["indexes"] = type_dict_indexes type_dict["builtin"] = builtin_graph_dict type_dict["custom"] = custom_graph_dict str_type_over_optimal = "" if not bad_custom_metric_list: str_good_custom_metric = string_concatenate(good_custom_metric_list) type_description = f"{str_good_custom_metric}指标均表现良好" else: for metric in bad_custom_metric_list: value = self.custom_data[metric]["value"][0] if self.custom_param_dict[metric]['kind'][0] == -1: metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 1: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 0: metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;" str_type_over_optimal = str_type_over_optimal[:-1] str_good_custom_metric = string_concatenate(good_custom_metric_list) str_bad_custom_metric = string_concatenate(bad_custom_metric_list) if not good_custom_metric_list: type_description = f"{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}" else: type_description = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳。{str_type_over_optimal}" type_dict["description"] = replace_key_with_value(type_description, self.name_dict) type_details_dict[type] = type_dict bad_metric_list.extend(bad_custom_metric_list) dimension_over_optimal.append(str_type_over_optimal) report_dict["details"] = type_details_dict dimension_over_optimal = [s for s in dimension_over_optimal if s] str_dimension_over_optimal = ";".join(dimension_over_optimal) str_dimension_over_optimal = replace_key_with_value(str_dimension_over_optimal, self.name_dict) if grade_dimension == '优秀': str_good_type = string_concatenate(good_custom_type_list) dimension_description1 = f'算法在{str_good_type}类型上表现优秀;' elif grade_dimension == '良好': str_good_type = string_concatenate(good_custom_type_list) dimension_description1 = f'算法在{str_good_type}类型上总体表现良好,满足设计指标要求;' elif grade_dimension == '一般': str_bad_type = string_concatenate(bad_custom_type_list) str_bad_metric = string_concatenate(bad_custom_metric_list) str_bad_metric = replace_key_with_value(str_bad_metric, self.name_dict) dimension_description1 = f'算法在{str_bad_type}类型上表现一般、需要在{str_bad_metric}指标上进一步优化。其中,{str_dimension_over_optimal};' elif grade_dimension == '较差': str_bad_type = string_concatenate(bad_custom_type_list) dimension_description1 = f'算法在{str_bad_type}类型上表现较差,需要提高算法的类型性表现。其中,{str_dimension_over_optimal};' if not bad_custom_type_list: dimension_description2 = f'{self.dimension}在各个类型上的表现俱佳' else: str_bad_type = string_concatenate(bad_custom_type_list) dimension_description2 = f"算法在{str_bad_type}类型上需要重点优化" report_dict["description1"] = replace_key_with_value(dimension_description1, self.type_name_dict) report_dict["description2"] = replace_key_with_value(dimension_description2, self.type_name_dict) brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list'] throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list'] steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list'] # common parameter calculate brake_vs_time = self.zip_time_pairs(brakePedal_list) throttle_vs_time = self.zip_time_pairs(throttlePedal_list) steering_vs_time = self.zip_time_pairs(steeringWheel_list) report_dict['commonData'] = { "per": { "name": "脚刹/油门踏板开度(百分比)", "legend": ["刹车踏板开度", "油门踏板开度"], "data": [brake_vs_time, throttle_vs_time] }, "ang": { "name": "方向盘转角(角度°)", "data": steering_vs_time }, # "spe": { # "name": "速度(km/h)", # "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"], # "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] # # }, # "acc": { # "name": "加速度(m/s²)", # "legend": ["横向加速度", "纵向加速度"], # "data": [lat_acc_vs_time, lon_acc_vs_time] # # }, # "dis": { # "name": "前车距离(m)", # "data": distance_vs_time # } } return report_dict def get_eval_data(self): df = self.eval_data return df