#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/03 @Last Modified: 2023/08/03 @Summary: Functionality metrics """ import os import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import numpy as np import pandas as pd from data_info import DataInfoList from score_weight import cal_score_with_priority, cal_weight_from_80 from common import score_grade, string_concatenate, replace_key_with_value import matplotlib.pyplot as plt class Efficient(object): """ Class for achieving efficient metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. """ def __init__(self, data_processed, custom_data, scoreModel, resultPath): self.eval_data = pd.DataFrame() self.data_processed = data_processed self.scoreModel = scoreModel self.resultPath = resultPath self.data = data_processed.obj_data[1] self.mileage = data_processed.report_info['mileage'] self.df = pd.DataFrame() self.speed_dict = dict() self.config = data_processed.config efficient_config = data_processed.efficient_config self.efficient_config = efficient_config # common data self.bulitin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = efficient_config['weightCustom'] self.metric_list = efficient_config['metric'] self.type_list = efficient_config['type'] self.type_name_dict = efficient_config['typeName'] self.name_dict = efficient_config['name'] self.unit_dict = efficient_config['unit'] # custom metric data self.customMetricParam = efficient_config['customMetricParam'] self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_data = custom_data self.custom_param_dict = {} # score data self.weight = efficient_config['weightDimension'] self.weight_type_dict = efficient_config['typeWeight'] self.weight_type_list = efficient_config['typeWeightList'] self.weight_dict = efficient_config['weight'] self.weight_list = efficient_config['weightList'] self.priority_dict = efficient_config['priority'] self.priority_list = efficient_config['priorityList'] self.kind_dict = efficient_config['kind'] self.optimal_dict = efficient_config['optimal'] self.multiple_dict = efficient_config['multiple'] self.kind_list = efficient_config['kindList'] self.optimal_list = efficient_config['optimalList'] self.multiple_list = efficient_config['multipleList'] # metric data self.metric_dict = efficient_config['typeMetricDict'] self.drive_metric_list = self.metric_dict['efficientDrive'] self.stop_metric_list = self.metric_dict['efficientStop'] # self.drive_metric_list = ["averageSpeed"] # self.stop_metric_list = ["stopDuration", "stopCount"] self._get_data() self._effi_param_cal() self.time_list = self.data['simTime'].values.tolist() self.speed_list = list() self.value_dict = {} self.average_v = 0 self.stop_count = 0 self.stop_duration = 0 self.sum_stop_time = 0 self.pass_junction_time = 0 self.score = 0 def _get_data(self): """ Get the data required for efficient evaluation according to efficient_INFO in DataInfoList. Parameters: df: Dataframe containing the vehicle running data. Returns: peak_valley: List of indices representing peaks and valleys. """ efficient_info_list = DataInfoList.EFFICIENT_INFO self.df = self.data[efficient_info_list].copy() def _effi_param_cal(self): """ """ self.df = self.df[self.df['playerId'] == 1] # self.df = self.df[self.df['id'] == 1] # self.df['v'] = self.df.apply(lambda row: self.velocity(row['speed_x'], row['speed_y']), axis=1) # self.df['v'] = self.df.apply(lambda row: self.velocity(row['speedX'], row['speedY']), axis=1) def _cal_max_min_avg(self, num_list): maxx = max(num_list) if num_list else "-" minn = min(num_list) if num_list else "-" avg = sum(num_list) / len(num_list) if num_list else "-" result = { "max": maxx, "min": minn, "avg": avg } return result def average_velocity(self): """ 1.速度的平均值 2.平均速度 = 里程/时间 速度不为0为开始时间,速度为0为结束时间 """ self.average_v = self.df['v'].mean() self.speed_list = self.df['v'].values.tolist() self.speed_dict = self._cal_max_min_avg(self.df['v'].dropna().values.tolist()) def stop_duration_and_count(self): """ 百公里停车次数 平均每次停车时长 驾照科目二考试判定为停车的标准:在考试项目区域内,汽车停顿2秒以上都算中途停车。 """ stop_v_threshold = 0.01 stop_time_threshold = 0.5 # 12.5帧 without_obstacle = self.df[self.df['obstacle'] == 0].copy() stop_time_list = without_obstacle[without_obstacle['v'] <= stop_v_threshold]['simTime'].values.tolist() # stop_time_list = self.df[self.df['v'] == 0]['simTime'].values.tolist() stop_frame_list = without_obstacle[without_obstacle['v'] <= stop_v_threshold]['simFrame'].values.tolist() # stop_frame_list = self.df[self.df['v'] == 0]['simFrame'].values.tolist() stop_frame_group = [] stop_time_group = [] f1 = stop_frame_list[0] if stop_frame_list else 0 t1 = stop_time_list[0] if stop_time_list else 0 sum_stop_time = 0 for i in range(1, len(stop_frame_list)): if stop_frame_list[i] - stop_frame_list[i - 1] != 1: f2 = stop_frame_list[i - 1] # 当帧数大于13时才确定为停车 # 当时长大于0.5s才确定为停车 if f2 - f1 >= 13: t2 = stop_time_list[i - 1] stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # update f1, t1 f1 = stop_frame_list[i] t1 = stop_time_list[i] f2 = stop_frame_list[-1] if stop_frame_list else 0 # 如果最后一段的帧数差大于13,且停车统计列表中的最后一帧不为用例的最后一帧 if f2 - f1 >= 13 and f2 != self.df['simFrame'].values.tolist()[-1]: t2 = stop_time_list[-1] stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 self.sum_stop_time = sum_stop_time self.stop_duration = sum_stop_time / self.stop_count if self.stop_count != 0 else 0 # self.stop_count = self.stop_count / self.mileage * 1000000 def efficient_statistic(self): arr_effi = [] self.average_velocity() self.stop_duration_and_count() if "averageSpeed" in self.metric_list: average_v = self.average_v arr_effi.append(average_v) self.value_dict["averageSpeed"] = average_v if "stopDuration" in self.metric_list: stop_duration = self.stop_duration arr_effi.append(stop_duration) self.value_dict["stopDuration"] = stop_duration if "stopCount" in self.metric_list: stop_count = self.stop_count arr_effi.append(stop_count) self.value_dict["stopCount"] = stop_count # arr_effi = [average_v, stop_count, stop_time] return arr_effi def custom_metric_param_parser(self, param_list): """ param_dict = { "paramA" [ { "kind": "-1", "optimal": "1", "multiple": ["0.5","5"], "spare1": null, "spare2": null } ] } """ kind_list = [] optimal_list = [] multiple_list = [] spare_list = [] # spare1_list = [] # spare2_list = [] for i in range(len(param_list)): kind_list.append(int(param_list[i]['kind'])) optimal_list.append(float(param_list[i]['optimal'])) multiple_list.append([float(x) for x in param_list[i]['multiple']]) spare_list.append([item["param"] for item in param_list[i]["spare"]]) # spare1_list.append(param_list[i]['spare1']) # spare2_list.append(param_list[i]['spare2']) result = { "kind": kind_list, "optimal": optimal_list, "multiple": multiple_list, "spare": spare_list, # "spare1": spare1_list, # "spare2": spare2_list } return result def custom_metric_score(self, metric, value, param_list): """ """ param = self.custom_metric_param_parser(param_list) self.custom_param_dict[metric] = param score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value])) score_sub = score_model.cal_score() score = sum(score_sub) / len(score_sub) return score def effi_score(self): arr_effi = self.efficient_statistic() print("\n[高效性表现及得分情况]") print("高效性各指标值:", [round(num, 2) for num in arr_effi]) arr_effi = np.array([arr_effi]) score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_effi) score_sub = score_model.cal_score() score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub)) score_metric = [round(num, 2) for num in score_sub] metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList] score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)} custom_metric_list = list(self.customMetricParam.keys()) for metric in custom_metric_list: value = self.custom_data[metric]['value'] param_list = self.customMetricParam[metric] score = self.custom_metric_score(metric, value, param_list) score_metric_dict[metric] = round(score, 2) score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) score_type_dict = {} if self.weight_custom: # 自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_efficient = sum(score_type_with_weight_dict.values()) else: # 客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_efficient = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) self.weight_dict = {key: round(value / type_weight, 4) for key, value in self.weight_dict.items() if key in self.metric_dict[type]} type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) score_efficient = round(score_efficient, 2) print("高效性各指标基准值:", self.optimal_list) print(f"高效性得分为:{score_efficient:.2f}分。") print(f"高效性各类型得分为:{score_type_dict}。") print(f"高效性各指标得分为:{score_metric_dict}。") return score_efficient, score_type_dict, score_metric_dict def effi_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = "高效性" if "efficientDrive" in self.type_list: drive_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.drive_metric_list} weight_distribution_drive = { "driveWeight": f"行驶({self.weight_type_dict['efficientDrive'] * 100:.2f}%)", "indexes": drive_weight_indexes_dict } weight_distribution['efficientDrive'] = weight_distribution_drive if "efficientStop" in self.type_list: stop_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.stop_metric_list} weight_distribution_stop = { "stopWeight": f"停车({self.weight_type_dict['efficientStop'] * 100:.2f}%)", "indexes": stop_weight_indexes_dict } weight_distribution['efficientStop'] = weight_distribution_stop return weight_distribution def _get_weight_distribution(self, dimension): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name[dimension] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def report_statistic(self): """ Returns: """ # report_dict = { # "name": "高效性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": score_efficient, # "level": grade_efficient, # 'score_type': score_type, # 'score_metric': score_metric, # # "description1": effi_description1, # "description2": effi_description2, # "description3": effi_description3, # # "efficientDrive": drive_dict, # "efficientStop": stop_dict, # } # brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list'] # throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list'] # steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list'] # # # common parameter calculate # brake_vs_time = self.zip_time_pairs(brakePedal_list, 100) # throttle_vs_time = self.zip_time_pairs(throttlePedal_list, 100) # steering_vs_time = self.zip_time_pairs(steeringWheel_list) report_dict = { "name": "高效性", # "weight": f"{self.weight * 100:.2f}%", } # len_time = len(self.time_list) # duration = self.time_list[-1] # duration_minute = round(duration / 60, 2) score_efficient, score_type_dict, score_metric_dict = self.effi_score() # get weight distribution # report_dict["weightDistribution"] = self._get_weight_distribution("efficient") # score_efficient, score_metric = self.effi_score() score_efficient = int(score_efficient) if int(score_efficient) == score_efficient else round(score_efficient, 2) grade_efficient = score_grade(score_efficient) report_dict["score"] = score_efficient report_dict["level"] = grade_efficient # for description # good_type_list = [] # bad_type_list = [] # good_metric_list = [] # bad_metric_list = [] # str for description # str_over_optimal = "" # type_details_dict = {} # for type in self.type_list: # bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) # # type_dict = { # "name": type, # } # # builtin_graph_dict = {} # custom_graph_dict = {} # # score_type = score_type_dict[type] # grade_type = score_grade(score_type) # type_dict["score"] = score_type # type_dict["level"] = grade_type # # type_dict_indexes = {} # for metric in self.metric_dict[type]: # bad_metric_list.append(metric) if score_metric_dict[metric] < 80 else good_metric_list.append(metric) # # if metric in self.bulitin_metric_list: # # for indexes # type_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", # "score": score_metric_dict[metric], # "value": f'{self.value_dict[metric]:.2f}', # # "range": f"[0, {self.optimal_dict['averageSpeed']})", # # "deviation": f"+{avv_deviation:.2f}%" if avv_deviation > 0 else f"{avv_deviation:.2f}%" # } # # if self.kind_dict[metric] == -1: # type_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]" # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) / # self.optimal_dict[ # metric]) * 100 # type_dict_indexes[metric][ # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"0%" # # elif self.kind_dict[metric] == 1: # type_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)" # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) / # self.optimal_dict[ # metric]) * 100 # type_dict_indexes[metric][ # "deviation"] = f"0%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # elif self.kind_dict[metric] == 0: # value = self.value_dict[metric] # minn = self.optimal_dict[metric] * self.multiple_dict[metric][0] # maxx = self.optimal_dict[metric] * self.multiple_dict[metric][1] # # type_dict_indexes[metric]["range"] = f"[{minn}, {maxx}]" # # if value < minn: # metric_over_optimal = (value - minn) / minn * 100 # elif value > maxx: # metric_over_optimal = (value - maxx) / maxx * 100 # else: # metric_over_optimal = 0 # # type_dict_indexes[metric][ # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) / self.optimal_dict[ # # metric]) * 100 # # type_dict_indexes[metric][ # # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # if score_metric_dict[metric] < 80: # str_over_optimal += f'{self.name_dict[metric]}为{round(self.value_dict[metric], 2)}{self.unit_dict[metric]},超过合理范围{metric_over_optimal}%;' # # else: # # for indexes # type_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", # "score": score_metric_dict[metric], # "value": f'{self.custom_data[metric]["value"][0]:.2f}', # # "range": f"[0, {self.optimal_dict['averageSpeed']})", # # "deviation": f"+{avv_deviation}%" if avv_deviation > 0 else f"{avv_deviation}%" # } # # value = self.custom_data[metric]["value"][0] # optimal = self.custom_param_dict[metric]['optimal'][0] # # if self.custom_param_dict[metric]['kind'][0] == -1: # type_dict_indexes[metric][ # "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" # # metric_over_optimal = (value - optimal) / optimal * 100 # type_dict_indexes[metric][ # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"0%" # # elif self.custom_param_dict[metric]['kind'][0] == 1: # type_dict_indexes[metric][ # "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" # # metric_over_optimal = (value - optimal) / optimal * 100 # type_dict_indexes[metric][ # "deviation"] = f"0%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # elif self.custom_param_dict[metric]['kind'][0] == 0: # minn = value * self.custom_param_dict[metric]['multiple'][0][0] # maxx = value * self.custom_param_dict[metric]['multiple'][0][1] # # type_dict_indexes[metric]["range"] = f"[{minn}, {maxx}]" # # if value < minn: # metric_over_optimal = (value - minn) / minn * 100 # elif value > maxx: # metric_over_optimal = (value - maxx) / maxx * 100 # else: # metric_over_optimal = 0 # # type_dict_indexes[metric][ # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # # metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - # # self.custom_data[metric]["value"][0]) / # # self.custom_param_dict[metric]['optimal'][0]) * 100 # # type_dict_indexes[metric][ # # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%" # # if score_metric_dict[metric] < 80: # str_over_optimal += f'{self.name_dict[metric]}为{self.custom_data[metric]["value"][0]:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal}%;' # # custom_graph_dict[metric] = self.custom_data[metric]['reportData'] # # # str_uncomf_over_optimal = str_uncomf_over_optimal[:-1] + ";" # type_dict["indexes"] = type_dict_indexes # type_dict["builtin"] = builtin_graph_dict # type_dict["custom"] = custom_graph_dict # # type_details_dict[type] = type_dict # report_dict["details"] = type_details_dict # efficient description # str_over_optimal = str_over_optimal[:-1] # if grade_efficient == '优秀': # effi_description1 = '机器人行驶效率高;' # elif grade_efficient == '良好': # effi_description1 = '机器人在本轮测试中的表现满足设计指标要求;' # elif grade_efficient == '一般': # str_ineffi_type = string_concatenate(bad_metric_list) # effi_description1 = f'机器人需要在{str_ineffi_type}指标上进一步优化。其中,{str_over_optimal};' # elif grade_efficient == '较差': # str_ineffi_type = string_concatenate(bad_metric_list) # effi_description1 = f'需要提高机器人在{str_ineffi_type}指标上的表现。其中,{str_over_optimal};' # # if not bad_metric_list: # effi_description2 = '高效性在各个指标上的表现俱佳' # effi_description3 = "机器人的规划控制能力良好,通行效率高" # else: # str_effi_type = string_concatenate(good_metric_list) # str_ineffi_type = string_concatenate(bad_metric_list) # # effi_description2 = f"{str_effi_type}指标表现良好,{str_ineffi_type}指标表现不佳。其中,{str_over_optimal}" # effi_description3 = "应该优化机器人的规划控制逻辑,提高机器人的通行效率" # report_dict["description1"] = replace_key_with_value(effi_description1, self.name_dict) # report_dict["description2"] = replace_key_with_value(effi_description2, self.name_dict) # report_dict["description3"] = effi_description3 description = "· 在高效性方面," if self.stop_count > 0: description += f"出现{self.stop_count}次无障碍物停止,需重点优化。" else: description += f"平均速度{self.speed_dict['avg']:.4f}m/s,未出现无障碍物停止,表现{grade_efficient}。" report_dict["description"] = description description1 = f"次数:{self.stop_count}次;" \ f"总时长:{self.sum_stop_time:.4f}s;" \ f"平均时长:{self.stop_duration:.4f}s" description2 = f"最大值:{self.speed_dict['max']:.4f}m/s;" \ f"最小值:{self.speed_dict['min']:.4f}m/s;" \ f"平均值:{self.speed_dict['avg']:.4f}m/s" report_dict["description1"] = description1 report_dict["description2"] = description2 plt.figure(figsize=(12, 3)) plt.plot(self.time_list, self.speed_list, label='Speed') plt.xlabel('Time(s)') plt.ylabel('Speed(m/s)') plt.legend() # 调整布局,消除空白边界 plt.tight_layout() plt.savefig(os.path.join(self.resultPath, "Speed.png")) plt.close() print(report_dict) return report_dict def get_eval_data(self): df = self.eval_data return df