#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/07/25 @Last Modified: 2023/07/26 @Summary: safe metrics """ import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import math import numpy as np import pandas as pd from collections import defaultdict import scipy.integrate as spi from score_weight import cal_score_with_priority, cal_weight_from_80 from common import score_grade, string_concatenate, replace_key_with_value, score_over_100 class Safe(object): """ Class for achieving safe metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel. Methods: _safe_param_cal_new: Calculate parameters for evaluateion. _cal_v_ego_projection: Calculate ego car velocity project over distance. _cal_v_projection: Calculate relative velocity project over distance. _cal_a_projection: Calculate relative acceleration project over distance. _calculate_derivative: Calculate derivative. _cal_relative_angular_v: Calculate relative angular velocity. _death_pr: Calculate death probability. _cal_collisionRisk_level: Calculate collisionRisk level. dist: Calculate distance of two cars. """ def __init__(self, data_processed, custom_data, scoreModel): self.eval_data = pd.DataFrame() self.data_processed = data_processed self.scoreModel = scoreModel self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data self.laneinfo_df = data_processed.lane_info_df self.obj_id_list = data_processed.obj_id_list # config infos for calculating score self.config = data_processed.config safe_config = self.config.config['safe'] self.safe_config = safe_config # common data self.bulitin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = safe_config['weightCustom'] self.metric_list = safe_config['metric'] self.type_list = safe_config['type'] self.type_name_dict = safe_config['typeName'] self.name_dict = safe_config['name'] self.unit_dict = safe_config['unit'] # custom metric data self.customMetricParam = safe_config['customMetricParam'] self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_data = custom_data self.custom_param_dict = {} # score data self.weight = safe_config['weightDimension'] self.weight_type_dict = safe_config['typeWeight'] self.weight_type_list = safe_config['typeWeightList'] self.weight_dict = safe_config['weight'] self.weight_list = safe_config['weightList'] self.priority_dict = safe_config['priority'] self.priority_list = safe_config['priorityList'] self.kind_dict = safe_config['kind'] self.kind1_dict = self.kind_dict[0] self.optimal_dict = safe_config['optimal'] self.optimal1_dict = self.optimal_dict[0] self.multiple_dict = safe_config['multiple'] self.kind_list = safe_config['kindList'] self.optimal_list = safe_config['optimalList'] self.multiple_list = safe_config['multipleList'] self.metric_dict = safe_config['typeMetricDict'] # self.time_metric_list = self.metric_dict['safeTime'] # self.distance_metric_list = self.metric_dict['safeDistance'] # lists of drving control info self.time_list = data_processed.driver_ctrl_data['time_list'] self.frame_list = self.ego_df['simFrame'].values.tolist() self.collisionRisk = 0 self.empty_flag = True # no car following scene if len(self.obj_id_list) > 1: self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_drac_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_xtn_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.most_dangerous = {} self.pass_percent = {} self._safe_param_cal_new() def _safe_param_cal_new(self): """ """ Tc = 0.3 rho = 0.3 # 驾驶员制动反应时间 ego_accel_max = 6 # 自车油门最大加速度 obj_decel_max = 8 # 前车刹车最大减速度 ego_decel_min = 1 # 自车刹车最小减速度 ug ego_decel_lon_max = 8 ego_decel_lat_max = 1 # 构建双层字典数据结构 obj_dict = defaultdict(dict) obj_data_dict = self.df.to_dict('records') for item in obj_data_dict: obj_dict[item['simFrame']][item['playerId']] = item df_list = [] EGO_PLAYER_ID = 1 # self.empty_flag = True for frame_num in self.frame_list: ego_data = obj_dict[frame_num][EGO_PLAYER_ID] v1 = ego_data['v'] / 3.6 # km/h to m/s x1 = ego_data['posX'] y1 = ego_data['posY'] h1 = ego_data['posH'] len1 = ego_data['dimX'] width1 = ego_data['dimY'] o_x1 = ego_data['offX'] v_x1 = ego_data['speedX'] / 3.6 # km/h to m/s v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s a_x1 = ego_data['accelX'] a_y1 = ego_data['accelY'] # a1 = ego_data['accel'] for playerId in self.obj_id_list: if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] lane_width = self.laneinfo_df[self.laneinfo_df['simFrame'].isin([frame_num])]['width'].values/2 except KeyError: continue x2 = obj_data['posX'] y2 = obj_data['posY'] dist = self.dist(x1, y1, x2, y2) obj_data['dist'] = dist v_x2 = obj_data['speedX'] / 3.6 # km/h to m/s v_y2 = obj_data['speedY'] / 3.6 # km/h to m/s v2 = obj_data['v'] / 3.6 # km/h to m/s # h2 = obj_data['posH'] len2 = obj_data['dimX'] width2 = obj_data['dimY'] o_x2 = obj_data['offX'] a_x2 = obj_data['accelX'] a_y2 = obj_data['accelY'] # a2 = obj_data['accel'] dx, dy = x2 - x1, y2 - y1 # 定义矢量A和x轴正向向量x A = np.array([dx, dy]) x = np.array([1, 0]) # 计算点积和向量长度 dot_product = np.dot(A, x) vector_length_A = np.linalg.norm(A) vector_length_x = np.linalg.norm(x) # 计算夹角的余弦值 cos_theta = dot_product / (vector_length_A * vector_length_x) # 将余弦值转换为角度值(弧度制) beta = np.arccos(cos_theta) # 如何通过theta正负确定方向 lon_d = dist * math.cos(beta - h1) lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi) obj_dict[frame_num][playerId]['lon_d'] = lon_d obj_dict[frame_num][playerId]['lat_d'] = lat_d if lon_d > 100 or lon_d < -5 or lat_d > 4: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2) obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p obj_type = obj_data['type'] if abs(dist*cos_theta) <= lane_width[0]: TTC = self._cal_TTC(dist, vrel_projection_in_dist) else: TTC = 10000 MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist) THW = self._cal_THW(dist, v_ego_p) # 单车道时可用 LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max) lat_dist = 0.5 v_right = v1 v_left = v2 a_right_lat_brake_min = 1 a_left_lat_brake_min = 1 a_lat_max = 5 LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max) DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2) lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1) lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1) lon_a = abs(lon_a1 - lon_a2) lon_d = dist * abs(math.cos(beta - h1)) lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1) BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1) lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1) lat_a = abs(lat_a1 - lat_a2) lat_d = dist * abs(math.sin(beta - h1)) lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1) STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2) obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2 obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2 # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay) # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2) TTC = None if (not TTC or TTC < 0) else TTC MTTC = None if (not MTTC or MTTC < 0) else MTTC THW = None if (not THW or THW < 0) else THW DRAC = 10 if DRAC >= 10 else DRAC if not TTC or TTC > 4000: # threshold = 4258.41 collisionSeverity = 0 pr_death = 0 collisionRisk = 0 else: result, error = spi.quad(self._normal_distribution, 0, TTC - Tc) collisionSeverity = 1 - result pr_death = self._death_pr(obj_type, vrel_projection_in_dist) collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity obj_dict[frame_num][playerId]['TTC'] = TTC obj_dict[frame_num][playerId]['MTTC'] = MTTC obj_dict[frame_num][playerId]['THW'] = THW obj_dict[frame_num][playerId]['LonSD'] = LonSD obj_dict[frame_num][playerId]['LatSD'] = LatSD obj_dict[frame_num][playerId]['DRAC'] = DRAC obj_dict[frame_num][playerId]['BTN'] = abs(BTN) obj_dict[frame_num][playerId]['STN'] = abs(STN) obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100 obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100 obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100 df_fnum = pd.DataFrame(obj_dict[frame_num].values()) df_list.append(df_fnum) df_safe = pd.concat(df_list) col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'lon_acc', 'lat_acc', 'dist', 'lon_d', 'lat_d', 'lat_v_rel', 'lon_v_rel', 'v_ego_projection_in_dist', 'v_obj_projection_in_dist', 'vrel_projection_in_dist', 'arel_projection_in_dist', 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death', 'collisionRisk'] if not self.empty_flag: df_safe = df_safe[col_list].reset_index(drop=True) # df_safe.reset_index(drop=True) self.eval_data = df_safe.copy() self.df = df_safe self.df['flag'] = 0 if not self.empty_flag: for metric in self.metric_list: if metric in self.bulitin_metric_list: self.df['tmp'] = self.df[metric].apply(lambda x: 1 if x < self.optimal1_dict[metric] else 0) self.df['flag'] = self.df['flag'] + self.df['tmp'] self.df['unsafe_flag'] = self.df['flag'].apply(lambda x: 1 if x > 0 else 0) def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy return V_on_AB def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx ** 2 + vy ** 2) AB_mod = math.sqrt(dx ** 2 + dy ** 2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2) return angular_velocity def _death_pr(self, obj_type, v_relative): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): if vrel_projection_in_dist == 0: return math.inf TTC = dist / vrel_projection_in_dist return TTC def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist): MTTC = math.nan if arel_projection_in_dist != 0: tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist # 投影为负数,不会发生碰撞风险 if tmp < 0: return math.nan t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist if t1 > 0 and t2 > 0: if t1 >= t2: MTTC = t2 elif t1 < t2: MTTC = t1 elif t1 > 0 and t2 <= 0: MTTC = t1 elif t1 <= 0 and t2 > 0: MTTC = t2 if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0: MTTC = dist / vrel_projection_in_dist return MTTC # THW (time headway) def _cal_THW(self, dist, v_ego_projection_in_dist): if not v_ego_projection_in_dist: THW = None else: THW = dist / v_ego_projection_in_dist return THW def velocity(self, v_x, v_y): v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6 return v def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max): lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / ( 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max) return lon_dist_min def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max): v_right_rho = v_right + rho * a_lat_max v_left_rho = v_left + rho * a_lat_max dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + ( (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2) return dist_min # DRAC (decelerate required avoid collision) def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2): dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671 if dist_length < 0: dist_width = dist - (width2 / 2 + width1 / 2) if dist_width < 0: return math.inf else: d = dist_width else: d = dist_length DRAC = vrel_projection_in_dist ** 2 / (2 * d) return DRAC # BTN (brake threat number) def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max): BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2): STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * ( width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max return STN # BTN (brake threat number) def cal_BTN(self, a_y1, ay, dy, vy, max_ay): BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2): STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax return STN # 追尾碰撞风险 def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev) def continuous_group(self, df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time']) unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame']) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df # def continuous_group_old(self, df): # time_list = df['simTime'].values.tolist() # frame_list = df['simFrame'].values.tolist() # # group = [] # sub_group = [] # # for i in range(len(frame_list)): # if not sub_group or frame_list[i] - frame_list[i - 1] <= 1: # sub_group.append(time_list[i]) # else: # group.append(sub_group) # sub_group = [time_list[i]] # # group.append(sub_group) # group = [g for g in group if len(g) >= 2] # # # 输出图表值 # time = [[g[0], g[-1]] for g in group] # unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time']) # # return unsafe_df def unsafe_time_ttc_df_statistic(self, obj_df): ttc_df = obj_df[obj_df['TTC'] < self.optimal1_dict['TTC']] ttc_df = ttc_df[['simTime', 'simFrame', 'TTC']] ttc_time_df = self.continuous_group(ttc_df) ttc_time_df['type'] = 'TTC' # ttc_time_df['type'] = 'time' self.unsafe_time_df = pd.concat([self.unsafe_time_df, ttc_time_df], ignore_index=True) def unsafe_time_mttc_df_statistic(self, obj_df): mttc_df = obj_df[obj_df['MTTC'] < self.optimal1_dict['MTTC']] mttc_df = mttc_df[['simTime', 'simFrame', 'MTTC']] mttc_time_df = self.continuous_group(mttc_df) mttc_time_df['type'] = 'MTTC' # mttc_time_df['type'] = 'time' self.unsafe_time_df = pd.concat([self.unsafe_time_df, mttc_time_df], ignore_index=True) def unsafe_time_thw_df_statistic(self, obj_df): thw_df = obj_df[obj_df['THW'] < self.optimal1_dict['THW']] thw_df = thw_df[['simTime', 'simFrame', 'THW']] thw_time_df = self.continuous_group(thw_df) thw_time_df['type'] = 'THW' # thw_time_df['type'] = 'time' self.unsafe_time_df = pd.concat([self.unsafe_time_df, thw_time_df], ignore_index=True) def unsafe_distance_lonsd_df_statistic(self, obj_df): lonsd_df = obj_df[obj_df['LonSD'] < self.optimal1_dict['LonSD']] lonsd_df = lonsd_df[['simTime', 'simFrame', 'LonSD']] lonsd_dist_df = self.continuous_group(lonsd_df) lonsd_dist_df['type'] = 'LonSD' # lonsd_dist_df['type'] = 'distance' self.unsafe_dist_df = pd.concat([self.unsafe_dist_df, lonsd_dist_df], ignore_index=True) def unsafe_distance_latsd_df_statistic(self, obj_df): latsd_df = obj_df[obj_df['LatSD'] < self.optimal1_dict['LatSD']] latsd_df = latsd_df[['simTime', 'simFrame', 'LatSD']] latsd_dist_df = self.continuous_group(latsd_df) latsd_dist_df['type'] = 'LatSD' # latsd_dist_df['type'] = 'distance' self.unsafe_dist_df = pd.concat([self.unsafe_dist_df, latsd_dist_df], ignore_index=True) def unsafe_acceleration_drac_df_statistic(self, obj_df): drac_df = obj_df[obj_df['DRAC'] > self.optimal1_dict['DRAC']] drac_df = drac_df[['simTime', 'simFrame', 'DRAC']] drac_acce_df = self.continuous_group(drac_df) drac_acce_df['type'] = 'DRAC' # drac_acce_df['type'] = 'acceleration' self.unsafe_acce_drac_df = pd.concat([self.unsafe_acce_drac_df, drac_acce_df], ignore_index=True) def unsafe_acceleration_btn_df_statistic(self, obj_df): btn_df = obj_df[obj_df['BTN'] > self.optimal1_dict['BTN']] btn_df = btn_df[['simTime', 'simFrame', 'BTN']] btn_acce_df = self.continuous_group(btn_df) btn_acce_df['type'] = 'BTN' # btn_acce_df['type'] = 'acceleration' self.unsafe_acce_xtn_df = pd.concat([self.unsafe_acce_xtn_df, btn_acce_df], ignore_index=True) def unsafe_acceleration_stn_df_statistic(self, obj_df): stn_df = obj_df[obj_df['STN'] > self.optimal1_dict['STN']] stn_df = stn_df[['simTime', 'simFrame', 'STN']] stn_acce_df = self.continuous_group(stn_df) stn_acce_df['type'] = 'STN' # stn_acce_df['type'] = 'acceleration' self.unsafe_acce_xtn_df = pd.concat([self.unsafe_acce_xtn_df, stn_acce_df], ignore_index=True) def unsafe_probability_cr_df_statistic(self, obj_df): cr_df = obj_df[obj_df['collisionRisk'] > self.optimal1_dict['collisionRisk']] cr_df = cr_df[['simTime', 'simFrame', 'collisionRisk']] cr_prob_df = self.continuous_group(cr_df) cr_prob_df['type'] = 'collisionRisk' # cr_prob_df['type'] = 'probability' self.unsafe_prob_df = pd.concat([self.unsafe_prob_df, cr_prob_df], ignore_index=True) def unsafe_probability_cs_df_statistic(self, obj_df): cs_df = obj_df[obj_df['collisionSeverity'] > self.optimal1_dict['collisionSeverity']] cs_df = cs_df[['simTime', 'simFrame', 'collisionSeverity']] cs_prob_df = self.continuous_group(cs_df) cs_prob_df['type'] = 'collisionSeverity' # cs_prob_df['type'] = 'probability' self.unsafe_prob_df = pd.concat([self.unsafe_prob_df, cs_prob_df], ignore_index=True) def _safe_statistic_most_dangerous(self): min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD'] max_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity'] for metric in min_list: if metric in self.metric_list: if metric in self.df.columns: self.most_dangerous[metric] = self.df[metric].min() else: self.most_dangerous[metric] = self.optimal1_dict[metric] if np.isnan(self.most_dangerous[metric]): self.most_dangerous[metric] = self.optimal1_dict[metric] for metric in max_list: if metric in self.metric_list: if metric in self.df.columns: self.most_dangerous[metric] = self.df[metric].max() else: self.most_dangerous[metric] = self.optimal1_dict[metric] # self.most_dangerous[metric] = self.df[metric].max() if np.isnan(self.most_dangerous[metric]): self.most_dangerous[metric] = self.optimal1_dict[metric] def _safe_statistic_pass_percent(self): greater_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD'] lesser_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity'] for metric in greater_list: if metric in self.metric_list: if metric in self.df.columns: self.pass_percent[metric] = self.df[self.df[metric] >= self.optimal1_dict[metric]][metric].count() / \ self.df[metric].count() else: self.pass_percent[metric] = 0.8 if np.isnan(self.pass_percent[metric]): self.pass_percent[metric] = 0.8 for metric in lesser_list: if metric in self.metric_list: if metric in self.df.columns: self.pass_percent[metric] = self.df[self.df[metric] <= self.optimal1_dict[metric]][metric].count() / \ self.df[metric].count() else: self.pass_percent[metric] = 0.8 if np.isnan(self.pass_percent[metric]): self.pass_percent[metric] = 0.8 if "collisionSeverity" in self.metric_list: self.collisionRisk = 1 - self.pass_percent["collisionSeverity"] def _safe_statistic(self): # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"] self._safe_statistic_most_dangerous() self._safe_statistic_pass_percent() most_dangerous_list = [abs(value) for key, value in self.most_dangerous.items() if key in self.metric_list] pass_percent_list = [abs(value) for key, value in self.pass_percent.items() if key in self.metric_list] arr_safe = [most_dangerous_list + pass_percent_list] return arr_safe def custom_metric_param_parser(self, param_list): """ param_dict = { "paramA" [ { "kind": "-1", "optimal": "1", "multiple": ["0.5","5"], "spare1": null, "spare2": null } ] } """ kind_list = [] optimal_list = [] multiple_list = [] spare_list = [] # spare1_list = [] # spare2_list = [] for i in range(len(param_list)): kind_list.append(int(param_list[i]['kind'])) optimal_list.append(float(param_list[i]['optimal'])) multiple_list.append([float(x) for x in param_list[i]['multiple']]) spare_list.append([item["param"] for item in param_list[i]["spare"]]) # spare1_list.append(param_list[i]['spare1']) # spare2_list.append(param_list[i]['spare2']) result = { "kind": kind_list, "optimal": optimal_list, "multiple": multiple_list, "spare": spare_list, # "spare1": spare1_list, # "spare2": spare2_list } return result def custom_metric_score(self, metric, value, param_list): """ """ param = self.custom_metric_param_parser(param_list) self.custom_param_dict[metric] = param score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value])) score_sub = score_model.cal_score() score = sum(score_sub) / len(score_sub) return score def _safe_no_obj_statistic(self): # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"] most_dangerous_list = [abs(value) for key, value in self.optimal1_dict.items() if key in self.metric_list] pass_percent_list = [1.0] * len(self.optimal1_dict.keys()) arr_safe = [most_dangerous_list + pass_percent_list] return arr_safe def safe_score_new(self): """ """ score_metric_dict = {} score_type_dict = {} if len(self.obj_id_list) == 1: arr_safe = self._safe_no_obj_statistic() else: arr_safe = self._safe_statistic() print("\n[安全性表现及得分情况]") print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe]) if arr_safe: arr_safe = np.array(arr_safe) score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe) score_sub = score_model.cal_score() metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList] metric_num = len(metric_list) if len(self.obj_id_list) > 1 and not self.empty_flag: score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]] else: score_sub = [80] * len(score_sub) score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判 score_metric = [] for i in range(metric_num): score_tmp = (score_sub[i] + score_sub[i + metric_num]) / 2 score_metric.append(round(score_tmp, 2)) score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)} for metric in self.custom_metric_list: value = self.custom_data[metric]['value'] param_list = self.customMetricParam[metric] score = self.custom_metric_score(metric, value, param_list) score_metric_dict[metric] = round(score, 2) score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) if self.weight_custom: # 用户自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_safe = sum(score_type_with_weight_dict.values()) else: # 动态客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) for key, value in self.weight_dict.items(): if key in self.metric_dict[type]: # self.weight_dict[key] = round(value / type_weight, 4) self.weight_dict[key] = value / type_weight type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 for key in self.weight_dict: self.weight_dict[key] = round(self.weight_dict[key], 4) score_type = list(score_type_dict.values()) self.weight_type_list = cal_weight_from_80(score_type) self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)} score_safe = round(score_safe, 2) # score_type = [round(x, 2) for key, x in score_type_dict.items()] # score_metric = [round(x, 2) for key, x in score_metric_dict.items()] print("安全性各指标基准值:", self.optimal_list) print(f"安全性得分为:{score_safe:.2f}分。") print(f"安全性各类型得分为:{score_type_dict}。") print(f"安全性各指标得分为:{score_metric_dict}。") # return score_safe, score_type, score_metric return score_safe, score_type_dict, score_metric_dict # def zip_time_pairs(self, zip_list, upper_limit=9999): # zip_time_pairs = zip(self.time_list, zip_list) # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)] # return zip_vs_time def zip_time_pairs(self, zip_list): zip_time_pairs = zip(self.time_list, zip_list) zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs] return zip_vs_time def _get_weight_distribution(self, dimension): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name[dimension] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def safe_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = "安全性" if "safeTime" in self.type_list: time_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in ['TTC', 'MTTC', 'THW']} weight_distribution_time = { "timeWeight": f"时间指标({self.weight_type_dict['safeTime'] * 100:.2f}%)", "indexes": time_weight_indexes_dict } weight_distribution["safeTime"] = weight_distribution_time if "safeDistance" in self.type_list: distance_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in ['LonSD', 'LatSD']} weight_distribution_distance = { "distanceWeight": f"距离指标({self.weight_type_dict['safeDistance'] * 100:.2f}%)", "indexes": distance_weight_indexes_dict } weight_distribution["safeDistance"] = weight_distribution_distance if "safeAcceleration" in self.type_list: acceleration_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in ['DRAC', 'BTN', 'STN']} weight_distribution_acceleration = { "accelerationWeight": f"加速度指标({self.weight_type_dict['safeAcceleration'] * 100:.2f}%)", "indexes": acceleration_weight_indexes_dict } weight_distribution["safeAcceleration"] = weight_distribution_acceleration if "safeProbability" in self.type_list: probability_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in ['collisionRisk', 'collisionSeverity']} weight_distribution_probability = { "probabilityWeight": f"概率指标({self.weight_type_dict['safeProbability'] * 100:.2f}%)", "indexes": probability_weight_indexes_dict } weight_distribution["safeProbability"] = weight_distribution_probability return weight_distribution def normalize_dict_values(self, dictionary): # 计算字典中键的数量 n = len(dictionary) # 初始化总和为0 total_sum = 0 # 遍历字典,对每个值进行赋值,并累加总和 for key, value in dictionary.items(): # 计算当前值,除了最后一个值外都四舍五入到两位小数 if key != list(dictionary.keys())[-1]: new_value = round(1 / n, 2) else: # 最后一个值的计算:1减去之前所有值的和 new_value = round(1 - total_sum, 2) # 更新字典的值 dictionary[key] = new_value # 累加当前值到总和 total_sum += new_value return dictionary def report_statistic(self): # time_list = self.data_processed.driver_ctrl_data['time_list'] brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list'] throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list'] steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list'] # common parameter calculate brake_vs_time = self.zip_time_pairs(brakePedal_list) throttle_vs_time = self.zip_time_pairs(throttlePedal_list) steering_vs_time = self.zip_time_pairs(steeringWheel_list) # if len(self.obj_id_list) == 1: if self.empty_flag: # report_dict = { # "name": "安全性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": 80, # "level": "良好", # # 'collisionRisk': self.collisionRisk, # "description1": safe_description1, # "description2": safe_description2, # "noObjectCar": True, # # "safeTime": time_dict, # "safeDistance": distance_dict, # "safeAcceleration": acceleration_dict, # "safeProbability": probability_dict # # } self.weight_dict = self.normalize_dict_values(self.weight_dict) self.weight_type_dict = self.normalize_dict_values(self.weight_type_dict) score_safe, score_type_dict, score_metric_dict = self.safe_score_new() # format score score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2) grade_safe = score_grade(score_safe) report_dict = { "name": "安全性", "weight": f"{self.weight * 100:.2f}%", "score": score_safe, "level": grade_safe, 'collisionRisk': self.collisionRisk, "noObjectCar": True, "weightDistribution": self._get_weight_distribution("safe") } # get weight distribution safe_description1 = "算法在无目标车情况下安全性表现良好,无碰撞风险;" safe_description2 = "安全性在无目标车情况下表现良好。" report_dict["description1"] = safe_description1 report_dict["description2"] = safe_description2 type_details_dict = {} for type in self.type_list: score_type = score_type_dict[type] grade_type = score_grade(score_type) type_dict = { "name":self.type_name_dict[type], "score": score_type, "level": grade_type, "description1": "无目标车数据可计算", "description2": "表现良好", } builtin_graph_dict = {} custom_graph_dict = {} type_dict_indexes = {} for metric in self.metric_dict[type]: type_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": 80, "extremum": "-", # "range": f"[{self.optimal1_dict['TTC']}, inf)", "rate": "-" } if metric in self.bulitin_metric_list: if self.kind1_dict[metric] == -1: type_dict_indexes[metric]["range"] = f"[0, {self.optimal1_dict[metric]}]" elif self.kind1_dict[metric] == 1: type_dict_indexes[metric]["range"] = f"[{self.optimal1_dict[metric]}, inf)" elif self.kind1_dict[metric] == 0: type_dict_indexes[metric][ "range"] = f"[{self.optimal1_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal1_dict[metric] * self.multiple_dict[metric][1]}]" else: if self.custom_param_dict[metric]['kind'][0] == -1: type_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" type_dict["indexes"] = type_dict_indexes type_dict["builtin"] = builtin_graph_dict type_dict["custom"] = custom_graph_dict type_details_dict[type] = type_dict report_dict["details"] = type_details_dict report_dict['commonData'] = { "per": { "name": "脚刹/油门踏板开度(百分比)", "legend": ["刹车踏板开度", "油门踏板开度"], "data": [brake_vs_time, throttle_vs_time] }, "ang": { "name": "方向盘转角(角度°)", "data": steering_vs_time } # "spe": { # "name": "速度(km/h)", # "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"], # "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] # # }, # "acc": { # "name": "加速度(m/s²)", # "legend": ["横向加速度", "纵向加速度"], # "data": [lat_acc_vs_time, lon_acc_vs_time] # # }, # "dis": { # "name": "前车距离(m)", # "data": distance_vs_time # } } return report_dict # report_dict = { # "name": "安全性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": score_safe, # "level": grade_safe, # 'collisionRisk': self.collisionRisk, # "description1": safe_description1, # "description2": safe_description2, # "noObjectCar": False, # # "safeTime": time_dict, # "safeDistance": distance_dict, # "safeAcceleration": acceleration_dict, # "safeProbability": probability_dict, # # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time], # "accData": [lat_acc_vs_time, lon_acc_vs_time], # # } report_dict = { "name": "安全性", "weight": f"{self.weight * 100:.2f}%", 'collisionRisk': self.collisionRisk, "noObjectCar": False, } upper_limit = 40 times_upper = 2 len_time = len(self.time_list) duration = self.time_list[-1] score_safe, score_type_dict, score_metric_dict = self.safe_score_new() # format score score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2) grade_safe = score_grade(score_safe) report_dict["score"] = score_safe report_dict["level"] = grade_safe # get weight distribution report_dict['weightDistribution'] = self._get_weight_distribution("safe") # speData ego_speed_list = self.ego_df['v'].values.tolist() ego_speed_vs_time = self.zip_time_pairs(ego_speed_list) obj_id = 2 obj_df = self.df[self.df['playerId'] == obj_id] obj_speed_list = obj_df['v'].values.tolist() obj_speed_vs_time = self.zip_time_pairs(obj_speed_list) rel_speed_list = obj_df['vrel_projection_in_dist'].values.tolist() rel_speed_vs_time = self.zip_time_pairs(rel_speed_list) # accData lon_acc_list = self.ego_df['lon_acc'].values.tolist() lon_acc_vs_time = self.zip_time_pairs(lon_acc_list) lat_acc_list = self.ego_df['lat_acc'].values.tolist() lat_acc_vs_time = self.zip_time_pairs(lat_acc_list) # disData distance_list = obj_df['dist'].values.tolist() distance_vs_time = self.zip_time_pairs(distance_list) # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time] # report_dict["disData"] = distance_vs_time # ttcData ttc_list = obj_df['TTC'].values.tolist() ttc_list = [9999 if math.isinf(x) else x for x in ttc_list] ttc_vs_time = self.zip_time_pairs(ttc_list) # for description good_type_list = [] bad_type_list = [] # good_metric_list = [] # bad_metric_list = [] # str for description # str_over_optimal = "" safe_over_optimal_dict = {} type_details_dict = {} for type in self.type_list: if type == "safeTime": time_dict = { "name": self.type_name_dict[type], } builtin_graph_dict = {} custom_graph_dict = {} # ------------------------------ bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) # time metric dict score_time = score_type_dict['safeTime'] grade_time = score_grade(score_time) time_dict["score"] = score_time time_dict["level"] = grade_time # safeTime description unsafe_time_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value < 80)] safe_time_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value >= 80)] str_time_over_optimal = '' str_time_over_optimal_time = '' if not unsafe_time_type_list: str_safe_time_type = string_concatenate(safe_time_type_list) time_description1 = f'{str_safe_time_type}指标均表现良好' time_description2 = f'{str_safe_time_type}指标均在合理范围内,表现良好' else: for metric in unsafe_time_type_list: if metric in self.bulitin_metric_list: metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) / self.optimal1_dict[metric]) * 100 str_time_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,' metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[ metric].count() / len_time str_time_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;' else: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - self.custom_data[metric]["value"][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_time_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;' str_time_over_optimal = str_time_over_optimal[:-1] str_time_over_optimal_time = str_time_over_optimal_time[:-1] str_safe_time_type = string_concatenate(safe_time_type_list) str_unsafe_time_type = string_concatenate(unsafe_time_type_list) if not safe_time_type_list: time_description1 = f"{str_unsafe_time_type}指标表现不佳,{str_time_over_optimal}" time_description2 = f"{str_time_over_optimal_time},算法应加强在该时间段对跟车距离的控制" else: time_description1 = f"{str_safe_time_type}指标表现良好,{str_unsafe_time_type}指标表现不佳,{str_time_over_optimal}" time_description2 = f"{str_safe_time_type}指标均在合理范围内,表现良好,{str_time_over_optimal_time},算法应加强在该时间段对跟车距离的控制" time_dict["description1"] = replace_key_with_value(time_description1, self.name_dict) time_dict["description2"] = replace_key_with_value(time_description2, self.name_dict) safe_over_optimal_dict['safeTime'] = str_time_over_optimal_time # safeTime extremum time_dict_indexes = {} if "TTC" in self.metric_list: self.unsafe_time_ttc_df_statistic(obj_df) if "MTTC" in self.metric_list: self.unsafe_time_mttc_df_statistic(obj_df) if "THW" in self.metric_list: self.unsafe_time_thw_df_statistic(obj_df) unsafe_time_df = self.unsafe_time_df.copy() unsafe_time_df['type'] = "origin" unsafe_time_slices = unsafe_time_df.to_dict('records') for metric in self.metric_dict[type]: if metric in self.bulitin_metric_list: metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \ self.most_dangerous[ metric] metric_list = obj_df[metric].values.tolist() metric_vs_time = self.zip_time_pairs(metric_list) unsafe_metric_df = self.unsafe_time_df.copy() unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin" unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time" unsafe_metric_slices = unsafe_metric_df.to_dict('records') time_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": str( int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) == self.pass_percent[metric] * 100 else round( self.pass_percent[metric] * 100, 2)) + '%' } if metric != 'TTC': # TTC in commonData metric_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": metric_vs_time, "range": f"[{self.optimal1_dict[metric]}, inf)", # "markLine": unsafe_metric_slices, # "markLine2": [self.optimal1_dict[metric]] } builtin_graph_dict[metric] = metric_data else: value = self.custom_data[metric]["value"][0] metric_extremum = upper_limit if value > upper_limit else value time_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', # "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": "-" } if self.custom_param_dict[metric]['kind'][0] == -1: time_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: time_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: time_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" metric_data = self.custom_data[metric]["reportData"] custom_graph_dict[metric] = metric_data time_dict["indexes"] = time_dict_indexes time_dict["builtin"] = builtin_graph_dict time_dict["custom"] = custom_graph_dict # time_dict["disData"] = distance_vs_time # time_dict["disMarkLine"] = unsafe_time_slices # time_dict = { # "score": score_time, # "level": grade_time, # "description1": time_description1, # "description2": time_description2, # "indexes": time_dict_indexes, # # "disData": distance_vs_time, # "disMarkLine": unsafe_time_slices, # # "TTC": ttc_data, # "MTTC": mttc_data, # "THW": thw_data # } type_details_dict[type] = time_dict elif type == "safeDistance": distance_dict = { "name": self.type_name_dict[type], } builtin_graph_dict = {} custom_graph_dict = {} bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) # ------------------------------ # distance metric dict score_distance = score_type_dict['safeDistance'] grade_distance = score_grade(score_distance) distance_dict["score"] = score_distance distance_dict["level"] = grade_distance # safeDistance description unsafe_dist_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value < 80)] safe_dist_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value >= 80)] str_dist_over_optimal = '' str_dist_over_optimal_time = '' if not unsafe_dist_type_list: str_safe_dist_type = string_concatenate(safe_dist_type_list) distance_description1 = f"{str_safe_dist_type}指标均表现良好" distance_description2 = f"{str_safe_dist_type}指标均在合理范围内,表现良好" else: for metric in unsafe_dist_type_list: if metric in self.bulitin_metric_list: metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) / self.optimal1_dict[metric]) * 100 str_dist_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,' metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[ metric].count() / len_time str_dist_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;' else: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - self.custom_data[metric]["value"][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_dist_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;' str_dist_over_optimal = str_dist_over_optimal[:-1] str_dist_over_optimal_time = str_dist_over_optimal_time[:-1] str_safe_dist_type = string_concatenate(safe_dist_type_list) str_unsafe_dist_type = string_concatenate(unsafe_dist_type_list) if not safe_dist_type_list: distance_description1 = f"{str_unsafe_dist_type}指标表现不佳,{str_dist_over_optimal}" distance_description2 = f"{str_dist_over_optimal_time}" else: distance_description1 = f"{str_safe_dist_type}指标表现良好,{str_unsafe_dist_type}指标表现不佳,{str_dist_over_optimal}" distance_description2 = f"{str_safe_dist_type}指标均在合理范围内,表现良好,{str_dist_over_optimal_time}" distance_dict["description1"] = replace_key_with_value(distance_description1, self.name_dict) distance_dict["description2"] = replace_key_with_value(distance_description2, self.name_dict) safe_over_optimal_dict['safeDistance'] = str_dist_over_optimal_time # safeDistance extremum distance_dict_indexes = {} if "LonSD" in self.metric_list: self.unsafe_distance_lonsd_df_statistic(obj_df) if "LatSD" in self.metric_list: self.unsafe_distance_latsd_df_statistic(obj_df) unsafe_dist_df = self.unsafe_dist_df.copy() unsafe_dist_df['type'] = "origin" # unsafe_dist_slices = unsafe_dist_df.to_dict('records') for metric in self.metric_dict[type]: if metric in self.bulitin_metric_list: metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \ self.most_dangerous[metric] metric_list = obj_df[metric].values.tolist() metric_vs_time = self.zip_time_pairs(metric_list) unsafe_metric_df = self.unsafe_dist_df.copy().dropna() unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin" unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "distance" unsafe_metric_slices = unsafe_metric_df.to_dict('records') distance_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": str( int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) == self.pass_percent[metric] * 100 else round( self.pass_percent[metric] * 100, 2)) + '%' } metric_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": metric_vs_time, "range": f"[{self.optimal1_dict[metric]}, inf)", # "markLine": unsafe_metric_slices, # "markLine2": [self.optimal1_dict[metric]] } builtin_graph_dict[metric] = metric_data else: value = self.custom_data[metric]["value"][0] metric_extremum = upper_limit if value > upper_limit else value distance_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', # "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": "-" } if self.custom_param_dict[metric]['kind'][0] == -1: distance_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: distance_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: distance_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" metric_data = self.custom_data[metric]["reportData"] custom_graph_dict[metric] = metric_data distance_dict["indexes"] = distance_dict_indexes distance_dict["builtin"] = builtin_graph_dict distance_dict["custom"] = custom_graph_dict type_details_dict[type] = distance_dict # distance_dict = { # "score": score_distance, # "level": grade_distance, # "description1": distance_description1, # "description2": distance_description2, # "indexes": distance_dict_indexes, # # "LonSD": lonsd_data, # "LatSD": latsd_data # } elif type == "safeAcceleration": acceleration_dict = { "name": self.type_name_dict[type], } builtin_graph_dict = {} custom_graph_dict = {} bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) # ------------------------------ # acceleration metric dict score_acceleration = score_type_dict['safeAcceleration'] grade_acceleration = score_grade(score_acceleration) acceleration_dict["score"] = score_acceleration acceleration_dict["level"] = grade_acceleration # safeAcceleration data for graph lat_dist_list = obj_df['lat_d'].values.tolist() lat_dist_vs_time = self.zip_time_pairs(lat_dist_list) lon_dist_list = obj_df['lon_d'].values.tolist() lon_dist_vs_time = self.zip_time_pairs(lon_dist_list) # safeAcceleration description unsafe_acce_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value < 80)] safe_acce_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value >= 80)] str_acce_over_optimal = '' str_acce_over_optimal_time = '' if not unsafe_acce_type_list: str_safe_acce_type = string_concatenate(safe_acce_type_list) acceleration_description1 = f"{str_safe_acce_type}指标均表现良好" acceleration_description2 = f"{str_safe_acce_type}指标均在合理范围内,表现良好" else: for metric in unsafe_acce_type_list: if metric in self.bulitin_metric_list: metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) / self.optimal1_dict[metric]) * 100 str_acce_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,' metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[ metric].count() / len_time str_acce_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;' else: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - self.custom_data[metric]["value"][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_acce_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;' str_acce_over_optimal = str_acce_over_optimal[:-1] str_acce_over_optimal_time = str_acce_over_optimal_time[:-1] str_safe_acce_type = string_concatenate(safe_acce_type_list) str_unsafe_acce_type = string_concatenate(unsafe_acce_type_list) if not safe_acce_type_list: acceleration_description1 = f"{str_unsafe_acce_type}指标表现不佳,{str_acce_over_optimal}" acceleration_description2 = f"{str_acce_over_optimal_time}" else: acceleration_description1 = f"{str_safe_acce_type}指标表现良好,{str_unsafe_acce_type}指标表现不佳,{str_acce_over_optimal}" acceleration_description2 = f"{str_safe_acce_type}指标均在合理范围内,表现良好,{str_acce_over_optimal_time}" acceleration_dict["description1"] = replace_key_with_value(acceleration_description1, self.name_dict) acceleration_dict["description2"] = replace_key_with_value(acceleration_description2, self.name_dict) safe_over_optimal_dict['safeAcceleration'] = str_acce_over_optimal_time # safeAcceleration extremum acceleration_dict_indexes = {} if "DRAC" in self.metric_list: self.unsafe_acceleration_drac_df_statistic(obj_df) if "BTN" in self.metric_list: self.unsafe_acceleration_btn_df_statistic(obj_df) if "STN" in self.metric_list: self.unsafe_acceleration_stn_df_statistic(obj_df) unsafe_acce_drac_df = self.unsafe_acce_drac_df.copy().dropna() unsafe_acce_drac_df['type'] = "origin" unsafe_acce_drac_slices = unsafe_acce_drac_df.to_dict('records') unsafe_acce_xtn_df = self.unsafe_acce_xtn_df.copy() unsafe_acce_xtn_df['type'] = "origin" unsafe_acce_xtn_slices = unsafe_acce_xtn_df.to_dict('records') for metric in self.metric_dict[type]: if metric in self.bulitin_metric_list: metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \ self.most_dangerous[metric] metric_list = obj_df[metric].values.tolist() metric_vs_time = self.zip_time_pairs(metric_list) if metric == "DRAC": unsafe_metric_df = self.unsafe_acce_drac_df.copy().dropna() else: unsafe_metric_df = self.unsafe_acce_xtn_df.copy().dropna() unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin" unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time" unsafe_metric_slices = unsafe_metric_df.to_dict('records') acceleration_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', "range": f"[0, {self.optimal1_dict[metric]}]", "rate": str( int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) == self.pass_percent[metric] * 100 else round( self.pass_percent[metric] * 100, 2)) + '%' } metric_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": metric_vs_time, "range": f"[0, {self.optimal1_dict[metric]}]", # "markLine": unsafe_metric_slices, # "markLine2": [self.optimal1_dict[metric]] } builtin_graph_dict[metric] = metric_data else: value = self.custom_data[metric]["value"][0] metric_extremum = upper_limit if value > upper_limit else value acceleration_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', # "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": "-" } if self.custom_param_dict[metric]['kind'][0] == -1: acceleration_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: acceleration_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: acceleration_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" metric_data = self.custom_data[metric]["reportData"] custom_graph_dict[metric] = metric_data acceleration_dict["indexes"] = acceleration_dict_indexes acceleration_dict["builtin"] = builtin_graph_dict acceleration_dict["custom"] = custom_graph_dict # acceleration_dict["disData"] = distance_vs_time # acceleration_dict["disMarkLine"] = unsafe_acce_drac_slices # acceleration_dict["dis2Data"] = [lat_dist_vs_time, lon_dist_vs_time] # acceleration_dict["dis2MarkLine"] = unsafe_acce_xtn_slices type_details_dict[type] = acceleration_dict # acceleration_dict = { # "score": score_acceleration, # "level": grade_acceleration, # "description1": acceleration_description1, # "description2": acceleration_description2, # "indexes": acceleration_dict_indexes, # # "disData": distance_vs_time, # "disMarkLine": unsafe_acce_drac_slices, # # "dis2Data": [lat_dist_vs_time, lon_dist_vs_time], # "dis2MarkLine": unsafe_acce_xtn_slices, # # "DRAC": drac_data, # "BTN": btn_data, # "STN": stn_data # } elif type == "safeProbability": probability_dict = { "name": self.type_name_dict[type], } builtin_graph_dict = {} custom_graph_dict = {} bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) # ------------------------------ # probability metric dict score_probability = score_type_dict['safeProbability'] grade_probability = score_grade(score_probability) probability_dict["score"] = score_probability probability_dict["level"] = grade_probability # safeProbability description unsafe_prob_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value < 80)] safe_prob_type_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value >= 80)] str_prob_over_optimal = '' str_prob_over_optimal_time = '' if not unsafe_prob_type_list: str_safe_prob_type = string_concatenate(safe_prob_type_list) probability_description1 = f"{str_safe_prob_type}指标均表现良好" probability_description2 = f"{str_safe_prob_type}指标均在合理范围内,表现良好" else: for metric in unsafe_prob_type_list: if metric in self.bulitin_metric_list: metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) / self.optimal1_dict[metric]) * 100 str_prob_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,' metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[ metric].count() / len_time str_prob_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;' else: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - self.custom_data[metric]["value"][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_prob_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;' str_prob_over_optimal = str_prob_over_optimal[:-1] str_prob_over_optimal_time = str_prob_over_optimal_time[:-1] str_safe_prob_type = string_concatenate(safe_prob_type_list) str_unsafe_prob_type = string_concatenate(unsafe_prob_type_list) if not safe_prob_type_list: probability_description1 = f"{str_unsafe_prob_type}指标表现不佳,{str_prob_over_optimal}" probability_description2 = f"{str_prob_over_optimal_time}" else: probability_description1 = f"{str_safe_prob_type}指标表现良好,{str_unsafe_prob_type}指标表现不佳,{str_prob_over_optimal}" probability_description2 = f"{str_safe_prob_type}指标均在合理范围内,表现良好,{str_prob_over_optimal_time}" probability_dict["description1"] = replace_key_with_value(probability_description1, self.name_dict) probability_dict["description2"] = replace_key_with_value(probability_description2, self.name_dict) safe_over_optimal_dict['safeProbability'] = str_prob_over_optimal_time # safeProbability extremum probability_dict_indexes = {} if "collisionRisk" in self.metric_list: self.unsafe_probability_cr_df_statistic(obj_df) if "collisionSeverity" in self.metric_list: self.unsafe_probability_cs_df_statistic(obj_df) unsafe_prob_df = self.unsafe_prob_df.copy().dropna() unsafe_prob_df['type'] = "origin" unsafe_prob_slices = unsafe_prob_df.to_dict('records') for metric in self.metric_dict[type]: if metric in self.bulitin_metric_list: metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \ self.most_dangerous[ metric] metric_list = obj_df[metric].values.tolist() metric_vs_time = self.zip_time_pairs(metric_list) unsafe_metric_df = self.unsafe_prob_df.copy().dropna() unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin" unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time" unsafe_metric_slices = unsafe_metric_df.to_dict('records') probability_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', "range": f"[0, {self.optimal1_dict[metric]}]", "rate": str( int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) == self.pass_percent[metric] * 100 else round( self.pass_percent[metric] * 100, 2)) + '%' } metric_data = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "data": metric_vs_time, "range": f"[0, {self.optimal1_dict[metric]}]", # "markLine": unsafe_metric_slices, # "markLine2": [self.optimal1_dict[metric]] } builtin_graph_dict[metric] = metric_data else: value = self.custom_data[metric]["value"][0] metric_extremum = upper_limit if value > upper_limit else value probability_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', # "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": "-" } if self.custom_param_dict[metric]['kind'][0] == -1: probability_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: probability_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: probability_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" metric_data = self.custom_data[metric]["reportData"] custom_graph_dict[metric] = metric_data probability_dict["indexes"] = probability_dict_indexes probability_dict["builtin"] = builtin_graph_dict probability_dict["custom"] = custom_graph_dict # probability_dict["ttcData"] = ttc_vs_time # probability_dict["ttcMarkLine"] = unsafe_prob_slices type_details_dict[type] = probability_dict # probability_dict = { # "score": score_probability, # "level": grade_probability, # "description1": probability_description1, # "description2": probability_description2, # "indexes": probability_dict_indexes, # # "ttcData": ttc_vs_time, # "ttcMarkLine": unsafe_prob_slices, # # "collisionRisk": cr_data, # "collisionSeverity": cs_data # } else: bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type) type_dict = { "name": self.type_name_dict[type], } builtin_graph_dict = {} custom_graph_dict = {} # get score and grade score_custom_type = score_type_dict[type] grade_custom_type = score_grade(score_custom_type) type_dict["score"] = score_custom_type type_dict["level"] = grade_custom_type # custom type description bad_custom_metric_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value < 80)] good_custom_metric_list = [key for key, value in score_metric_dict.items() if (key in self.metric_dict[type]) and (value >= 80)] str_type_over_optimal = '' str_type_over_optimal_time = '' if not bad_custom_metric_list: str_safe_type_type = string_concatenate(good_custom_metric_list) type_description1 = f'{str_safe_type_type}指标均表现良好' type_description2 = f'{str_safe_type_type}指标均在合理范围内,表现良好' else: for metric in bad_custom_metric_list: metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] - self.custom_data[metric]["value"][0]) / self.custom_param_dict[metric]['optimal'][0]) * 100 str_type_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;' str_type_over_optimal = str_type_over_optimal[:-1] str_type_over_optimal_time = str_type_over_optimal_time[:-1] str_safe_type_type = string_concatenate(good_custom_metric_list) str_unsafe_type_type = string_concatenate(bad_custom_metric_list) if not good_custom_metric_list: type_description1 = f"{str_unsafe_type_type}指标表现不佳,{str_type_over_optimal}" type_description2 = f"{str_type_over_optimal_time},算法应加强在该时间段对跟车距离的控制" else: type_description1 = f"{str_safe_type_type}指标表现良好,{str_unsafe_type_type}指标表现不佳,{str_type_over_optimal}" type_description2 = f"{str_safe_type_type}指标均在合理范围内,表现良好,{str_type_over_optimal_time},算法应加强在该时间段对跟车距离的控制" type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict) type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict) type_dict_indexes = {} for metric in self.metric_dict[type]: value = self.custom_data[metric]["value"][0] metric_extremum = upper_limit if value > upper_limit else value type_dict_indexes[metric] = { # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "meaning": f"{self.name_dict[metric]}", "score": score_metric_dict[metric], "extremum": f'{metric_extremum:.2f}', # "range": f"[{self.optimal1_dict[metric]}, inf)", "rate": "-" } if self.custom_param_dict[metric]['kind'][0] == -1: type_dict_indexes[metric][ "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]" elif self.custom_param_dict[metric]['kind'][0] == 1: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)" elif self.custom_param_dict[metric]['kind'][0] == 0: type_dict_indexes[metric][ "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]" metric_data = self.custom_data[metric]["reportData"] custom_graph_dict[metric] = metric_data type_dict["indexes"] = type_dict_indexes type_dict["builtin"] = builtin_graph_dict type_dict["custom"] = custom_graph_dict type_details_dict[type] = type_dict report_dict["details"] = type_details_dict # good_type_list = [] # bad_type_list = [] # # for type in self.type_list: # bad_type_list.append(type) if any(i < 80 for key, i in score_metric_dict.items() if # key in self.metric_dict[type]) else good_type_list.append(type) unsafe_time = self.df['unsafe_flag'].sum() * duration / len_time unsafe_time = round(unsafe_time, 2) safe_over_optimal = [value for key, value in safe_over_optimal_dict.items() if value] # 解决空字符串多逗号问题 str_safe_over_optimal = ';'.join(safe_over_optimal) if grade_safe == '优秀': safe_description1 = '车辆在本轮测试中无碰撞风险;' elif grade_safe == '良好': safe_description1 = '算法在本轮测试中的表现满足设计指标要求;' elif grade_safe == '一般': str_unsafe_type = string_concatenate(bad_type_list) safe_description1 = f'未满足设计指标要求。算法需要在{str_unsafe_type}上进一步优化。在仿真过程中共有{unsafe_time}s的时间处于危险状态。' \ f'在{str_unsafe_type}中,{str_safe_over_optimal};' elif grade_safe == '较差': str_unsafe_type = string_concatenate(bad_type_list) safe_description1 = f'未满足设计指标要求。算法在本轮测试中有碰撞风险,需要提高算法在{str_unsafe_type}上的表现。在{str_unsafe_type}中,' \ f'{str_safe_over_optimal};' if not bad_type_list: safe_description2 = '安全性在各个指标上的表现俱佳。' else: str_unsafe_type = string_concatenate(bad_type_list) safe_description2 = f"安全性在{str_unsafe_type}上存在严重风险,需要重点优化。" report_dict["description1"] = replace_key_with_value(safe_description1, self.name_dict) report_dict["description1"] = replace_key_with_value(safe_description1, self.type_name_dict) report_dict["description2"] = replace_key_with_value(safe_description2, self.type_name_dict) report_dict['commonData'] = { "per": { "name": "脚刹/油门踏板开度(百分比)", "legend": ["刹车踏板开度", "油门踏板开度"], "data": [brake_vs_time, throttle_vs_time] }, "ang": { "name": "方向盘转角(角度°)", "data": steering_vs_time }, "spe": { "name": "速度(km/h)", "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"], "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time] }, "acc": { "name": "加速度(m/s²)", "legend": ["横向加速度", "纵向加速度"], "data": [lat_acc_vs_time, lon_acc_vs_time] }, "dis": { "name": "前车距离(m)", "data": distance_vs_time }, "ttc": { "name": 'TTC(m)', "data": ttc_vs_time } } # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_time_df], ignore_index=True) # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_dist_df], ignore_index=True) # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_acce_drac_df], ignore_index=True) # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_acce_xtn_df], ignore_index=True) # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_prob_df], ignore_index=True) self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_time_df, self.unsafe_dist_df, self.unsafe_acce_drac_df, self.unsafe_acce_xtn_df, self.unsafe_prob_df], ignore_index=True) unsafe_df = self.unsafe_df.copy().dropna() unsafe_slices = unsafe_df.to_dict('records') report_dict["commonMarkLine"] = unsafe_slices # report_dict = { # "name": "安全性", # "weight": f"{self.weight * 100:.2f}%", # "weightDistribution": weight_distribution, # "score": score_safe, # "level": grade_safe, # 'collisionRisk': self.collisionRisk, # "description1": safe_description1, # "description2": safe_description2, # "noObjectCar": False, # # "safeTime": time_dict, # "safeDistance": distance_dict, # "safeAcceleration": acceleration_dict, # "safeProbability": probability_dict, # # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time], # "accData": [lat_acc_vs_time, lon_acc_vs_time], # # } return report_dict def get_eval_data(self): if not self.eval_data.empty and not self.empty_flag: df = self.eval_data[ ['simTime', 'simFrame', 'playerId', 'dist', 'lon_d', 'lat_d', 'lat_v_rel', 'lon_v_rel', 'vrel_projection_in_dist', 'arel_projection_in_dist', 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death', 'collisionRisk']].copy() elif 'dist' in self.eval_data.columns: df = self.eval_data[ ['simTime', 'simFrame', 'playerId', 'dist', 'lon_d', 'lat_d']].copy() else: df = self.eval_data.copy() return df