#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/03 @Last Modified: 2023/08/03 @Summary: Function metrics """ import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import math import numpy as np import pandas as pd import scipy.signal as sg from score_weight import cal_score_with_priority, cal_weight_from_80 from common import score_grade, string_concatenate, replace_key_with_value, score_over_100 class Function(object): """ Class for achieving function metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. """ def __init__(self, data_processed, custom_data, scoreModel): self.eval_data = pd.DataFrame() self.data_processed = data_processed self.scoreModel = scoreModel self.status_trigger_dict = data_processed.status_trigger_dict self.df = data_processed.object_df self.ego_df = data_processed.ego_data self.obj_id_list = data_processed.obj_id_list self.df_roadmark = data_processed.road_mark_df self.df_roadpos = data_processed.road_pos_df self.df_drivectrl = data_processed.driver_ctrl_df self.unfunc_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unfunc_follow_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unfunc_lane_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unfunc_custom_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.custom_markline_list = [] # config infos for calculating score self.config = data_processed.config function_config = self.config.config['function'] self.function_config = function_config # common data self.bulitin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = function_config['weightCustom'] self.metric_list = function_config['metric'] self.type_list = function_config['type'] self.type_name_dict = function_config['typeName'] self.name_dict = function_config['name'] self.unit_dict = function_config['unit'] # custom metric data self.customMetricParam = function_config['customMetricParam'] self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_data = custom_data self.custom_param_dict = {} # score data self.weight = function_config['weightDimension'] self.weight_type_dict = function_config['typeWeight'] self.weight_type_list = function_config['typeWeightList'] self.weight_dict = function_config['weight'] self.weight_list = function_config['weightList'] self.priority_dict = function_config['priority'] self.priority_list = function_config['priorityList'] self.kind_dict = function_config['kind'] self.optimal_dict = function_config['optimal'] self.multiple_dict = function_config['multiple'] self.kind_list = function_config['kindList'] self.optimal_list = function_config['optimalList'] self.multiple_list = function_config['multipleList'] # self.unit_dict = function_config['unit'] self.metric_dict = function_config['typeMetricDict'] if "functionACC" in self.metric_dict.keys(): self.acc_metric_list = self.metric_dict['functionACC'] if "functionLKA" in self.metric_dict.keys(): self.lka_metric_list = self.metric_dict['functionLKA'] # self.acc_metric_list = ['followSpeedDeviation', 'followDistanceDeviation', 'followStopDistance', # 'followResponseTime'] # self.lka_metric_list = ['laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation', # 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency', # 'centerDistanceRange'] # custom metric self.flag_type_dict = {} # lists of drving control info self.time_list = data_processed.driver_ctrl_data['time_list'] self.frame_list = data_processed.driver_ctrl_data['frame_list'] self.time_list_follow = [] self.frame_list_follow = [] self.dist_list = [] self.v_deviation_list = [] self.v_relative_list = [] self.dist_deviation_list = [] self.stop_distance_list = [] self.follow_stop_time_start_list = [] # 起停跟车响应时长 self.dist_list_full_time = list() self.dist_deviation_list_full_time = list() self.line_dist = [] self.center_dist = [] self.center_dist_with_nan = [] self.center_time_list = [] self.center_frame_list = [] self.ICA_FLAG = False self.follow_stop_count = 0 self.result = {} def _following(self, df): """ 稳定跟车行驶: 1.稳态控制速度偏差:筛选出跟车行驶数据后, 2.稳态控制距离偏差 ACC stastus 0x7: Stand-wait State 0x6: Stand-active State 0x5: Passive State 0x4: Standby State 0x3: Shut-off State 0x2: Override State 0x1: Active State 0x0: Off # 速度变化不能以0到非0为参考,要改为从0到0.05的变化,有一个阈值滤波,避免前车车辆抖动误差影响传感器 弯道行驶: 最小跟随弯道半径 """ # df = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下 # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下 col_list = ['simTime', 'simFrame', 'playerId', 'v', 'posX', 'posY'] # target_id df = df[col_list].copy() ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 筛选目标车(同一车道内,距离最近的前车) # obj_df = df[df['playerId'] == df['target_id']] target_id = 2 obj_df = df[df['playerId'] == target_id][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 目标车 obj_df = obj_df.rename(columns={'v': 'v_obj', 'posX': 'posX_obj', 'posY': 'posY_obj'}) df_merge = pd.merge(ego_df, obj_df, on=['simTime', 'simFrame'], how='left') df_merge['v_relative'] = df_merge['v'] - df_merge['v_obj'] df_merge['dist'] = df_merge.apply( lambda row: self.dist(row['posX'], row['posY'], row['posX_obj'], row['posY_obj']), axis=1) self.dist_list = df_merge['dist'].values.tolist() df_merge['time_gap'] = df_merge['dist'] / df_merge['v'] SAFE_TIME_GAP = 3 df_merge['dist_deviation'] = df_merge['time_gap'].apply( lambda x: 0 if (x >= SAFE_TIME_GAP) else (SAFE_TIME_GAP - x)) df_stop = df_merge[(df_merge['v'] == 0) & (df_merge['v_obj'] == 0)] stop_distance_list = df_stop['dist'].values.tolist() self.stop_distance_list = stop_distance_list t_list = df_stop['simTime'].values.tolist() stop_group = [] sub_group = [] CONTINUOUS_TIME_PERIOD = 1 CONTINUOUS_FRAME_PERIOD = 13 for i in range(len(t_list)): if not sub_group or t_list[i] - t_list[i - 1] <= CONTINUOUS_TIME_PERIOD: sub_group.append(t_list[i]) else: stop_group.append(sub_group) sub_group = [t_list[i]] stop_group.append(sub_group) stop_group = [g for g in stop_group if len(g) >= CONTINUOUS_FRAME_PERIOD] self.follow_stop_count = len(stop_group) # solution 1: 跟车状态下,算法输出预设跟车速度 # df['velocity_deviation'] = abs(df['v'] - df['set_velocity']) # max_velocity_deviation = df['velocity_deviation'].max() # solution 2: 跟车状态下,跟车设定速度为目标车速度 # velocity_deviation = [] # distance_deviation = [] # stop_distance = [] # # for f, data in df.groupby('simFrame'): # ego_data = data.iloc[0].copy() # # df.loc[len(df)] = ego_data # # if len(data) < 2: # continue # v1 = ego_data['v'] # x1 = ego_data['posX'] # y1 = ego_data['posY'] # # for i in range(1, len(data)): # obj_data = data.iloc[i].copy() # # v2 = obj_data['v'] # x2 = obj_data['posX'] # y2 = obj_data['posY'] # # v_delta = abs(v1 - v2) # velocity_deviation.append(v_delta) # # dist = self.dist(x1, y1, x2, y2) # distance_deviation.append(dist) # # if v2 == 0 and v1 == 0: # stop_distance.append(dist) # df.loc[len(df)] = obj_data # self.df = df df_merge.replace([np.inf, -np.inf], np.nan, inplace=True) # 异常值处理 self.time_list_follow = df_merge['simTime'].values.tolist() self.frame_list_follow = df_merge['simFrame'].values.tolist() self.v_relative_list = df_merge['v_relative'].values.tolist() self.v_deviation_list = abs(df_merge['v_relative']).values.tolist() self.dist_deviation_list = df_merge['dist_deviation'].values.tolist() tmp_df = self.ego_df[['simTime', 'simFrame']].copy() v_rel_df = df_merge[['simTime', 'v_relative']].copy() df_merged1 = pd.merge(tmp_df, v_rel_df, on='simTime', how='left') self.v_relative_list_full_time = df_merged1['v_relative'].values.tolist() dist_deviation_df = df_merge[['simTime', 'dist_deviation']].copy() df_merged1 = pd.merge(tmp_df, dist_deviation_df, on='simTime', how='left') self.dist_deviation_list_full_time = df_merged1['dist_deviation'].values.tolist() dist_df = df_merge[['simTime', 'dist']].copy() df_merged1 = pd.merge(tmp_df, dist_df, on='simTime', how='left') self.dist_list_full_time = df_merged1['dist'].values.tolist() max_velocity_deviation = abs(df_merge['v_relative']).max() distance_deviation = df_merge['dist_deviation'].max() min_stop_distance = min(self.stop_distance_list) if self.stop_distance_list else 9999 self.result['followSpeedDeviation'] = max_velocity_deviation self.result['followDistanceDeviation'] = distance_deviation self.result['followStopDistance'] = min_stop_distance def _stop_and_go(self, df): """ 停走功能: 1.跟停距离3-4m, 2.启动跟车响应时间<=1.2s 3.停车跟车响应时间<=1.2s decisionType_target 0 无障碍物巡航 1 停车减速让行 2 跟车 3 绕行 4 到达终点? """ # df = self.df[self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下 # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下 df = df.dropna(subset=['type']) STOP_SPEED_THRESHOLD = 0.05 df['v'] = df['v'].apply(lambda x: 0 if x <= STOP_SPEED_THRESHOLD else 1) # 区分速度为0或非0 target_id = 2 df_ego = df[df['playerId'] == 1].copy() df_obj = df[df['playerId'] == target_id].copy() # 目标车 df_obj_time = df_obj['simTime'].values.tolist() df_ego = df_ego[df_ego['simTime'].isin(df_obj_time)].copy() df_ego = df_ego.drop_duplicates(["simTime", "simFrame"]) df_obj = df_obj.drop_duplicates(["simTime", "simFrame"]) df_ego['v_diff'] = df_ego['v'].diff() df_ego['v_start_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == 1 else 0) # 起步即为1 df_ego['v_stop_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == -1 else 0) # 停车即为-1 df_obj['v_diff'] = df_obj['v'].diff() obj_v_start_flag = df_obj['v_diff'].apply(lambda x: 1 if x == 1 else 0).values # 起步即为1 obj_v_stop_flag = df_obj['v_diff'].apply(lambda x: 1 if x == -1 else 0).values # 停车即为1 df_ego['obj_v_start_flag'] = obj_v_start_flag df_ego['obj_v_stop_flag'] = obj_v_stop_flag df_ego['flag_start'] = df_ego['obj_v_start_flag'] - df_ego['v_start_flag'] # 目标车起步即为1,自车起步即为-1 df_ego['flag_stop'] = df_ego['obj_v_stop_flag'] - df_ego['v_stop_flag'] # 目标车停车即为1,自车停车即为-1 flag_start_list = df_ego['flag_start'].values flag_stop_list = df_ego['flag_stop'].values time_list = df_ego['simTime'].values time_start_list = [] time_stop_list = [] for i, flag in enumerate(flag_start_list): if flag: t1 = time_list[i] if flag == -1: t2 = time_list[i] time_start_list.append(t2 - t1) # t2-t1即为自车起步响应时间 for i, flag in enumerate(flag_stop_list): if flag: t1 = time_list[i] if flag == -1: t2 = time_list[i] time_stop_list.append(t2 - t1) # t2-t1即为自车停车响应时间 time_start_list = [i for i in time_start_list if i != 0] self.result['followResponseTime'] = max(time_start_list) if time_start_list else 0 self.follow_stop_time_start_list = time_start_list # self.result['跟车停止响应最长时间'] = max(time_stop_list) def dist(self, x1, y1, x2, y2): dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dis def _line_dist_merge(self): # line_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy() # line_dist_group = line_dist_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()).values.tolist() # line_dist_df['line_dist'] = line_dist_group # line_dist_df = line_dist_df[['simFrame', 'line_dist']] # line_dist_df.columns = ['simFrame', 'line_dist'] # line_dist_df = line_dist_df.drop_duplicates() line_dist_df = self.df_roadmark[(self.df_roadmark['id'] == 0) | (self.df_roadmark['id'] == 2)].copy() # df = line_dist_df[line_dist_df['id'] == 2][['simTime', 'simFrame']].copy() # df['line_dist'] = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).values.tolist() if line_dist_df.empty: self.ego_df['line_dist'] = pd.Series(dtype=float) else: df = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).reset_index() # df = df.rename(columns={'lateralDist': 'line_dist'}) df.columns = ["simFrame", "line_dist"] self.ego_df = pd.merge(self.ego_df, df, on='simFrame', how='left') def _lane_transverse_distance(self, df): """ 0x7:Reserved 0x6: Reserved 0x5: Reserved 0x4: Error 0x3: Active 0x2: Standby 0x1: Passive 0x0: Off """ # 车道内偏移行驶时,与近侧车道线的横向距离 # data_group = lane_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()) # data_group = self.df_roadmark.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()) # df = data_group.to_frame(name='line_dist') # df['simFrame'] = data_group.index.tolist() # df = self.df_roadmark[self.df_roadmark['id'] == 0][['simTime', 'simFrame']].copy() # df['line_dist'] = self.df_roadmark.groupby('simFrame').apply( # lambda t: abs(t['lateralDist']).min()).values.tolist() # self.ego_df = pd.merge(self.ego_df, df, on=['simTime', 'simFrame'], how='left') # self._line_dist_merge() # # df = self.df[self.df['LKA_status'] == 3] # df = self.ego_df[self.ego_df['LKA_status'] == "Active"] self.line_dist = df['line_dist'].to_list() self.line_dist = [x for x in self.line_dist if not np.isnan(x)] self.result['laneDistance'] = min(self.line_dist) if self.line_dist else self.optimal_dict['laneDistance'] def _lateral_dist_of_center_line(self, df_laneoffset): # df_laneoffset = self.df[(self.df['LKA_status'] == "Active") & (self.df['playerId'] == 1)] self.center_dist_with_nan = df_laneoffset['laneOffset'].to_list() self.center_time_list = df_laneoffset['simTime'].to_list() self.center_frame_list = df_laneoffset['simFrame'].to_list() # self.center_dist_df = self.df_roadpos[self.df_roadpos["playerId"] == 1] # self.center_dist = self.center_dist_df["laneOffset"] # self.center_time_list = self.center_dist_df['simTime'].to_list() # self.center_frame_list = self.center_dist_df['simFrame'].to_list() self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)] if not self.center_dist: self.result['centerDistanceExpectation'] = 0 self.result['centerDistanceStandardDeviation'] = 0 # self.result['centerDistanceMax'] = abs(extreme_max_value).max() # self.result['centerDistanceMin'] = abs(extreme_min_value).min() self.result['centerDistanceMax'] = 0 self.result['centerDistanceMin'] = 0 self.result['centerDistanceFrequency'] = 0 self.result['centerDistanceRange'] = 0 else: center_dist = [abs(x) for x in self.center_dist] # 车道中心线横向距离分布期望与标准差 E_lane_center_dist = np.mean(center_dist) D_lane_center_dist = np.std(center_dist) # 车道中心线横向距离分布极值 center_dist = np.array(center_dist) extrmax_index = sg.argrelmax(center_dist)[0] extrmin_index = sg.argrelmin(center_dist)[0] extreme_max_value = center_dist[extrmax_index] extreme_min_value = center_dist[extrmin_index] # 横向相对位置震荡频率 # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度 length1 = len(extrmax_index) # 极大值列表长度 length2 = len(extrmin_index) # 极小值列表长度 # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔 FREQUENCY = 100 time_diff = 1.0 / FREQUENCY period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1]) period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1]) try: period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期 except ZeroDivisionError: period = 0 frequency = 1 / period if period else 0 # length = min(len(extreme_max_value), len(extreme_min_value)) # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length]) # frequency = 1 / period # 横向相对位置震荡极差 length = min(len(extreme_max_value), len(extreme_min_value)) extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length] self.result['centerDistanceExpectation'] = E_lane_center_dist self.result['centerDistanceStandardDeviation'] = D_lane_center_dist # self.result['centerDistanceMax'] = abs(extreme_max_value).max() # self.result['centerDistanceMin'] = abs(extreme_min_value).min() self.result['centerDistanceMax'] = center_dist.max() self.result['centerDistanceMin'] = center_dist.min() self.result['centerDistanceFrequency'] = abs(frequency) self.result['centerDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0 def _continous_judge(self, frame_list): CONTINUOUS_FRAME_PERIOD = 3 if not frame_list: return 0 cnt = 1 for i in range(1, len(frame_list)): if frame_list[i] - frame_list[i - 1] <= CONTINUOUS_FRAME_PERIOD: continue cnt += 1 return cnt def _lane_departure_warning(self, ldw_df): """ 工作车速范围:55/60-200/250 km/h 最小可用车道宽度:2.6m 误报警:距离安全,但是预警 漏报警:距离危险,但是没报警 灵敏度差:线外40cm 灵敏度零级:压线 灵敏度一级:线内20cm(20%) 灵敏度二级:线内40cm(40%) 灵敏度三级:线内60cm(60%) 偏离车速1.0m/s:线内60cm报警 偏离车速0.5m/s:线内40cm报警 """ # 自车, 车道线宽度>2.6m,转向灯没开 # ego_df = self.df[self.df['playerId'] == 1].copy() # ego_df['line_dist'] = self.line_dist # lane_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy() # lane_dist_group = lane_dist_df.groupby("simFrame").apply( # lambda t: abs(t['lateralDist']).min()).values.tolist() # lane_dist_df['line_dist'] = lane_dist_group # lane_dist_df = lane_dist_df[['simFrame', 'line_dist']] # lane_dist_df.columns = ['simFrame', 'line_dist'] # # lane_dist_df = lane_dist_df[['simTime', 'simFrame', 'line_dist']] # # lane_dist_df.columns = ['simTime', 'simFrame', 'line_dist'] # lane_dist_df = lane_dist_df.drop_duplicates() # ldw_status_df = self.df[self.df['playerId'] == 1].copy()[['simTime', 'simFrame', 'LDW_status']].drop_duplicates() # ldw_status_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy().drop_duplicates() # ldw_df = pd.merge_asof(ldw_status_df, lane_dist_df, on='simFrame', direction='nearest') # ldw_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy() # calculate max DLC warning_dist_max = ldw_df[ldw_df['LDW_status'] == "Active"]['line_dist'].max() # warning_dist_max = ldw_df[ldw_df['LDW_status'] == 3]['line_dist'].max() self.result['预警时距离车道线最大距离'] = warning_dist_max # count miss warning miss_warning_df = ldw_df[(ldw_df['line_dist'] <= 0.4) & (ldw_df['LDW_status'] != "Active")] miss_warning_frame_list = miss_warning_df['simFrame'].values.tolist() miss_warning_count = self._continous_judge(miss_warning_frame_list) self.result['车道偏离漏预警次数'] = miss_warning_count # count false warning false_warning_df = ldw_df[(ldw_df['line_dist'] >= 0.4) & (ldw_df['LDW_status'] == "Active")] false_warning_frame_list = false_warning_df['simFrame'].values.tolist() false_warning_count = self._continous_judge(false_warning_frame_list) self.result['车道偏离误预警次数'] = false_warning_count def _autonomous_emergency_brake(self): """ 刹得准、刹得稳 碰撞预警 + 紧急制动 <=4s 预警>1s,制动<3s 触发目标类型 目标类型准确度 目标为车辆的速度降: 弱势目标的速度降: 触发时TTC<4s 一级制动减速度:4m/s^2 二级制动减速度:10m/s^2 刹停行驶距离:10-20m 刹停时前车距离:0.8-1.5m 误触发次数:频率 n/10万km 漏触发: 能绕开就没必要刹停:开始刹车时距离远近 Returns: """ # ego_df = self.df[self.df['playerId'] == 1] df = self.ego_df[self.ego_df['Aeb_status'] == 'Active'] # df = self.ego_df[self.ego_df['Aeb_status'] == 0] # calculate brakeDecelerate df['lon_accel'] = df['accelX'] * np.cos(df['posH']) + df['accelY'] * np.sin(df['posH']) decelerate_max = df['lon_accel'].min() # calculate brake stop travelDist df_dist = df[df['lon_accel'] < 0] frame_list = df_dist['simFrame'].to_list() travelDist_list = df_dist['travelDist'].to_list() travelDist_group = [] start = 0 for i in range(1, len(frame_list)): if frame_list[i] - frame_list[i - 1] > 5: end = i - 1 # travelDist_group.append(travelDist_list[start:end]) travelDist_group.append(travelDist_list[end] - travelDist_list[start]) start = i end = -1 travelDist_group.append(travelDist_list[end] - travelDist_list[start]) brake_distance_max = max(travelDist_group) if travelDist_group else np.inf self.result['brakeDecelerate'] = abs(round(decelerate_max, 4)) self.result['brakeDistance'] = round(brake_distance_max, 4) def _integrated_cruise_assist(self, df): df_LCC = df[df['ICA_status'].isin(['LLC_follow_vehicle', 'LLC_follow_line'])].copy() self.LCC_line_dist = df_LCC['line_dist'].to_list() self.result['LCClaneDistance'] = min(self.LCC_line_dist) if self.LCC_line_dist else self.optimal_dict[ 'LCClaneDistance'] self.LCC_center_dist = df_LCC['laneOffset'].to_list() self.LCC_center_time_list = df_LCC['simTime'].to_list() self.LCC_center_frame_list = df_LCC['simFrame'].to_list() if not self.LCC_center_dist: self.result['LCCcenterDistanceExpectation'] = 0 self.result['LCCcenterDistanceStandardDeviation'] = 0 # self.result['centerDistanceMax'] = abs(extreme_max_value).max() # self.result['centerDistanceMin'] = abs(extreme_min_value).min() self.result['LCCcenterDistanceMax'] = 0 self.result['LCCcenterDistanceMin'] = 0 self.result['LCCcenterDistanceFrequency'] = 0 self.result['LCCcenterDistanceRange'] = 0 else: center_dist = [abs(x) for x in self.LCC_center_dist] # 车道中心线横向距离分布期望与标准差 E_lane_center_dist = np.mean(center_dist) D_lane_center_dist = np.std(center_dist) # 车道中心线横向距离分布极值 center_dist = np.array(center_dist) extrmax_index = sg.argrelmax(center_dist)[0] extrmin_index = sg.argrelmin(center_dist)[0] extreme_max_value = center_dist[extrmax_index] extreme_min_value = center_dist[extrmin_index] # 横向相对位置震荡频率 # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度 length1 = len(extrmax_index) # 极大值列表长度 length2 = len(extrmin_index) # 极小值列表长度 # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔 FREQUENCY = 100 time_diff = 1.0 / FREQUENCY period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1]) period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1]) try: period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期 except ZeroDivisionError: period = 0 frequency = 1 / period if period else 0 # length = min(len(extreme_max_value), len(extreme_min_value)) # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length]) # frequency = 1 / period # 横向相对位置震荡极差 length = min(len(extreme_max_value), len(extreme_min_value)) extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length] self.result['LCCcenterDistanceExpectation'] = E_lane_center_dist self.result['LCCcenterDistanceStandardDeviation'] = D_lane_center_dist # self.result['centerDistanceMax'] = abs(extreme_max_value).max() # self.result['centerDistanceMin'] = abs(extreme_min_value).min() self.result['LCCcenterDistanceMax'] = center_dist.max() self.result['LCCcenterDistanceMin'] = center_dist.min() self.result['LCCcenterDistanceFrequency'] = abs(frequency) self.result['LCCcenterDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0 df_LC = df[df['ICA_status'] == 'Only_Longitudinal_Control'].copy() v_max = df_LC['v'].max() v_min = df_LC['v'].min() self.result['OLC_v_deviation'] = v_max - v_min def _function_statistic(self): """ """ ACC_optimal_list = [] if "functionACC" in self.type_list: for function_type in self.type_list: if "ICA" in function_type: continue if len(self.obj_id_list) > 1: # 给定的双层列表 follow_time_ranges = self.status_trigger_dict['ACC']['ACC_active_time'] # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤 # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并 filtered_df = pd.DataFrame(columns=self.df.columns) for start, end in follow_time_ranges: mask = (self.df['simTime'] >= start) & (self.df['simTime'] <= end) if filtered_df is None: filtered_df = self.df[mask] else: filtered_df = pd.concat([filtered_df, self.df[mask]]) # 上述方式可能引入重复行,使用drop_duplicates去重 df_follow = filtered_df.drop_duplicates() df_stop_and_go = df_follow.copy() # 重置索引(如果需要) # filtered_df.reset_index(drop=True, inplace=True) # df_follow111 = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下 self._following(df_follow) # df_stop_and_go = self.df[ # self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下 self._stop_and_go(df_stop_and_go) if df_follow.empty and df_stop_and_go.empty: self.flag_type_dict['functionACC'] = False else: self.flag_type_dict['functionACC'] = True else: ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list] if "functionLKA" in self.type_list: self._line_dist_merge() # df_line_dist = self.df[self.df['LKA_status'] == 3] # 给定的双层列表 follow_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time'] # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤 # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并 filtered_df = pd.DataFrame(columns=self.ego_df.columns) for start, end in follow_time_ranges: mask = (self.ego_df['simTime'] >= start) & (self.ego_df['simTime'] <= end) if filtered_df is None: filtered_df = self.ego_df[mask] else: filtered_df = pd.concat([filtered_df, self.ego_df[mask]]) # 上述方式可能引入重复行,使用drop_duplicates去重 df_lka = filtered_df.drop_duplicates() # df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"] if df_lka.empty: self.flag_type_dict['functionLKA'] = False else: self.flag_type_dict['functionLKA'] = True self._lane_transverse_distance(df_lka) self._lateral_dist_of_center_line(df_lka) # if "functionLDW" in self.type_list: # df_ldw = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy() # self._lane_departure_warning(df_ldw) # if "functionAEB" in self.type_list: # self._autonomous_emergency_brake() # if self.df['ICA_status'].nunique() != 1: # self.ICA_FLAG = True # # if "functionICA" in self.type_list: # # self._integrated_cruise_assist(self.ego_df) # # ACC # if len(self.obj_id_list) > 1: # df_follow = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3,对应Active # self._following(df_follow) # # df_stop_and_go = self.df[ # self.df['ACC_status'].isin(["Shut_off", "Stand-active", "Stand-wait"])].copy() # 跟车状态下 # self._stop_and_go(df_stop_and_go) # # else: # ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list] # # # LKA # self._line_dist_merge() # # df_line_dist = self.df[self.df['LKA_status'] == 3] # df_lka = self.ego_df[self.ego_df['ICA_status'] == "LLC_Follow_Line"] # self._lane_transverse_distance(df_lka) # self._lateral_dist_of_center_line(df_lka) arr_func = [value for key, value in self.result.items() if key in self.metric_list] # arr_func = list(self.result.values()) if "functionACC" in self.type_list and len(self.obj_id_list) == 1: arr_func = ACC_optimal_list + arr_func return arr_func def custom_metric_param_parser(self, param_list): """ param_dict = { "paramA" [ { "kind": "-1", "optimal": "1", "multiple": ["0.5","5"], "spare1": null, "spare2": null } ] } """ kind_list = [] optimal_list = [] multiple_list = [] spare_list = [] # spare1_list = [] # spare2_list = [] for i in range(len(param_list)): kind_list.append(int(param_list[i]['kind'])) optimal_list.append(float(param_list[i]['optimal'])) multiple_list.append([float(x) for x in param_list[i]['multiple']]) spare_list.append([item["param"] for item in param_list[i]["spare"]]) # spare1_list.append(param_list[i]['spare1']) # spare2_list.append(param_list[i]['spare2']) result = { "kind": kind_list, "optimal": optimal_list, "multiple": multiple_list, "spare": spare_list, # "spare1": spare1_list, # "spare2": spare2_list } return result def custom_metric_score(self, metric, value, param_list): """ """ param = self.custom_metric_param_parser(param_list) self.custom_param_dict[metric] = param score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value])) score_sub = score_model.cal_score() score = sum(score_sub) / len(score_sub) return score def func_score(self): score_metric_dict = {} score_type_dict = {} arr_func = self._function_statistic() print("\n[功能性表现及得分情况]") print("功能性各指标值:", [round(num, 2) for num in arr_func]) if arr_func: arr_func = np.array([arr_func]) score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_func) score_sub = score_model.cal_score() score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub)) score_metric = [round(num, 2) for num in score_sub] metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList] score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)} flag_custom_type_dict = {} for metric in self.custom_metric_list: value = self.custom_data[metric]['value'] param_list = self.customMetricParam[metric] score = self.custom_metric_score(metric, value, param_list) score_metric_dict[metric] = round(score, 2) if 'statusFlag' in self.custom_data[metric].keys(): custom_type, flag_custom_type = next(iter(self.custom_data[metric]['statusFlag'].items())) if (custom_type in flag_custom_type_dict) and (flag_custom_type == True): flag_custom_type_dict[custom_type] = True else: flag_custom_type_dict[custom_type] = flag_custom_type # # # update metric_list and metric_dict # # flag_custom_type_dict1 = {k: v for k, v in flag_custom_type_dict.items() if v == True} flag_builtin_type_dict2 = {k: v for k, v in self.flag_type_dict.items() if v == True} flag_builtin_type_dict2.update(flag_custom_type_dict1) self.type_list = list(flag_builtin_type_dict2.keys()) self.metric_list = [] for type in self.type_list: self.metric_list.extend(self.metric_dict[type]) score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) # no score in list if not score_metric: return 0, {}, {} if self.weight_custom: # 自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_function = sum(score_type_with_weight_dict.values()) else: # 客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_function = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) for key, value in self.weight_dict.items(): if key in self.metric_dict[type]: # self.weight_dict[key] = round(value / type_weight, 4) self.weight_dict[key] = value / type_weight type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 for key in self.weight_dict: self.weight_dict[key] = round(self.weight_dict[key], 4) score_type = list(score_type_dict.values()) self.weight_type_list = cal_weight_from_80(score_type) self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)} score_function = round(score_function, 2) print(f"功能性得分为:{score_function:.2f}分。") print(f"功能性各类型得分为:{score_type_dict}分。") print(f"功能性各指标得分为:{score_metric_dict}。") return score_function, score_type_dict, score_metric_dict def continuous_group(self, df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time']) unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame']) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df def unfunctional_follow_v_deviation_df_statistic(self): unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow, 'v_deviation': self.v_deviation_list}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] v_df = unfunc_df[unfunc_df['v_deviation'] > self.optimal_dict['followSpeedDeviation']] v_df = v_df[['simTime', 'simFrame', 'v_deviation']] v_follow_df = self.continuous_group(v_df) # v_follow_df['type'] = 'follow' # v_follow_df['type'] = 'ACC' v_follow_df['type'] = f"{self.type_name_dict['functionACC']}" self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, v_follow_df], ignore_index=True) def unfunctional_follow_dist_deviation_df_statistic(self): unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow, 'dist_deviation': self.dist_deviation_list}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] dist_df = unfunc_df[unfunc_df['dist_deviation'] > self.optimal_dict['followDistanceDeviation']] # 阈值由1改为了3 dist_df = dist_df[['simTime', 'simFrame', 'dist_deviation']] dist_follow_df = self.continuous_group(dist_df) # v_follow_df['type'] = 'follow' # dist_follow_df['type'] = 'ACC' dist_follow_df['type'] = f"{self.type_name_dict['functionACC']}" self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, dist_follow_df], ignore_index=True) def unfunctional_lane_df_statistic(self): unfunc_df = pd.DataFrame( {'simTime': self.center_time_list, 'simFrame': self.center_frame_list, 'center_dist': self.center_dist_with_nan}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] unfunc_df = unfunc_df.dropna(subset=['center_dist']) lane_df = unfunc_df[abs(unfunc_df['center_dist']) > self.optimal_dict['centerDistanceMax']] lane_df = lane_df[['simTime', 'simFrame', 'center_dist']] dist_lane_df = self.continuous_group(lane_df) # dist_lane_df['type'] = 'lane' # dist_lane_df['type'] = 'LKA' dist_lane_df['type'] = f"{self.type_name_dict['functionLKA']}" self.unfunc_lane_df = pd.concat([self.unfunc_lane_df, dist_lane_df], ignore_index=True) # def zip_time_pairs(self, zip_list, upper_limit=9999): # zip_time_pairs = zip(self.time_list, zip_list) # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)] # return zip_vs_time def zip_time_pairs(self, zip_list): zip_time_pairs = zip(self.time_list, zip_list) zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs] return zip_vs_time def _get_weight_distribution(self, dimension): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name[dimension] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def report_statistic(self): # report_dict = { # "name": "功能性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": score_function, # "level": grade_function, # 'score_type': score_type, # 'score_metric': score_metric, # 'followStopCount': self.follow_stop_count, # "description1": func_description1, # "description2": func_description2, # # "functionACC": acc_dict, # "functionLKA": lka_dict, # # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time], # "accData": [lat_acc_vs_time, lon_acc_vs_time], # # } brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list'] throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list'] steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list'] # common parameter calculate brake_vs_time = self.zip_time_pairs(brakePedal_list) throttle_vs_time = self.zip_time_pairs(throttlePedal_list) steering_vs_time = self.zip_time_pairs(steeringWheel_list) report_dict = { "name": self.config.dimension_name["function"], "weight": f"{self.weight * 100:.2f}%", 'followStopCount': self.follow_stop_count, } # upper_limit = 40 times_upper = 2 # len_time = len(self.time_list) duration = self.time_list[-1] # score_function, score_type, score_metric = self.func_score() score_function, score_type_dict, score_metric_dict = self.func_score() # no metric if not score_metric_dict: return {} # get weight distribution report_dict["weightDistribution"] = self._get_weight_distribution("function") score_function = int(score_function) if int(score_function) == score_function else round( score_function, 2) # score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()] # score_metric = [int(n) if int(n) == n else n for key, n in score_metric_dict.items()] grade_function = score_grade(score_function) report_dict["score"] = score_function report_dict["level"] = grade_function # report_dict["score_type"] = score_type # report_dict["score_metric"] = score_metric # speed data ego_speed_list = self.ego_df['v'].values.tolist() ego_speed_vs_time = self.zip_time_pairs(ego_speed_list) obj_speed_vs_time = [] rel_speed_vs_time = [] # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] # accData lat_acc_list = self.ego_df['lat_acc'].values.tolist() lat_acc_vs_time = self.zip_time_pairs(lat_acc_list) lon_acc_list = self.ego_df['lon_acc'].values.tolist() lon_acc_vs_time = self.zip_time_pairs(lon_acc_list) # acc data # lat_acc_list = self.ego_df['lat_acc'].values.tolist() # lat_acc_vs_time = self.zip_time_pairs(lat_acc_list) # lon_acc_list = self.ego_df['lon_acc'].values.tolist() # lon_acc_vs_time = self.zip_time_pairs(lon_acc_list) # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time] # statistic type report infos unfunc_metric_list = [] func_over_optimal = [] # function description func_type_list = [] unfunc_type_list = [] type_details_dict = {} for type in self.type_list: if type == "functionACC": builtin_graph_dict = {} custom_graph_dict = {} if len(self.obj_id_list) == 1: follow_description1 = "无目标车数据可计算;" follow_description2 = "" follow_description3 = "无目标车数据可计算;" follow_description4 = "无目标车数据可计算;" acc_dict_indexes = {} for metric in self.metric_dict[type]: acc_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": 80, "avg": "-", "max": "-", "min": "-", } if self.kind_dict[metric] == -1: acc_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]" elif self.kind_dict[metric] == 1: acc_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)" elif self.kind_dict[metric] == 0: acc_dict_indexes[metric][ "range"] = f"[{self.optimal_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal_dict[metric] * self.multiple_dict[metric][1]}]" acc_dict = { "name": f"{self.type_name_dict[type]}", "score": 80, "level": "良好", "description1": follow_description1, "description2": follow_description2, "description3": follow_description3, "description4": follow_description4, "noObjectCar": True, "indexes": acc_dict_indexes, "builtin": {}, "custom": {} } # follow cruise description func_follow_metric_list = [] unfunc_follow_metric_list = [] str_follow_over_optimal = '' distance_vs_time = self.zip_time_pairs(self.dist_list_full_time) else: acc_dict = { "name": f"{self.type_name_dict[type]}", "noObjectCar": False, } # follow dict score_follow = score_type_dict[type] grade_follow = score_grade(score_follow) acc_dict["score"] = score_follow acc_dict["level"] = grade_follow unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type) # follow cruise description func_follow_metric_list = [] unfunc_follow_metric_list = [] for metric in self.metric_dict[type]: unfunc_follow_metric_list.append(metric) if score_metric_dict[ metric] < 80 else func_follow_metric_list.append( metric) str_follow_over_optimal = '' if not unfunc_follow_metric_list: str_func_follow_metric = string_concatenate(func_follow_metric_list) follow_description1 = f"{str_func_follow_metric}指标均表现良好" else: for metric in unfunc_follow_metric_list: if metric in self.bulitin_metric_list: value = self.result[metric] if self.kind_dict[metric] == -1: metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[ metric]) * 100 elif self.kind_dict[metric] == 1: metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[ metric]) * 100 elif self.kind_dict[metric] == 0: metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[ metric]) * 100 else: value = self.custom_data[metric]["value"][0] if self.custom_param_dict[metric]['kind'][0] == -1: metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 1: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 0: metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 # str_follow_over_optimal += f"{metric}为{value:.2f},超过合理范围{metric_over_optimal:.2f}%;" # str_follow_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;" str_follow_over_optimal += f"{metric}为{round(value, 2)}{self.unit_dict[metric]},超过合理范围{round(metric_over_optimal, 2)}%;" str_follow_over_optimal = str_follow_over_optimal[:-1] if str_follow_over_optimal else "" str_func_follow_metric = string_concatenate(func_follow_metric_list) str_unfunc_follow_metric = string_concatenate(unfunc_follow_metric_list) if not func_follow_metric_list: follow_description1 = f"{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}" else: follow_description1 = f"{str_func_follow_metric}指标表现良好,{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}" # for follow_description2 follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0 follow_description2 = f"自车在行驶时有{round(follow_duration, 2)}s处于跟车状态," follow_description3 = "" if "followResponseTime" in self.metric_list: # for follow_description3 cnt3_1 = len(self.follow_stop_time_start_list) tmp3_list = [x for x in self.follow_stop_time_start_list if x > self.optimal_dict['followResponseTime']] cnt3_2 = len(tmp3_list) percent3 = 0 if cnt3_1 == 0 else round(cnt3_2 / cnt3_1 * 100, 2) follow_description3 = f"起停跟车响应时间有{cnt3_2}次超出合理范围,占比为{percent3}%;" follow_description4 = "" if "followDistanceDeviation" in self.metric_list: # for follow_description4 cnt4_1 = len(self.stop_distance_list) tmp4_list = [x for x in self.stop_distance_list if x < self.optimal_dict['followDistanceDeviation']] cnt4_2 = len(tmp4_list) percent4 = 0 if cnt4_1 == 0 else round(cnt4_2 / cnt4_1 * 100, 2) follow_description4 = f"在本次测试中,跟车状态下目标车共发生了{cnt4_1}次停车,有{cnt4_2}次超出合理范围,占比为{percent4}%;" # distance_list = obj_df['dist'].values.tolist() # distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time) # # acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict) # acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict) # acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict) # acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict) # common parameter calculate obj_df = self.df[self.df['playerId'] == 2] obj_speed_list = obj_df['v'].values.tolist() obj_speed_vs_time = self.zip_time_pairs(obj_speed_list) rel_speed_vs_time = self.zip_time_pairs(self.v_relative_list_full_time) distance_vs_time = self.zip_time_pairs(self.dist_list_full_time) # acc_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] # acc_dict["disData"] = distance_vs_time unfunc_follow_df = self.unfunc_follow_df.copy() unfunc_follow_df['type'] = "origin" unfunc_follow_slices = unfunc_follow_df.to_dict('records') # acc_dict["speMarkLine"] = unfunc_follow_slices # acc_dict["disMarkLine"] = unfunc_follow_slices distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time) # function ACC data acc_dict_indexes = {} for metric in self.metric_dict[type]: if metric == "followSpeedDeviation": self.unfunctional_follow_v_deviation_df_statistic() unfunc_v_df = self.unfunc_follow_df.copy() unfunc_v_df.loc[unfunc_v_df['type'] != 'ACC', 'type'] = "origin" # unfunc_v_df.loc[unfunc_v_df['type'] == 'time', 'type'] = "time" unfunc_v_slices = unfunc_v_df.to_dict('records') df1 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC'] df1['v_time'] = df1['end_time'] - df1['start_time'] devi_v_time = df1['v_time'].sum() follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0 percent2_1 = devi_v_time / follow_duration * 100 if follow_duration else 0 if devi_v_time != 0: follow_description2 += f"跟车状态下自车和目标车的速度差共有{round(devi_v_time, 2)}s超出合理范围,占比为{round(percent2_1, 2)}%;" else: follow_description2 += "跟车状态下自车和目标车的速度差均在合理范围内;" v_deviation_list = [x for x in self.v_deviation_list if not np.isnan(x)] acc_dict_indexes["followSpeedDeviation"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict[ "followSpeedDeviation"], "avg": round(np.mean(v_deviation_list), 2) if v_deviation_list else '-', "max": round(max(v_deviation_list), 2) if v_deviation_list else '-', "min": round(min(v_deviation_list), 2) if v_deviation_list else '-', # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]" "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]" } fsd_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": rel_speed_vs_time, "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]", # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]", # "markLine": unfunc_v_slices, # "markLine2": [-1 * self.optimal_dict['followSpeedDeviation'], # self.optimal_dict['followSpeedDeviation']], } builtin_graph_dict["followSpeedDeviation"] = fsd_data if metric == "followDistanceDeviation": self.unfunctional_follow_dist_deviation_df_statistic() unfunc_dist_df = self.unfunc_follow_df.copy() unfunc_dist_df.loc[unfunc_dist_df['type'] != 'ACC', 'type'] = "origin" # unfunc_dist_df.loc[unfunc_dist_df['type'] == 'distance', 'type'] = "distance" unfunc_dist_slices = unfunc_dist_df.to_dict('records') df2 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC'] df2['dist_time'] = df2['end_time'] - df2['start_time'] devi_dist_time = df2['dist_time'].sum() follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0 percent2_2 = devi_dist_time / follow_duration * 100 if follow_duration else 0 if devi_dist_time != 0: follow_description2 += f"跟车状态下自车和目标车的距离差共有{round(devi_dist_time, 2)}s超出合理范围,占比为{round(percent2_2, 2)}%;" else: follow_description2 += "跟车状态下自车和目标车的距离差均在合理范围内;" dist_deviation_list = [x for x in self.dist_deviation_list if not np.isnan(x)] acc_dict_indexes["followDistanceDeviation"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict["followDistanceDeviation"], "avg": round(np.mean(dist_deviation_list), 2) if dist_deviation_list else '-', "max": round(max(dist_deviation_list), 2) if dist_deviation_list else '-', "min": round(min(dist_deviation_list), 2) if dist_deviation_list else '-', "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]" } fdd_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": distance_deviation_vs_time, "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]", # "markLine": unfunc_dist_slices, # "markLine2": [self.optimal_dict['followDistanceDeviation']], } builtin_graph_dict["followDistanceDeviation"] = fdd_data if metric == "followStopDistance": acc_dict_indexes["followStopDistance"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict["followStopDistance"], "avg": round(np.mean(self.stop_distance_list), 2) if self.stop_distance_list else '-', "max": round(max(self.stop_distance_list), 2) if self.stop_distance_list else '-', "min": round(min(self.stop_distance_list), 2) if self.stop_distance_list else '-', "range": f"[{self.optimal_dict['followStopDistance']}, inf)" } # sdd_data = { # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", # "name": f"{self.name_dict[metric]}", # "data": self.stop_distance_list, # "range": f"[{self.optimal_dict['followStopDistance']}, inf)", # # "ref": [self.optimal_dict['followStopDistance'] - 1, # # self.optimal_dict['followStopDistance']], # } # builtin_graph_dict["followStopDistance"] = sdd_data if metric == "followResponseTime": acc_dict_indexes["followResponseTime"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict["followResponseTime"], "avg": round(np.mean(self.follow_stop_time_start_list), 2) if self.follow_stop_time_start_list else '-', "max": round(max(self.follow_stop_time_start_list), 2) if self.follow_stop_time_start_list else '-', "min": round(min(self.follow_stop_time_start_list), 2) if self.follow_stop_time_start_list else '-', "range": f"[0, {self.optimal_dict['followResponseTime']}]" } # rt_data = { # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", # "name": f"{self.name_dict[metric]}", # "data": self.follow_stop_time_start_list, # # "range": f"[0, {self.optimal_dict['followResponseTime']})" # } # builtin_graph_dict["followResponseTime"] = rt_data if metric not in self.bulitin_metric_list: acc_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "avg": self.custom_data[metric]['tableData']['avg'], "max": self.custom_data[metric]['tableData']['max'], "min": self.custom_data[metric]['tableData']['min'], } if self.custom_param_dict[metric]['kind'][0] == -1: acc_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: acc_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: acc_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" custom_graph_dict[metric] = self.custom_data[metric]["reportData"] self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine']) acc_dict["indexes"] = acc_dict_indexes acc_dict["builtin"] = builtin_graph_dict acc_dict["custom"] = custom_graph_dict acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict) acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict) acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict) acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict) type_details_dict[type] = acc_dict unfunc_metric_list += unfunc_follow_metric_list func_over_optimal.append(str_follow_over_optimal) elif type == "functionLKA": lka_dict = { "name": f"{self.type_name_dict[type]}", } builtin_graph_dict = {} custom_graph_dict = {} # get score and grade score_lane = score_type_dict["functionLKA"] grade_lane = score_grade(score_lane) lka_dict['score'] = score_lane lka_dict['level'] = grade_lane # unfunc_type_list.append('车道保持') if score_type_dict["functionLKA"] < 80 else func_type_list.append( # '车道保持') unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type) # lane keep description func_lane_metric_list = [] unfunc_lane_metric_list = [] for metric in self.metric_dict[type]: unfunc_lane_metric_list.append(metric) if score_metric_dict[ metric] < 80 else func_lane_metric_list.append( metric) lka_dict_indexes = {} for metric in self.metric_dict[type]: if metric == "laneDistance": lka_dict_indexes["laneDistance"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict["laneDistance"], "avg": round(np.mean(self.line_dist), 2) if self.line_dist else "-", "max": round(max(self.line_dist), 2) if self.line_dist else "-", "min": round(min(self.line_dist), 2) if self.line_dist else "-", "range": f"[{self.optimal_dict['laneDistance']}, 1.875]" } if metric == "centerDistanceExpectation": lka_dict_indexes["centerDistanceExpectation"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceExpectation'], "avg": round(self.result["centerDistanceExpectation"], 2) if self.center_dist else "-", "max": "-", "min": "-", "range": f"[0, {self.optimal_dict['centerDistanceExpectation']}]" } if metric == "centerDistanceStandardDeviation": lka_dict_indexes["centerDistanceStandardDeviation"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceStandardDeviation'], "avg": round(self.result["centerDistanceStandardDeviation"], 2) if self.center_dist else "-", "max": "-", "min": "-", "range": f"[0, {self.optimal_dict['centerDistanceStandardDeviation']}]" } if metric == "centerDistanceMax": lka_dict_indexes["centerDistanceMax"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceMax'], "avg": "-", "max": round(self.result["centerDistanceMax"], 2) if self.center_dist else "-", "min": "-", "range": f"[0, {self.optimal_dict['centerDistanceMax']}]" } if metric == "centerDistanceMin": lka_dict_indexes["centerDistanceMin"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceMin'], "avg": "-", "max": "-", "min": round(self.result["centerDistanceMin"], 2) if self.center_dist else "-", "range": f"[0, {self.optimal_dict['centerDistanceMin']}]" } if metric == "centerDistanceFrequency": lka_dict_indexes["centerDistanceFrequency"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceFrequency'], "avg": round(self.result["centerDistanceFrequency"], 2) if self.center_dist else "-", "max": "-", "min": "-", "range": f"[0, {self.optimal_dict['centerDistanceFrequency']}]" } if metric == "centerDistanceRange": lka_dict_indexes["centerDistanceRange"] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict['centerDistanceRange'], "avg": round(self.result["centerDistanceRange"], 2) if self.center_dist else "-", "max": "-", "min": "-", "range": f"[0, {self.optimal_dict['centerDistanceRange']}]" } if metric not in self.bulitin_metric_list: lka_dict_indexes[metric] = { # "name": self.custom_data[metric]['name'], # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "avg": self.custom_data[metric]['tableData']['avg'], "max": self.custom_data[metric]['tableData']['max'], "min": self.custom_data[metric]['tableData']['min'], } if self.custom_param_dict[metric]['kind'][0] == -1: lka_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: lka_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: lka_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" custom_graph_dict[metric] = self.custom_data[metric]["reportData"] self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine']) lka_dict["indexes"] = lka_dict_indexes str_lane_over_optimal = '' if not unfunc_lane_metric_list: str_func_lane_metric = string_concatenate(func_lane_metric_list) lane_description1 = f"{str_func_lane_metric}指标均表现良好" else: for metric in unfunc_lane_metric_list: if metric in self.bulitin_metric_list: value = self.result[metric] if self.kind_dict[metric] == -1: metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[ metric]) * 100 elif self.kind_dict[metric] == 1: metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[ metric]) * 100 elif self.kind_dict[metric] == 0: metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[ metric]) * 100 else: value = self.custom_data[metric]["value"][0] if self.custom_param_dict[metric]['kind'][0] == -1: metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 1: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 0: metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_lane_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;" str_lane_over_optimal = str_lane_over_optimal[:-1] if str_lane_over_optimal else "" str_func_lane_metric = string_concatenate(func_lane_metric_list) str_unfunc_lane_metric = string_concatenate(unfunc_lane_metric_list) if not func_lane_metric_list: lane_description1 = f"{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}" else: lane_description1 = f"{str_func_lane_metric}指标表现良好,{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}" lane_description2 = "" center_distance_vs_time = [] if "centerDistanceMax" in self.metric_list: cnt2_1 = len(self.center_dist) tmp2_list = [x for x in self.center_dist if x > self.optimal_dict['centerDistanceMax']] cnt2_2 = len(tmp2_list) percent2 = 0 if cnt2_1 == 0 else (cnt2_2 / cnt2_1 * 100) dist_time = percent2 * duration / 100 if dist_time != 0: lane_description2 = f"距离有{dist_time:.2f}秒超出合理范围,占比为{percent2:.2f}%" else: lane_description2 = f"距离均在合理范围内,算法车道保持表现良好" # lane keep data for graph # center_distance_vs_time = self.zip_time_pairs(self.center_dist) center_distance_vs_time = self.zip_time_pairs(self.center_dist_with_nan) self.unfunctional_lane_df_statistic() # # lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict) # lka_dict["description2"] = lane_description2 unfunc_lane_df = self.unfunc_lane_df.copy() unfunc_lane_df['type'] = "origin" unfunc_lane_slices = unfunc_lane_df.to_dict('records') unfunc_lane_dist_df = self.unfunc_lane_df.copy() unfunc_lane_dist_df.loc[unfunc_lane_dist_df['type'] != 'LKA', 'type'] = "origin" unfunc_lane_dist_slices = unfunc_lane_dist_df.to_dict('records') lka_data = { "name": "车辆中心线横向距离(m)", "data": center_distance_vs_time, # "markLine": unfunc_lane_dist_slices } if 'centerDistanceMax' in self.optimal_dict: lka_range = f"[0, {self.optimal_dict['centerDistanceMax']}]" elif 'centerDistanceMin' in self.optimal_dict: lka_range = f"[0, {self.optimal_dict['centerDistanceMin']}]" else: lka_range = f"[0, 0.5]" lka_data["range"] = lka_range builtin_graph_dict["centerDistance"] = lka_data lka_dict["builtin"] = builtin_graph_dict lka_dict["custom"] = custom_graph_dict lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict) lka_dict["description2"] = lane_description2 # lka_dict["centerDisData"] = center_distance_vs_time # lka_dict["centerDisMarkLine"] = unfunc_lane_dist_slices type_details_dict[type] = lka_dict unfunc_metric_list += unfunc_lane_metric_list func_over_optimal.append(str_lane_over_optimal) else: type_dict = { "name": f"{self.type_name_dict[type]}", } builtin_graph_dict = {} custom_graph_dict = {} # get score and grade score_custom_type = score_type_dict[type] grade_custom_type = score_grade(score_custom_type) type_dict["score"] = score_custom_type type_dict["level"] = grade_custom_type # custom type description good_custom_metric_list = [] bad_custom_metric_list = [] unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type) type_dict_indexes = {} for metric in self.metric_dict[type]: bad_custom_metric_list.append(metric) if score_metric_dict[ metric] < 80 else good_custom_metric_list.append( metric) type_dict_indexes[metric] = { # "name": self.custom_data[metric]['name'], # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "avg": self.custom_data[metric]['tableData']['avg'], "max": self.custom_data[metric]['tableData']['max'], "min": self.custom_data[metric]['tableData']['min'], } if self.custom_param_dict[metric]['kind'][0] == -1: type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]:.3f}]" elif self.custom_param_dict[metric]['kind'][0] == 1: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]:.3f}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]:.3f}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]:.3f}]" custom_graph_dict[metric] = self.custom_data[metric]['reportData'] self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine']) type_dict["indexes"] = type_dict_indexes str_type_over_optimal = "" if not bad_custom_metric_list: str_good_custom_metric = string_concatenate(good_custom_metric_list) type_description1 = f"{str_good_custom_metric}指标均表现良好" type_description2 = f"指标均在合理范围内,算法表现良好" else: for metric in bad_custom_metric_list: value = self.custom_data[metric]["value"][0] if self.custom_param_dict[metric]['kind'][0] == -1: metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 1: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 elif self.custom_param_dict[metric]['kind'][0] == 0: metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;" str_type_over_optimal = str_type_over_optimal[:-1] str_good_custom_metric = string_concatenate(good_custom_metric_list) str_bad_custom_metric = string_concatenate(bad_custom_metric_list) if not good_custom_metric_list: type_description1 = f"{str_bad_custom_metric}指标表现不佳" type_description2 = f"{str_type_over_optimal}" else: type_description1 = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳" type_description2 = f"{str_type_over_optimal}" type_dict["builtin"] = builtin_graph_dict type_dict["custom"] = custom_graph_dict type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict) type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict) type_details_dict[type] = type_dict unfunc_metric_list += bad_custom_metric_list func_over_optimal.append(str_type_over_optimal) report_dict["details"] = type_details_dict # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] func_over_optimal = [s for s in func_over_optimal if s] str_func_over_optimal = ';'.join(func_over_optimal) if grade_function == '优秀': str_func_type = string_concatenate(func_type_list) func_description1 = f'算法在{str_func_type}功能上表现优秀;' elif grade_function == '良好': str_func_type = string_concatenate(func_type_list) func_description1 = f'算法在{str_func_type}功能上总体表现良好,满足设计指标要求;' elif grade_function == '一般': str_unfunc_type = string_concatenate(unfunc_type_list) str_unfunc_metric = string_concatenate(unfunc_metric_list) str_unfunc_metric = replace_key_with_value(str_unfunc_metric, self.type_name_dict) func_description1 = f'算法在{str_unfunc_type}功能上表现一般、需要在{str_unfunc_metric}指标上进一步优化。其中,{str_func_over_optimal};' elif grade_function == '较差': str_unfunc_type = string_concatenate(unfunc_type_list) func_description1 = f'算法在{str_unfunc_type}功能上表现较差,需要提高算法的功能性表现。其中,{str_func_over_optimal};' if not unfunc_type_list: func_description2 = '功能性在各个功能上的表现俱佳。' else: str_unfunc_type = string_concatenate(unfunc_type_list) func_description2 = f"算法在{str_unfunc_type}功能上需要重点优化。" # report_dict["description1"] = replace_key_with_value(func_description1, self.name_dict) report_dict["description1"] = replace_key_with_value(replace_key_with_value(func_description1, self.name_dict), self.type_name_dict) report_dict["description2"] = replace_key_with_value(func_description2, self.type_name_dict) report_dict['commonData'] = { "per": { "name": "脚刹/油门踏板开度(百分比)", "legend": ["刹车踏板开度", "油门踏板开度"], "data": [brake_vs_time, throttle_vs_time] }, "ang": { "name": "方向盘转角(角度°)", "data": steering_vs_time }, "spe": { "name": "速度(km/h)", "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"], "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] }, "acc": { "name": "加速度(m/s²)", "legend": ["横向加速度", "纵向加速度"], "data": [lat_acc_vs_time, lon_acc_vs_time] }, # "dis": { # "name": "前车距离(m)", # "data": distance_vs_time # } } if "functionACC" in self.type_list: report_dict['commonData']["dis"] = { "name": "前车距离(m)", "data": distance_vs_time } self.unfunc_df = pd.concat([self.unfunc_df, self.unfunc_follow_df, self.unfunc_lane_df], ignore_index=True) unfunc_df = self.unfunc_df.copy().dropna() unfunc_slices = unfunc_df.to_dict('records') unfunc_slices.extend(self.custom_markline_list) report_dict["commonMarkLine"] = unfunc_slices # report_dict = { # "name": "功能性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": score_function, # "level": grade_function, # 'followStopCount': self.follow_stop_count, # "description1": func_description1, # "description2": func_description2, # # "functionACC": acc_dict, # "functionLKA": lka_dict, # # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time], # "accData": [lat_acc_vs_time, lon_acc_vs_time], # # } self.eval_data = self.ego_df.copy() return report_dict def get_eval_data(self): if 'line_dist' in self.eval_data.columns: df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc', 'line_dist']].copy() else: df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc']].copy() self.df['line_dist'] = pd.Series(dtype=float) return df