123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/03
- @Last Modified: 2023/08/03
- @Summary: Functionality metrics
- """
- import os
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import numpy as np
- import pandas as pd
- from data_info import DataInfoList
- from score_weight import cal_score_with_priority, cal_weight_from_80
- from common import score_grade, string_concatenate, replace_key_with_value
- import matplotlib.pyplot as plt
- threshold = 0.2
- class Efficient(object):
- """
- Class for achieving efficient metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- """
- def __init__(self, data_processed, scoreModel, resultPath):
- self.eval_data = pd.DataFrame()
- self.data_processed = data_processed
- self.scoreModel = scoreModel
- self.resultPath = resultPath
- # self.data = data_processed.obj_data[1]
- self.data = data_processed.ego_df
- self.mileage = data_processed.report_info['mileage']
- self.df = pd.DataFrame()
- self.speed_dict = dict()
- self.config = data_processed.config
- efficient_config = self.config.config['efficient']
- print("efficient_config1 is", efficient_config)
- self.efficient_config = efficient_config
- # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = efficient_config['weightCustom']
- self.metric_list = efficient_config['metric']
- self.type_list = efficient_config['type']
- self.type_name_dict = efficient_config['typeName']
- self.name_dict = efficient_config['name']
- self.unit_dict = efficient_config['unit']
- # custom metric data
- self.customMetricParam = efficient_config['customMetricParam']
- self.custom_metric_list = list(self.customMetricParam.keys())
- # self.custom_data = custom_data
- self.custom_param_dict = {}
- # score data
- self.weight = efficient_config['weightDimension']
- self.weight_type_dict = efficient_config['typeWeight']
- self.weight_type_list = efficient_config['typeWeightList']
- self.weight_dict = efficient_config['weight']
- print("self.weight_dict is", self.weight_dict)
- self.weight_list = efficient_config['weightList']
- self.priority_dict = efficient_config['priority']
- self.priority_list = efficient_config['priorityList']
- self.kind_dict = efficient_config['kind']
- self.optimal_dict = efficient_config['optimal']
- self.multiple_dict = efficient_config['multiple']
- self.kind_list = efficient_config['kindList']
- print("efficient_config['optimalList'] is", efficient_config['optimalList'])
- self.optimal_list = efficient_config['optimalList']
- self.multiple_list = efficient_config['multipleList']
- # metric data
- self.metric_dict = efficient_config['typeMetricDict']
- self.drive_metric_list = self.metric_dict['efficientDrive']
- self.stop_metric_list = self.metric_dict['efficientStop']
- # self.drive_metric_list = ["averageSpeed"]
- # self.stop_metric_list = ["stopDuration", "stopCount"]
- self._get_data()
- self._effi_param_cal()
- self.time_list = self.data['simTime'].values.tolist()
- self.speed_list = list()
- self.value_dict = {}
- self.average_v = 0
- self.stop_count = 0
- self.stop_duration = 0
- self.sum_stop_time = 0
- self.start_stop_time = []
- self.obstacle_stop_count = 0
- self.obstacle_stop_time = 0
- self.obstacle_stop_duration = []
- self.obstacle_start_stop_time = []
- self.pass_junction_time = 0
- self.score = 0
- def _get_data(self):
- """
- Get the data required for efficient evaluation according to efficient_INFO in DataInfoList.
- Parameters:
- df: Dataframe containing the vehicle running data.
- Returns:
- peak_valley: List of indices representing peaks and valleys.
- """
- efficient_info_list = DataInfoList.EFFICIENT_INFO
- self.df = self.data[efficient_info_list].copy()
- def _effi_param_cal(self):
- """
- """
- self.df = self.df[self.df['playerId'] == 1]
- # self.df = self.df[self.df['id'] == 1]
- # self.df['v'] = self.df.apply(lambda row: self.velocity(row['speed_x'], row['speed_y']), axis=1)
- # self.df['v'] = self.df.apply(lambda row: self.velocity(row['speedX'], row['speedY']), axis=1)
- def _cal_max_min_avg(self, num_list):
- maxx = max(num_list) if num_list else "-"
- minn = min(num_list) if num_list else "-"
- avg = sum(num_list) / len(num_list) if num_list else "-"
- result = {
- "max": maxx,
- "min": minn,
- "avg": avg
- }
- return result
- def average_velocity(self):
- """
- 1.速度的平均值
- 2.平均速度 = 里程/时间
- 速度不为0为开始时间,速度为0为结束时间
- """
- self.average_v = self.df['v'].mean()
- self.speed_list = self.df['v'].values.tolist()
- self.speed_dict = self._cal_max_min_avg(self.df['v'].dropna().values.tolist())
- def stop_duration_and_count(self):
- """
- 百公里停车次数
- 平均每次停车时长
- 驾照科目二考试判定为停车的标准:在考试项目区域内,汽车停顿2秒以上都算中途停车。
- """
- stop_v_threshold = 0.01
- stop_time_threshold = 0.5 # 12.5帧
- without_obstacle = self.df[self.df['obstacle'] == 0].copy()
- stop_time_list = without_obstacle[without_obstacle['v'] <= stop_v_threshold]['simTime'].values.tolist()
- # stop_time_list = self.df[self.df['v'] == 0]['simTime'].values.tolist()
- stop_frame_list = without_obstacle[without_obstacle['v'] <= stop_v_threshold]['simFrame'].values.tolist()
- # stop_frame_list = self.df[self.df['v'] == 0]['simFrame'].values.tolist()
- stop_frame_group = []
- stop_time_group = []
- f1 = stop_frame_list[0] if stop_frame_list else 0
- t1 = stop_time_list[0] if stop_time_list else 0
- sum_stop_time = 0
- for i in range(1, len(stop_frame_list)):
- if stop_frame_list[i] - stop_frame_list[i - 1] != 1:
- f2 = stop_frame_list[i - 1]
- # 当帧数大于13时才确定为停车
- # 当时长大于0.5s才确定为停车
- if f2 - f1 >= 13:
- t2 = stop_time_list[i - 1]
- stop_frame_group.append((f1, f2))
- stop_time_group.append((t1, t2))
- self.start_stop_time.append(str("{:.02f}".format(t1)) + 's')
- sum_stop_time += (t2 - t1)
- self.stop_count += 1
- # update f1, t1
- f1 = stop_frame_list[i]
- t1 = stop_time_list[i]
- f2 = stop_frame_list[-1] if stop_frame_list else 0
- # 如果最后一段的帧数差大于13,且停车统计列表中的最后一帧不为用例的最后一帧
- if f2 - f1 >= 13 and f2 != self.df['simFrame'].values.tolist()[-1]:
- t2 = stop_time_list[-1]
- stop_frame_group.append((f1, f2))
- stop_time_group.append((t1, t2))
- self.start_stop_time.append(str(":.02f".format(t1)) + 's')
- sum_stop_time += (t2 - t1)
- self.stop_count += 1
- self.sum_stop_time = sum_stop_time
- self.stop_duration = sum_stop_time / self.stop_count if self.stop_count != 0 else 0
- # self.stop_count = self.stop_count / self.mileage * 1000000
- def obstacle_stop_duration_and_count(self):
- time_list = self.data[(self.data["obstacle"] == 1) & (abs(self.data["lon_v"]) < 0.01)]["simTime"].tolist()
- # 确保时间列表至少有两个元素
- if len(time_list) < 2:
- return [] # 或者可以抛出一个异常,因为没有足够的数据来比较
- # 初始化变量
- start_time = time_list[0] # 当前连续时间段的起始时间
- intervals = [] # 存储超过阈值的时间段信息
- # 遍历时间列表(从第二个元素开始)
- for i in range(1, len(time_list)):
- # 计算当前元素与前一个元素的差值
- difference = time_list[i] - time_list[i - 1]
- # 如果差值超过阈值,则记录前一个时间段的信息
- if difference > threshold:
- # 注意:这里我们返回的是前一个时间段的起始和终止时间(以及它们的差值)
- # 但按照题目要求,我们只返回差值
- # 终止时间是前一个元素,即 time_list[i-1]
- # 起始时间是之前记录的 start_time
- interval_length = time_list[i - 1] - start_time
- intervals.append((start_time, time_list[i - 1], interval_length))
- # 更新起始时间为当前元素,以便开始记录下一个时间段
- start_time = time_list[i]
- # 处理最后一个时间段(如果它存在且没有因为超过阈值而被记录)
- if time_list[-1] != start_time:
- # 注意:这里我们假设列表的最后一个元素之后没有更多的元素来比较
- # 因此,我们总是将最后一个元素视为前一个时间段的终止时间
- interval_length = time_list[-1] - start_time
- intervals.append((start_time, time_list[-1], interval_length))
- self.obstacle_stop_count = len(intervals)
- for start, end, length in intervals:
- self.obstacle_start_stop_time.append(str("{:.02f}".format(start)) + 's')
- self.obstacle_stop_duration.append(length)
- # 如果只需要差值,可以只返回差值的列表
- # return [interval[2] for interval in intervals]
- print("self.obstacle_start_stop_time is", self.obstacle_start_stop_time)
- self.obstacle_stop_time = sum(self.obstacle_stop_duration)/self.obstacle_stop_count if self.obstacle_stop_count > 0 else 0
- def efficient_statistic(self):
- arr_effi = []
- self.average_velocity()
- self.stop_duration_and_count()
- self.obstacle_stop_duration_and_count()
- if "averageSpeed" in self.metric_list:
- average_v = self.average_v
- arr_effi.append(average_v)
- self.value_dict["averageSpeed"] = average_v
- if "stopDuration" in self.metric_list:
- stop_duration = self.stop_duration
- arr_effi.append(stop_duration)
- self.value_dict["stopDuration"] = stop_duration
- if "obstaclestopDuration" in self.metric_list:
- obstacle_stop_duration = self.obstacle_stop_time
- arr_effi.append(obstacle_stop_duration)
- self.value_dict["obstaclestopDuration"] = obstacle_stop_duration
- if "stopCount" in self.metric_list:
- stop_count = self.stop_count
- arr_effi.append(stop_count)
- self.value_dict["stopCount"] = stop_count
- # arr_effi = [average_v, stop_count, stop_time]
- return arr_effi
- def custom_metric_param_parser(self, param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def custom_metric_score(self, metric, value, param_list):
- """
- """
- param = self.custom_metric_param_parser(param_list)
- self.custom_param_dict[metric] = param
- score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
- score_sub = score_model.cal_score()
- score = sum(score_sub) / len(score_sub)
- return score
- def effi_score(self):
- arr_effi = self.efficient_statistic()
- print("\n[高效性表现及得分情况]")
- print("高效性各指标值:", [round(num, 2) for num in arr_effi])
- arr_effi = np.array([arr_effi])
- print("self.optimal_list is", self.optimal_list)
- print("arr_effi is", arr_effi)
- score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_effi)
- score_sub = score_model.cal_score()
- score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
- score_metric = [round(num, 2) for num in score_sub]
- metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
- print("metric_list is", metric_list)
- score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
- print("score_metric_dict is", score_metric_dict)
- # custom_metric_list = list(self.customMetricParam.keys())
- # for metric in custom_metric_list:
- # value = self.custom_data[metric]['value']
- # param_list = self.customMetricParam[metric]
- # score = self.custom_metric_score(metric, value, param_list)
- # score_metric_dict[metric] = round(score, 2)
- #
- # score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- # score_metric = list(score_metric_dict.values())
- score_type_dict = {}
- if self.weight_custom: # 自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- score_type_dict[type] = round(type_score, 2)
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_efficient = sum(score_type_with_weight_dict.values())
- else: # 客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_efficient = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- self.weight_dict = {key: round(value / type_weight, 4) for key, value in self.weight_dict.items() if
- key in self.metric_dict[type]}
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2)
- score_efficient = round(score_efficient, 2)
- print("高效性各指标基准值:", self.optimal_list)
- print(f"高效性得分为:{score_efficient:.2f}分。")
- print(f"高效性各类型得分为:{score_type_dict}。")
- print(f"高效性各指标得分为:{score_metric_dict}。")
- return score_efficient, score_type_dict, score_metric_dict
- def effi_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = "高效性"
- if "efficientDrive" in self.type_list:
- drive_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if
- key in self.drive_metric_list}
- weight_distribution_drive = {
- "driveWeight": f"行驶({self.weight_type_dict['efficientDrive'] * 100:.2f}%)",
- "indexes": drive_weight_indexes_dict
- }
- weight_distribution['efficientDrive'] = weight_distribution_drive
- if "efficientStop" in self.type_list:
- stop_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if
- key in self.stop_metric_list}
- weight_distribution_stop = {
- "stopWeight": f"停车({self.weight_type_dict['efficientStop'] * 100:.2f}%)",
- "indexes": stop_weight_indexes_dict
- }
- weight_distribution['efficientStop'] = weight_distribution_stop
- return weight_distribution
- def _get_weight_distribution(self, dimension):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name[dimension]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def report_statistic(self):
- """
- Returns:
- """
- # report_dict = {
- # "name": "高效性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_efficient,
- # "level": grade_efficient,
- # 'score_type': score_type,
- # 'score_metric': score_metric,
- #
- # "description1": effi_description1,
- # "description2": effi_description2,
- # "description3": effi_description3,
- #
- # "efficientDrive": drive_dict,
- # "efficientStop": stop_dict,
- # }
- # brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
- # throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
- # steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
- #
- # # common parameter calculate
- # brake_vs_time = self.zip_time_pairs(brakePedal_list, 100)
- # throttle_vs_time = self.zip_time_pairs(throttlePedal_list, 100)
- # steering_vs_time = self.zip_time_pairs(steeringWheel_list)
- report_dict = {
- "name": "高效性",
- "weight": f"{self.weight * 100:.2f}%",
- }
- # len_time = len(self.time_list)
- # duration = self.time_list[-1]
- # duration_minute = round(duration / 60, 2)
- score_efficient, score_type_dict, score_metric_dict = self.effi_score()
- print("score_metric_dict is", score_metric_dict)
- # get weight distribution
- # report_dict["weightDistribution"] = self._get_weight_distribution("efficient")
- # score_efficient, score_metric = self.effi_score()
- score_efficient = int(score_efficient) if int(score_efficient) == score_efficient else round(score_efficient, 2)
- grade_efficient = score_grade(score_efficient)
- report_dict["score"] = score_efficient
- report_dict["level"] = grade_efficient
- # for description
- # good_type_list = []
- # bad_type_list = []
- # good_metric_list = []
- # bad_metric_list = []
- # str for description
- # str_over_optimal = ""
- # type_details_dict = {}
- # for type in self.type_list:
- # bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- #
- # type_dict = {
- # "name": type,
- # }
- #
- # builtin_graph_dict = {}
- # custom_graph_dict = {}
- #
- # score_type = score_type_dict[type]
- # grade_type = score_grade(score_type)
- # type_dict["score"] = score_type
- # type_dict["level"] = grade_type
- #
- # type_dict_indexes = {}
- # for metric in self.metric_dict[type]:
- # bad_metric_list.append(metric) if score_metric_dict[metric] < 80 else good_metric_list.append(metric)
- #
- # if metric in self.bulitin_metric_list:
- # # for indexes
- # type_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "score": score_metric_dict[metric],
- # "value": f'{self.value_dict[metric]:.2f}',
- # # "range": f"[0, {self.optimal_dict['averageSpeed']})",
- # # "deviation": f"+{avv_deviation:.2f}%" if avv_deviation > 0 else f"{avv_deviation:.2f}%"
- # }
- #
- # if self.kind_dict[metric] == -1:
- # type_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]"
- # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) /
- # self.optimal_dict[
- # metric]) * 100
- # type_dict_indexes[metric][
- # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"0%"
- #
- # elif self.kind_dict[metric] == 1:
- # type_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)"
- # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) /
- # self.optimal_dict[
- # metric]) * 100
- # type_dict_indexes[metric][
- # "deviation"] = f"0%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # elif self.kind_dict[metric] == 0:
- # value = self.value_dict[metric]
- # minn = self.optimal_dict[metric] * self.multiple_dict[metric][0]
- # maxx = self.optimal_dict[metric] * self.multiple_dict[metric][1]
- #
- # type_dict_indexes[metric]["range"] = f"[{minn}, {maxx}]"
- #
- # if value < minn:
- # metric_over_optimal = (value - minn) / minn * 100
- # elif value > maxx:
- # metric_over_optimal = (value - maxx) / maxx * 100
- # else:
- # metric_over_optimal = 0
- #
- # type_dict_indexes[metric][
- # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # # metric_over_optimal = ((self.value_dict[metric] - self.optimal_dict[metric]) / self.optimal_dict[
- # # metric]) * 100
- # # type_dict_indexes[metric][
- # # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # if score_metric_dict[metric] < 80:
- # str_over_optimal += f'{self.name_dict[metric]}为{round(self.value_dict[metric], 2)}{self.unit_dict[metric]},超过合理范围{metric_over_optimal}%;'
- #
- # else:
- # # for indexes
- # type_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "score": score_metric_dict[metric],
- # "value": f'{self.custom_data[metric]["value"][0]:.2f}',
- # # "range": f"[0, {self.optimal_dict['averageSpeed']})",
- # # "deviation": f"+{avv_deviation}%" if avv_deviation > 0 else f"{avv_deviation}%"
- # }
- #
- # value = self.custom_data[metric]["value"][0]
- # optimal = self.custom_param_dict[metric]['optimal'][0]
- #
- # if self.custom_param_dict[metric]['kind'][0] == -1:
- # type_dict_indexes[metric][
- # "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- #
- # metric_over_optimal = (value - optimal) / optimal * 100
- # type_dict_indexes[metric][
- # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"0%"
- #
- # elif self.custom_param_dict[metric]['kind'][0] == 1:
- # type_dict_indexes[metric][
- # "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- #
- # metric_over_optimal = (value - optimal) / optimal * 100
- # type_dict_indexes[metric][
- # "deviation"] = f"0%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # elif self.custom_param_dict[metric]['kind'][0] == 0:
- # minn = value * self.custom_param_dict[metric]['multiple'][0][0]
- # maxx = value * self.custom_param_dict[metric]['multiple'][0][1]
- #
- # type_dict_indexes[metric]["range"] = f"[{minn}, {maxx}]"
- #
- # if value < minn:
- # metric_over_optimal = (value - minn) / minn * 100
- # elif value > maxx:
- # metric_over_optimal = (value - maxx) / maxx * 100
- # else:
- # metric_over_optimal = 0
- #
- # type_dict_indexes[metric][
- # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # # metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- # # self.custom_data[metric]["value"][0]) /
- # # self.custom_param_dict[metric]['optimal'][0]) * 100
- # # type_dict_indexes[metric][
- # # "deviation"] = f"+{metric_over_optimal:.2f}%" if metric_over_optimal > 0 else f"{metric_over_optimal:.2f}%"
- #
- # if score_metric_dict[metric] < 80:
- # str_over_optimal += f'{self.name_dict[metric]}为{self.custom_data[metric]["value"][0]:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal}%;'
- #
- # custom_graph_dict[metric] = self.custom_data[metric]['reportData']
- #
- # # str_uncomf_over_optimal = str_uncomf_over_optimal[:-1] + ";"
- # type_dict["indexes"] = type_dict_indexes
- # type_dict["builtin"] = builtin_graph_dict
- # type_dict["custom"] = custom_graph_dict
- #
- # type_details_dict[type] = type_dict
- # report_dict["details"] = type_details_dict
- # efficient description
- # str_over_optimal = str_over_optimal[:-1]
- # if grade_efficient == '优秀':
- # effi_description1 = '机器人行驶效率高;'
- # elif grade_efficient == '良好':
- # effi_description1 = '机器人在本轮测试中的表现满足设计指标要求;'
- # elif grade_efficient == '一般':
- # str_ineffi_type = string_concatenate(bad_metric_list)
- # effi_description1 = f'机器人需要在{str_ineffi_type}指标上进一步优化。其中,{str_over_optimal};'
- # elif grade_efficient == '较差':
- # str_ineffi_type = string_concatenate(bad_metric_list)
- # effi_description1 = f'需要提高机器人在{str_ineffi_type}指标上的表现。其中,{str_over_optimal};'
- #
- # if not bad_metric_list:
- # effi_description2 = '高效性在各个指标上的表现俱佳'
- # effi_description3 = "机器人的规划控制能力良好,通行效率高"
- # else:
- # str_effi_type = string_concatenate(good_metric_list)
- # str_ineffi_type = string_concatenate(bad_metric_list)
- #
- # effi_description2 = f"{str_effi_type}指标表现良好,{str_ineffi_type}指标表现不佳。其中,{str_over_optimal}"
- # effi_description3 = "应该优化机器人的规划控制逻辑,提高机器人的通行效率"
- # report_dict["description1"] = replace_key_with_value(effi_description1, self.name_dict)
- # report_dict["description2"] = replace_key_with_value(effi_description2, self.name_dict)
- # report_dict["description3"] = effi_description3
- description = f"· 在高效性方面,得分{score_efficient:.2f}分,表现{grade_efficient}。"
- if self.stop_count > 0:
- description += f"出现{self.stop_count}次无障碍物停止,需重点优化。" if score_efficient < 80 else f"出现{self.stop_count}次无障碍物停止,需注意。"
- else:
- description += f"平均速度{self.speed_dict['avg']:.02f}m/s,未出现无障碍物停止,表现{grade_efficient}。"
- report_dict["description"] = description
- str_temp_stop = "、".join(self.start_stop_time)
- description1 = f"次数:{self.stop_count}次\n"
- if self.stop_count == 0:
- description1 += "无障碍物时没有发生停车行为"
- else:
- description1 += (f"在{str_temp_stop}时刻发生无障碍物停车行为\n"
- f"停止总时长为{self.sum_stop_time:.02f}s,平均停止时长为{self.stop_duration:.02f}s")
- str_temp_obstaclestop = "、".join(self.obstacle_start_stop_time)
- description2 = f"次数:{self.obstacle_stop_count}次\n"
- if self.stop_count == 0:
- description2 += "没有发生遇到障碍物停止情况"
- else:
- description2 += (f"在{str_temp_obstaclestop}时刻发生遇障停止行为\n"
- f"停止总时长为{sum(self.obstacle_stop_duration):.02f}s,平均停止时长为{self.obstacle_stop_time:.02f}s")
- description3 = f"最大值:{self.speed_dict['max']:.4f}m/s;" \
- f"最小值:{self.speed_dict['min']:.4f}m/s;" \
- f"平均值:{self.speed_dict['avg']:.4f}m/s"
- stopDuration_index = {
- "weight": self.weight_dict['stopDuration'],
- "score": score_metric_dict['stopDuration'],
- "description": description1
- }
- obstaclestopDuration_index = {
- "weight": self.weight_dict['obstaclestopDuration'],
- "score": score_metric_dict['obstaclestopDuration'],
- "description": description2
- }
- averageSpeed_index = {
- "weight": self.weight_dict['averageSpeed'],
- "score": score_metric_dict['averageSpeed'],
- "description": description3
- }
- # report_dict["description1"] = description1
- # report_dict["description2"] = description2
- indexes_dict = {
- "efficientDrive": stopDuration_index,
- "averageSpeed": averageSpeed_index,
- "obstaclestopDuration": obstaclestopDuration_index
- }
- report_dict["indexes"] = indexes_dict
- # plt.figure(figsize=(12, 3))
- # plt.plot(self.time_list, self.speed_list, label='Speed')
- #
- # plt.xlabel('Time(s)')
- # plt.ylabel('Speed(m/s)')
- # plt.legend()
- #
- # # 调整布局,消除空白边界
- # plt.tight_layout()
- #
- # plt.savefig(os.path.join(self.resultPath, "Speed.png"))
- # plt.close()
- print(report_dict)
- return report_dict
- def get_eval_data(self):
- df = self.eval_data
- return df
|