12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/03
- @Last Modified: 2023/08/03
- @Summary: Function metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import math
- import numpy as np
- import pandas as pd
- import scipy.signal as sg
- from score_weight import cal_score_with_priority, cal_weight_from_80
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- class Function(object):
- """
- Class for achieving function metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- """
- def __init__(self, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.data_processed = data_processed
- self.scoreModel = scoreModel
- self.status_trigger_dict = data_processed.status_trigger_dict
- self.df = data_processed.object_df
- self.ego_df = data_processed.ego_data
- self.obj_id_list = data_processed.obj_id_list
- self.df_roadmark = data_processed.road_mark_df
- self.df_roadpos = data_processed.road_pos_df
- self.df_drivectrl = data_processed.driver_ctrl_df
- self.unfunc_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_follow_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_lane_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_custom_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.custom_markline_list = []
- # config infos for calculating score
- self.config = data_processed.config
- function_config = self.config.config['function']
- self.function_config = function_config
- # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = function_config['weightCustom']
- self.metric_list = function_config['metric']
- self.type_list = function_config['type']
- self.type_name_dict = function_config['typeName']
- self.name_dict = function_config['name']
- self.unit_dict = function_config['unit']
- # custom metric data
- self.customMetricParam = function_config['customMetricParam']
- self.custom_metric_list = list(self.customMetricParam.keys())
- self.custom_data = custom_data
- self.custom_param_dict = {}
- # score data
- self.weight = function_config['weightDimension']
- self.weight_type_dict = function_config['typeWeight']
- self.weight_type_list = function_config['typeWeightList']
- self.weight_dict = function_config['weight']
- self.weight_list = function_config['weightList']
- self.priority_dict = function_config['priority']
- self.priority_list = function_config['priorityList']
- self.kind_dict = function_config['kind']
- self.optimal_dict = function_config['optimal']
- self.multiple_dict = function_config['multiple']
- self.kind_list = function_config['kindList']
- self.optimal_list = function_config['optimalList']
- self.multiple_list = function_config['multipleList']
- # self.unit_dict = function_config['unit']
- self.metric_dict = function_config['typeMetricDict']
- if "function_ACC" in self.metric_dict.keys():
- self.acc_metric_list = self.metric_dict['function_ACC']
- if "functionLKA" in self.metric_dict.keys():
- self.lka_metric_list = self.metric_dict['functionLKA']
- # self.acc_metric_list = ['followSpeedDeviation', 'followDistanceDeviation', 'followStopDistance',
- # 'followResponseTime']
- # self.lka_metric_list = ['laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation',
- # 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency',
- # 'centerDistanceRange']
- # custom metric
- self.flag_type_dict = {}
- # lists of drving control info
- self.time_list = data_processed.driver_ctrl_data['time_list']
- self.frame_list = data_processed.driver_ctrl_data['frame_list']
- self.time_list_follow = []
- self.frame_list_follow = []
- self.dist_list = []
- self.v_deviation_list = []
- self.v_relative_list = []
- self.dist_deviation_list = []
- self.stop_distance_list = []
- self.follow_stop_time_start_list = [] # 起停跟车响应时长
- self.dist_list_full_time = list()
- self.dist_deviation_list_full_time = list()
- self.line_dist = []
- self.center_dist = []
- self.center_dist_with_nan = []
- self.center_time_list = []
- self.center_frame_list = []
- self.ICA_FLAG = False
- self.follow_stop_count = 0
- self.result = {}
- def _following(self, df):
- """
- 稳定跟车行驶:
- 1.稳态控制速度偏差:筛选出跟车行驶数据后,
- 2.稳态控制距离偏差
- ACC stastus
- 0x7: Stand-wait State
- 0x6: Stand-active State
- 0x5: Passive State
- 0x4: Standby State
- 0x3: Shut-off State
- 0x2: Override State
- 0x1: Active State
- 0x0: Off
- # 速度变化不能以0到非0为参考,要改为从0到0.05的变化,有一个阈值滤波,避免前车车辆抖动误差影响传感器
- 弯道行驶: 最小跟随弯道半径
- """
- # df = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
- # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
- col_list = ['simTime', 'simFrame', 'playerId', 'v', 'posX', 'posY'] # target_id
- df = df[col_list].copy()
- ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'posX', 'posY']]
- # 筛选目标车(同一车道内,距离最近的前车)
- # obj_df = df[df['playerId'] == df['target_id']]
- target_id = 2
- obj_df = df[df['playerId'] == target_id][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 目标车
- obj_df = obj_df.rename(columns={'v': 'v_obj', 'posX': 'posX_obj', 'posY': 'posY_obj'})
- df_merge = pd.merge(ego_df, obj_df, on=['simTime', 'simFrame'], how='left')
- df_merge['v_relative'] = df_merge['v'] - df_merge['v_obj']
- df_merge['dist'] = df_merge.apply(
- lambda row: self.dist(row['posX'], row['posY'], row['posX_obj'], row['posY_obj']), axis=1)
- self.dist_list = df_merge['dist'].values.tolist()
- df_merge['time_gap'] = df_merge['dist'] / df_merge['v']
- SAFE_TIME_GAP = 3
- df_merge['dist_deviation'] = df_merge['time_gap'].apply(
- lambda x: 0 if (x >= SAFE_TIME_GAP) else (SAFE_TIME_GAP - x))
- df_stop = df_merge[(df_merge['v'] == 0) & (df_merge['v_obj'] == 0)]
- stop_distance_list = df_stop['dist'].values.tolist()
- self.stop_distance_list = stop_distance_list
- t_list = df_stop['simTime'].values.tolist()
- stop_group = []
- sub_group = []
- CONTINUOUS_TIME_PERIOD = 1
- CONTINUOUS_FRAME_PERIOD = 13
- for i in range(len(t_list)):
- if not sub_group or t_list[i] - t_list[i - 1] <= CONTINUOUS_TIME_PERIOD:
- sub_group.append(t_list[i])
- else:
- stop_group.append(sub_group)
- sub_group = [t_list[i]]
- stop_group.append(sub_group)
- stop_group = [g for g in stop_group if len(g) >= CONTINUOUS_FRAME_PERIOD]
- self.follow_stop_count = len(stop_group)
- # solution 1: 跟车状态下,算法输出预设跟车速度
- # df['velocity_deviation'] = abs(df['v'] - df['set_velocity'])
- # max_velocity_deviation = df['velocity_deviation'].max()
- # solution 2: 跟车状态下,跟车设定速度为目标车速度
- # velocity_deviation = []
- # distance_deviation = []
- # stop_distance = []
- #
- # for f, data in df.groupby('simFrame'):
- # ego_data = data.iloc[0].copy()
- # # df.loc[len(df)] = ego_data
- #
- # if len(data) < 2:
- # continue
- # v1 = ego_data['v']
- # x1 = ego_data['posX']
- # y1 = ego_data['posY']
- #
- # for i in range(1, len(data)):
- # obj_data = data.iloc[i].copy()
- #
- # v2 = obj_data['v']
- # x2 = obj_data['posX']
- # y2 = obj_data['posY']
- #
- # v_delta = abs(v1 - v2)
- # velocity_deviation.append(v_delta)
- #
- # dist = self.dist(x1, y1, x2, y2)
- # distance_deviation.append(dist)
- #
- # if v2 == 0 and v1 == 0:
- # stop_distance.append(dist)
- # df.loc[len(df)] = obj_data
- # self.df = df
- df_merge.replace([np.inf, -np.inf], np.nan, inplace=True) # 异常值处理
- self.time_list_follow = df_merge['simTime'].values.tolist()
- self.frame_list_follow = df_merge['simFrame'].values.tolist()
- self.v_relative_list = df_merge['v_relative'].values.tolist()
- self.v_deviation_list = abs(df_merge['v_relative']).values.tolist()
- self.dist_deviation_list = df_merge['dist_deviation'].values.tolist()
- tmp_df = self.ego_df[['simTime', 'simFrame']].copy()
- v_rel_df = df_merge[['simTime', 'v_relative']].copy()
- df_merged1 = pd.merge(tmp_df, v_rel_df, on='simTime', how='left')
- self.v_relative_list_full_time = df_merged1['v_relative'].values.tolist()
- dist_deviation_df = df_merge[['simTime', 'dist_deviation']].copy()
- df_merged1 = pd.merge(tmp_df, dist_deviation_df, on='simTime', how='left')
- self.dist_deviation_list_full_time = df_merged1['dist_deviation'].values.tolist()
- dist_df = df_merge[['simTime', 'dist']].copy()
- df_merged1 = pd.merge(tmp_df, dist_df, on='simTime', how='left')
- self.dist_list_full_time = df_merged1['dist'].values.tolist()
- max_velocity_deviation = abs(df_merge['v_relative']).max()
- distance_deviation = df_merge['dist_deviation'].max()
- min_stop_distance = min(self.stop_distance_list) if self.stop_distance_list else 9999
- self.result['followSpeedDeviation'] = max_velocity_deviation
- self.result['followDistanceDeviation'] = distance_deviation
- self.result['followStopDistance'] = min_stop_distance
- def _stop_and_go(self, df):
- """
- 停走功能:
- 1.跟停距离3-4m,
- 2.启动跟车响应时间<=1.2s
- 3.停车跟车响应时间<=1.2s
- decisionType_target
- 0 无障碍物巡航
- 1 停车减速让行
- 2 跟车
- 3 绕行
- 4 到达终点?
- """
- # df = self.df[self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
- df = df.dropna(subset=['type'])
- STOP_SPEED_THRESHOLD = 0.05
- df['v'] = df['v'].apply(lambda x: 0 if x <= STOP_SPEED_THRESHOLD else 1) # 区分速度为0或非0
- target_id = 2
- df_ego = df[df['playerId'] == 1].copy()
- df_obj = df[df['playerId'] == target_id].copy() # 目标车
- df_obj_time = df_obj['simTime'].values.tolist()
- df_ego = df_ego[df_ego['simTime'].isin(df_obj_time)].copy()
- df_ego = df_ego.drop_duplicates(["simTime", "simFrame"])
- df_obj = df_obj.drop_duplicates(["simTime", "simFrame"])
- df_ego['v_diff'] = df_ego['v'].diff()
- df_ego['v_start_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == 1 else 0) # 起步即为1
- df_ego['v_stop_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == -1 else 0) # 停车即为-1
- df_obj['v_diff'] = df_obj['v'].diff()
- obj_v_start_flag = df_obj['v_diff'].apply(lambda x: 1 if x == 1 else 0).values # 起步即为1
- obj_v_stop_flag = df_obj['v_diff'].apply(lambda x: 1 if x == -1 else 0).values # 停车即为1
- df_ego['obj_v_start_flag'] = obj_v_start_flag
- df_ego['obj_v_stop_flag'] = obj_v_stop_flag
- df_ego['flag_start'] = df_ego['obj_v_start_flag'] - df_ego['v_start_flag'] # 目标车起步即为1,自车起步即为-1
- df_ego['flag_stop'] = df_ego['obj_v_stop_flag'] - df_ego['v_stop_flag'] # 目标车停车即为1,自车停车即为-1
- flag_start_list = df_ego['flag_start'].values
- flag_stop_list = df_ego['flag_stop'].values
- time_list = df_ego['simTime'].values
- time_start_list = []
- time_stop_list = []
- for i, flag in enumerate(flag_start_list):
- if flag:
- t1 = time_list[i]
- if flag == -1:
- t2 = time_list[i]
- time_start_list.append(t2 - t1) # t2-t1即为自车起步响应时间
- for i, flag in enumerate(flag_stop_list):
- if flag:
- t1 = time_list[i]
- if flag == -1:
- t2 = time_list[i]
- time_stop_list.append(t2 - t1) # t2-t1即为自车停车响应时间
- time_start_list = [i for i in time_start_list if i != 0]
- self.result['followResponseTime'] = max(time_start_list) if time_start_list else 0
- self.follow_stop_time_start_list = time_start_list
- # self.result['跟车停止响应最长时间'] = max(time_stop_list)
- def dist(self, x1, y1, x2, y2):
- dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dis
- def _line_dist_merge(self):
- # line_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
- # line_dist_group = line_dist_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
- # line_dist_df['line_dist'] = line_dist_group
- # line_dist_df = line_dist_df[['simFrame', 'line_dist']]
- # line_dist_df.columns = ['simFrame', 'line_dist']
- # line_dist_df = line_dist_df.drop_duplicates()
- line_dist_df = self.df_roadmark[(self.df_roadmark['id'] == 0) | (self.df_roadmark['id'] == 2)].copy()
- # df = line_dist_df[line_dist_df['id'] == 2][['simTime', 'simFrame']].copy()
- # df['line_dist'] = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
- if line_dist_df.empty:
- self.ego_df['line_dist'] = pd.Series(dtype=float)
- else:
- df = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).reset_index()
- # df = df.rename(columns={'lateralDist': 'line_dist'})
- df.columns = ["simFrame", "line_dist"]
- self.ego_df = pd.merge(self.ego_df, df, on='simFrame', how='left')
- def _lane_transverse_distance(self, df):
- """
- 0x7:Reserved
- 0x6: Reserved
- 0x5: Reserved
- 0x4: Error
- 0x3: Active
- 0x2: Standby
- 0x1: Passive
- 0x0: Off
- """
- # 车道内偏移行驶时,与近侧车道线的横向距离
- # data_group = lane_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
- # data_group = self.df_roadmark.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
- # df = data_group.to_frame(name='line_dist')
- # df['simFrame'] = data_group.index.tolist()
- # df = self.df_roadmark[self.df_roadmark['id'] == 0][['simTime', 'simFrame']].copy()
- # df['line_dist'] = self.df_roadmark.groupby('simFrame').apply(
- # lambda t: abs(t['lateralDist']).min()).values.tolist()
- # self.ego_df = pd.merge(self.ego_df, df, on=['simTime', 'simFrame'], how='left')
- # self._line_dist_merge()
- # # df = self.df[self.df['LKA_status'] == 3]
- # df = self.ego_df[self.ego_df['LKA_status'] == "Active"]
- self.line_dist = df['line_dist'].to_list()
- self.line_dist = [x for x in self.line_dist if not np.isnan(x)]
- self.result['laneDistance'] = min(self.line_dist) if self.line_dist else self.optimal_dict['laneDistance']
- def _lateral_dist_of_center_line(self, df_laneoffset):
- # df_laneoffset = self.df[(self.df['LKA_status'] == "Active") & (self.df['playerId'] == 1)]
- self.center_dist_with_nan = df_laneoffset['laneOffset'].to_list()
- self.center_time_list = df_laneoffset['simTime'].to_list()
- self.center_frame_list = df_laneoffset['simFrame'].to_list()
- # self.center_dist_df = self.df_roadpos[self.df_roadpos["playerId"] == 1]
- # self.center_dist = self.center_dist_df["laneOffset"]
- # self.center_time_list = self.center_dist_df['simTime'].to_list()
- # self.center_frame_list = self.center_dist_df['simFrame'].to_list()
- self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
- if not self.center_dist:
- self.result['centerDistanceExpectation'] = 0
- self.result['centerDistanceStandardDeviation'] = 0
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['centerDistanceMax'] = 0
- self.result['centerDistanceMin'] = 0
- self.result['centerDistanceFrequency'] = 0
- self.result['centerDistanceRange'] = 0
- else:
- center_dist = [abs(x) for x in self.center_dist]
- # 车道中心线横向距离分布期望与标准差
- E_lane_center_dist = np.mean(center_dist)
- D_lane_center_dist = np.std(center_dist)
- # 车道中心线横向距离分布极值
- center_dist = np.array(center_dist)
- extrmax_index = sg.argrelmax(center_dist)[0]
- extrmin_index = sg.argrelmin(center_dist)[0]
- extreme_max_value = center_dist[extrmax_index]
- extreme_min_value = center_dist[extrmin_index]
- # 横向相对位置震荡频率
- # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
- length1 = len(extrmax_index) # 极大值列表长度
- length2 = len(extrmin_index) # 极小值列表长度
- # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
- FREQUENCY = 100
- time_diff = 1.0 / FREQUENCY
- period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
- period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
- try:
- period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
- except ZeroDivisionError:
- period = 0
- frequency = 1 / period if period else 0
- # length = min(len(extreme_max_value), len(extreme_min_value))
- # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
- # frequency = 1 / period
- # 横向相对位置震荡极差
- length = min(len(extreme_max_value), len(extreme_min_value))
- extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
- self.result['centerDistanceExpectation'] = E_lane_center_dist
- self.result['centerDistanceStandardDeviation'] = D_lane_center_dist
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['centerDistanceMax'] = center_dist.max()
- self.result['centerDistanceMin'] = center_dist.min()
- self.result['centerDistanceFrequency'] = abs(frequency)
- self.result['centerDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
- def _continous_judge(self, frame_list):
- CONTINUOUS_FRAME_PERIOD = 3
- if not frame_list:
- return 0
- cnt = 1
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] <= CONTINUOUS_FRAME_PERIOD:
- continue
- cnt += 1
- return cnt
- def _lane_departure_warning(self, ldw_df):
- """
- 工作车速范围:55/60-200/250 km/h
- 最小可用车道宽度:2.6m
- 误报警:距离安全,但是预警
- 漏报警:距离危险,但是没报警
- 灵敏度差:线外40cm
- 灵敏度零级:压线
- 灵敏度一级:线内20cm(20%)
- 灵敏度二级:线内40cm(40%)
- 灵敏度三级:线内60cm(60%)
- 偏离车速1.0m/s:线内60cm报警
- 偏离车速0.5m/s:线内40cm报警
- """
- # 自车, 车道线宽度>2.6m,转向灯没开
- # ego_df = self.df[self.df['playerId'] == 1].copy()
- # ego_df['line_dist'] = self.line_dist
- # lane_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
- # lane_dist_group = lane_dist_df.groupby("simFrame").apply(
- # lambda t: abs(t['lateralDist']).min()).values.tolist()
- # lane_dist_df['line_dist'] = lane_dist_group
- # lane_dist_df = lane_dist_df[['simFrame', 'line_dist']]
- # lane_dist_df.columns = ['simFrame', 'line_dist']
- # # lane_dist_df = lane_dist_df[['simTime', 'simFrame', 'line_dist']]
- # # lane_dist_df.columns = ['simTime', 'simFrame', 'line_dist']
- # lane_dist_df = lane_dist_df.drop_duplicates()
- # ldw_status_df = self.df[self.df['playerId'] == 1].copy()[['simTime', 'simFrame', 'LDW_status']].drop_duplicates()
- # ldw_status_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy().drop_duplicates()
- # ldw_df = pd.merge_asof(ldw_status_df, lane_dist_df, on='simFrame', direction='nearest')
- # ldw_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
- # calculate max DLC
- warning_dist_max = ldw_df[ldw_df['LDW_status'] == "Active"]['line_dist'].max()
- # warning_dist_max = ldw_df[ldw_df['LDW_status'] == 3]['line_dist'].max()
- self.result['预警时距离车道线最大距离'] = warning_dist_max
- # count miss warning
- miss_warning_df = ldw_df[(ldw_df['line_dist'] <= 0.4) & (ldw_df['LDW_status'] != "Active")]
- miss_warning_frame_list = miss_warning_df['simFrame'].values.tolist()
- miss_warning_count = self._continous_judge(miss_warning_frame_list)
- self.result['车道偏离漏预警次数'] = miss_warning_count
- # count false warning
- false_warning_df = ldw_df[(ldw_df['line_dist'] >= 0.4) & (ldw_df['LDW_status'] == "Active")]
- false_warning_frame_list = false_warning_df['simFrame'].values.tolist()
- false_warning_count = self._continous_judge(false_warning_frame_list)
- self.result['车道偏离误预警次数'] = false_warning_count
- def _autonomous_emergency_brake(self):
- """
- 刹得准、刹得稳
- 碰撞预警 + 紧急制动
- <=4s 预警>1s,制动<3s
- 触发目标类型
- 目标类型准确度
- 目标为车辆的速度降:
- 弱势目标的速度降:
- 触发时TTC<4s
- 一级制动减速度:4m/s^2
- 二级制动减速度:10m/s^2
- 刹停行驶距离:10-20m
- 刹停时前车距离:0.8-1.5m
- 误触发次数:频率 n/10万km
- 漏触发:
- 能绕开就没必要刹停:开始刹车时距离远近
- Returns:
- """
- # ego_df = self.df[self.df['playerId'] == 1]
- df = self.ego_df[self.ego_df['Aeb_status'] == 'Active']
- # df = self.ego_df[self.ego_df['Aeb_status'] == 0]
- # calculate brakeDecelerate
- df['lon_accel'] = df['accelX'] * np.cos(df['posH']) + df['accelY'] * np.sin(df['posH'])
- decelerate_max = df['lon_accel'].min()
- # calculate brake stop travelDist
- df_dist = df[df['lon_accel'] < 0]
- frame_list = df_dist['simFrame'].to_list()
- travelDist_list = df_dist['travelDist'].to_list()
- travelDist_group = []
- start = 0
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] > 5:
- end = i - 1
- # travelDist_group.append(travelDist_list[start:end])
- travelDist_group.append(travelDist_list[end] - travelDist_list[start])
- start = i
- end = -1
- travelDist_group.append(travelDist_list[end] - travelDist_list[start])
- brake_distance_max = max(travelDist_group) if travelDist_group else np.inf
- self.result['brakeDecelerate'] = abs(round(decelerate_max, 4))
- self.result['brakeDistance'] = round(brake_distance_max, 4)
- def _integrated_cruise_assist(self, df):
- df_LCC = df[df['ICA_status'].isin(['LLC_follow_vehicle', 'LLC_follow_line'])].copy()
- self.LCC_line_dist = df_LCC['line_dist'].to_list()
- self.result['LCClaneDistance'] = min(self.LCC_line_dist) if self.LCC_line_dist else self.optimal_dict[
- 'LCClaneDistance']
- self.LCC_center_dist = df_LCC['laneOffset'].to_list()
- self.LCC_center_time_list = df_LCC['simTime'].to_list()
- self.LCC_center_frame_list = df_LCC['simFrame'].to_list()
- if not self.LCC_center_dist:
- self.result['LCCcenterDistanceExpectation'] = 0
- self.result['LCCcenterDistanceStandardDeviation'] = 0
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['LCCcenterDistanceMax'] = 0
- self.result['LCCcenterDistanceMin'] = 0
- self.result['LCCcenterDistanceFrequency'] = 0
- self.result['LCCcenterDistanceRange'] = 0
- else:
- center_dist = [abs(x) for x in self.LCC_center_dist]
- # 车道中心线横向距离分布期望与标准差
- E_lane_center_dist = np.mean(center_dist)
- D_lane_center_dist = np.std(center_dist)
- # 车道中心线横向距离分布极值
- center_dist = np.array(center_dist)
- extrmax_index = sg.argrelmax(center_dist)[0]
- extrmin_index = sg.argrelmin(center_dist)[0]
- extreme_max_value = center_dist[extrmax_index]
- extreme_min_value = center_dist[extrmin_index]
- # 横向相对位置震荡频率
- # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
- length1 = len(extrmax_index) # 极大值列表长度
- length2 = len(extrmin_index) # 极小值列表长度
- # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
- FREQUENCY = 100
- time_diff = 1.0 / FREQUENCY
- period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
- period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
- try:
- period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
- except ZeroDivisionError:
- period = 0
- frequency = 1 / period if period else 0
- # length = min(len(extreme_max_value), len(extreme_min_value))
- # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
- # frequency = 1 / period
- # 横向相对位置震荡极差
- length = min(len(extreme_max_value), len(extreme_min_value))
- extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
- self.result['LCCcenterDistanceExpectation'] = E_lane_center_dist
- self.result['LCCcenterDistanceStandardDeviation'] = D_lane_center_dist
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['LCCcenterDistanceMax'] = center_dist.max()
- self.result['LCCcenterDistanceMin'] = center_dist.min()
- self.result['LCCcenterDistanceFrequency'] = abs(frequency)
- self.result['LCCcenterDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
- df_LC = df[df['ICA_status'] == 'Only_Longitudinal_Control'].copy()
- v_max = df_LC['v'].max()
- v_min = df_LC['v'].min()
- self.result['OLC_v_deviation'] = v_max - v_min
- def _function_statistic(self):
- """
- """
- ACC_optimal_list = []
- if "function_ACC" in self.type_list:
- for function_type in self.type_list:
- if "ICA" in function_type:
- continue
- if len(self.obj_id_list) > 1:
- # 给定的双层列表
- follow_time_ranges = self.status_trigger_dict['ACC']['ACC_active_time']
- # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
- # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
- filtered_df = pd.DataFrame(columns=self.df.columns)
- for start, end in follow_time_ranges:
- mask = (self.df['simTime'] >= start) & (self.df['simTime'] <= end)
- if filtered_df is None:
- filtered_df = self.df[mask]
- else:
- filtered_df = pd.concat([filtered_df, self.df[mask]])
- # 上述方式可能引入重复行,使用drop_duplicates去重
- df_follow = filtered_df.drop_duplicates()
- df_stop_and_go = df_follow.copy()
- # 重置索引(如果需要)
- # filtered_df.reset_index(drop=True, inplace=True)
- # df_follow111 = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
- self._following(df_follow)
- # df_stop_and_go = self.df[
- # self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- self._stop_and_go(df_stop_and_go)
- if df_follow.empty and df_stop_and_go.empty:
- self.flag_type_dict['function_ACC'] = False
- else:
- self.flag_type_dict['function_ACC'] = True
- else:
- ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
- if "functionLKA" in self.type_list:
- self._line_dist_merge()
- # df_line_dist = self.df[self.df['LKA_status'] == 3]
- # 给定的双层列表
- follow_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time']
- # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
- # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
- filtered_df = pd.DataFrame(columns=self.ego_df.columns)
- for start, end in follow_time_ranges:
- mask = (self.ego_df['simTime'] >= start) & (self.ego_df['simTime'] <= end)
- if filtered_df is None:
- filtered_df = self.ego_df[mask]
- else:
- filtered_df = pd.concat([filtered_df, self.ego_df[mask]])
- # 上述方式可能引入重复行,使用drop_duplicates去重
- df_lka = filtered_df.drop_duplicates()
- # df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"]
- if df_lka.empty:
- self.flag_type_dict['functionLKA'] = False
- else:
- self.flag_type_dict['functionLKA'] = True
- self._lane_transverse_distance(df_lka)
- self._lateral_dist_of_center_line(df_lka)
- # if "functionLDW" in self.type_list:
- # df_ldw = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
- # self._lane_departure_warning(df_ldw)
- # if "functionAEB" in self.type_list:
- # self._autonomous_emergency_brake()
- # if self.df['ICA_status'].nunique() != 1:
- # self.ICA_FLAG = True
- # # if "functionICA" in self.type_list:
- # # self._integrated_cruise_assist(self.ego_df)
- # # ACC
- # if len(self.obj_id_list) > 1:
- # df_follow = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3,对应Active
- # self._following(df_follow)
- #
- # df_stop_and_go = self.df[
- # self.df['ACC_status'].isin(["Shut_off", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- # self._stop_and_go(df_stop_and_go)
- #
- # else:
- # ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
- #
- # # LKA
- # self._line_dist_merge()
- # # df_line_dist = self.df[self.df['LKA_status'] == 3]
- # df_lka = self.ego_df[self.ego_df['ICA_status'] == "LLC_Follow_Line"]
- # self._lane_transverse_distance(df_lka)
- # self._lateral_dist_of_center_line(df_lka)
- arr_func = [value for key, value in self.result.items() if key in self.metric_list]
- # arr_func = list(self.result.values())
- if "function_ACC" in self.type_list and len(self.obj_id_list) == 1:
- arr_func = ACC_optimal_list + arr_func
- return arr_func
- def custom_metric_param_parser(self, param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def custom_metric_score(self, metric, value, param_list):
- """
- """
- param = self.custom_metric_param_parser(param_list)
- self.custom_param_dict[metric] = param
- score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
- score_sub = score_model.cal_score()
- score = sum(score_sub) / len(score_sub)
- return score
- def func_score(self):
- score_metric_dict = {}
- score_type_dict = {}
- arr_func = self._function_statistic()
- print("\n[功能性表现及得分情况]", arr_func)
- print("功能性各指标值:", [round(num, 2) for num in arr_func])
- if arr_func:
- arr_func = np.array([arr_func])
- score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_func)
- score_sub = score_model.cal_score()
- score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
- score_metric = [round(num, 2) for num in score_sub]
- metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
- score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
- flag_custom_type_dict = {}
- for metric in self.custom_metric_list:
- value = self.custom_data[metric]['value']
- param_list = self.customMetricParam[metric]
- score = self.custom_metric_score(metric, value, param_list)
- score_metric_dict[metric] = round(score, 2)
- if 'statusFlag' in self.custom_data[metric].keys():
- custom_type, flag_custom_type = next(iter(self.custom_data[metric]['statusFlag'].items()))
- if (custom_type in flag_custom_type_dict) and (flag_custom_type == True):
- flag_custom_type_dict[custom_type] = True
- else:
- flag_custom_type_dict[custom_type] = flag_custom_type
- #
- #
- # update metric_list and metric_dict
- #
- #
- flag_custom_type_dict1 = {k: v for k, v in flag_custom_type_dict.items() if v == True}
- flag_builtin_type_dict2 = {k: v for k, v in self.flag_type_dict.items() if v == True}
- flag_builtin_type_dict2.update(flag_custom_type_dict1)
- self.type_list = list(flag_builtin_type_dict2.keys())
- print("self.type_list is", self.type_list)
- self.metric_list = []
- print("self.metric_dict is", self.metric_dict)
- for type in self.type_list:
- self.metric_list.extend(self.metric_dict[type])
- score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- score_metric = list(score_metric_dict.values())
- # no score in list
- if not score_metric:
- return 0, {}, {}
- if self.weight_custom: # 自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_function = sum(score_type_with_weight_dict.values())
- else: # 客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_function = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- for key, value in self.weight_dict.items():
- if key in self.metric_dict[type]:
- # self.weight_dict[key] = round(value / type_weight, 4)
- self.weight_dict[key] = value / type_weight
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- for key in self.weight_dict:
- self.weight_dict[key] = round(self.weight_dict[key], 4)
- score_type = list(score_type_dict.values())
- self.weight_type_list = cal_weight_from_80(score_type)
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- score_function = round(score_function, 2)
- print(f"功能性得分为:{score_function:.2f}分。")
- print(f"功能性各类型得分为:{score_type_dict}分。")
- print(f"功能性各指标得分为:{score_metric_dict}。")
- return score_function, score_type_dict, score_metric_dict
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- def unfunctional_follow_v_deviation_df_statistic(self):
- unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
- 'v_deviation': self.v_deviation_list})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- v_df = unfunc_df[unfunc_df['v_deviation'] > self.optimal_dict['followSpeedDeviation']]
- v_df = v_df[['simTime', 'simFrame', 'v_deviation']]
- v_follow_df = self.continuous_group(v_df)
- # v_follow_df['type'] = 'follow'
- # v_follow_df['type'] = 'ACC'
- v_follow_df['type'] = f"{self.type_name_dict['function_ACC']}"
- self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, v_follow_df], ignore_index=True)
- def unfunctional_follow_dist_deviation_df_statistic(self):
- unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
- 'dist_deviation': self.dist_deviation_list})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- dist_df = unfunc_df[unfunc_df['dist_deviation'] > self.optimal_dict['followDistanceDeviation']] # 阈值由1改为了3
- dist_df = dist_df[['simTime', 'simFrame', 'dist_deviation']]
- dist_follow_df = self.continuous_group(dist_df)
- # v_follow_df['type'] = 'follow'
- # dist_follow_df['type'] = 'ACC'
- dist_follow_df['type'] = f"{self.type_name_dict['function_ACC']}"
- self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, dist_follow_df], ignore_index=True)
- def unfunctional_lane_df_statistic(self):
- unfunc_df = pd.DataFrame(
- {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
- 'center_dist': self.center_dist_with_nan})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- unfunc_df = unfunc_df.dropna(subset=['center_dist'])
- lane_df = unfunc_df[abs(unfunc_df['center_dist']) > self.optimal_dict['centerDistanceMax']]
- lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
- dist_lane_df = self.continuous_group(lane_df)
- # dist_lane_df['type'] = 'lane'
- # dist_lane_df['type'] = 'LKA'
- dist_lane_df['type'] = f"{self.type_name_dict['functionLKA']}"
- self.unfunc_lane_df = pd.concat([self.unfunc_lane_df, dist_lane_df], ignore_index=True)
- # def zip_time_pairs(self, zip_list, upper_limit=9999):
- # zip_time_pairs = zip(self.time_list, zip_list)
- # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
- # return zip_vs_time
- def zip_time_pairs(self, zip_list):
- zip_time_pairs = zip(self.time_list, zip_list)
- zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
- return zip_vs_time
- def _get_weight_distribution(self, dimension):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name[dimension]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def report_statistic(self):
- # report_dict = {
- # "name": "功能性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_function,
- # "level": grade_function,
- # 'score_type': score_type,
- # 'score_metric': score_metric,
- # 'followStopCount': self.follow_stop_count,
- # "description1": func_description1,
- # "description2": func_description2,
- #
- # "functionACC": acc_dict,
- # "functionLKA": lka_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
- throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
- steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
- # common parameter calculate
- brake_vs_time = self.zip_time_pairs(brakePedal_list)
- throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
- steering_vs_time = self.zip_time_pairs(steeringWheel_list)
- report_dict = {
- "name": self.config.dimension_name["function"],
- "weight": f"{self.weight * 100:.2f}%",
- 'followStopCount': self.follow_stop_count,
- }
- # upper_limit = 40
- times_upper = 2
- # len_time = len(self.time_list)
- duration = self.time_list[-1]
- # score_function, score_type, score_metric = self.func_score()
- score_function, score_type_dict, score_metric_dict = self.func_score()
- # no metric
- if not score_metric_dict:
- return {}
- # get weight distribution
- report_dict["weightDistribution"] = self._get_weight_distribution("function")
- score_function = int(score_function) if int(score_function) == score_function else round(
- score_function, 2)
- # score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
- # score_metric = [int(n) if int(n) == n else n for key, n in score_metric_dict.items()]
- grade_function = score_grade(score_function)
- report_dict["score"] = score_function
- report_dict["level"] = grade_function
- # report_dict["score_type"] = score_type
- # report_dict["score_metric"] = score_metric
- # speed data
- ego_speed_list = self.ego_df['v'].values.tolist()
- ego_speed_vs_time = self.zip_time_pairs(ego_speed_list)
- obj_speed_vs_time = []
- rel_speed_vs_time = []
- # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- # accData
- lat_acc_list = self.ego_df['lat_acc'].values.tolist()
- lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
- lon_acc_list = self.ego_df['lon_acc'].values.tolist()
- lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
- # acc data
- # lat_acc_list = self.ego_df['lat_acc'].values.tolist()
- # lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
- # lon_acc_list = self.ego_df['lon_acc'].values.tolist()
- # lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
- # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time]
- # statistic type report infos
- unfunc_metric_list = []
- func_over_optimal = []
- # function description
- func_type_list = []
- unfunc_type_list = []
- type_details_dict = {}
- for type in self.type_list:
- if type == "function_ACC":
- builtin_graph_dict = {}
- custom_graph_dict = {}
- if len(self.obj_id_list) == 1:
- follow_description1 = "无目标车数据可计算;"
- follow_description2 = ""
- follow_description3 = "无目标车数据可计算;"
- follow_description4 = "无目标车数据可计算;"
- acc_dict_indexes = {}
- for metric in self.metric_dict[type]:
- acc_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": 80,
- "avg": "-",
- "max": "-",
- "min": "-",
- }
- if self.kind_dict[metric] == -1:
- acc_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]"
- elif self.kind_dict[metric] == 1:
- acc_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)"
- elif self.kind_dict[metric] == 0:
- acc_dict_indexes[metric][
- "range"] = f"[{self.optimal_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal_dict[metric] * self.multiple_dict[metric][1]}]"
- acc_dict = {
- "name": f"{self.type_name_dict[type]}",
- "score": 80,
- "level": "良好",
- "description1": follow_description1,
- "description2": follow_description2,
- "description3": follow_description3,
- "description4": follow_description4,
- "noObjectCar": True,
- "indexes": acc_dict_indexes,
- "builtin": {},
- "custom": {}
- }
- # follow cruise description
- func_follow_metric_list = []
- unfunc_follow_metric_list = []
- str_follow_over_optimal = ''
- distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
- else:
- acc_dict = {
- "name": f"{self.type_name_dict[type]}",
- "noObjectCar": False,
- }
- # follow dict
- score_follow = score_type_dict[type]
- grade_follow = score_grade(score_follow)
- acc_dict["score"] = score_follow
- acc_dict["level"] = grade_follow
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- # follow cruise description
- func_follow_metric_list = []
- unfunc_follow_metric_list = []
- for metric in self.metric_dict[type]:
- unfunc_follow_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else func_follow_metric_list.append(
- metric)
- str_follow_over_optimal = ''
- if not unfunc_follow_metric_list:
- str_func_follow_metric = string_concatenate(func_follow_metric_list)
- follow_description1 = f"{str_func_follow_metric}指标均表现良好"
- else:
- for metric in unfunc_follow_metric_list:
- if metric in self.bulitin_metric_list:
- value = self.result[metric]
- if self.kind_dict[metric] == -1:
- metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 1:
- metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 0:
- metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- else:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- # str_follow_over_optimal += f"{metric}为{value:.2f},超过合理范围{metric_over_optimal:.2f}%;"
- # str_follow_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_follow_over_optimal += f"{metric}为{round(value, 2)}{self.unit_dict[metric]},超过合理范围{round(metric_over_optimal, 2)}%;"
- str_follow_over_optimal = str_follow_over_optimal[:-1] if str_follow_over_optimal else ""
- str_func_follow_metric = string_concatenate(func_follow_metric_list)
- str_unfunc_follow_metric = string_concatenate(unfunc_follow_metric_list)
- if not func_follow_metric_list:
- follow_description1 = f"{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
- else:
- follow_description1 = f"{str_func_follow_metric}指标表现良好,{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
- # for follow_description2
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- follow_description2 = f"自车在行驶时有{round(follow_duration, 2)}s处于跟车状态,"
- follow_description3 = ""
- if "followResponseTime" in self.metric_list:
- # for follow_description3
- cnt3_1 = len(self.follow_stop_time_start_list)
- tmp3_list = [x for x in self.follow_stop_time_start_list if
- x > self.optimal_dict['followResponseTime']]
- cnt3_2 = len(tmp3_list)
- percent3 = 0 if cnt3_1 == 0 else round(cnt3_2 / cnt3_1 * 100, 2)
- follow_description3 = f"起停跟车响应时间有{cnt3_2}次超出合理范围,占比为{percent3}%;"
- follow_description4 = ""
- if "followDistanceDeviation" in self.metric_list:
- # for follow_description4
- cnt4_1 = len(self.stop_distance_list)
- tmp4_list = [x for x in self.stop_distance_list if
- x < self.optimal_dict['followDistanceDeviation']]
- cnt4_2 = len(tmp4_list)
- percent4 = 0 if cnt4_1 == 0 else round(cnt4_2 / cnt4_1 * 100, 2)
- follow_description4 = f"在本次测试中,跟车状态下目标车共发生了{cnt4_1}次停车,有{cnt4_2}次超出合理范围,占比为{percent4}%;"
- # distance_list = obj_df['dist'].values.tolist()
- # distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
- #
- # acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
- # acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
- # acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
- # acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
- # common parameter calculate
- obj_df = self.df[self.df['playerId'] == 2]
- obj_speed_list = obj_df['v'].values.tolist()
- obj_speed_vs_time = self.zip_time_pairs(obj_speed_list)
- rel_speed_vs_time = self.zip_time_pairs(self.v_relative_list_full_time)
- distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
- # acc_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- # acc_dict["disData"] = distance_vs_time
- unfunc_follow_df = self.unfunc_follow_df.copy()
- unfunc_follow_df['type'] = "origin"
- unfunc_follow_slices = unfunc_follow_df.to_dict('records')
- # acc_dict["speMarkLine"] = unfunc_follow_slices
- # acc_dict["disMarkLine"] = unfunc_follow_slices
- distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
- # function ACC data
- acc_dict_indexes = {}
- for metric in self.metric_dict[type]:
- if metric == "followSpeedDeviation":
- self.unfunctional_follow_v_deviation_df_statistic()
- unfunc_v_df = self.unfunc_follow_df.copy()
- unfunc_v_df.loc[unfunc_v_df['type'] != 'ACC', 'type'] = "origin"
- # unfunc_v_df.loc[unfunc_v_df['type'] == 'time', 'type'] = "time"
- unfunc_v_slices = unfunc_v_df.to_dict('records')
- df1 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
- df1['v_time'] = df1['end_time'] - df1['start_time']
- devi_v_time = df1['v_time'].sum()
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- percent2_1 = devi_v_time / follow_duration * 100 if follow_duration else 0
- if devi_v_time != 0:
- follow_description2 += f"跟车状态下自车和目标车的速度差共有{round(devi_v_time, 2)}s超出合理范围,占比为{round(percent2_1, 2)}%;"
- else:
- follow_description2 += "跟车状态下自车和目标车的速度差均在合理范围内;"
- v_deviation_list = [x for x in self.v_deviation_list if not np.isnan(x)]
- acc_dict_indexes["followSpeedDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[
- "followSpeedDeviation"],
- "avg": round(np.mean(v_deviation_list), 2) if v_deviation_list else '-',
- "max": round(max(v_deviation_list), 2) if v_deviation_list else '-',
- "min": round(min(v_deviation_list), 2) if v_deviation_list else '-',
- # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]"
- "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]"
- }
- fsd_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": rel_speed_vs_time,
- "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]",
- # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]",
- # "markLine": unfunc_v_slices,
- # "markLine2": [-1 * self.optimal_dict['followSpeedDeviation'],
- # self.optimal_dict['followSpeedDeviation']],
- }
- builtin_graph_dict["followSpeedDeviation"] = fsd_data
- if metric == "followDistanceDeviation":
- self.unfunctional_follow_dist_deviation_df_statistic()
- unfunc_dist_df = self.unfunc_follow_df.copy()
- unfunc_dist_df.loc[unfunc_dist_df['type'] != 'ACC', 'type'] = "origin"
- # unfunc_dist_df.loc[unfunc_dist_df['type'] == 'distance', 'type'] = "distance"
- unfunc_dist_slices = unfunc_dist_df.to_dict('records')
- df2 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
- df2['dist_time'] = df2['end_time'] - df2['start_time']
- devi_dist_time = df2['dist_time'].sum()
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- percent2_2 = devi_dist_time / follow_duration * 100 if follow_duration else 0
- if devi_dist_time != 0:
- follow_description2 += f"跟车状态下自车和目标车的距离差共有{round(devi_dist_time, 2)}s超出合理范围,占比为{round(percent2_2, 2)}%;"
- else:
- follow_description2 += "跟车状态下自车和目标车的距离差均在合理范围内;"
- dist_deviation_list = [x for x in self.dist_deviation_list if not np.isnan(x)]
- acc_dict_indexes["followDistanceDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followDistanceDeviation"],
- "avg": round(np.mean(dist_deviation_list), 2) if dist_deviation_list else '-',
- "max": round(max(dist_deviation_list), 2) if dist_deviation_list else '-',
- "min": round(min(dist_deviation_list), 2) if dist_deviation_list else '-',
- "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]"
- }
- fdd_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": distance_deviation_vs_time,
- "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]",
- # "markLine": unfunc_dist_slices,
- # "markLine2": [self.optimal_dict['followDistanceDeviation']],
- }
- builtin_graph_dict["followDistanceDeviation"] = fdd_data
- if metric == "followStopDistance":
- acc_dict_indexes["followStopDistance"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followStopDistance"],
- "avg": round(np.mean(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "max": round(max(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "min": round(min(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "range": f"[{self.optimal_dict['followStopDistance']}, inf)"
- }
- # sdd_data = {
- # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "name": f"{self.name_dict[metric]}",
- # "data": self.stop_distance_list,
- # "range": f"[{self.optimal_dict['followStopDistance']}, inf)",
- # # "ref": [self.optimal_dict['followStopDistance'] - 1,
- # # self.optimal_dict['followStopDistance']],
- # }
- # builtin_graph_dict["followStopDistance"] = sdd_data
- if metric == "followResponseTime":
- acc_dict_indexes["followResponseTime"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followResponseTime"],
- "avg": round(np.mean(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "max": round(max(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "min": round(min(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "range": f"[0, {self.optimal_dict['followResponseTime']}]"
- }
- # rt_data = {
- # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "name": f"{self.name_dict[metric]}",
- # "data": self.follow_stop_time_start_list,
- # # "range": f"[0, {self.optimal_dict['followResponseTime']})"
- # }
- # builtin_graph_dict["followResponseTime"] = rt_data
- if metric not in self.bulitin_metric_list:
- acc_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- acc_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- acc_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- acc_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- acc_dict["indexes"] = acc_dict_indexes
- acc_dict["builtin"] = builtin_graph_dict
- acc_dict["custom"] = custom_graph_dict
- acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
- acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
- acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
- acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
- type_details_dict[type] = acc_dict
- unfunc_metric_list += unfunc_follow_metric_list
- func_over_optimal.append(str_follow_over_optimal)
- elif type == "functionLKA":
- lka_dict = {
- "name": f"{self.type_name_dict[type]}",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_lane = score_type_dict["functionLKA"]
- grade_lane = score_grade(score_lane)
- lka_dict['score'] = score_lane
- lka_dict['level'] = grade_lane
- # unfunc_type_list.append('车道保持') if score_type_dict["functionLKA"] < 80 else func_type_list.append(
- # '车道保持')
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- # lane keep description
- func_lane_metric_list = []
- unfunc_lane_metric_list = []
- for metric in self.metric_dict[type]:
- unfunc_lane_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else func_lane_metric_list.append(
- metric)
- lka_dict_indexes = {}
- for metric in self.metric_dict[type]:
- if metric == "laneDistance":
- lka_dict_indexes["laneDistance"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["laneDistance"],
- "avg": round(np.mean(self.line_dist), 2) if self.line_dist else "-",
- "max": round(max(self.line_dist), 2) if self.line_dist else "-",
- "min": round(min(self.line_dist), 2) if self.line_dist else "-",
- "range": f"[{self.optimal_dict['laneDistance']}, 1.875]"
- }
- if metric == "centerDistanceExpectation":
- lka_dict_indexes["centerDistanceExpectation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceExpectation'],
- "avg": round(self.result["centerDistanceExpectation"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceExpectation']}]"
- }
- if metric == "centerDistanceStandardDeviation":
- lka_dict_indexes["centerDistanceStandardDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceStandardDeviation'],
- "avg": round(self.result["centerDistanceStandardDeviation"],
- 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceStandardDeviation']}]"
- }
- if metric == "centerDistanceMax":
- lka_dict_indexes["centerDistanceMax"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceMax'],
- "avg": "-",
- "max": round(self.result["centerDistanceMax"], 2) if self.center_dist else "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceMax']}]"
- }
- if metric == "centerDistanceMin":
- lka_dict_indexes["centerDistanceMin"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceMin'],
- "avg": "-",
- "max": "-",
- "min": round(self.result["centerDistanceMin"], 2) if self.center_dist else "-",
- "range": f"[0, {self.optimal_dict['centerDistanceMin']}]"
- }
- if metric == "centerDistanceFrequency":
- lka_dict_indexes["centerDistanceFrequency"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceFrequency'],
- "avg": round(self.result["centerDistanceFrequency"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceFrequency']}]"
- }
- if metric == "centerDistanceRange":
- lka_dict_indexes["centerDistanceRange"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceRange'],
- "avg": round(self.result["centerDistanceRange"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceRange']}]"
- }
- if metric not in self.bulitin_metric_list:
- lka_dict_indexes[metric] = {
- # "name": self.custom_data[metric]['name'],
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- lka_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- lka_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- lka_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- lka_dict["indexes"] = lka_dict_indexes
- str_lane_over_optimal = ''
- if not unfunc_lane_metric_list:
- str_func_lane_metric = string_concatenate(func_lane_metric_list)
- lane_description1 = f"{str_func_lane_metric}指标均表现良好"
- else:
- for metric in unfunc_lane_metric_list:
- if metric in self.bulitin_metric_list:
- value = self.result[metric]
- if self.kind_dict[metric] == -1:
- metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 1:
- metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 0:
- metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- else:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_lane_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_lane_over_optimal = str_lane_over_optimal[:-1] if str_lane_over_optimal else ""
- str_func_lane_metric = string_concatenate(func_lane_metric_list)
- str_unfunc_lane_metric = string_concatenate(unfunc_lane_metric_list)
- if not func_lane_metric_list:
- lane_description1 = f"{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
- else:
- lane_description1 = f"{str_func_lane_metric}指标表现良好,{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
- lane_description2 = ""
- center_distance_vs_time = []
- if "centerDistanceMax" in self.metric_list:
- cnt2_1 = len(self.center_dist)
- tmp2_list = [x for x in self.center_dist if x > self.optimal_dict['centerDistanceMax']]
- cnt2_2 = len(tmp2_list)
- percent2 = 0 if cnt2_1 == 0 else (cnt2_2 / cnt2_1 * 100)
- dist_time = percent2 * duration / 100
- if dist_time != 0:
- lane_description2 = f"距离有{dist_time:.2f}秒超出合理范围,占比为{percent2:.2f}%"
- else:
- lane_description2 = f"距离均在合理范围内,算法车道保持表现良好"
- # lane keep data for graph
- # center_distance_vs_time = self.zip_time_pairs(self.center_dist)
- center_distance_vs_time = self.zip_time_pairs(self.center_dist_with_nan)
- self.unfunctional_lane_df_statistic()
- #
- # lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
- # lka_dict["description2"] = lane_description2
- unfunc_lane_df = self.unfunc_lane_df.copy()
- unfunc_lane_df['type'] = "origin"
- unfunc_lane_slices = unfunc_lane_df.to_dict('records')
- unfunc_lane_dist_df = self.unfunc_lane_df.copy()
- unfunc_lane_dist_df.loc[unfunc_lane_dist_df['type'] != 'LKA', 'type'] = "origin"
- unfunc_lane_dist_slices = unfunc_lane_dist_df.to_dict('records')
- lka_data = {
- "name": "车辆中心线横向距离(m)",
- "data": center_distance_vs_time,
- # "markLine": unfunc_lane_dist_slices
- }
- if 'centerDistanceMax' in self.optimal_dict:
- lka_range = f"[0, {self.optimal_dict['centerDistanceMax']}]"
- elif 'centerDistanceMin' in self.optimal_dict:
- lka_range = f"[0, {self.optimal_dict['centerDistanceMin']}]"
- else:
- lka_range = f"[0, 0.5]"
- lka_data["range"] = lka_range
- builtin_graph_dict["centerDistance"] = lka_data
- lka_dict["builtin"] = builtin_graph_dict
- lka_dict["custom"] = custom_graph_dict
- lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
- lka_dict["description2"] = lane_description2
- # lka_dict["centerDisData"] = center_distance_vs_time
- # lka_dict["centerDisMarkLine"] = unfunc_lane_dist_slices
- type_details_dict[type] = lka_dict
- unfunc_metric_list += unfunc_lane_metric_list
- func_over_optimal.append(str_lane_over_optimal)
- else:
- type_dict = {
- "name": f"{self.type_name_dict[type]}",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_custom_type = score_type_dict[type]
- grade_custom_type = score_grade(score_custom_type)
- type_dict["score"] = score_custom_type
- type_dict["level"] = grade_custom_type
- # custom type description
- good_custom_metric_list = []
- bad_custom_metric_list = []
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- type_dict_indexes = {}
- for metric in self.metric_dict[type]:
- bad_custom_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else good_custom_metric_list.append(
- metric)
- type_dict_indexes[metric] = {
- # "name": self.custom_data[metric]['name'],
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]:.3f}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]:.3f}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]:.3f}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]:.3f}]"
- custom_graph_dict[metric] = self.custom_data[metric]['reportData']
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- type_dict["indexes"] = type_dict_indexes
- str_type_over_optimal = ""
- if not bad_custom_metric_list:
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- type_description1 = f"{str_good_custom_metric}指标均表现良好"
- type_description2 = f"指标均在合理范围内,算法表现良好"
- else:
- for metric in bad_custom_metric_list:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_type_over_optimal = str_type_over_optimal[:-1]
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
- if not good_custom_metric_list:
- type_description1 = f"{str_bad_custom_metric}指标表现不佳"
- type_description2 = f"{str_type_over_optimal}"
- else:
- type_description1 = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳"
- type_description2 = f"{str_type_over_optimal}"
- type_dict["builtin"] = builtin_graph_dict
- type_dict["custom"] = custom_graph_dict
- type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict)
- type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict)
- type_details_dict[type] = type_dict
- unfunc_metric_list += bad_custom_metric_list
- func_over_optimal.append(str_type_over_optimal)
- report_dict["details"] = type_details_dict
- # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- func_over_optimal = [s for s in func_over_optimal if s]
- str_func_over_optimal = ';'.join(func_over_optimal)
- if grade_function == '优秀':
- str_func_type = string_concatenate(func_type_list)
- func_description1 = f'算法在{str_func_type}功能上表现优秀;'
- elif grade_function == '良好':
- str_func_type = string_concatenate(func_type_list)
- func_description1 = f'算法在{str_func_type}功能上总体表现良好,满足设计指标要求;'
- elif grade_function == '一般':
- str_unfunc_type = string_concatenate(unfunc_type_list)
- str_unfunc_metric = string_concatenate(unfunc_metric_list)
- str_unfunc_metric = replace_key_with_value(str_unfunc_metric, self.type_name_dict)
- func_description1 = f'算法在{str_unfunc_type}功能上表现一般、需要在{str_unfunc_metric}指标上进一步优化。其中,{str_func_over_optimal};'
- elif grade_function == '较差':
- str_unfunc_type = string_concatenate(unfunc_type_list)
- func_description1 = f'算法在{str_unfunc_type}功能上表现较差,需要提高算法的功能性表现。其中,{str_func_over_optimal};'
- if not unfunc_type_list:
- func_description2 = '功能性在各个功能上的表现俱佳。'
- else:
- str_unfunc_type = string_concatenate(unfunc_type_list)
- func_description2 = f"算法在{str_unfunc_type}功能上需要重点优化。"
- # report_dict["description1"] = replace_key_with_value(func_description1, self.name_dict)
- report_dict["description1"] = replace_key_with_value(replace_key_with_value(func_description1, self.name_dict),
- self.type_name_dict)
- report_dict["description2"] = replace_key_with_value(func_description2, self.type_name_dict)
- report_dict['commonData'] = {
- "per": {
- "name": "脚刹/油门踏板开度(百分比)",
- "legend": ["刹车踏板开度", "油门踏板开度"],
- "data": [brake_vs_time, throttle_vs_time]
- },
- "ang": {
- "name": "方向盘转角(角度°)",
- "data": steering_vs_time
- },
- "spe": {
- "name": "速度(km/h)",
- "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
- "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- },
- "acc": {
- "name": "加速度(m/s²)",
- "legend": ["横向加速度", "纵向加速度"],
- "data": [lat_acc_vs_time, lon_acc_vs_time]
- },
- # "dis": {
- # "name": "前车距离(m)",
- # "data": distance_vs_time
- # }
- }
- if "function_ACC" in self.type_list:
- report_dict['commonData']["dis"] = {
- "name": "前车距离(m)",
- "data": distance_vs_time
- }
- self.unfunc_df = pd.concat([self.unfunc_df, self.unfunc_follow_df, self.unfunc_lane_df], ignore_index=True)
- unfunc_df = self.unfunc_df.copy().dropna()
- unfunc_slices = unfunc_df.to_dict('records')
- unfunc_slices.extend(self.custom_markline_list)
- report_dict["commonMarkLine"] = unfunc_slices
- # report_dict = {
- # "name": "功能性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_function,
- # "level": grade_function,
- # 'followStopCount': self.follow_stop_count,
- # "description1": func_description1,
- # "description2": func_description2,
- #
- # "functionACC": acc_dict,
- # "functionLKA": lka_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- self.eval_data = self.ego_df.copy()
- return report_dict
- def get_eval_data(self):
- if 'line_dist' in self.eval_data.columns:
- df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc', 'line_dist']].copy()
- else:
- df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc']].copy()
- self.df['line_dist'] = pd.Series(dtype=float)
- return df
|