|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/07/25
- @Last Modified: 2023/07/26
- @Summary: safe metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import math
- import numpy as np
- import pandas as pd
- from collections import defaultdict
- import scipy.integrate as spi
- from score_weight import cal_score_with_priority, cal_weight_from_80
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- class Safe(object):
- """
- Class for achieving safe metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
- Methods:
- _safe_param_cal_new: Calculate parameters for evaluateion.
- _cal_v_ego_projection: Calculate ego car velocity project over distance.
- _cal_v_projection: Calculate relative velocity project over distance.
- _cal_a_projection: Calculate relative acceleration project over distance.
- _calculate_derivative: Calculate derivative.
- _cal_relative_angular_v: Calculate relative angular velocity.
- _death_pr: Calculate death probability.
- _cal_collisionRisk_level: Calculate collisionRisk level.
- dist: Calculate distance of two cars.
- """
- def __init__(self, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.data_processed = data_processed
- self.scoreModel = scoreModel
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_data
- self.laneinfo_df = data_processed.lane_info_df
- self.obj_id_list = data_processed.obj_id_list
- # config infos for calculating score
- self.config = data_processed.config
- safe_config = self.config.config['safe']
- self.safe_config = safe_config
- # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = safe_config['weightCustom']
- self.metric_list = safe_config['metric']
- self.type_list = safe_config['type']
- self.type_name_dict = safe_config['typeName']
- self.name_dict = safe_config['name']
- self.unit_dict = safe_config['unit']
- # custom metric data
- self.customMetricParam = safe_config['customMetricParam']
- self.custom_metric_list = list(self.customMetricParam.keys())
- self.custom_data = custom_data
- self.custom_param_dict = {}
- # score data
- self.weight = safe_config['weightDimension']
- self.weight_type_dict = safe_config['typeWeight']
- self.weight_type_list = safe_config['typeWeightList']
- self.weight_dict = safe_config['weight']
- self.weight_list = safe_config['weightList']
- self.priority_dict = safe_config['priority']
- self.priority_list = safe_config['priorityList']
- self.kind_dict = safe_config['kind']
- self.kind1_dict = self.kind_dict[0]
- self.optimal_dict = safe_config['optimal']
- self.optimal1_dict = self.optimal_dict[0]
- self.multiple_dict = safe_config['multiple']
- self.kind_list = safe_config['kindList']
- self.optimal_list = safe_config['optimalList']
- self.multiple_list = safe_config['multipleList']
- self.metric_dict = safe_config['typeMetricDict']
- # self.time_metric_list = self.metric_dict['safeTime']
- # self.distance_metric_list = self.metric_dict['safeDistance']
- # lists of drving control info
- self.time_list = data_processed.driver_ctrl_data['time_list']
- self.frame_list = self.ego_df['simFrame'].values.tolist()
- self.collisionRisk = 0
- self.empty_flag = True
- # no car following scene
- if len(self.obj_id_list) > 1:
- self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_drac_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_xtn_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.most_dangerous = {}
- self.pass_percent = {}
- self._safe_param_cal_new()
- def _safe_param_cal_new(self):
- """
- """
- Tc = 0.3
- rho = 0.3 # 驾驶员制动反应时间
- ego_accel_max = 6 # 自车油门最大加速度
- obj_decel_max = 8 # 前车刹车最大减速度
- ego_decel_min = 1 # 自车刹车最小减速度 ug
- ego_decel_lon_max = 8
- ego_decel_lat_max = 1
- # 构建双层字典数据结构
- obj_dict = defaultdict(dict)
- obj_data_dict = self.df.to_dict('records')
- for item in obj_data_dict:
- obj_dict[item['simFrame']][item['playerId']] = item
- df_list = []
- EGO_PLAYER_ID = 1
- # self.empty_flag = True
- for frame_num in self.frame_list:
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- v1 = ego_data['v'] / 3.6 # km/h to m/s
- x1 = ego_data['posX']
- y1 = ego_data['posY']
- h1 = ego_data['posH']
- len1 = ego_data['dimX']
- width1 = ego_data['dimY']
- o_x1 = ego_data['offX']
- v_x1 = ego_data['speedX'] / 3.6 # km/h to m/s
- v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
- a_x1 = ego_data['accelX']
- a_y1 = ego_data['accelY']
- # a1 = ego_data['accel']
- for playerId in self.obj_id_list:
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- lane_width = self.laneinfo_df[self.laneinfo_df['simFrame'].isin([frame_num])]['width'].values/2
- except KeyError:
- continue
- x2 = obj_data['posX']
- y2 = obj_data['posY']
- dist = self.dist(x1, y1, x2, y2)
- obj_data['dist'] = dist
- v_x2 = obj_data['speedX'] / 3.6 # km/h to m/s
- v_y2 = obj_data['speedY'] / 3.6 # km/h to m/s
- v2 = obj_data['v'] / 3.6 # km/h to m/s
- # h2 = obj_data['posH']
- len2 = obj_data['dimX']
- width2 = obj_data['dimY']
- o_x2 = obj_data['offX']
- a_x2 = obj_data['accelX']
- a_y2 = obj_data['accelY']
- # a2 = obj_data['accel']
- dx, dy = x2 - x1, y2 - y1
- # 定义矢量A和x轴正向向量x
- A = np.array([dx, dy])
- x = np.array([1, 0])
- # 计算点积和向量长度
- dot_product = np.dot(A, x)
- vector_length_A = np.linalg.norm(A)
- vector_length_x = np.linalg.norm(x)
- # 计算夹角的余弦值
- cos_theta = dot_product / (vector_length_A * vector_length_x)
- # 将余弦值转换为角度值(弧度制)
- beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
- lon_d = dist * math.cos(beta - h1)
- lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
- if lon_d > 100 or lon_d < -5 or lat_d > 4:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
- v_x2, v_y2)
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
- obj_type = obj_data['type']
- if abs(dist*cos_theta) <= lane_width[0]:
- TTC = self._cal_TTC(dist, vrel_projection_in_dist)
- else:
- TTC = 10000
- MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
- THW = self._cal_THW(dist, v_ego_p)
- # 单车道时可用
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
- obj_decel_max)
- lat_dist = 0.5
- v_right = v1
- v_left = v2
- a_right_lat_brake_min = 1
- a_left_lat_brake_min = 1
- a_lat_max = 5
- LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- a_left_lat_brake_min,
- a_lat_max)
- DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
- lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
- lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
- lon_a = abs(lon_a1 - lon_a2)
- lon_d = dist * abs(math.cos(beta - h1))
- lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
- lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
- lat_a = abs(lat_a1 - lat_a2)
- lat_d = dist * abs(math.sin(beta - h1))
- lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
- STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
- obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
- obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
- # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
- # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
- TTC = None if (not TTC or TTC < 0) else TTC
- MTTC = None if (not MTTC or MTTC < 0) else MTTC
- THW = None if (not THW or THW < 0) else THW
- DRAC = 10 if DRAC >= 10 else DRAC
- if not TTC or TTC > 4000: # threshold = 4258.41
- collisionSeverity = 0
- pr_death = 0
- collisionRisk = 0
- else:
- result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
- collisionSeverity = 1 - result
- pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
- collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
- obj_dict[frame_num][playerId]['TTC'] = TTC
- obj_dict[frame_num][playerId]['MTTC'] = MTTC
- obj_dict[frame_num][playerId]['THW'] = THW
- obj_dict[frame_num][playerId]['LonSD'] = LonSD
- obj_dict[frame_num][playerId]['LatSD'] = LatSD
- obj_dict[frame_num][playerId]['DRAC'] = DRAC
- obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
- obj_dict[frame_num][playerId]['STN'] = abs(STN)
- obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
- obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
- obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
- df_list.append(df_fnum)
- df_safe = pd.concat(df_list)
- col_list = ['simTime', 'simFrame', 'playerId', 'v', 'accel', 'lon_acc', 'lat_acc', 'dist', 'lon_d', 'lat_d',
- 'lat_v_rel', 'lon_v_rel', 'v_ego_projection_in_dist',
- 'v_obj_projection_in_dist', 'vrel_projection_in_dist', 'arel_projection_in_dist',
- 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death',
- 'collisionRisk']
- if not self.empty_flag:
- df_safe = df_safe[col_list].reset_index(drop=True)
- # df_safe.reset_index(drop=True)
- self.eval_data = df_safe.copy()
- self.df = df_safe
- self.df['flag'] = 0
- if not self.empty_flag:
- for metric in self.metric_list:
- if metric in self.bulitin_metric_list:
- self.df['tmp'] = self.df[metric].apply(lambda x: 1 if x < self.optimal1_dict[metric] else 0)
- self.df['flag'] = self.df['flag'] + self.df['tmp']
- self.df['unsafe_flag'] = self.df['flag'].apply(lambda x: 1 if x > 0 else 0)
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- return V_on_AB
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
- return angular_velocity
- def _death_pr(self, obj_type, v_relative):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- if vrel_projection_in_dist == 0:
- return math.inf
- TTC = dist / vrel_projection_in_dist
- return TTC
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
- MTTC = math.nan
- if arel_projection_in_dist != 0:
- tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
- if tmp < 0:
- return math.nan
- t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
- t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
- if t1 > 0 and t2 > 0:
- if t1 >= t2:
- MTTC = t2
- elif t1 < t2:
- MTTC = t1
- elif t1 > 0 and t2 <= 0:
- MTTC = t1
- elif t1 <= 0 and t2 > 0:
- MTTC = t2
- if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
- MTTC = dist / vrel_projection_in_dist
- return MTTC
- # THW (time headway)
- def _cal_THW(self, dist, v_ego_projection_in_dist):
- if not v_ego_projection_in_dist:
- THW = None
- else:
- THW = dist / v_ego_projection_in_dist
- return THW
- def velocity(self, v_x, v_y):
- v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
- return v
- def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
- lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
- 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
- return lon_dist_min
- def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min,
- a_lat_max):
- v_right_rho = v_right + rho * a_lat_max
- v_left_rho = v_left + rho * a_lat_max
- dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + (
- (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2)
- return dist_min
- # DRAC (decelerate required avoid collision)
- def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
- if dist_length < 0:
- dist_width = dist - (width2 / 2 + width1 / 2)
- if dist_width < 0:
- return math.inf
- else:
- d = dist_width
- else:
- d = dist_length
- DRAC = vrel_projection_in_dist ** 2 / (2 * d)
- return DRAC
- # BTN (brake threat number)
- def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
- BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
- STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
- width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
- return STN
- # BTN (brake threat number)
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
- BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
- STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
- return STN
- # 追尾碰撞风险
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- # def continuous_group_old(self, df):
- # time_list = df['simTime'].values.tolist()
- # frame_list = df['simFrame'].values.tolist()
- #
- # group = []
- # sub_group = []
- #
- # for i in range(len(frame_list)):
- # if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
- # sub_group.append(time_list[i])
- # else:
- # group.append(sub_group)
- # sub_group = [time_list[i]]
- #
- # group.append(sub_group)
- # group = [g for g in group if len(g) >= 2]
- #
- # # 输出图表值
- # time = [[g[0], g[-1]] for g in group]
- # unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- #
- # return unsafe_df
- def unsafe_time_ttc_df_statistic(self, obj_df):
- ttc_df = obj_df[obj_df['TTC'] < self.optimal1_dict['TTC']]
- ttc_df = ttc_df[['simTime', 'simFrame', 'TTC']]
- ttc_time_df = self.continuous_group(ttc_df)
- ttc_time_df['type'] = 'TTC'
- # ttc_time_df['type'] = 'time'
- self.unsafe_time_df = pd.concat([self.unsafe_time_df, ttc_time_df], ignore_index=True)
- def unsafe_time_mttc_df_statistic(self, obj_df):
- mttc_df = obj_df[obj_df['MTTC'] < self.optimal1_dict['MTTC']]
- mttc_df = mttc_df[['simTime', 'simFrame', 'MTTC']]
- mttc_time_df = self.continuous_group(mttc_df)
- mttc_time_df['type'] = 'MTTC'
- # mttc_time_df['type'] = 'time'
- self.unsafe_time_df = pd.concat([self.unsafe_time_df, mttc_time_df], ignore_index=True)
- def unsafe_time_thw_df_statistic(self, obj_df):
- thw_df = obj_df[obj_df['THW'] < self.optimal1_dict['THW']]
- thw_df = thw_df[['simTime', 'simFrame', 'THW']]
- thw_time_df = self.continuous_group(thw_df)
- thw_time_df['type'] = 'THW'
- # thw_time_df['type'] = 'time'
- self.unsafe_time_df = pd.concat([self.unsafe_time_df, thw_time_df], ignore_index=True)
- def unsafe_distance_lonsd_df_statistic(self, obj_df):
- lonsd_df = obj_df[obj_df['LonSD'] < self.optimal1_dict['LonSD']]
- lonsd_df = lonsd_df[['simTime', 'simFrame', 'LonSD']]
- lonsd_dist_df = self.continuous_group(lonsd_df)
- lonsd_dist_df['type'] = 'LonSD'
- # lonsd_dist_df['type'] = 'distance'
- self.unsafe_dist_df = pd.concat([self.unsafe_dist_df, lonsd_dist_df], ignore_index=True)
- def unsafe_distance_latsd_df_statistic(self, obj_df):
- latsd_df = obj_df[obj_df['LatSD'] < self.optimal1_dict['LatSD']]
- latsd_df = latsd_df[['simTime', 'simFrame', 'LatSD']]
- latsd_dist_df = self.continuous_group(latsd_df)
- latsd_dist_df['type'] = 'LatSD'
- # latsd_dist_df['type'] = 'distance'
- self.unsafe_dist_df = pd.concat([self.unsafe_dist_df, latsd_dist_df], ignore_index=True)
- def unsafe_acceleration_drac_df_statistic(self, obj_df):
- drac_df = obj_df[obj_df['DRAC'] > self.optimal1_dict['DRAC']]
- drac_df = drac_df[['simTime', 'simFrame', 'DRAC']]
- drac_acce_df = self.continuous_group(drac_df)
- drac_acce_df['type'] = 'DRAC'
- # drac_acce_df['type'] = 'acceleration'
- self.unsafe_acce_drac_df = pd.concat([self.unsafe_acce_drac_df, drac_acce_df], ignore_index=True)
- def unsafe_acceleration_btn_df_statistic(self, obj_df):
- btn_df = obj_df[obj_df['BTN'] > self.optimal1_dict['BTN']]
- btn_df = btn_df[['simTime', 'simFrame', 'BTN']]
- btn_acce_df = self.continuous_group(btn_df)
- btn_acce_df['type'] = 'BTN'
- # btn_acce_df['type'] = 'acceleration'
- self.unsafe_acce_xtn_df = pd.concat([self.unsafe_acce_xtn_df, btn_acce_df], ignore_index=True)
- def unsafe_acceleration_stn_df_statistic(self, obj_df):
- stn_df = obj_df[obj_df['STN'] > self.optimal1_dict['STN']]
- stn_df = stn_df[['simTime', 'simFrame', 'STN']]
- stn_acce_df = self.continuous_group(stn_df)
- stn_acce_df['type'] = 'STN'
- # stn_acce_df['type'] = 'acceleration'
- self.unsafe_acce_xtn_df = pd.concat([self.unsafe_acce_xtn_df, stn_acce_df], ignore_index=True)
- def unsafe_probability_cr_df_statistic(self, obj_df):
- cr_df = obj_df[obj_df['collisionRisk'] > self.optimal1_dict['collisionRisk']]
- cr_df = cr_df[['simTime', 'simFrame', 'collisionRisk']]
- cr_prob_df = self.continuous_group(cr_df)
- cr_prob_df['type'] = 'collisionRisk'
- # cr_prob_df['type'] = 'probability'
- self.unsafe_prob_df = pd.concat([self.unsafe_prob_df, cr_prob_df], ignore_index=True)
- def unsafe_probability_cs_df_statistic(self, obj_df):
- cs_df = obj_df[obj_df['collisionSeverity'] > self.optimal1_dict['collisionSeverity']]
- cs_df = cs_df[['simTime', 'simFrame', 'collisionSeverity']]
- cs_prob_df = self.continuous_group(cs_df)
- cs_prob_df['type'] = 'collisionSeverity'
- # cs_prob_df['type'] = 'probability'
- self.unsafe_prob_df = pd.concat([self.unsafe_prob_df, cs_prob_df], ignore_index=True)
- def _safe_statistic_most_dangerous(self):
- min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
- max_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity']
- for metric in min_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.most_dangerous[metric] = self.df[metric].min()
- else:
- self.most_dangerous[metric] = self.optimal1_dict[metric]
- if np.isnan(self.most_dangerous[metric]):
- self.most_dangerous[metric] = self.optimal1_dict[metric]
- for metric in max_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.most_dangerous[metric] = self.df[metric].max()
- else:
- self.most_dangerous[metric] = self.optimal1_dict[metric]
- # self.most_dangerous[metric] = self.df[metric].max()
- if np.isnan(self.most_dangerous[metric]):
- self.most_dangerous[metric] = self.optimal1_dict[metric]
- def _safe_statistic_pass_percent(self):
- greater_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
- lesser_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity']
- for metric in greater_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.pass_percent[metric] = self.df[self.df[metric] >= self.optimal1_dict[metric]][metric].count() / \
- self.df[metric].count()
- else:
- self.pass_percent[metric] = 0.8
- if np.isnan(self.pass_percent[metric]):
- self.pass_percent[metric] = 0.8
- for metric in lesser_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.pass_percent[metric] = self.df[self.df[metric] <= self.optimal1_dict[metric]][metric].count() / \
- self.df[metric].count()
- else:
- self.pass_percent[metric] = 0.8
- if np.isnan(self.pass_percent[metric]):
- self.pass_percent[metric] = 0.8
- if "collisionSeverity" in self.metric_list:
- self.collisionRisk = 1 - self.pass_percent["collisionSeverity"]
- def _safe_statistic(self):
- # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
- self._safe_statistic_most_dangerous()
- self._safe_statistic_pass_percent()
- most_dangerous_list = [abs(value) for key, value in self.most_dangerous.items() if key in self.metric_list]
- pass_percent_list = [abs(value) for key, value in self.pass_percent.items() if key in self.metric_list]
- arr_safe = [most_dangerous_list + pass_percent_list]
- return arr_safe
- def custom_metric_param_parser(self, param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def custom_metric_score(self, metric, value, param_list):
- """
- """
- param = self.custom_metric_param_parser(param_list)
- self.custom_param_dict[metric] = param
- score_model = self.scoreModel(param['kind'], param['optimal'], param['multiple'], np.array([value]))
- score_sub = score_model.cal_score()
- score = sum(score_sub) / len(score_sub)
- return score
- def _safe_no_obj_statistic(self):
- # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
- most_dangerous_list = [abs(value) for key, value in self.optimal1_dict.items() if key in self.metric_list]
- pass_percent_list = [1.0] * len(self.optimal1_dict.keys())
- arr_safe = [most_dangerous_list + pass_percent_list]
- return arr_safe
- def safe_score_new(self):
- """
- """
- score_metric_dict = {}
- score_type_dict = {}
- if len(self.obj_id_list) == 1:
- arr_safe = self._safe_no_obj_statistic()
- else:
- arr_safe = self._safe_statistic()
- print("\n[安全性表现及得分情况]")
- print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
- if arr_safe:
- arr_safe = np.array(arr_safe)
- score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
- score_sub = score_model.cal_score()
- metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
- metric_num = len(metric_list)
- if len(self.obj_id_list) > 1 and not self.empty_flag:
- score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
- else:
- score_sub = [80] * len(score_sub)
- score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
- score_metric = []
- for i in range(metric_num):
- score_tmp = (score_sub[i] + score_sub[i + metric_num]) / 2
- score_metric.append(round(score_tmp, 2))
- score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
- for metric in self.custom_metric_list:
- value = self.custom_data[metric]['value']
- param_list = self.customMetricParam[metric]
- score = self.custom_metric_score(metric, value, param_list)
- score_metric_dict[metric] = round(score, 2)
- score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- score_metric = list(score_metric_dict.values())
- if self.weight_custom: # 用户自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_safe = sum(score_type_with_weight_dict.values())
- else: # 动态客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- for key, value in self.weight_dict.items():
- if key in self.metric_dict[type]:
- # self.weight_dict[key] = round(value / type_weight, 4)
- self.weight_dict[key] = value / type_weight
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- for key in self.weight_dict:
- self.weight_dict[key] = round(self.weight_dict[key], 4)
- score_type = list(score_type_dict.values())
- self.weight_type_list = cal_weight_from_80(score_type)
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- score_safe = round(score_safe, 2)
- # score_type = [round(x, 2) for key, x in score_type_dict.items()]
- # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
- print("安全性各指标基准值:", self.optimal_list)
- print(f"安全性得分为:{score_safe:.2f}分。")
- print(f"安全性各类型得分为:{score_type_dict}。")
- print(f"安全性各指标得分为:{score_metric_dict}。")
- # return score_safe, score_type, score_metric
- return score_safe, score_type_dict, score_metric_dict
- # def zip_time_pairs(self, zip_list, upper_limit=9999):
- # zip_time_pairs = zip(self.time_list, zip_list)
- # zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
- # return zip_vs_time
- def zip_time_pairs(self, zip_list):
- zip_time_pairs = zip(self.time_list, zip_list)
- zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
- return zip_vs_time
- def _get_weight_distribution(self, dimension):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name[dimension]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def safe_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = "安全性"
- if "safeTime" in self.type_list:
- time_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if
- key in ['TTC', 'MTTC', 'THW']}
- weight_distribution_time = {
- "timeWeight": f"时间指标({self.weight_type_dict['safeTime'] * 100:.2f}%)",
- "indexes": time_weight_indexes_dict
- }
- weight_distribution["safeTime"] = weight_distribution_time
- if "safeDistance" in self.type_list:
- distance_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in self.weight_dict.items()
- if
- key in ['LonSD', 'LatSD']}
- weight_distribution_distance = {
- "distanceWeight": f"距离指标({self.weight_type_dict['safeDistance'] * 100:.2f}%)",
- "indexes": distance_weight_indexes_dict
- }
- weight_distribution["safeDistance"] = weight_distribution_distance
- if "safeAcceleration" in self.type_list:
- acceleration_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in ['DRAC', 'BTN', 'STN']}
- weight_distribution_acceleration = {
- "accelerationWeight": f"加速度指标({self.weight_type_dict['safeAcceleration'] * 100:.2f}%)",
- "indexes": acceleration_weight_indexes_dict
- }
- weight_distribution["safeAcceleration"] = weight_distribution_acceleration
- if "safeProbability" in self.type_list:
- probability_weight_indexes_dict = {key: f"{key}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in ['collisionRisk', 'collisionSeverity']}
- weight_distribution_probability = {
- "probabilityWeight": f"概率指标({self.weight_type_dict['safeProbability'] * 100:.2f}%)",
- "indexes": probability_weight_indexes_dict
- }
- weight_distribution["safeProbability"] = weight_distribution_probability
- return weight_distribution
- def normalize_dict_values(self, dictionary):
- # 计算字典中键的数量
- n = len(dictionary)
- # 初始化总和为0
- total_sum = 0
- # 遍历字典,对每个值进行赋值,并累加总和
- for key, value in dictionary.items():
- # 计算当前值,除了最后一个值外都四舍五入到两位小数
- if key != list(dictionary.keys())[-1]:
- new_value = round(1 / n, 2)
- else:
- # 最后一个值的计算:1减去之前所有值的和
- new_value = round(1 - total_sum, 2)
- # 更新字典的值
- dictionary[key] = new_value
- # 累加当前值到总和
- total_sum += new_value
- return dictionary
- def report_statistic(self):
- # time_list = self.data_processed.driver_ctrl_data['time_list']
- brakePedal_list = self.data_processed.driver_ctrl_data['brakePedal_list']
- throttlePedal_list = self.data_processed.driver_ctrl_data['throttlePedal_list']
- steeringWheel_list = self.data_processed.driver_ctrl_data['steeringWheel_list']
- # common parameter calculate
- brake_vs_time = self.zip_time_pairs(brakePedal_list)
- throttle_vs_time = self.zip_time_pairs(throttlePedal_list)
- steering_vs_time = self.zip_time_pairs(steeringWheel_list)
- # if len(self.obj_id_list) == 1:
- if self.empty_flag:
- # report_dict = {
- # "name": "安全性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": 80,
- # "level": "良好",
- #
- # 'collisionRisk': self.collisionRisk,
- # "description1": safe_description1,
- # "description2": safe_description2,
- # "noObjectCar": True,
- #
- # "safeTime": time_dict,
- # "safeDistance": distance_dict,
- # "safeAcceleration": acceleration_dict,
- # "safeProbability": probability_dict
- #
- # }
- self.weight_dict = self.normalize_dict_values(self.weight_dict)
- self.weight_type_dict = self.normalize_dict_values(self.weight_type_dict)
- score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
- # format score
- score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
- grade_safe = score_grade(score_safe)
- report_dict = {
- "name": "安全性",
- "weight": f"{self.weight * 100:.2f}%",
- "score": score_safe,
- "level": grade_safe,
- 'collisionRisk': self.collisionRisk,
- "noObjectCar": True,
- "weightDistribution": self._get_weight_distribution("safe")
- }
- # get weight distribution
- safe_description1 = "算法在无目标车情况下安全性表现良好,无碰撞风险;"
- safe_description2 = "安全性在无目标车情况下表现良好。"
- report_dict["description1"] = safe_description1
- report_dict["description2"] = safe_description2
- type_details_dict = {}
- for type in self.type_list:
- score_type = score_type_dict[type]
- grade_type = score_grade(score_type)
- type_dict = {
- "name":self.type_name_dict[type],
- "score": score_type,
- "level": grade_type,
- "description1": "无目标车数据可计算",
- "description2": "表现良好",
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- type_dict_indexes = {}
- for metric in self.metric_dict[type]:
- type_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": 80,
- "extremum": "-",
- # "range": f"[{self.optimal1_dict['TTC']}, inf)",
- "rate": "-"
- }
- if metric in self.bulitin_metric_list:
- if self.kind1_dict[metric] == -1:
- type_dict_indexes[metric]["range"] = f"[0, {self.optimal1_dict[metric]}]"
- elif self.kind1_dict[metric] == 1:
- type_dict_indexes[metric]["range"] = f"[{self.optimal1_dict[metric]}, inf)"
- elif self.kind1_dict[metric] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.optimal1_dict[metric] * self.multiple_dict[metric][0]}, {self.optimal1_dict[metric] * self.multiple_dict[metric][1]}]"
- else:
- if self.custom_param_dict[metric]['kind'][0] == -1:
- type_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- type_dict["indexes"] = type_dict_indexes
- type_dict["builtin"] = builtin_graph_dict
- type_dict["custom"] = custom_graph_dict
- type_details_dict[type] = type_dict
- report_dict["details"] = type_details_dict
- report_dict['commonData'] = {
- "per": {
- "name": "脚刹/油门踏板开度(百分比)",
- "legend": ["刹车踏板开度", "油门踏板开度"],
- "data": [brake_vs_time, throttle_vs_time]
- },
- "ang": {
- "name": "方向盘转角(角度°)",
- "data": steering_vs_time
- }
- # "spe": {
- # "name": "速度(km/h)",
- # "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
- # "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- #
- # },
- # "acc": {
- # "name": "加速度(m/s²)",
- # "legend": ["横向加速度", "纵向加速度"],
- # "data": [lat_acc_vs_time, lon_acc_vs_time]
- #
- # },
- # "dis": {
- # "name": "前车距离(m)",
- # "data": distance_vs_time
- # }
- }
- return report_dict
- # report_dict = {
- # "name": "安全性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_safe,
- # "level": grade_safe,
- # 'collisionRisk': self.collisionRisk,
- # "description1": safe_description1,
- # "description2": safe_description2,
- # "noObjectCar": False,
- #
- # "safeTime": time_dict,
- # "safeDistance": distance_dict,
- # "safeAcceleration": acceleration_dict,
- # "safeProbability": probability_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- report_dict = {
- "name": "安全性",
- "weight": f"{self.weight * 100:.2f}%",
- 'collisionRisk': self.collisionRisk,
- "noObjectCar": False,
- }
- upper_limit = 40
- times_upper = 2
- len_time = len(self.time_list)
- duration = self.time_list[-1]
- score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
- # format score
- score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
- grade_safe = score_grade(score_safe)
- report_dict["score"] = score_safe
- report_dict["level"] = grade_safe
- # get weight distribution
- report_dict['weightDistribution'] = self._get_weight_distribution("safe")
- # speData
- ego_speed_list = self.ego_df['v'].values.tolist()
- ego_speed_vs_time = self.zip_time_pairs(ego_speed_list)
- obj_id = 2
- obj_df = self.df[self.df['playerId'] == obj_id]
- obj_speed_list = obj_df['v'].values.tolist()
- obj_speed_vs_time = self.zip_time_pairs(obj_speed_list)
- rel_speed_list = obj_df['vrel_projection_in_dist'].values.tolist()
- rel_speed_vs_time = self.zip_time_pairs(rel_speed_list)
- # accData
- lon_acc_list = self.ego_df['lon_acc'].values.tolist()
- lon_acc_vs_time = self.zip_time_pairs(lon_acc_list)
- lat_acc_list = self.ego_df['lat_acc'].values.tolist()
- lat_acc_vs_time = self.zip_time_pairs(lat_acc_list)
- # disData
- distance_list = obj_df['dist'].values.tolist()
- distance_vs_time = self.zip_time_pairs(distance_list)
- # report_dict["speData"] = [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- # report_dict["accData"] = [lat_acc_vs_time, lon_acc_vs_time]
- # report_dict["disData"] = distance_vs_time
- # ttcData
- ttc_list = obj_df['TTC'].values.tolist()
- ttc_list = [9999 if math.isinf(x) else x for x in ttc_list]
- ttc_vs_time = self.zip_time_pairs(ttc_list)
- # for description
- good_type_list = []
- bad_type_list = []
- # good_metric_list = []
- # bad_metric_list = []
- # str for description
- # str_over_optimal = ""
- safe_over_optimal_dict = {}
- type_details_dict = {}
- for type in self.type_list:
- if type == "safeTime":
- time_dict = {
- "name": self.type_name_dict[type],
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # ------------------------------
- bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- # time metric dict
- score_time = score_type_dict['safeTime']
- grade_time = score_grade(score_time)
- time_dict["score"] = score_time
- time_dict["level"] = grade_time
- # safeTime description
- unsafe_time_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value < 80)]
- safe_time_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value >= 80)]
- str_time_over_optimal = ''
- str_time_over_optimal_time = ''
- if not unsafe_time_type_list:
- str_safe_time_type = string_concatenate(safe_time_type_list)
- time_description1 = f'{str_safe_time_type}指标均表现良好'
- time_description2 = f'{str_safe_time_type}指标均在合理范围内,表现良好'
- else:
- for metric in unsafe_time_type_list:
- if metric in self.bulitin_metric_list:
- metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) /
- self.optimal1_dict[metric]) * 100
- str_time_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,'
- metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[
- metric].count() / len_time
- str_time_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;'
- else:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- self.custom_data[metric]["value"][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_time_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;'
- str_time_over_optimal = str_time_over_optimal[:-1]
- str_time_over_optimal_time = str_time_over_optimal_time[:-1]
- str_safe_time_type = string_concatenate(safe_time_type_list)
- str_unsafe_time_type = string_concatenate(unsafe_time_type_list)
- if not safe_time_type_list:
- time_description1 = f"{str_unsafe_time_type}指标表现不佳,{str_time_over_optimal}"
- time_description2 = f"{str_time_over_optimal_time},算法应加强在该时间段对跟车距离的控制"
- else:
- time_description1 = f"{str_safe_time_type}指标表现良好,{str_unsafe_time_type}指标表现不佳,{str_time_over_optimal}"
- time_description2 = f"{str_safe_time_type}指标均在合理范围内,表现良好,{str_time_over_optimal_time},算法应加强在该时间段对跟车距离的控制"
- time_dict["description1"] = replace_key_with_value(time_description1, self.name_dict)
- time_dict["description2"] = replace_key_with_value(time_description2, self.name_dict)
- safe_over_optimal_dict['safeTime'] = str_time_over_optimal_time
- # safeTime extremum
- time_dict_indexes = {}
- if "TTC" in self.metric_list:
- self.unsafe_time_ttc_df_statistic(obj_df)
- if "MTTC" in self.metric_list:
- self.unsafe_time_mttc_df_statistic(obj_df)
- if "THW" in self.metric_list:
- self.unsafe_time_thw_df_statistic(obj_df)
- unsafe_time_df = self.unsafe_time_df.copy()
- unsafe_time_df['type'] = "origin"
- unsafe_time_slices = unsafe_time_df.to_dict('records')
- for metric in self.metric_dict[type]:
- if metric in self.bulitin_metric_list:
- metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \
- self.most_dangerous[
- metric]
- metric_list = obj_df[metric].values.tolist()
- metric_vs_time = self.zip_time_pairs(metric_list)
- unsafe_metric_df = self.unsafe_time_df.copy()
- unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin"
- unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time"
- unsafe_metric_slices = unsafe_metric_df.to_dict('records')
- time_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": str(
- int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) ==
- self.pass_percent[metric] * 100 else round(
- self.pass_percent[metric] * 100, 2)) + '%'
- }
- if metric != 'TTC': # TTC in commonData
- metric_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": metric_vs_time,
- "range": f"[{self.optimal1_dict[metric]}, inf)",
- # "markLine": unsafe_metric_slices,
- # "markLine2": [self.optimal1_dict[metric]]
- }
- builtin_graph_dict[metric] = metric_data
- else:
- value = self.custom_data[metric]["value"][0]
- metric_extremum = upper_limit if value > upper_limit else value
- time_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- # "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": "-"
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- time_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- time_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- time_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- metric_data = self.custom_data[metric]["reportData"]
- custom_graph_dict[metric] = metric_data
- time_dict["indexes"] = time_dict_indexes
- time_dict["builtin"] = builtin_graph_dict
- time_dict["custom"] = custom_graph_dict
- # time_dict["disData"] = distance_vs_time
- # time_dict["disMarkLine"] = unsafe_time_slices
- # time_dict = {
- # "score": score_time,
- # "level": grade_time,
- # "description1": time_description1,
- # "description2": time_description2,
- # "indexes": time_dict_indexes,
- #
- # "disData": distance_vs_time,
- # "disMarkLine": unsafe_time_slices,
- #
- # "TTC": ttc_data,
- # "MTTC": mttc_data,
- # "THW": thw_data
- # }
- type_details_dict[type] = time_dict
- elif type == "safeDistance":
- distance_dict = {
- "name": self.type_name_dict[type],
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- # ------------------------------
- # distance metric dict
- score_distance = score_type_dict['safeDistance']
- grade_distance = score_grade(score_distance)
- distance_dict["score"] = score_distance
- distance_dict["level"] = grade_distance
- # safeDistance description
- unsafe_dist_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value < 80)]
- safe_dist_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value >= 80)]
- str_dist_over_optimal = ''
- str_dist_over_optimal_time = ''
- if not unsafe_dist_type_list:
- str_safe_dist_type = string_concatenate(safe_dist_type_list)
- distance_description1 = f"{str_safe_dist_type}指标均表现良好"
- distance_description2 = f"{str_safe_dist_type}指标均在合理范围内,表现良好"
- else:
- for metric in unsafe_dist_type_list:
- if metric in self.bulitin_metric_list:
- metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) /
- self.optimal1_dict[metric]) * 100
- str_dist_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,'
- metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[
- metric].count() / len_time
- str_dist_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;'
- else:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- self.custom_data[metric]["value"][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_dist_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;'
- str_dist_over_optimal = str_dist_over_optimal[:-1]
- str_dist_over_optimal_time = str_dist_over_optimal_time[:-1]
- str_safe_dist_type = string_concatenate(safe_dist_type_list)
- str_unsafe_dist_type = string_concatenate(unsafe_dist_type_list)
- if not safe_dist_type_list:
- distance_description1 = f"{str_unsafe_dist_type}指标表现不佳,{str_dist_over_optimal}"
- distance_description2 = f"{str_dist_over_optimal_time}"
- else:
- distance_description1 = f"{str_safe_dist_type}指标表现良好,{str_unsafe_dist_type}指标表现不佳,{str_dist_over_optimal}"
- distance_description2 = f"{str_safe_dist_type}指标均在合理范围内,表现良好,{str_dist_over_optimal_time}"
- distance_dict["description1"] = replace_key_with_value(distance_description1, self.name_dict)
- distance_dict["description2"] = replace_key_with_value(distance_description2, self.name_dict)
- safe_over_optimal_dict['safeDistance'] = str_dist_over_optimal_time
- # safeDistance extremum
- distance_dict_indexes = {}
- if "LonSD" in self.metric_list:
- self.unsafe_distance_lonsd_df_statistic(obj_df)
- if "LatSD" in self.metric_list:
- self.unsafe_distance_latsd_df_statistic(obj_df)
- unsafe_dist_df = self.unsafe_dist_df.copy()
- unsafe_dist_df['type'] = "origin"
- # unsafe_dist_slices = unsafe_dist_df.to_dict('records')
- for metric in self.metric_dict[type]:
- if metric in self.bulitin_metric_list:
- metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \
- self.most_dangerous[metric]
- metric_list = obj_df[metric].values.tolist()
- metric_vs_time = self.zip_time_pairs(metric_list)
- unsafe_metric_df = self.unsafe_dist_df.copy().dropna()
- unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin"
- unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "distance"
- unsafe_metric_slices = unsafe_metric_df.to_dict('records')
- distance_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": str(
- int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) ==
- self.pass_percent[metric] * 100 else round(
- self.pass_percent[metric] * 100, 2)) + '%'
- }
- metric_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": metric_vs_time,
- "range": f"[{self.optimal1_dict[metric]}, inf)",
- # "markLine": unsafe_metric_slices,
- # "markLine2": [self.optimal1_dict[metric]]
- }
- builtin_graph_dict[metric] = metric_data
- else:
- value = self.custom_data[metric]["value"][0]
- metric_extremum = upper_limit if value > upper_limit else value
- distance_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- # "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": "-"
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- distance_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- distance_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- distance_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- metric_data = self.custom_data[metric]["reportData"]
- custom_graph_dict[metric] = metric_data
- distance_dict["indexes"] = distance_dict_indexes
- distance_dict["builtin"] = builtin_graph_dict
- distance_dict["custom"] = custom_graph_dict
- type_details_dict[type] = distance_dict
- # distance_dict = {
- # "score": score_distance,
- # "level": grade_distance,
- # "description1": distance_description1,
- # "description2": distance_description2,
- # "indexes": distance_dict_indexes,
- #
- # "LonSD": lonsd_data,
- # "LatSD": latsd_data
- # }
- elif type == "safeAcceleration":
- acceleration_dict = {
- "name": self.type_name_dict[type],
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- # ------------------------------
- # acceleration metric dict
- score_acceleration = score_type_dict['safeAcceleration']
- grade_acceleration = score_grade(score_acceleration)
- acceleration_dict["score"] = score_acceleration
- acceleration_dict["level"] = grade_acceleration
- # safeAcceleration data for graph
- lat_dist_list = obj_df['lat_d'].values.tolist()
- lat_dist_vs_time = self.zip_time_pairs(lat_dist_list)
- lon_dist_list = obj_df['lon_d'].values.tolist()
- lon_dist_vs_time = self.zip_time_pairs(lon_dist_list)
- # safeAcceleration description
- unsafe_acce_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value < 80)]
- safe_acce_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value >= 80)]
- str_acce_over_optimal = ''
- str_acce_over_optimal_time = ''
- if not unsafe_acce_type_list:
- str_safe_acce_type = string_concatenate(safe_acce_type_list)
- acceleration_description1 = f"{str_safe_acce_type}指标均表现良好"
- acceleration_description2 = f"{str_safe_acce_type}指标均在合理范围内,表现良好"
- else:
- for metric in unsafe_acce_type_list:
- if metric in self.bulitin_metric_list:
- metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) /
- self.optimal1_dict[metric]) * 100
- str_acce_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,'
- metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[
- metric].count() / len_time
- str_acce_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;'
- else:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- self.custom_data[metric]["value"][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_acce_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;'
- str_acce_over_optimal = str_acce_over_optimal[:-1]
- str_acce_over_optimal_time = str_acce_over_optimal_time[:-1]
- str_safe_acce_type = string_concatenate(safe_acce_type_list)
- str_unsafe_acce_type = string_concatenate(unsafe_acce_type_list)
- if not safe_acce_type_list:
- acceleration_description1 = f"{str_unsafe_acce_type}指标表现不佳,{str_acce_over_optimal}"
- acceleration_description2 = f"{str_acce_over_optimal_time}"
- else:
- acceleration_description1 = f"{str_safe_acce_type}指标表现良好,{str_unsafe_acce_type}指标表现不佳,{str_acce_over_optimal}"
- acceleration_description2 = f"{str_safe_acce_type}指标均在合理范围内,表现良好,{str_acce_over_optimal_time}"
- acceleration_dict["description1"] = replace_key_with_value(acceleration_description1, self.name_dict)
- acceleration_dict["description2"] = replace_key_with_value(acceleration_description2, self.name_dict)
- safe_over_optimal_dict['safeAcceleration'] = str_acce_over_optimal_time
- # safeAcceleration extremum
- acceleration_dict_indexes = {}
- if "DRAC" in self.metric_list:
- self.unsafe_acceleration_drac_df_statistic(obj_df)
- if "BTN" in self.metric_list:
- self.unsafe_acceleration_btn_df_statistic(obj_df)
- if "STN" in self.metric_list:
- self.unsafe_acceleration_stn_df_statistic(obj_df)
- unsafe_acce_drac_df = self.unsafe_acce_drac_df.copy().dropna()
- unsafe_acce_drac_df['type'] = "origin"
- unsafe_acce_drac_slices = unsafe_acce_drac_df.to_dict('records')
- unsafe_acce_xtn_df = self.unsafe_acce_xtn_df.copy()
- unsafe_acce_xtn_df['type'] = "origin"
- unsafe_acce_xtn_slices = unsafe_acce_xtn_df.to_dict('records')
- for metric in self.metric_dict[type]:
- if metric in self.bulitin_metric_list:
- metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \
- self.most_dangerous[metric]
- metric_list = obj_df[metric].values.tolist()
- metric_vs_time = self.zip_time_pairs(metric_list)
- if metric == "DRAC":
- unsafe_metric_df = self.unsafe_acce_drac_df.copy().dropna()
- else:
- unsafe_metric_df = self.unsafe_acce_xtn_df.copy().dropna()
- unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin"
- unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time"
- unsafe_metric_slices = unsafe_metric_df.to_dict('records')
- acceleration_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- "range": f"[0, {self.optimal1_dict[metric]}]",
- "rate": str(
- int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) ==
- self.pass_percent[metric] * 100 else round(
- self.pass_percent[metric] * 100, 2)) + '%'
- }
- metric_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": metric_vs_time,
- "range": f"[0, {self.optimal1_dict[metric]}]",
- # "markLine": unsafe_metric_slices,
- # "markLine2": [self.optimal1_dict[metric]]
- }
- builtin_graph_dict[metric] = metric_data
- else:
- value = self.custom_data[metric]["value"][0]
- metric_extremum = upper_limit if value > upper_limit else value
- acceleration_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- # "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": "-"
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- acceleration_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- acceleration_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- acceleration_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- metric_data = self.custom_data[metric]["reportData"]
- custom_graph_dict[metric] = metric_data
- acceleration_dict["indexes"] = acceleration_dict_indexes
- acceleration_dict["builtin"] = builtin_graph_dict
- acceleration_dict["custom"] = custom_graph_dict
- # acceleration_dict["disData"] = distance_vs_time
- # acceleration_dict["disMarkLine"] = unsafe_acce_drac_slices
- # acceleration_dict["dis2Data"] = [lat_dist_vs_time, lon_dist_vs_time]
- # acceleration_dict["dis2MarkLine"] = unsafe_acce_xtn_slices
- type_details_dict[type] = acceleration_dict
- # acceleration_dict = {
- # "score": score_acceleration,
- # "level": grade_acceleration,
- # "description1": acceleration_description1,
- # "description2": acceleration_description2,
- # "indexes": acceleration_dict_indexes,
- #
- # "disData": distance_vs_time,
- # "disMarkLine": unsafe_acce_drac_slices,
- #
- # "dis2Data": [lat_dist_vs_time, lon_dist_vs_time],
- # "dis2MarkLine": unsafe_acce_xtn_slices,
- #
- # "DRAC": drac_data,
- # "BTN": btn_data,
- # "STN": stn_data
- # }
- elif type == "safeProbability":
- probability_dict = {
- "name": self.type_name_dict[type],
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- # ------------------------------
- # probability metric dict
- score_probability = score_type_dict['safeProbability']
- grade_probability = score_grade(score_probability)
- probability_dict["score"] = score_probability
- probability_dict["level"] = grade_probability
- # safeProbability description
- unsafe_prob_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value < 80)]
- safe_prob_type_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value >= 80)]
- str_prob_over_optimal = ''
- str_prob_over_optimal_time = ''
- if not unsafe_prob_type_list:
- str_safe_prob_type = string_concatenate(safe_prob_type_list)
- probability_description1 = f"{str_safe_prob_type}指标均表现良好"
- probability_description2 = f"{str_safe_prob_type}指标均在合理范围内,表现良好"
- else:
- for metric in unsafe_prob_type_list:
- if metric in self.bulitin_metric_list:
- metric_over_optimal = ((self.optimal1_dict[metric] - self.most_dangerous[metric]) /
- self.optimal1_dict[metric]) * 100
- str_prob_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%,'
- metric_over_optimal_time = (1 - self.pass_percent[metric]) * duration * self.df[
- metric].count() / len_time
- str_prob_over_optimal_time += f'{metric}指标共有{metric_over_optimal_time:.2f}秒超出合理范围;'
- else:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- self.custom_data[metric]["value"][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_prob_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;'
- str_prob_over_optimal = str_prob_over_optimal[:-1]
- str_prob_over_optimal_time = str_prob_over_optimal_time[:-1]
- str_safe_prob_type = string_concatenate(safe_prob_type_list)
- str_unsafe_prob_type = string_concatenate(unsafe_prob_type_list)
- if not safe_prob_type_list:
- probability_description1 = f"{str_unsafe_prob_type}指标表现不佳,{str_prob_over_optimal}"
- probability_description2 = f"{str_prob_over_optimal_time}"
- else:
- probability_description1 = f"{str_safe_prob_type}指标表现良好,{str_unsafe_prob_type}指标表现不佳,{str_prob_over_optimal}"
- probability_description2 = f"{str_safe_prob_type}指标均在合理范围内,表现良好,{str_prob_over_optimal_time}"
- probability_dict["description1"] = replace_key_with_value(probability_description1, self.name_dict)
- probability_dict["description2"] = replace_key_with_value(probability_description2, self.name_dict)
- safe_over_optimal_dict['safeProbability'] = str_prob_over_optimal_time
- # safeProbability extremum
- probability_dict_indexes = {}
- if "collisionRisk" in self.metric_list:
- self.unsafe_probability_cr_df_statistic(obj_df)
- if "collisionSeverity" in self.metric_list:
- self.unsafe_probability_cs_df_statistic(obj_df)
- unsafe_prob_df = self.unsafe_prob_df.copy().dropna()
- unsafe_prob_df['type'] = "origin"
- unsafe_prob_slices = unsafe_prob_df.to_dict('records')
- for metric in self.metric_dict[type]:
- if metric in self.bulitin_metric_list:
- metric_extremum = upper_limit if self.most_dangerous[metric] > upper_limit else \
- self.most_dangerous[
- metric]
- metric_list = obj_df[metric].values.tolist()
- metric_vs_time = self.zip_time_pairs(metric_list)
- unsafe_metric_df = self.unsafe_prob_df.copy().dropna()
- unsafe_metric_df.loc[unsafe_metric_df['type'] != metric, 'type'] = "origin"
- unsafe_metric_df.loc[unsafe_metric_df['type'] == metric, 'type'] = "time"
- unsafe_metric_slices = unsafe_metric_df.to_dict('records')
- probability_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- "range": f"[0, {self.optimal1_dict[metric]}]",
- "rate": str(
- int(self.pass_percent[metric] * 100) if int(self.pass_percent[metric] * 100) ==
- self.pass_percent[metric] * 100 else round(
- self.pass_percent[metric] * 100, 2)) + '%'
- }
- metric_data = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "data": metric_vs_time,
- "range": f"[0, {self.optimal1_dict[metric]}]",
- # "markLine": unsafe_metric_slices,
- # "markLine2": [self.optimal1_dict[metric]]
- }
- builtin_graph_dict[metric] = metric_data
- else:
- value = self.custom_data[metric]["value"][0]
- metric_extremum = upper_limit if value > upper_limit else value
- probability_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- # "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": "-"
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- probability_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- probability_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- probability_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- metric_data = self.custom_data[metric]["reportData"]
- custom_graph_dict[metric] = metric_data
- probability_dict["indexes"] = probability_dict_indexes
- probability_dict["builtin"] = builtin_graph_dict
- probability_dict["custom"] = custom_graph_dict
- # probability_dict["ttcData"] = ttc_vs_time
- # probability_dict["ttcMarkLine"] = unsafe_prob_slices
- type_details_dict[type] = probability_dict
- # probability_dict = {
- # "score": score_probability,
- # "level": grade_probability,
- # "description1": probability_description1,
- # "description2": probability_description2,
- # "indexes": probability_dict_indexes,
- #
- # "ttcData": ttc_vs_time,
- # "ttcMarkLine": unsafe_prob_slices,
- #
- # "collisionRisk": cr_data,
- # "collisionSeverity": cs_data
- # }
- else:
- bad_type_list.append(type) if score_type_dict[type] < 80 else good_type_list.append(type)
- type_dict = {
- "name": self.type_name_dict[type],
- }
- builtin_graph_dict = {}
- custom_graph_dict = {}
- # get score and grade
- score_custom_type = score_type_dict[type]
- grade_custom_type = score_grade(score_custom_type)
- type_dict["score"] = score_custom_type
- type_dict["level"] = grade_custom_type
- # custom type description
- bad_custom_metric_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value < 80)]
- good_custom_metric_list = [key for key, value in score_metric_dict.items() if
- (key in self.metric_dict[type]) and (value >= 80)]
- str_type_over_optimal = ''
- str_type_over_optimal_time = ''
- if not bad_custom_metric_list:
- str_safe_type_type = string_concatenate(good_custom_metric_list)
- type_description1 = f'{str_safe_type_type}指标均表现良好'
- type_description2 = f'{str_safe_type_type}指标均在合理范围内,表现良好'
- else:
- for metric in bad_custom_metric_list:
- metric_over_optimal = ((self.custom_param_dict[metric]['optimal'][0] -
- self.custom_data[metric]["value"][0]) /
- self.custom_param_dict[metric]['optimal'][0]) * 100
- str_type_over_optimal += f'{self.name_dict[metric]}极值超过合理范围{metric_over_optimal:.2f}%;'
- str_type_over_optimal = str_type_over_optimal[:-1]
- str_type_over_optimal_time = str_type_over_optimal_time[:-1]
- str_safe_type_type = string_concatenate(good_custom_metric_list)
- str_unsafe_type_type = string_concatenate(bad_custom_metric_list)
- if not good_custom_metric_list:
- type_description1 = f"{str_unsafe_type_type}指标表现不佳,{str_type_over_optimal}"
- type_description2 = f"{str_type_over_optimal_time},算法应加强在该时间段对跟车距离的控制"
- else:
- type_description1 = f"{str_safe_type_type}指标表现良好,{str_unsafe_type_type}指标表现不佳,{str_type_over_optimal}"
- type_description2 = f"{str_safe_type_type}指标均在合理范围内,表现良好,{str_type_over_optimal_time},算法应加强在该时间段对跟车距离的控制"
- type_dict["description1"] = replace_key_with_value(type_description1, self.name_dict)
- type_dict["description2"] = replace_key_with_value(type_description2, self.name_dict)
- type_dict_indexes = {}
- for metric in self.metric_dict[type]:
- value = self.custom_data[metric]["value"][0]
- metric_extremum = upper_limit if value > upper_limit else value
- type_dict_indexes[metric] = {
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "meaning": f"{self.name_dict[metric]}",
- "score": score_metric_dict[metric],
- "extremum": f'{metric_extremum:.2f}',
- # "range": f"[{self.optimal1_dict[metric]}, inf)",
- "rate": "-"
- }
- if self.custom_param_dict[metric]['kind'][0] == -1:
- type_dict_indexes[metric][
- "range"] = f"[0, {self.custom_param_dict[metric]['optimal'][0]}]"
- elif self.custom_param_dict[metric]['kind'][0] == 1:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0]}, inf)"
- elif self.custom_param_dict[metric]['kind'][0] == 0:
- type_dict_indexes[metric][
- "range"] = f"[{self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][0]}, {self.custom_param_dict[metric]['optimal'][0] * self.custom_param_dict[metric]['multiple'][0][1]}]"
- metric_data = self.custom_data[metric]["reportData"]
- custom_graph_dict[metric] = metric_data
- type_dict["indexes"] = type_dict_indexes
- type_dict["builtin"] = builtin_graph_dict
- type_dict["custom"] = custom_graph_dict
- type_details_dict[type] = type_dict
- report_dict["details"] = type_details_dict
- # good_type_list = []
- # bad_type_list = []
- #
- # for type in self.type_list:
- # bad_type_list.append(type) if any(i < 80 for key, i in score_metric_dict.items() if
- # key in self.metric_dict[type]) else good_type_list.append(type)
- unsafe_time = self.df['unsafe_flag'].sum() * duration / len_time
- unsafe_time = round(unsafe_time, 2)
- safe_over_optimal = [value for key, value in safe_over_optimal_dict.items() if value] # 解决空字符串多逗号问题
- str_safe_over_optimal = ';'.join(safe_over_optimal)
- if grade_safe == '优秀':
- safe_description1 = '车辆在本轮测试中无碰撞风险;'
- elif grade_safe == '良好':
- safe_description1 = '算法在本轮测试中的表现满足设计指标要求;'
- elif grade_safe == '一般':
- str_unsafe_type = string_concatenate(bad_type_list)
- safe_description1 = f'未满足设计指标要求。算法需要在{str_unsafe_type}上进一步优化。在仿真过程中共有{unsafe_time}s的时间处于危险状态。' \
- f'在{str_unsafe_type}中,{str_safe_over_optimal};'
- elif grade_safe == '较差':
- str_unsafe_type = string_concatenate(bad_type_list)
- safe_description1 = f'未满足设计指标要求。算法在本轮测试中有碰撞风险,需要提高算法在{str_unsafe_type}上的表现。在{str_unsafe_type}中,' \
- f'{str_safe_over_optimal};'
- if not bad_type_list:
- safe_description2 = '安全性在各个指标上的表现俱佳。'
- else:
- str_unsafe_type = string_concatenate(bad_type_list)
- safe_description2 = f"安全性在{str_unsafe_type}上存在严重风险,需要重点优化。"
- report_dict["description1"] = replace_key_with_value(safe_description1, self.name_dict)
- report_dict["description1"] = replace_key_with_value(safe_description1, self.type_name_dict)
- report_dict["description2"] = replace_key_with_value(safe_description2, self.type_name_dict)
- report_dict['commonData'] = {
- "per": {
- "name": "脚刹/油门踏板开度(百分比)",
- "legend": ["刹车踏板开度", "油门踏板开度"],
- "data": [brake_vs_time, throttle_vs_time]
- },
- "ang": {
- "name": "方向盘转角(角度°)",
- "data": steering_vs_time
- },
- "spe": {
- "name": "速度(km/h)",
- "legend": ["自车速度", "目标车速度", "自车与目标车相对速度"],
- "data": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time]
- },
- "acc": {
- "name": "加速度(m/s²)",
- "legend": ["横向加速度", "纵向加速度"],
- "data": [lat_acc_vs_time, lon_acc_vs_time]
- },
- "dis": {
- "name": "前车距离(m)",
- "data": distance_vs_time
- },
- "ttc": {
- "name": 'TTC(m)',
- "data": ttc_vs_time
- }
- }
- # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_time_df], ignore_index=True)
- # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_dist_df], ignore_index=True)
- # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_acce_drac_df], ignore_index=True)
- # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_acce_xtn_df], ignore_index=True)
- # self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_prob_df], ignore_index=True)
- self.unsafe_df = pd.concat([self.unsafe_df, self.unsafe_time_df, self.unsafe_dist_df, self.unsafe_acce_drac_df,
- self.unsafe_acce_xtn_df, self.unsafe_prob_df], ignore_index=True)
- unsafe_df = self.unsafe_df.copy().dropna()
- unsafe_slices = unsafe_df.to_dict('records')
- report_dict["commonMarkLine"] = unsafe_slices
- # report_dict = {
- # "name": "安全性",
- # "weight": f"{self.weight * 100:.2f}%",
- # "weightDistribution": weight_distribution,
- # "score": score_safe,
- # "level": grade_safe,
- # 'collisionRisk': self.collisionRisk,
- # "description1": safe_description1,
- # "description2": safe_description2,
- # "noObjectCar": False,
- #
- # "safeTime": time_dict,
- # "safeDistance": distance_dict,
- # "safeAcceleration": acceleration_dict,
- # "safeProbability": probability_dict,
- #
- # "speData": [ego_speed_vs_time, obj_speed_vs_time, rel_speed_vs_time],
- # "accData": [lat_acc_vs_time, lon_acc_vs_time],
- #
- # }
- return report_dict
- def get_eval_data(self):
- if not self.eval_data.empty and not self.empty_flag:
- df = self.eval_data[
- ['simTime', 'simFrame', 'playerId', 'dist', 'lon_d', 'lat_d', 'lat_v_rel', 'lon_v_rel',
- 'vrel_projection_in_dist', 'arel_projection_in_dist', 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD',
- 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death', 'collisionRisk']].copy()
- elif 'dist' in self.eval_data.columns:
- df = self.eval_data[
- ['simTime', 'simFrame', 'playerId', 'dist', 'lon_d', 'lat_d']].copy()
- else:
- df = self.eval_data.copy()
- return df
|