#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/03 @Last Modified: 2023/08/03 @Summary: Functionality metrics """ import math import numpy as np import pandas as pd from scipy.spatial import KDTree from score_weight import cal_score_with_priority, cal_weight_from_80 from common import score_grade, string_concatenate, replace_key_with_value, _cal_max_min_avg class Accurate(object): """ Class for achieving accurate metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. """ def __init__(self, data_processed, scoreModel, resultPath): # self.eval_data = pd.DataFrame() # self.data_processed = data_processed self.scoreModel = scoreModel self.resultPath = resultPath self.df = data_processed.ego_df self.df_trajectory = data_processed.trajectory_df # 读取轨迹的数据 self.config = data_processed.config accurate_config = data_processed.accurate_config self.accurate_config = accurate_config # common data self.builtin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = accurate_config['weightCustom'] self.metric_list = accurate_config['metric'] self.type_list = accurate_config['type'] self.type_name_dict = accurate_config['typeName'] self.name_dict = accurate_config['name'] self.unit_dict = accurate_config['unit'] # custom metric data self.customMetricParam = accurate_config['customMetricParam'] self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_data = {} self.custom_param_dict = {} # score data self.weight = accurate_config['weightDimension'] self.weight_type_dict = accurate_config['typeWeight'] self.weight_type_list = accurate_config['typeWeightList'] self.weight_dict = accurate_config['weight'] self.weight_list = accurate_config['weightList'] self.priority_dict = accurate_config['priority'] self.priority_list = accurate_config['priorityList'] self.kind_dict = accurate_config['kind'] self.optimal_dict = accurate_config['optimal'] self.multiple_dict = accurate_config['multiple'] self.kind_list = accurate_config['kindList'] self.optimal_list = accurate_config['optimalList'] self.multiple_list = accurate_config['multipleList'] # metric data self.metric_dict = accurate_config['typeMetricDict'] # self.drive_metric_list = self.metric_dict['accurateDrive'] # self.stop_metric_list = self.metric_dict['accurateStop'] # self.drive_metric_list = ["averageSpeed"] # self.stop_metric_list = ["stopDuration", "stopCount"] # metric value self.positionError_list = [] self.executeAccurateError_list = [] self.positionError_dict = {} self.positionError_sum = 0 # self.executeAccurateError_count = 0 self.positionError = 0 self.executeAccurateError = 0 def _accurate_metric_cal(self): self._positionError_cal() self._executeAccurateError_cal() def _executeAccurateError_cal(self): # 用于记录段数的变量 error_segment_count = 0 # 标记当前是否在目标数字的段中 in_segment = False targets = [21201000300, 21201000001, 21201000002, 21202000000, 21203000200, 21203000300] task_error_code_list = self.df.task_error_code.tolist() for target in targets: for number in task_error_code_list: # 如果当前数字是目标数字,并且我们之前不在段中 if number == target and not in_segment: # 开始一个新的段 error_segment_count += 1 in_segment = True # 如果当前数字不是目标数字,并且我们之前在段中 elif number != target and in_segment: # 结束当前的段 in_segment = False # 注意:如果列表以目标数字结束,并且没有额外的非目标数字来结束段, # 则上面的循环将不会将最后一个段计数。我们需要在这里检查它。 if task_error_code_list and (task_error_code_list[-1] == target) and in_segment: error_segment_count += 1 self.executeAccurateError = error_segment_count def _positionError_cal(self): self.df_trajectory['ego_pos'] = self.df_trajectory.apply(lambda row: (row['ego_posX'], row['ego_posY']), axis=1).tolist() self.df_trajectory['target_pos'] = self.df_trajectory.apply(lambda row: (row['TargetX'], row['TargetY']), axis=1).tolist() print("self.df_trajectory['target_pos'] is", type(self.df_trajectory['target_pos'].tolist())) ego_pos = np.array(self.df_trajectory['ego_pos'].tolist()) target_pos = KDTree(np.array(self.df_trajectory['target_pos'].tolist())) print("target_pos is", target_pos) min_distances = target_pos.query(ego_pos, k=1)[0].ravel() print("min_distances is", min_distances) # 初始化存储最小距离的列表 min_distance = min_distances # 遍历第一组坐标 # for P in self.df_trajectory['ego_pos']: # min_dist = float('inf') # 初始化为正无穷大 # # 遍历第二组坐标 # for Q in self.df_trajectory['target_pos']: # # 计算两点间的距离 # dist = math.sqrt((P[0] - Q[0]) ** 2 + (P[1] - Q[1]) ** 2) # # 更新最小距离 # if dist < min_dist: # min_dist = dist # # 存储最小距离 # min_distances.append(min_dist) # print("min_distances is", min_distances) self.positionError_list = min_distance self.positionError = np.std(np.array(min_distance)) def _accurate_statistic(self): """ """ self._accurate_metric_cal() self.positionError_dict = _cal_max_min_avg(self.positionError_list) if len(self.positionError_list)>0 else {} arr_accurate = [[self.positionError, self.executeAccurateError]] return arr_accurate def _score_cal(self): """ """ arr_accurate = self._accurate_statistic() print("\n[准确性表现及得分情况]") print("准确性各指标值:", [[round(num, 2) for num in row] for row in arr_accurate]) arr_accurate = np.array(arr_accurate) score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_accurate) score_sub = score_model.cal_score() score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub)) score_metric = [round(num, 2) for num in score_sub] metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList] score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)} score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) score_type_dict = {} if self.weight_custom: # 自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_accurate = sum(score_type_with_weight_dict.values()) else: # 客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_accurate = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) self.weight_dict = {key: round(value / type_weight, 4) for key, value in self.weight_dict.items() if key in self.metric_dict[type]} type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) score_accurate = round(score_accurate, 2) print("准确性各指标基准值:", self.optimal_list) print(f"准确性得分为:{score_accurate:.2f}分。") print(f"准确性各类型得分为:{score_type_dict}。") print(f"准确性各指标得分为:{score_metric_dict}。") return score_accurate, score_type_dict, score_metric_dict def report_statistic(self): """ Returns: """ report_dict = { "name": "准确性", "weight": f"{self.weight * 100:.2f}%", } score_accurate, score_type_dict, score_metric_dict = self._score_cal() # score_accurate, score_metric = self.effi_score() score_accurate = int(score_accurate) if int(score_accurate) == score_accurate else round(score_accurate, 2) grade_accurate = score_grade(score_accurate) report_dict["score"] = score_accurate report_dict["level"] = grade_accurate description = f"· 在准确性方面,得分{score_accurate}分,表现{grade_accurate}," is_good = True if self.positionError_sum > 1: is_good = False description += f"行驶过程中,位置偏移总误差为{self.positionError_sum}米,需重点优化。" if self.executeAccurateError > 0: is_good = False description += f"出现{self.executeAccurateError}次任务执行状态错误,需重点优化。" if is_good: description += f"行驶准确且任务执行准确,算法表现优秀。" report_dict["description"] = description description1 = f"最大值:{self.positionError_dict['max']}m;" \ f"最小值:{self.positionError_dict['min']}m;" \ f"平均值:{self.positionError_dict['avg']}m" if self.positionError_dict else "位置偏移无误差" description2 = f"次数:{self.executeAccurateError}次" positionError_index = { "weight": self.weight_dict['positionError'], "score": score_metric_dict['positionError'], "description": description1 } executeAccurateError_index = { "weight": self.weight_dict['executeAccurateError'], "score": score_metric_dict['executeAccurateError'], "description": description2 } indexes_dict = { "positionError": positionError_index, "executeAccurateError": executeAccurateError_index } report_dict["indexes"] = indexes_dict print(report_dict) return report_dict