123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/03
- @Last Modified: 2023/08/03
- @Summary: Function metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import math
- import numpy as np
- import pandas as pd
- import scipy.signal as sg
- from score_weight import cal_score_with_priority, cal_weight_from_80
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- class Function(object):
- """
- Class for achieving function metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- """
- def __init__(self, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.data_processed = data_processed
- self.scoreModel = scoreModel
- self.status_trigger_dict = data_processed.status_trigger_dict
- self.df = data_processed.object_df
- self.ego_df = data_processed.ego_data
- self.obj_id_list = data_processed.obj_id_list
- self.df_roadmark = data_processed.road_mark_df
- self.df_roadpos = data_processed.road_pos_df
- self.df_drivectrl = data_processed.driver_ctrl_df
- self.unfunc_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_follow_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_lane_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unfunc_custom_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.custom_markline_list = []
- # config infos for calculating score
- self.config = data_processed.config
- function_config = self.config.config['function']
- self.function_config = function_config
- # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = function_config['weightCustom']
- self.metric_list = function_config['metric']
- self.type_list = function_config['type']
- self.type_name_dict = function_config['typeName']
- self.name_dict = function_config['name']
- self.unit_dict = function_config['unit']
- # custom metric data
- self.customMetricParam = function_config['customMetricParam']
- self.custom_metric_list = list(self.customMetricParam.keys())
- self.custom_data = custom_data
- self.custom_param_dict = {}
- # score data
- self.weight = function_config['weightDimension']
- self.weight_type_dict = function_config['typeWeight']
- self.weight_type_list = function_config['typeWeightList']
- self.weight_dict = function_config['weight']
- self.weight_list = function_config['weightList']
- self.priority_dict = function_config['priority']
- self.priority_list = function_config['priorityList']
- self.kind_dict = function_config['kind']
- self.optimal_dict = function_config['optimal']
- self.multiple_dict = function_config['multiple']
- self.kind_list = function_config['kindList']
- self.optimal_list = function_config['optimalList']
- self.multiple_list = function_config['multipleList']
- # self.unit_dict = function_config['unit']
- self.metric_dict = function_config['typeMetricDict']
- if "functionACC" in self.metric_dict.keys():
- self.acc_metric_list = self.metric_dict['functionACC']
- if "functionLKA" in self.metric_dict.keys():
- self.lka_metric_list = self.metric_dict['functionLKA']
- # self.acc_metric_list = ['followSpeedDeviation', 'followDistanceDeviation', 'followStopDistance',
- # 'followResponseTime']
- # self.lka_metric_list = ['laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation',
- # 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency',
- # 'centerDistanceRange']
- # custom metric
- self.flag_type_dict = {}
- # lists of drving control info
- self.time_list = data_processed.driver_ctrl_data['time_list']
- self.frame_list = data_processed.driver_ctrl_data['frame_list']
- self.time_list_follow = []
- self.frame_list_follow = []
- self.dist_list = []
- self.v_deviation_list = []
- self.v_relative_list = []
- self.dist_deviation_list = []
- self.stop_distance_list = []
- self.follow_stop_time_start_list = [] # 起停跟车响应时长
- self.dist_list_full_time = list()
- self.dist_deviation_list_full_time = list()
- self.line_dist = []
- self.center_dist = []
- self.center_dist_with_nan = []
- self.center_time_list = []
- self.center_frame_list = []
- self.ICA_FLAG = False
- self.follow_stop_count = 0
- self.result = {}
- def _following(self, df):
- """
- 稳定跟车行驶:
- 1.稳态控制速度偏差:筛选出跟车行驶数据后,
- 2.稳态控制距离偏差
- ACC stastus
- 0x7: Stand-wait State
- 0x6: Stand-active State
- 0x5: Passive State
- 0x4: Standby State
- 0x3: Shut-off State
- 0x2: Override State
- 0x1: Active State
- 0x0: Off
- # 速度变化不能以0到非0为参考,要改为从0到0.05的变化,有一个阈值滤波,避免前车车辆抖动误差影响传感器
- 弯道行驶: 最小跟随弯道半径
- """
- # df = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
- # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
- col_list = ['simTime', 'simFrame', 'playerId', 'v', 'posX', 'posY'] # target_id
- df = df[col_list].copy()
- ego_df = df[df['playerId'] == 1][['simTime', 'simFrame', 'v', 'posX', 'posY']]
- # 筛选目标车(同一车道内,距离最近的前车)
- # obj_df = df[df['playerId'] == df['target_id']]
- target_id = 2
- obj_df = df[df['playerId'] == target_id][['simTime', 'simFrame', 'v', 'posX', 'posY']] # 目标车
- obj_df = obj_df.rename(columns={'v': 'v_obj', 'posX': 'posX_obj', 'posY': 'posY_obj'})
- df_merge = pd.merge(ego_df, obj_df, on=['simTime', 'simFrame'], how='left')
- df_merge['v_relative'] = df_merge['v'] - df_merge['v_obj']
- df_merge['dist'] = df_merge.apply(
- lambda row: self.dist(row['posX'], row['posY'], row['posX_obj'], row['posY_obj']), axis=1)
- self.dist_list = df_merge['dist'].values.tolist()
- df_merge['time_gap'] = df_merge['dist'] / df_merge['v']
- SAFE_TIME_GAP = 3
- df_merge['dist_deviation'] = df_merge['time_gap'].apply(
- lambda x: 0 if (x >= SAFE_TIME_GAP) else (SAFE_TIME_GAP - x))
- df_stop = df_merge[(df_merge['v'] == 0) & (df_merge['v_obj'] == 0)]
- stop_distance_list = df_stop['dist'].values.tolist()
- self.stop_distance_list = stop_distance_list
- t_list = df_stop['simTime'].values.tolist()
- stop_group = []
- sub_group = []
- CONTINUOUS_TIME_PERIOD = 1
- CONTINUOUS_FRAME_PERIOD = 13
- for i in range(len(t_list)):
- if not sub_group or t_list[i] - t_list[i - 1] <= CONTINUOUS_TIME_PERIOD:
- sub_group.append(t_list[i])
- else:
- stop_group.append(sub_group)
- sub_group = [t_list[i]]
- stop_group.append(sub_group)
- stop_group = [g for g in stop_group if len(g) >= CONTINUOUS_FRAME_PERIOD]
- self.follow_stop_count = len(stop_group)
- # solution 1: 跟车状态下,算法输出预设跟车速度
- # df['velocity_deviation'] = abs(df['v'] - df['set_velocity'])
- # max_velocity_deviation = df['velocity_deviation'].max()
- # solution 2: 跟车状态下,跟车设定速度为目标车速度
- # velocity_deviation = []
- # distance_deviation = []
- # stop_distance = []
- #
- # for f, data in df.groupby('simFrame'):
- # ego_data = data.iloc[0].copy()
- # # df.loc[len(df)] = ego_data
- #
- # if len(data) < 2:
- # continue
- # v1 = ego_data['v']
- # x1 = ego_data['posX']
- # y1 = ego_data['posY']
- #
- # for i in range(1, len(data)):
- # obj_data = data.iloc[i].copy()
- #
- # v2 = obj_data['v']
- # x2 = obj_data['posX']
- # y2 = obj_data['posY']
- #
- # v_delta = abs(v1 - v2)
- # velocity_deviation.append(v_delta)
- #
- # dist = self.dist(x1, y1, x2, y2)
- # distance_deviation.append(dist)
- #
- # if v2 == 0 and v1 == 0:
- # stop_distance.append(dist)
- # df.loc[len(df)] = obj_data
- # self.df = df
- df_merge.replace([np.inf, -np.inf], np.nan, inplace=True) # 异常值处理
- self.time_list_follow = df_merge['simTime'].values.tolist()
- self.frame_list_follow = df_merge['simFrame'].values.tolist()
- self.v_relative_list = df_merge['v_relative'].values.tolist()
- self.v_deviation_list = abs(df_merge['v_relative']).values.tolist()
- self.dist_deviation_list = df_merge['dist_deviation'].values.tolist()
- tmp_df = self.ego_df[['simTime', 'simFrame']].copy()
- v_rel_df = df_merge[['simTime', 'v_relative']].copy()
- df_merged1 = pd.merge(tmp_df, v_rel_df, on='simTime', how='left')
- self.v_relative_list_full_time = df_merged1['v_relative'].values.tolist()
- dist_deviation_df = df_merge[['simTime', 'dist_deviation']].copy()
- df_merged1 = pd.merge(tmp_df, dist_deviation_df, on='simTime', how='left')
- self.dist_deviation_list_full_time = df_merged1['dist_deviation'].values.tolist()
- dist_df = df_merge[['simTime', 'dist']].copy()
- df_merged1 = pd.merge(tmp_df, dist_df, on='simTime', how='left')
- self.dist_list_full_time = df_merged1['dist'].values.tolist()
- max_velocity_deviation = abs(df_merge['v_relative']).max()
- distance_deviation = df_merge['dist_deviation'].max()
- min_stop_distance = min(self.stop_distance_list) if self.stop_distance_list else 9999
- self.result['followSpeedDeviation'] = max_velocity_deviation
- self.result['followDistanceDeviation'] = distance_deviation
- self.result['followStopDistance'] = min_stop_distance
- def _stop_and_go(self, df):
- """
- 停走功能:
- 1.跟停距离3-4m,
- 2.启动跟车响应时间<=1.2s
- 3.停车跟车响应时间<=1.2s
- decisionType_target
- 0 无障碍物巡航
- 1 停车减速让行
- 2 跟车
- 3 绕行
- 4 到达终点?
- """
- # df = self.df[self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- # df = self.df[self.df['ACC_status'] == 1].copy() # 跟车状态下
- df = df.dropna(subset=['type'])
- STOP_SPEED_THRESHOLD = 0.05
- df['v'] = df['v'].apply(lambda x: 0 if x <= STOP_SPEED_THRESHOLD else 1) # 区分速度为0或非0
- target_id = 2
- df_ego = df[df['playerId'] == 1].copy()
- df_obj = df[df['playerId'] == target_id].copy() # 目标车
- df_obj_time = df_obj['simTime'].values.tolist()
- df_ego = df_ego[df_ego['simTime'].isin(df_obj_time)].copy()
- df_ego = df_ego.drop_duplicates(["simTime", "simFrame"])
- df_obj = df_obj.drop_duplicates(["simTime", "simFrame"])
- df_ego['v_diff'] = df_ego['v'].diff()
- df_ego['v_start_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == 1 else 0) # 起步即为1
- df_ego['v_stop_flag'] = df_ego['v_diff'].apply(lambda x: 1 if x == -1 else 0) # 停车即为-1
- df_obj['v_diff'] = df_obj['v'].diff()
- obj_v_start_flag = df_obj['v_diff'].apply(lambda x: 1 if x == 1 else 0).values # 起步即为1
- obj_v_stop_flag = df_obj['v_diff'].apply(lambda x: 1 if x == -1 else 0).values # 停车即为1
- df_ego['obj_v_start_flag'] = obj_v_start_flag
- df_ego['obj_v_stop_flag'] = obj_v_stop_flag
- df_ego['flag_start'] = df_ego['obj_v_start_flag'] - df_ego['v_start_flag'] # 目标车起步即为1,自车起步即为-1
- df_ego['flag_stop'] = df_ego['obj_v_stop_flag'] - df_ego['v_stop_flag'] # 目标车停车即为1,自车停车即为-1
- flag_start_list = df_ego['flag_start'].values
- flag_stop_list = df_ego['flag_stop'].values
- time_list = df_ego['simTime'].values
- time_start_list = []
- time_stop_list = []
- for i, flag in enumerate(flag_start_list):
- if flag:
- t1 = time_list[i]
- if flag == -1:
- t2 = time_list[i]
- time_start_list.append(t2 - t1) # t2-t1即为自车起步响应时间
- for i, flag in enumerate(flag_stop_list):
- if flag:
- t1 = time_list[i]
- if flag == -1:
- t2 = time_list[i]
- time_stop_list.append(t2 - t1) # t2-t1即为自车停车响应时间
- time_start_list = [i for i in time_start_list if i != 0]
- self.result['followResponseTime'] = max(time_start_list) if time_start_list else 0
- self.follow_stop_time_start_list = time_start_list
- # self.result['跟车停止响应最长时间'] = max(time_stop_list)
- def dist(self, x1, y1, x2, y2):
- dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dis
- def _line_dist_merge(self):
- # line_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
- # line_dist_group = line_dist_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
- # line_dist_df['line_dist'] = line_dist_group
- # line_dist_df = line_dist_df[['simFrame', 'line_dist']]
- # line_dist_df.columns = ['simFrame', 'line_dist']
- # line_dist_df = line_dist_df.drop_duplicates()
- line_dist_df = self.df_roadmark[(self.df_roadmark['id'] == 0) | (self.df_roadmark['id'] == 2)].copy()
- # df = line_dist_df[line_dist_df['id'] == 2][['simTime', 'simFrame']].copy()
- # df['line_dist'] = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).values.tolist()
- if line_dist_df.empty:
- self.ego_df['line_dist'] = pd.Series(dtype=float)
- else:
- df = line_dist_df.groupby('simFrame').apply(lambda t: abs(t['lateralDist']).min()).reset_index()
- # df = df.rename(columns={'lateralDist': 'line_dist'})
- df.columns = ["simFrame", "line_dist"]
- self.ego_df = pd.merge(self.ego_df, df, on='simFrame', how='left')
- def _lane_transverse_distance(self, df):
- """
- 0x7:Reserved
- 0x6: Reserved
- 0x5: Reserved
- 0x4: Error
- 0x3: Active
- 0x2: Standby
- 0x1: Passive
- 0x0: Off
- """
- # 车道内偏移行驶时,与近侧车道线的横向距离
- # data_group = lane_df.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
- # data_group = self.df_roadmark.groupby("simFrame").apply(lambda t: abs(t['lateralDist']).min())
- # df = data_group.to_frame(name='line_dist')
- # df['simFrame'] = data_group.index.tolist()
- # df = self.df_roadmark[self.df_roadmark['id'] == 0][['simTime', 'simFrame']].copy()
- # df['line_dist'] = self.df_roadmark.groupby('simFrame').apply(
- # lambda t: abs(t['lateralDist']).min()).values.tolist()
- # self.ego_df = pd.merge(self.ego_df, df, on=['simTime', 'simFrame'], how='left')
- # self._line_dist_merge()
- # # df = self.df[self.df['LKA_status'] == 3]
- # df = self.ego_df[self.ego_df['LKA_status'] == "Active"]
- self.line_dist = df['line_dist'].to_list()
- self.line_dist = [x for x in self.line_dist if not np.isnan(x)]
- self.result['laneDistance'] = min(self.line_dist) if self.line_dist else self.optimal_dict['laneDistance']
- def _lateral_dist_of_center_line(self, df_laneoffset):
- # df_laneoffset = self.df[(self.df['LKA_status'] == "Active") & (self.df['playerId'] == 1)]
- self.center_dist_with_nan = df_laneoffset['laneOffset'].to_list()
- self.center_time_list = df_laneoffset['simTime'].to_list()
- self.center_frame_list = df_laneoffset['simFrame'].to_list()
- # self.center_dist_df = self.df_roadpos[self.df_roadpos["playerId"] == 1]
- # self.center_dist = self.center_dist_df["laneOffset"]
- # self.center_time_list = self.center_dist_df['simTime'].to_list()
- # self.center_frame_list = self.center_dist_df['simFrame'].to_list()
- self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
- if not self.center_dist:
- self.result['centerDistanceExpectation'] = 0
- self.result['centerDistanceStandardDeviation'] = 0
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['centerDistanceMax'] = 0
- self.result['centerDistanceMin'] = 0
- self.result['centerDistanceFrequency'] = 0
- self.result['centerDistanceRange'] = 0
- else:
- center_dist = [abs(x) for x in self.center_dist]
- # 车道中心线横向距离分布期望与标准差
- E_lane_center_dist = np.mean(center_dist)
- D_lane_center_dist = np.std(center_dist)
- # 车道中心线横向距离分布极值
- center_dist = np.array(center_dist)
- extrmax_index = sg.argrelmax(center_dist)[0]
- extrmin_index = sg.argrelmin(center_dist)[0]
- extreme_max_value = center_dist[extrmax_index]
- extreme_min_value = center_dist[extrmin_index]
- # 横向相对位置震荡频率
- # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
- length1 = len(extrmax_index) # 极大值列表长度
- length2 = len(extrmin_index) # 极小值列表长度
- # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
- FREQUENCY = 100
- time_diff = 1.0 / FREQUENCY
- period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
- period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
- try:
- period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
- except ZeroDivisionError:
- period = 0
- frequency = 1 / period if period else 0
- # length = min(len(extreme_max_value), len(extreme_min_value))
- # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
- # frequency = 1 / period
- # 横向相对位置震荡极差
- length = min(len(extreme_max_value), len(extreme_min_value))
- extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
- self.result['centerDistanceExpectation'] = E_lane_center_dist
- self.result['centerDistanceStandardDeviation'] = D_lane_center_dist
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['centerDistanceMax'] = center_dist.max()
- self.result['centerDistanceMin'] = center_dist.min()
- self.result['centerDistanceFrequency'] = abs(frequency)
- self.result['centerDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
- def _continous_judge(self, frame_list):
- CONTINUOUS_FRAME_PERIOD = 3
- if not frame_list:
- return 0
- cnt = 1
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] <= CONTINUOUS_FRAME_PERIOD:
- continue
- cnt += 1
- return cnt
- def _lane_departure_warning(self, ldw_df):
- """
- 工作车速范围:55/60-200/250 km/h
- 最小可用车道宽度:2.6m
- 误报警:距离安全,但是预警
- 漏报警:距离危险,但是没报警
- 灵敏度差:线外40cm
- 灵敏度零级:压线
- 灵敏度一级:线内20cm(20%)
- 灵敏度二级:线内40cm(40%)
- 灵敏度三级:线内60cm(60%)
- 偏离车速1.0m/s:线内60cm报警
- 偏离车速0.5m/s:线内40cm报警
- """
- # 自车, 车道线宽度>2.6m,转向灯没开
- # ego_df = self.df[self.df['playerId'] == 1].copy()
- # ego_df['line_dist'] = self.line_dist
- # lane_dist_df = self.df_roadmark[self.df_roadmark['id'] == 0 & self.df_roadmark['id'] == 2].copy()
- # lane_dist_group = lane_dist_df.groupby("simFrame").apply(
- # lambda t: abs(t['lateralDist']).min()).values.tolist()
- # lane_dist_df['line_dist'] = lane_dist_group
- # lane_dist_df = lane_dist_df[['simFrame', 'line_dist']]
- # lane_dist_df.columns = ['simFrame', 'line_dist']
- # # lane_dist_df = lane_dist_df[['simTime', 'simFrame', 'line_dist']]
- # # lane_dist_df.columns = ['simTime', 'simFrame', 'line_dist']
- # lane_dist_df = lane_dist_df.drop_duplicates()
- # ldw_status_df = self.df[self.df['playerId'] == 1].copy()[['simTime', 'simFrame', 'LDW_status']].drop_duplicates()
- # ldw_status_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy().drop_duplicates()
- # ldw_df = pd.merge_asof(ldw_status_df, lane_dist_df, on='simFrame', direction='nearest')
- # ldw_df = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
- # calculate max DLC
- warning_dist_max = ldw_df[ldw_df['LDW_status'] == "Active"]['line_dist'].max()
- # warning_dist_max = ldw_df[ldw_df['LDW_status'] == 3]['line_dist'].max()
- self.result['预警时距离车道线最大距离'] = warning_dist_max
- # count miss warning
- miss_warning_df = ldw_df[(ldw_df['line_dist'] <= 0.4) & (ldw_df['LDW_status'] != "Active")]
- miss_warning_frame_list = miss_warning_df['simFrame'].values.tolist()
- miss_warning_count = self._continous_judge(miss_warning_frame_list)
- self.result['车道偏离漏预警次数'] = miss_warning_count
- # count false warning
- false_warning_df = ldw_df[(ldw_df['line_dist'] >= 0.4) & (ldw_df['LDW_status'] == "Active")]
- false_warning_frame_list = false_warning_df['simFrame'].values.tolist()
- false_warning_count = self._continous_judge(false_warning_frame_list)
- self.result['车道偏离误预警次数'] = false_warning_count
- def _autonomous_emergency_brake(self):
- """
- 刹得准、刹得稳
- 碰撞预警 + 紧急制动
- <=4s 预警>1s,制动<3s
- 触发目标类型
- 目标类型准确度
- 目标为车辆的速度降:
- 弱势目标的速度降:
- 触发时TTC<4s
- 一级制动减速度:4m/s^2
- 二级制动减速度:10m/s^2
- 刹停行驶距离:10-20m
- 刹停时前车距离:0.8-1.5m
- 误触发次数:频率 n/10万km
- 漏触发:
- 能绕开就没必要刹停:开始刹车时距离远近
- Returns:
- """
- # ego_df = self.df[self.df['playerId'] == 1]
- df = self.ego_df[self.ego_df['Aeb_status'] == 'Active']
- # df = self.ego_df[self.ego_df['Aeb_status'] == 0]
- # calculate brakeDecelerate
- df['lon_accel'] = df['accelX'] * np.cos(df['posH']) + df['accelY'] * np.sin(df['posH'])
- decelerate_max = df['lon_accel'].min()
- # calculate brake stop travelDist
- df_dist = df[df['lon_accel'] < 0]
- frame_list = df_dist['simFrame'].to_list()
- travelDist_list = df_dist['travelDist'].to_list()
- travelDist_group = []
- start = 0
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] > 5:
- end = i - 1
- # travelDist_group.append(travelDist_list[start:end])
- travelDist_group.append(travelDist_list[end] - travelDist_list[start])
- start = i
- end = -1
- travelDist_group.append(travelDist_list[end] - travelDist_list[start])
- brake_distance_max = max(travelDist_group) if travelDist_group else np.inf
- self.result['brakeDecelerate'] = abs(round(decelerate_max, 4))
- self.result['brakeDistance'] = round(brake_distance_max, 4)
- def _integrated_cruise_assist(self, df):
- df_LCC = df[df['ICA_status'].isin(['LLC_follow_vehicle', 'LLC_follow_line'])].copy()
- self.LCC_line_dist = df_LCC['line_dist'].to_list()
- self.result['LCClaneDistance'] = min(self.LCC_line_dist) if self.LCC_line_dist else self.optimal_dict[
- 'LCClaneDistance']
- self.LCC_center_dist = df_LCC['laneOffset'].to_list()
- self.LCC_center_time_list = df_LCC['simTime'].to_list()
- self.LCC_center_frame_list = df_LCC['simFrame'].to_list()
- if not self.LCC_center_dist:
- self.result['LCCcenterDistanceExpectation'] = 0
- self.result['LCCcenterDistanceStandardDeviation'] = 0
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['LCCcenterDistanceMax'] = 0
- self.result['LCCcenterDistanceMin'] = 0
- self.result['LCCcenterDistanceFrequency'] = 0
- self.result['LCCcenterDistanceRange'] = 0
- else:
- center_dist = [abs(x) for x in self.LCC_center_dist]
- # 车道中心线横向距离分布期望与标准差
- E_lane_center_dist = np.mean(center_dist)
- D_lane_center_dist = np.std(center_dist)
- # 车道中心线横向距离分布极值
- center_dist = np.array(center_dist)
- extrmax_index = sg.argrelmax(center_dist)[0]
- extrmin_index = sg.argrelmin(center_dist)[0]
- extreme_max_value = center_dist[extrmax_index]
- extreme_min_value = center_dist[extrmin_index]
- # 横向相对位置震荡频率
- # 极值即函数局部的最大值或最小值,周期表示两个极大值之间的时间长度或者两个极小值之间的时间长度
- length1 = len(extrmax_index) # 极大值列表长度
- length2 = len(extrmin_index) # 极小值列表长度
- # 极值下标之间的时间间隔列表乘以每两个下标之间的时间间隔
- FREQUENCY = 100
- time_diff = 1.0 / FREQUENCY
- period1 = time_diff * (extrmax_index[1:length1] - extrmax_index[0:length1 - 1])
- period2 = time_diff * (extrmin_index[1:length2] - extrmin_index[0:length2 - 1])
- try:
- period = (sum(period1) + sum(period2)) / (length1 + length2) # 所有时间间隔(即周期)的均值作为最后的周期
- except ZeroDivisionError:
- period = 0
- frequency = 1 / period if period else 0
- # length = min(len(extreme_max_value), len(extreme_min_value))
- # period = np.mean(extrmax_index[0:length] - extrmin_index[0:length])
- # frequency = 1 / period
- # 横向相对位置震荡极差
- length = min(len(extreme_max_value), len(extreme_min_value))
- extr_diff = extreme_max_value[0:length] - extreme_min_value[0:length]
- self.result['LCCcenterDistanceExpectation'] = E_lane_center_dist
- self.result['LCCcenterDistanceStandardDeviation'] = D_lane_center_dist
- # self.result['centerDistanceMax'] = abs(extreme_max_value).max()
- # self.result['centerDistanceMin'] = abs(extreme_min_value).min()
- self.result['LCCcenterDistanceMax'] = center_dist.max()
- self.result['LCCcenterDistanceMin'] = center_dist.min()
- self.result['LCCcenterDistanceFrequency'] = abs(frequency)
- self.result['LCCcenterDistanceRange'] = abs(extr_diff).max() if list(extr_diff) else 0
- df_LC = df[df['ICA_status'] == 'Only_Longitudinal_Control'].copy()
- v_max = df_LC['v'].max()
- v_min = df_LC['v'].min()
- self.result['OLC_v_deviation'] = v_max - v_min
- def _function_statistic(self):
- """
- """
- ACC_optimal_list = []
- if "functionACC" in self.type_list:
- for function_type in self.type_list:
- if "ICA" in function_type:
- continue
- if len(self.obj_id_list) > 1:
- # 给定的双层列表
- follow_time_ranges = self.status_trigger_dict['ACC']['ACC_active_time']
- # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
- # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
- filtered_df = pd.DataFrame(columns=self.df.columns)
- for start, end in follow_time_ranges:
- mask = (self.df['simTime'] >= start) & (self.df['simTime'] <= end)
- if filtered_df is None:
- filtered_df = self.df[mask]
- else:
- filtered_df = pd.concat([filtered_df, self.df[mask]])
- # 上述方式可能引入重复行,使用drop_duplicates去重
- df_follow = filtered_df.drop_duplicates()
- df_stop_and_go = df_follow.copy()
- # 重置索引(如果需要)
- # filtered_df.reset_index(drop=True, inplace=True)
- # df_follow111 = self.df[self.df['ACC_status'] == "Active"].copy() # 跟车状态下
- self._following(df_follow)
- # df_stop_and_go = self.df[
- # self.df['ACC_status'].isin(["Active", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- self._stop_and_go(df_stop_and_go)
- if df_follow.empty and df_stop_and_go.empty:
- self.flag_type_dict['functionACC'] = False
- else:
- self.flag_type_dict['functionACC'] = True
- else:
- ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
- if "functionLKA" in self.type_list:
- self._line_dist_merge()
- # df_line_dist = self.df[self.df['LKA_status'] == 3]
- # 给定的双层列表
- follow_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time']
- # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
- # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
- filtered_df = pd.DataFrame(columns=self.ego_df.columns)
- for start, end in follow_time_ranges:
- mask = (self.ego_df['simTime'] >= start) & (self.ego_df['simTime'] <= end)
- if filtered_df is None:
- filtered_df = self.ego_df[mask]
- else:
- filtered_df = pd.concat([filtered_df, self.ego_df[mask]])
- # 上述方式可能引入重复行,使用drop_duplicates去重
- df_lka = filtered_df.drop_duplicates()
- # df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"]
- if df_lka.empty:
- self.flag_type_dict['functionLKA'] = False
- else:
- self.flag_type_dict['functionLKA'] = True
- self._lane_transverse_distance(df_lka)
- self._lateral_dist_of_center_line(df_lka)
- # if "functionLDW" in self.type_list:
- # df_ldw = self.ego_df[['simTime', 'simFrame', 'LDW_status', 'line_dist']].copy()
- # self._lane_departure_warning(df_ldw)
- # if "functionAEB" in self.type_list:
- # self._autonomous_emergency_brake()
- # if self.df['ICA_status'].nunique() != 1:
- # self.ICA_FLAG = True
- # # if "functionICA" in self.type_list:
- # # self._integrated_cruise_assist(self.ego_df)
- # # ACC
- # if len(self.obj_id_list) > 1:
- # df_follow = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3,对应Active
- # self._following(df_follow)
- #
- # df_stop_and_go = self.df[
- # self.df['ACC_status'].isin(["Shut_off", "Stand-active", "Stand-wait"])].copy() # 跟车状态下
- # self._stop_and_go(df_stop_and_go)
- #
- # else:
- # ACC_optimal_list = [value for key, value in self.optimal_dict.items() if key in self.acc_metric_list]
- #
- # # LKA
- # self._line_dist_merge()
- # # df_line_dist = self.df[self.df['LKA_status'] == 3]
- # df_lka = self.ego_df[self.ego_df['ICA_status'] == "LLC_Follow_Line"]
- # self._lane_transverse_distance(df_lka)
- # self._lateral_dist_of_center_line(df_lka)
- arr_func = [value for key, value in self.result.items() if key in self.metric_list]
- # arr_func = list(self.result.values())
- if "functionACC" in self.type_list and len(self.obj_id_list) == 1:
- arr_func = ACC_optimal_list + arr_func
- return arr_func
- def custom_metric_param_parser(self, param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def custom_metric_score(self, metric, value, param_list):
- """
- """
- param = self.custom_metric_param_parser(param_list)
- self.custom_param_dict[metric] = param
- score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
- score_sub = score_model.cal_score()
- score = sum(score_sub) / len(score_sub)
- return score
- def func_score(self):
- score_metric_dict = {}
- score_type_dict = {}
- arr_func = self._function_statistic()
- print("\n[功能性表现及得分情况]")
- print("功能性各指标值:", [round(num, 2) for num in arr_func])
- if arr_func:
- arr_func = np.array([arr_func])
- score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_func)
- score_sub = score_model.cal_score()
- score_sub = list(map(lambda x: 80 if np.isnan(x) else x, score_sub))
- score_metric = [round(num, 2) for num in score_sub]
- metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
- score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
- flag_custom_type_dict = {}
- for metric in self.custom_metric_list:
- value = self.custom_data[metric]['value']
- param_list = self.customMetricParam[metric]
- score = self.custom_metric_score(metric, value, param_list)
- score_metric_dict[metric] = round(score, 2)
- if 'statusFlag' in self.custom_data[metric].keys():
- custom_type, flag_custom_type = next(iter(self.custom_data[metric]['statusFlag'].items()))
- if (custom_type in flag_custom_type_dict) and (flag_custom_type == True):
- flag_custom_type_dict[custom_type] = True
- else:
- flag_custom_type_dict[custom_type] = flag_custom_type
- #
- #
- # update metric_list and metric_dict
- #
- #
- flag_custom_type_dict1 = {k: v for k, v in flag_custom_type_dict.items() if v == True}
- flag_builtin_type_dict2 = {k: v for k, v in self.flag_type_dict.items() if v == True}
- flag_builtin_type_dict2.update(flag_custom_type_dict1)
- self.type_list = list(flag_builtin_type_dict2.keys())
- self.metric_list = []
- for type in self.type_list:
- self.metric_list.extend(self.metric_dict[type])
- score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- score_metric = list(score_metric_dict.values())
- # no score in list
- if not score_metric:
- return 0, {}, {}
- if self.weight_custom: # 自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_function = sum(score_type_with_weight_dict.values())
- else: # 客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_function = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- for key, value in self.weight_dict.items():
- if key in self.metric_dict[type]:
- # self.weight_dict[key] = round(value / type_weight, 4)
- self.weight_dict[key] = value / type_weight
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- for key in self.weight_dict:
- self.weight_dict[key] = round(self.weight_dict[key], 4)
- score_type = list(score_type_dict.values())
- self.weight_type_list = cal_weight_from_80(score_type)
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- score_function = round(score_function, 2)
- print(f"功能性得分为:{score_function:.2f}分。")
- print(f"功能性各类型得分为:{score_type_dict}分。")
- print(f"功能性各指标得分为:{score_metric_dict}。")
- return score_function, score_type_dict, score_metric_dict
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- def unfunctional_follow_v_deviation_df_statistic(self):
- unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
- 'v_deviation': self.v_deviation_list})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- v_df = unfunc_df[unfunc_df['v_deviation'] > self.optimal_dict['followSpeedDeviation']]
- v_df = v_df[['simTime', 'simFrame', 'v_deviation']]
- v_follow_df = self.continuous_group(v_df)
- # v_follow_df['type'] = 'follow'
- # v_follow_df['type'] = 'ACC'
- v_follow_df['type'] = f"{self.type_name_dict['functionACC']}"
- self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, v_follow_df], ignore_index=True)
- def unfunctional_follow_dist_deviation_df_statistic(self):
- unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
- 'dist_deviation': self.dist_deviation_list})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- dist_df = unfunc_df[unfunc_df['dist_deviation'] > self.optimal_dict['followDistanceDeviation']] # 阈值由1改为了3
- dist_df = dist_df[['simTime', 'simFrame', 'dist_deviation']]
- dist_follow_df = self.continuous_group(dist_df)
- # v_follow_df['type'] = 'follow'
- # dist_follow_df['type'] = 'ACC'
- dist_follow_df['type'] = f"{self.type_name_dict['functionACC']}"
- self.unfunc_follow_df = pd.concat([self.unfunc_follow_df, dist_follow_df], ignore_index=True)
- def unfunctional_lane_df_statistic(self):
- unfunc_df = pd.DataFrame(
- {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
- 'center_dist': self.center_dist_with_nan})
- unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
- unfunc_df = unfunc_df.dropna(subset=['center_dist'])
- lane_df = unfunc_df[abs(unfunc_df['center_dist']) > self.optimal_dict['centerDistanceMax']]
- lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
- dist_lane_df = self.continuous_group(lane_df)
- # dist_lane_df['type'] = 'lane'
- # dist_lane_df['type'] = 'LKA'
- dist_lane_df['type'] = f"{self.type_name_dict['functionLKA']}"
- self.unfunc_lane_df = pd.concat([self.unfunc_lane_df, dist_lane_df], ignore_index=True)
- # def zip_time_pairs(self, zip_list, upper_limit=9999):
- # zip_time_pairs = zip(self.time_list, zip_list)
- # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
- # return zip_vs_time
- def zip_time_pairs(self, zip_list):
- zip_time_pairs = zip(self.time_list, zip_list)
- zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
- return zip_vs_time
- def _get_weight_distribution(self, dimension):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name[dimension]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def report_statistic(self):
- # report_dict = {
- # "name": "功能性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_function,
- # "level": grade_function,
- # 'score_type': score_type,
- # 'score_metric': score_metric,
- # 'followStopCount': self.follow_stop_count,
- # "description1": func_description1,
- # "description2": func_description2,
- #
- # "functionACC": acc_dict,
- # "functionLKA": lka_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
- throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
- steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
- # common parameter calculate
- brake_vs_time = self.zip_time_pairs(brakePedal_list)
- throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
- steering_vs_time = self.zip_time_pairs(steeringWheel_list)
- report_dict = {
- "name": self.config.dimension_name["function"],
- "weight": f"{self.weight * 100:.2f}%",
- 'followStopCount': self.follow_stop_count,
- }
- # upper_limit = 40
- times_upper = 2
- # len_time = len(self.time_list)
- duration = self.time_list[-1]
- # score_function, score_type, score_metric = self.func_score()
- score_function, score_type_dict, score_metric_dict = self.func_score()
- # no metric
- if not score_metric_dict:
- return {}
- # get weight distribution
- report_dict["weightDistribution"] = self._get_weight_distribution("function")
- score_function = int(score_function) if int(score_function) == score_function else round(
- score_function, 2)
- # score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
- # score_metric = [int(n) if int(n) == n else n for key, n in score_metric_dict.items()]
- grade_function = score_grade(score_function)
- report_dict["score"] = score_function
- report_dict["level"] = grade_function
- # report_dict["score_type"] = score_type
- # report_dict["score_metric"] = score_metric
- # speed data
- ego_speed_list = self.ego_df['v'].values.tolist()
- ego_speed_vs_time = self.zip_time_pairs(ego_speed_list)
- obj_speed_vs_time = []
- rel_speed_vs_time = []
- # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- # accData
- lat_acc_list = self.ego_df['lat_acc'].values.tolist()
- lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
- lon_acc_list = self.ego_df['lon_acc'].values.tolist()
- lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
- # acc data
- # lat_acc_list = self.ego_df['lat_acc'].values.tolist()
- # lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
- # lon_acc_list = self.ego_df['lon_acc'].values.tolist()
- # lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
- # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time]
- # statistic type report infos
- unfunc_metric_list = []
- func_over_optimal = []
- # function description
- func_type_list = []
- unfunc_type_list = []
- type_details_dict = {}
- for type in self.type_list:
- if type == "functionACC":
- builtin_graph_dict = {}
- custom_graph_dict = {}
- if len(self.obj_id_list) == 1:
- follow_description1 = "无目标车数据可计算;"
- follow_description2 = ""
- follow_description3 = "无目标车数据可计算;"
- follow_description4 = "无目标车数据可计算;"
- acc_dict_indexes = {}
- for metric in self.metric_dict[type]:
- acc_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": 80,
- "avg": "-",
- "max": "-",
- "min": "-",
- }
- if self.kind_dict[metric] == -1:
- acc_dict_indexes[metric]["range"] = f"[0, {self.optimal_dict[metric]}]"
- elif self.kind_dict[metric] == 1:
- acc_dict_indexes[metric]["range"] = f"[{self.optimal_dict[metric]}, inf)"
- elif self.kind_dict[metric] == 0:
- acc_dict_indexes[metric][
- "range"] = f"[{self.optimal_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal_dict[metric] * self.multiple_dict[metric][1]}]"
- acc_dict = {
- "name": f"{self.type_name_dict[type]}",
- "score": 80,
- "level": "良好",
- "description1": follow_description1,
- "description2": follow_description2,
- "description3": follow_description3,
- "description4": follow_description4,
- "noObjectCar": True,
- "indexes": acc_dict_indexes,
- "builtin": {},
- "custom": {}
- }
- # follow cruise description
- func_follow_metric_list = []
- unfunc_follow_metric_list = []
- str_follow_over_optimal = ''
- distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
- else:
- acc_dict = {
- "name": f"{self.type_name_dict[type]}",
- "noObjectCar": False,
- }
- # follow dict
- score_follow = score_type_dict[type]
- grade_follow = score_grade(score_follow)
- acc_dict["score"] = score_follow
- acc_dict["level"] = grade_follow
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- # follow cruise description
- func_follow_metric_list = []
- unfunc_follow_metric_list = []
- for metric in self.metric_dict[type]:
- unfunc_follow_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else func_follow_metric_list.append(
- metric)
- str_follow_over_optimal = ''
- if not unfunc_follow_metric_list:
- str_func_follow_metric = string_concatenate(func_follow_metric_list)
- follow_description1 = f"{str_func_follow_metric}指标均表现良好"
- else:
- for metric in unfunc_follow_metric_list:
- if metric in self.bulitin_metric_list:
- value = self.result[metric]
- if self.kind_dict[metric] == -1:
- metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 1:
- metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 0:
- metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- else:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- # str_follow_over_optimal += f"{metric}为{value:.2f},超过合理范围{metric_over_optimal:.2f}%;"
- # str_follow_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_follow_over_optimal += f"{metric}为{round(value, 2)}{self.unit_dict[metric]},超过合理范围{round(metric_over_optimal, 2)}%;"
- str_follow_over_optimal = str_follow_over_optimal[:-1] if str_follow_over_optimal else ""
- str_func_follow_metric = string_concatenate(func_follow_metric_list)
- str_unfunc_follow_metric = string_concatenate(unfunc_follow_metric_list)
- if not func_follow_metric_list:
- follow_description1 = f"{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
- else:
- follow_description1 = f"{str_func_follow_metric}指标表现良好,{str_unfunc_follow_metric}指标表现不佳。{str_follow_over_optimal}"
- # for follow_description2
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- follow_description2 = f"自车在行驶时有{round(follow_duration, 2)}s处于跟车状态,"
- follow_description3 = ""
- if "followResponseTime" in self.metric_list:
- # for follow_description3
- cnt3_1 = len(self.follow_stop_time_start_list)
- tmp3_list = [x for x in self.follow_stop_time_start_list if
- x > self.optimal_dict['followResponseTime']]
- cnt3_2 = len(tmp3_list)
- percent3 = 0 if cnt3_1 == 0 else round(cnt3_2 / cnt3_1 * 100, 2)
- follow_description3 = f"起停跟车响应时间有{cnt3_2}次超出合理范围,占比为{percent3}%;"
- follow_description4 = ""
- if "followDistanceDeviation" in self.metric_list:
- # for follow_description4
- cnt4_1 = len(self.stop_distance_list)
- tmp4_list = [x for x in self.stop_distance_list if
- x < self.optimal_dict['followDistanceDeviation']]
- cnt4_2 = len(tmp4_list)
- percent4 = 0 if cnt4_1 == 0 else round(cnt4_2 / cnt4_1 * 100, 2)
- follow_description4 = f"在本次测试中,跟车状态下目标车共发生了{cnt4_1}次停车,有{cnt4_2}次超出合理范围,占比为{percent4}%;"
- # distance_list = obj_df['dist'].values.tolist()
- # distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
- #
- # acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
- # acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
- # acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
- # acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
- # common parameter calculate
- obj_df = self.df[self.df['playerId'] == 2]
- obj_speed_list = obj_df['v'].values.tolist()
- obj_speed_vs_time = self.zip_time_pairs(obj_speed_list)
- rel_speed_vs_time = self.zip_time_pairs(self.v_relative_list_full_time)
- distance_vs_time = self.zip_time_pairs(self.dist_list_full_time)
- # acc_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- # acc_dict["disData"] = distance_vs_time
- unfunc_follow_df = self.unfunc_follow_df.copy()
- unfunc_follow_df['type'] = "origin"
- unfunc_follow_slices = unfunc_follow_df.to_dict('records')
- # acc_dict["speMarkLine"] = unfunc_follow_slices
- # acc_dict["disMarkLine"] = unfunc_follow_slices
- distance_deviation_vs_time = self.zip_time_pairs(self.dist_deviation_list_full_time)
- # function ACC data
- acc_dict_indexes = {}
- for metric in self.metric_dict[type]:
- if metric == "followSpeedDeviation":
- self.unfunctional_follow_v_deviation_df_statistic()
- unfunc_v_df = self.unfunc_follow_df.copy()
- unfunc_v_df.loc[unfunc_v_df['type'] != 'ACC', 'type'] = "origin"
- # unfunc_v_df.loc[unfunc_v_df['type'] == 'time', 'type'] = "time"
- unfunc_v_slices = unfunc_v_df.to_dict('records')
- df1 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
- df1['v_time'] = df1['end_time'] - df1['start_time']
- devi_v_time = df1['v_time'].sum()
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- percent2_1 = devi_v_time / follow_duration * 100 if follow_duration else 0
- if devi_v_time != 0:
- follow_description2 += f"跟车状态下自车和目标车的速度差共有{round(devi_v_time, 2)}s超出合理范围,占比为{round(percent2_1, 2)}%;"
- else:
- follow_description2 += "跟车状态下自车和目标车的速度差均在合理范围内;"
- v_deviation_list = [x for x in self.v_deviation_list if not np.isnan(x)]
- acc_dict_indexes["followSpeedDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[
- "followSpeedDeviation"],
- "avg": round(np.mean(v_deviation_list), 2) if v_deviation_list else '-',
- "max": round(max(v_deviation_list), 2) if v_deviation_list else '-',
- "min": round(min(v_deviation_list), 2) if v_deviation_list else '-',
- # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]"
- "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]"
- }
- fsd_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": rel_speed_vs_time,
- "range": f"[-{self.optimal_dict['followSpeedDeviation']}, {self.optimal_dict['followSpeedDeviation']}]",
- # "range": f"[0, {self.optimal_dict['followSpeedDeviation']}]",
- # "markLine": unfunc_v_slices,
- # "markLine2": [-1 * self.optimal_dict['followSpeedDeviation'],
- # self.optimal_dict['followSpeedDeviation']],
- }
- builtin_graph_dict["followSpeedDeviation"] = fsd_data
- if metric == "followDistanceDeviation":
- self.unfunctional_follow_dist_deviation_df_statistic()
- unfunc_dist_df = self.unfunc_follow_df.copy()
- unfunc_dist_df.loc[unfunc_dist_df['type'] != 'ACC', 'type'] = "origin"
- # unfunc_dist_df.loc[unfunc_dist_df['type'] == 'distance', 'type'] = "distance"
- unfunc_dist_slices = unfunc_dist_df.to_dict('records')
- df2 = self.unfunc_follow_df[self.unfunc_follow_df['type'] == 'ACC']
- df2['dist_time'] = df2['end_time'] - df2['start_time']
- devi_dist_time = df2['dist_time'].sum()
- follow_duration = (len(self.time_list_follow) - 2) * 0.04 if self.time_list_follow else 0
- percent2_2 = devi_dist_time / follow_duration * 100 if follow_duration else 0
- if devi_dist_time != 0:
- follow_description2 += f"跟车状态下自车和目标车的距离差共有{round(devi_dist_time, 2)}s超出合理范围,占比为{round(percent2_2, 2)}%;"
- else:
- follow_description2 += "跟车状态下自车和目标车的距离差均在合理范围内;"
- dist_deviation_list = [x for x in self.dist_deviation_list if not np.isnan(x)]
- acc_dict_indexes["followDistanceDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followDistanceDeviation"],
- "avg": round(np.mean(dist_deviation_list), 2) if dist_deviation_list else '-',
- "max": round(max(dist_deviation_list), 2) if dist_deviation_list else '-',
- "min": round(min(dist_deviation_list), 2) if dist_deviation_list else '-',
- "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]"
- }
- fdd_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": distance_deviation_vs_time,
- "range": f"[0, {self.optimal_dict['followDistanceDeviation']}]",
- # "markLine": unfunc_dist_slices,
- # "markLine2": [self.optimal_dict['followDistanceDeviation']],
- }
- builtin_graph_dict["followDistanceDeviation"] = fdd_data
- if metric == "followStopDistance":
- acc_dict_indexes["followStopDistance"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followStopDistance"],
- "avg": round(np.mean(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "max": round(max(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "min": round(min(self.stop_distance_list), 2) if self.stop_distance_list else '-',
- "range": f"[{self.optimal_dict['followStopDistance']}, inf)"
- }
- # sdd_data = {
- # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "name": f"{self.name_dict[metric]}",
- # "data": self.stop_distance_list,
- # "range": f"[{self.optimal_dict['followStopDistance']}, inf)",
- # # "ref": [self.optimal_dict['followStopDistance'] - 1,
- # # self.optimal_dict['followStopDistance']],
- # }
- # builtin_graph_dict["followStopDistance"] = sdd_data
- if metric == "followResponseTime":
- acc_dict_indexes["followResponseTime"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["followResponseTime"],
- "avg": round(np.mean(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "max": round(max(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "min": round(min(self.follow_stop_time_start_list),
- 2) if self.follow_stop_time_start_list else '-',
- "range": f"[0, {self.optimal_dict['followResponseTime']}]"
- }
- # rt_data = {
- # # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- # "name": f"{self.name_dict[metric]}",
- # "data": self.follow_stop_time_start_list,
- # # "range": f"[0, {self.optimal_dict['followResponseTime']})"
- # }
- # builtin_graph_dict["followResponseTime"] = rt_data
- if metric not in self.bulitin_metric_list:
- acc_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- acc_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- acc_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- acc_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- acc_dict["indexes"] = acc_dict_indexes
- acc_dict["builtin"] = builtin_graph_dict
- acc_dict["custom"] = custom_graph_dict
- acc_dict["description1"] = replace_key_with_value(follow_description1, self.name_dict)
- acc_dict["description2"] = replace_key_with_value(follow_description2, self.name_dict)
- acc_dict["description3"] = replace_key_with_value(follow_description3, self.name_dict)
- acc_dict["description4"] = replace_key_with_value(follow_description4, self.name_dict)
- type_details_dict[type] = acc_dict
- unfunc_metric_list += unfunc_follow_metric_list
- func_over_optimal.append(str_follow_over_optimal)
- elif type == "functionLKA":
- lka_dict = {
- "name": f"{self.type_name_dict[type]}",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_lane = score_type_dict["functionLKA"]
- grade_lane = score_grade(score_lane)
- lka_dict['score'] = score_lane
- lka_dict['level'] = grade_lane
- # unfunc_type_list.append('车道保持') if score_type_dict["functionLKA"] < 80 else func_type_list.append(
- # '车道保持')
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- # lane keep description
- func_lane_metric_list = []
- unfunc_lane_metric_list = []
- for metric in self.metric_dict[type]:
- unfunc_lane_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else func_lane_metric_list.append(
- metric)
- lka_dict_indexes = {}
- for metric in self.metric_dict[type]:
- if metric == "laneDistance":
- lka_dict_indexes["laneDistance"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict["laneDistance"],
- "avg": round(np.mean(self.line_dist), 2) if self.line_dist else "-",
- "max": round(max(self.line_dist), 2) if self.line_dist else "-",
- "min": round(min(self.line_dist), 2) if self.line_dist else "-",
- "range": f"[{self.optimal_dict['laneDistance']}, 1.875]"
- }
- if metric == "centerDistanceExpectation":
- lka_dict_indexes["centerDistanceExpectation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceExpectation'],
- "avg": round(self.result["centerDistanceExpectation"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceExpectation']}]"
- }
- if metric == "centerDistanceStandardDeviation":
- lka_dict_indexes["centerDistanceStandardDeviation"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceStandardDeviation'],
- "avg": round(self.result["centerDistanceStandardDeviation"],
- 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceStandardDeviation']}]"
- }
- if metric == "centerDistanceMax":
- lka_dict_indexes["centerDistanceMax"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceMax'],
- "avg": "-",
- "max": round(self.result["centerDistanceMax"], 2) if self.center_dist else "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceMax']}]"
- }
- if metric == "centerDistanceMin":
- lka_dict_indexes["centerDistanceMin"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceMin'],
- "avg": "-",
- "max": "-",
- "min": round(self.result["centerDistanceMin"], 2) if self.center_dist else "-",
- "range": f"[0, {self.optimal_dict['centerDistanceMin']}]"
- }
- if metric == "centerDistanceFrequency":
- lka_dict_indexes["centerDistanceFrequency"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceFrequency'],
- "avg": round(self.result["centerDistanceFrequency"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceFrequency']}]"
- }
- if metric == "centerDistanceRange":
- lka_dict_indexes["centerDistanceRange"] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict['centerDistanceRange'],
- "avg": round(self.result["centerDistanceRange"], 2) if self.center_dist else "-",
- "max": "-",
- "min": "-",
- "range": f"[0, {self.optimal_dict['centerDistanceRange']}]"
- }
- if metric not in self.bulitin_metric_list:
- lka_dict_indexes[metric] = {
- # "name": self.custom_data[metric]['name'],
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- lka_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- lka_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- lka_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- custom_graph_dict[metric] = self.custom_data[metric]["reportData"]
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- lka_dict["indexes"] = lka_dict_indexes
- str_lane_over_optimal = ''
- if not unfunc_lane_metric_list:
- str_func_lane_metric = string_concatenate(func_lane_metric_list)
- lane_description1 = f"{str_func_lane_metric}指标均表现良好"
- else:
- for metric in unfunc_lane_metric_list:
- if metric in self.bulitin_metric_list:
- value = self.result[metric]
- if self.kind_dict[metric] == -1:
- metric_over_optimal = ((value - self.optimal_dict[metric]) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 1:
- metric_over_optimal = ((self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- elif self.kind_dict[metric] == 0:
- metric_over_optimal = (abs(self.optimal_dict[metric] - value) / self.optimal_dict[
- metric]) * 100
- else:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_lane_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_lane_over_optimal = str_lane_over_optimal[:-1] if str_lane_over_optimal else ""
- str_func_lane_metric = string_concatenate(func_lane_metric_list)
- str_unfunc_lane_metric = string_concatenate(unfunc_lane_metric_list)
- if not func_lane_metric_list:
- lane_description1 = f"{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
- else:
- lane_description1 = f"{str_func_lane_metric}指标表现良好,{str_unfunc_lane_metric}指标表现不佳。{str_lane_over_optimal}"
- lane_description2 = ""
- center_distance_vs_time = []
- if "centerDistanceMax" in self.metric_list:
- cnt2_1 = len(self.center_dist)
- tmp2_list = [x for x in self.center_dist if x > self.optimal_dict['centerDistanceMax']]
- cnt2_2 = len(tmp2_list)
- percent2 = 0 if cnt2_1 == 0 else (cnt2_2 / cnt2_1 * 100)
- dist_time = percent2 * duration / 100
- if dist_time != 0:
- lane_description2 = f"距离有{dist_time:.2f}秒超出合理范围,占比为{percent2:.2f}%"
- else:
- lane_description2 = f"距离均在合理范围内,算法车道保持表现良好"
- # lane keep data for graph
- # center_distance_vs_time = self.zip_time_pairs(self.center_dist)
- center_distance_vs_time = self.zip_time_pairs(self.center_dist_with_nan)
- self.unfunctional_lane_df_statistic()
- #
- # lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
- # lka_dict["description2"] = lane_description2
- unfunc_lane_df = self.unfunc_lane_df.copy()
- unfunc_lane_df['type'] = "origin"
- unfunc_lane_slices = unfunc_lane_df.to_dict('records')
- unfunc_lane_dist_df = self.unfunc_lane_df.copy()
- unfunc_lane_dist_df.loc[unfunc_lane_dist_df['type'] != 'LKA', 'type'] = "origin"
- unfunc_lane_dist_slices = unfunc_lane_dist_df.to_dict('records')
- lka_data = {
- "name": "车辆中心线横向距离(m)",
- "data": center_distance_vs_time,
- # "markLine": unfunc_lane_dist_slices
- }
- if 'centerDistanceMax' in self.optimal_dict:
- lka_range = f"[0, {self.optimal_dict['centerDistanceMax']}]"
- elif 'centerDistanceMin' in self.optimal_dict:
- lka_range = f"[0, {self.optimal_dict['centerDistanceMin']}]"
- else:
- lka_range = f"[0, 0.5]"
- lka_data["range"] = lka_range
- builtin_graph_dict["centerDistance"] = lka_data
- lka_dict["builtin"] = builtin_graph_dict
- lka_dict["custom"] = custom_graph_dict
- lka_dict["description1"] = replace_key_with_value(lane_description1, self.name_dict)
- lka_dict["description2"] = lane_description2
- # lka_dict["centerDisData"] = center_distance_vs_time
- # lka_dict["centerDisMarkLine"] = unfunc_lane_dist_slices
- type_details_dict[type] = lka_dict
- unfunc_metric_list += unfunc_lane_metric_list
- func_over_optimal.append(str_lane_over_optimal)
- else:
- type_dict = {
- "name": f"{self.type_name_dict[type]}",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_custom_type = score_type_dict[type]
- grade_custom_type = score_grade(score_custom_type)
- type_dict["score"] = score_custom_type
- type_dict["level"] = grade_custom_type
- # custom type description
- good_custom_metric_list = []
- bad_custom_metric_list = []
- unfunc_type_list.append(type) if score_type_dict[type] < 80 else func_type_list.append(type)
- type_dict_indexes = {}
- for metric in self.metric_dict[type]:
- bad_custom_metric_list.append(metric) if score_metric_dict[
- metric] < 80 else good_custom_metric_list.append(
- metric)
- type_dict_indexes[metric] = {
- # "name": self.custom_data[metric]['name'],
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "avg": self.custom_data[metric]['tableData']['avg'],
- "max": self.custom_data[metric]['tableData']['max'],
- "min": self.custom_data[metric]['tableData']['min'],
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- type_dict_indexes[metric]["range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]:.3f}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]:.3f}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]:.3f}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]:.3f}]"
- custom_graph_dict[metric] = self.custom_data[metric]['reportData']
- self.custom_markline_list.extend(self.custom_data[metric]["reportData"]['markLine'])
- type_dict["indexes"] = type_dict_indexes
- str_type_over_optimal = ""
- if not bad_custom_metric_list:
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- type_description1 = f"{str_good_custom_metric}指标均表现良好"
- type_description2 = f"指标均在合理范围内,算法表现良好"
- else:
- for metric in bad_custom_metric_list:
- value = self.custom_data[metric]["value"][0]
- if self.custom_param_dict[metric]['kind'][0] == -1:
- metric_over_optimal = ((value - self.custom_param_dict[metric]['optimal'][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- metric_over_optimal = (abs(self.custom_param_dict[metric]['optimal'][0] - value) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_type_over_optimal += f"{metric}为{value:.2f}{self.unit_dict[metric]},超过合理范围{metric_over_optimal:.2f}%;"
- str_type_over_optimal = str_type_over_optimal[:-1]
- str_good_custom_metric = string_concatenate(good_custom_metric_list)
- str_bad_custom_metric = string_concatenate(bad_custom_metric_list)
- if not good_custom_metric_list:
- type_description1 = f"{str_bad_custom_metric}指标表现不佳"
- type_description2 = f"{str_type_over_optimal}"
- else:
- type_description1 = f"{str_good_custom_metric}指标表现良好,{str_bad_custom_metric}指标表现不佳"
- type_description2 = f"{str_type_over_optimal}"
- type_dict["builtin"] = builtin_graph_dict
- type_dict["custom"] = custom_graph_dict
- type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict)
- type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict)
- type_details_dict[type] = type_dict
- unfunc_metric_list += bad_custom_metric_list
- func_over_optimal.append(str_type_over_optimal)
- report_dict["details"] = type_details_dict
- # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- func_over_optimal = [s for s in func_over_optimal if s]
- str_func_over_optimal = ';'.join(func_over_optimal)
- if grade_function == '优秀':
- str_func_type = string_concatenate(func_type_list)
- func_description1 = f'算法在{str_func_type}功能上表现优秀;'
- elif grade_function == '良好':
- str_func_type = string_concatenate(func_type_list)
- func_description1 = f'算法在{str_func_type}功能上总体表现良好,满足设计指标要求;'
- elif grade_function == '一般':
- str_unfunc_type = string_concatenate(unfunc_type_list)
- str_unfunc_metric = string_concatenate(unfunc_metric_list)
- str_unfunc_metric = replace_key_with_value(str_unfunc_metric, self.type_name_dict)
- func_description1 = f'算法在{str_unfunc_type}功能上表现一般、需要在{str_unfunc_metric}指标上进一步优化。其中,{str_func_over_optimal};'
- elif grade_function == '较差':
- str_unfunc_type = string_concatenate(unfunc_type_list)
- func_description1 = f'算法在{str_unfunc_type}功能上表现较差,需要提高算法的功能性表现。其中,{str_func_over_optimal};'
- if not unfunc_type_list:
- func_description2 = '功能性在各个功能上的表现俱佳。'
- else:
- str_unfunc_type = string_concatenate(unfunc_type_list)
- func_description2 = f"算法在{str_unfunc_type}功能上需要重点优化。"
- # report_dict["description1"] = replace_key_with_value(func_description1, self.name_dict)
- report_dict["description1"] = replace_key_with_value(replace_key_with_value(func_description1, self.name_dict),
- self.type_name_dict)
- report_dict["description2"] = replace_key_with_value(func_description2, self.type_name_dict)
- report_dict['commonData'] = {
- "per": {
- "name": "脚刹/油门踏板开度(百分比)",
- "legend": ["刹车踏板开度", "油门踏板开度"],
- "data": [brake_vs_time, throttle_vs_time]
- },
- "ang": {
- "name": "方向盘转角(角度°)",
- "data": steering_vs_time
- },
- "spe": {
- "name": "速度(km/h)",
- "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
- "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- },
- "acc": {
- "name": "加速度(m/s²)",
- "legend": ["横向加速度", "纵向加速度"],
- "data": [lat_acc_vs_time, lon_acc_vs_time]
- },
- # "dis": {
- # "name": "前车距离(m)",
- # "data": distance_vs_time
- # }
- }
- if "functionACC" in self.type_list:
- report_dict['commonData']["dis"] = {
- "name": "前车距离(m)",
- "data": distance_vs_time
- }
- self.unfunc_df = pd.concat([self.unfunc_df, self.unfunc_follow_df, self.unfunc_lane_df], ignore_index=True)
- unfunc_df = self.unfunc_df.copy().dropna()
- unfunc_slices = unfunc_df.to_dict('records')
- unfunc_slices.extend(self.custom_markline_list)
- report_dict["commonMarkLine"] = unfunc_slices
- # report_dict = {
- # "name": "功能性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_function,
- # "level": grade_function,
- # 'followStopCount': self.follow_stop_count,
- # "description1": func_description1,
- # "description2": func_description2,
- #
- # "functionACC": acc_dict,
- # "functionLKA": lka_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- self.eval_data = self.ego_df.copy()
- return report_dict
- def get_eval_data(self):
- if 'line_dist' in self.eval_data.columns:
- df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc', 'line_dist']].copy()
- else:
- df = self.eval_data[['simTime', 'simFrame', 'playerId', 'lat_acc_roc', 'lon_acc_roc']].copy()
- self.df['line_dist'] = pd.Series(dtype=float)
- return df
|