#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/03 @Last Modified: 2023/08/03 @Summary: Functionality metrics """ import os import math import numpy as np import pandas as pd from collections import defaultdict import scipy.integrate as spi from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \ OVERSPEED_THRESHOLD from common import score_grade, string_concatenate, replace_key_with_value, score_over_100, _cal_max_min_avg from score_weight import cal_score_with_priority, cal_weight_from_80 class Safe(object): """ Class for achieving safe metrics for autonomous driving. Attributes: df: Vehicle driving data, stored in dataframe format. df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel. Methods: _safe_param_cal_new: Calculate parameters for evaluateion. _cal_v_ego_projection: Calculate ego car velocity project over distance. _cal_v_projection: Calculate relative velocity project over distance. _cal_a_projection: Calculate relative acceleration project over distance. _calculate_derivative: Calculate derivative. _cal_relative_angular_v: Calculate relative angular velocity. _death_pr: Calculate death probability. _cal_collisionRisk_level: Calculate collisionRisk level. dist: Calculate distance of two cars. """ def __init__(self, data_processed, scoreModel, resultPath): # self.eval_data = pd.DataFrame() # self.data_processed = data_processed self.scoreModel = scoreModel self.resultPath = resultPath self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_df # print("self.ego_df is", self.ego_df) self.obj_id_list = data_processed.obj_id_list # print("self.obj_id_list is", self.obj_id_list) # # # config infos for calculating score self.config = data_processed.config # print("self.config is", self.config) safe_config = self.config.config['safe'] self.safe_config = safe_config # # common data self.bulitin_metric_list = self.config.builtinMetricList # # # dimension data self.weight_custom = safe_config['weightCustom'] self.metric_list = safe_config['metric'] self.type_list = safe_config['type'] self.type_name_dict = safe_config['typeName'] self.name_dict = safe_config['name'] self.unit_dict = safe_config['unit'] # # custom metric data # # self.customMetricParam = safe_config['customMetricParam'] # # self.custom_metric_list = list(self.customMetricParam.keys()) # # self.custom_data = {} # # self.custom_param_dict = {} # # # score data self.weight = safe_config['weightDimension'] self.weight_type_dict = safe_config['typeWeight'] self.weight_type_list = safe_config['typeWeightList'] self.weight_dict = safe_config['weight'] self.weight_list = safe_config['weightList'] self.priority_dict = safe_config['priority'] self.priority_list = safe_config['priorityList'] self.kind1_dict = safe_config['kind'] # self.kind1_dict = self.kind_dict[0] self.optimal1_dict = safe_config['optimal'] # self.optimal1_dict = self.optimal_dict[0] self.multiple_dict = safe_config['multiple'] self.kind_list = safe_config['kindList'] self.optimal_list = safe_config['optimalList'] self.multiple_list = safe_config['multipleList'] self.metric_dict = safe_config['typeMetricDict'] # # self.time_metric_list = self.metric_dict['safeTime'] # # self.distance_metric_list = self.metric_dict['safeDistance'] # # # lists of drving control info # self.time_list = data_processed.driver_ctrl_data['time_list'] # self.frame_list = self.df['simFrame'].values.tolist() if self.df['simFrame'].values.tolist()[-1] <= self.ego_df['simFrame'].values.tolist()[-1]: self.frame_list = self.df['simFrame'].values.tolist() else: self.frame_list = self.ego_df['simFrame'].values.tolist() self.collision_dist_list = [] self.collisionRisk_list = [] self.collisionSeverity_list = [] self.overSpeed_list = [] self.collision_risk_dict = dict() self.collision_severity_dict = dict() self.over_speed_dict = dict() self.collisionCount = 0 self.collisionRisk = 0 self.collisionSeverity = 0 self.overSpeed = 0 self.empty_flag = True if len(self.obj_id_list) > 1: # self.most_dangerous = {} # self.pass_percent = {} self._safe_metric_cal() self._overspeed_cal() # # no car following scene # if len(self.obj_id_list) > 1: # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_acce_drac_df = pd.DataFrame( # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_acce_xtn_df = pd.DataFrame( # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.most_dangerous = {} # self.pass_percent = {} # self._safe_param_cal_new() def _overspeed_cal(self): """ :return: """ self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist() if self.overSpeed_list: max_speed = max(self.overSpeed_list) overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100 self.overSpeed = round(overspeed, 2) else: self.overSpeed = 0 def _safe_metric_cal(self): """ """ self.collisionRisk_list = [] self.collision_dist_list = [] self.collisionSeverity_list = [] Tc = 0.3 rho = 0.3 # 驾驶员制动反应时间 ego_accel_max = 6 # 自车油门最大加速度 obj_decel_max = 8 # 前车刹车最大减速度 ego_decel_min = 1 # 自车刹车最小减速度 ug ego_decel_lon_max = 8 ego_decel_lat_max = 1 # 构建双层字典数据结构 obj_dict = defaultdict(dict) obj_data_dict = self.df.to_dict('records') for item in obj_data_dict: obj_dict[item['simFrame']][item['playerId']] = item # print("obj_dict is", obj_dict) df_list = [] EGO_PLAYER_ID = 1 self.empty_flag = True # print("obj_dict[1] is", self.frame_list) for frame_num in self.frame_list: # ego_data = obj_dict[frame_num][EGO_PLAYER_ID] # ego_data = self.ego_df.iloc[frame_num] ego_data = self.ego_df[self.ego_df['simFrame'] == frame_num] # if frame_num == 1: # print("ego_data is", ego_data) v1 = ego_data['v'] # km/h to m/s # print("ego_data['posX'].values is", ego_data['posX'].values[0]) x1 = ego_data['posX'].values[0] # print("x1 is", x1) y1 = ego_data['posY'].values[0] # print("x1 and y1 are", x1, y1) h1 = ego_data['yaw'] # len1 = 2.3 # 随手录入的数值 # width1 = ego_data['dimY'] # # o_x1 = ego_data['offX'] v_x1 = ego_data['lon_v'].values[0] # km/h to m/s # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s v_y1 = 0 # 随手录入的数值 a_x1 = ego_data['speedH'].values[0] a_y1 = 0 # 随手录入的数值 # a_y1 = ego_data['accelY'] # a1 = ego_data['accel'] # print("calculate is beginning...") # print("self.obj_id_list is", self.obj_id_list) for playerId in self.obj_id_list: dist_list = [] # print("playerId is", playerId) if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] except KeyError: continue x2 = obj_data['posX'] y2 = obj_data['posY'] # print(" x2 and y2 are", x2, y2) # print("x2 is", y2) dist = self.dist(x1, y1, x2, y2) # print("dist is", dist) # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险 if dist <= DANGEROUS_DIST: dist_list.append(0) else: dist_list.append(1) obj_data['dist'] = dist # print("obj_data['dist'] is", obj_data['dist']) v_x2 = obj_data['lon_v'] # km/h to m/s v_y2 = obj_data['lat_v'] # km/h to m/s v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2) # v2 = obj_data['v'] / 3.6 # km/h to m/s # h2 = obj_data['posH'] # len2 = obj_data['dimX'] # width2 = obj_data['dimY'] # o_x2 = obj_data['offX'] # a_x2 = obj_data['accelX'] # a_y2 = obj_data['accelY'] a_x2 = obj_data['lon_acc'] a_y2 = obj_data['lat_acc'] # a2 = obj_data['accel'] dx, dy = x2 - x1, y2 - y1 # 定义矢量A和x轴正向向量x A = np.array([dx, dy]) # print("A is", A) x = np.array([1, 0]) # 计算点积和向量长度 dot_product = np.dot(A, x) vector_length_A = np.linalg.norm(A) vector_length_x = np.linalg.norm(x) # 计算夹角的余弦值 cos_theta = dot_product / (vector_length_A * vector_length_x) # 将余弦值转换为角度值(弧度制) beta = np.arccos(cos_theta) # 如何通过theta正负确定方向 lon_d = dist * math.cos(beta - h1) lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi) obj_dict[frame_num][playerId]['lon_d'] = lon_d obj_dict[frame_num][playerId]['lat_d'] = lat_d if lon_d > 5 or lon_d < -2 or lat_d > 1: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2) obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p # obj_type = obj_data['type'] TTC = self._cal_TTC(dist, vrel_projection_in_dist) # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist) # THW = self._cal_THW(dist, v_ego_p) # 单车道时可用 # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, # obj_decel_max) # lat_dist = 0.5 # v_right = v1 # v_left = v2 # a_right_lat_brake_min = 1 # a_left_lat_brake_min = 1 # a_lat_max = 5 # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min, # a_left_lat_brake_min, # a_lat_max) # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2) lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1) lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1) # lon_a = abs(lon_a1 - lon_a2) # lon_d = dist * abs(math.cos(beta - h1)) # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1) # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1) lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1) lat_a = abs(lat_a1 - lat_a2) lat_d = dist * abs(math.sin(beta - h1)) lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1) # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2) obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2 obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2 # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay) # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2) TTC = None if (not TTC or TTC < 0) else TTC # MTTC = None if (not MTTC or MTTC < 0) else MTTC # THW = None if (not THW or THW < 0) else THW # # DRAC = 10 if DRAC >= 10 else DRAC # print("calculate is beginning...") if not TTC or TTC > 4000: # threshold = 4258.41 collisionSeverity = 0 pr_death = 0 collisionRisk = 0 else: result, error = spi.quad(self._normal_distribution, 0, TTC - Tc) collisionSeverity = 1 - result # print("collisionSeverity is", collisionSeverity) # pr_death = self._death_pr(obj_type, vrel_projection_in_dist) pr_death = self._death_pr(vrel_projection_in_dist) collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity # print("collinsionRisk is", collisionRisk) self.collisionRisk_list.append(collisionRisk) self.collisionSeverity_list.append(collisionSeverity) self.collision_dist_list.append(max(dist_list)) # print("obj_data['dist'] is", collision_dist_list) prev_value = self.collision_dist_list[0] # 初始化前一个元素为列表的第一个元素 # 遍历列表(从第二个元素开始) for i in range(1, len(self.collision_dist_list)): current_value = self.collision_dist_list[i] # 当前元素 # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0 if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0): self.collisionCount += 1 # 增加0的段的计数 self.collisionRisk = max(self.collisionRisk_list) self.collisionSeverity = max(self.collisionSeverity_list) print("collision count is", self.collisionCount) print("collisionRisk is", self.collisionRisk) print("collisionSeverity is", self.collisionSeverity) def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy return V_on_AB def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx ** 2 + vy ** 2) AB_mod = math.sqrt(dx ** 2 + dy ** 2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2) return angular_velocity # def _death_pr(self, obj_type, v_relative): def _death_pr(self, v_relative): # if obj_type == 5: # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) # else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): def _cal_collisionRisk_level(self, v_relative, collisionSeverity): # if obj_type == 5: # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) # else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist def _cal_count_(self, dist): count = 0 # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): if vrel_projection_in_dist == 0: return math.inf TTC = dist / vrel_projection_in_dist return TTC def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev) def _safe_statistic(self): """ """ self.collision_risk_dict = _cal_max_min_avg(self.collisionRisk_list) self.collision_severity_dict = _cal_max_min_avg(self.collisionSeverity_list) self.over_speed_dict = _cal_max_min_avg(self.overSpeed_list) arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]] return arr_safe def safe_score_new(self): """ """ score_metric_dict = {} score_type_dict = {} arr_safe = self._safe_statistic() print("\n[安全性表现及得分情况]") print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe]) if arr_safe: arr_safe = np.array(arr_safe) score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe) score_sub = score_model.cal_score() metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList] # metric_num = len(metric_list) if len(self.obj_id_list) > 1 and not self.empty_flag: # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]] pass else: score_sub = [80] * len(score_sub) score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判 score_metric = score_sub score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)} score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list} score_metric = list(score_metric_dict.values()) if self.weight_custom: # 用户自定义权重 score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in self.weight_dict} for type in self.type_list: type_score = sum( value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type]) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in score_type_dict} score_safe = sum(score_type_with_weight_dict.values()) else: # 动态客观赋权 self.weight_list = cal_weight_from_80(score_metric) self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)} score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list) for type in self.type_list: type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type]) for key, value in self.weight_dict.items(): if key in self.metric_dict[type]: # self.weight_dict[key] = round(value / type_weight, 4) self.weight_dict[key] = value / type_weight type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]] type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]] type_priority_list = [value for key, value in self.priority_dict.items() if key in self.metric_dict[type]] type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list) score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100 for key in self.weight_dict: self.weight_dict[key] = round(self.weight_dict[key], 4) score_type = list(score_type_dict.values()) self.weight_type_list = cal_weight_from_80(score_type) self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)} score_safe = round(score_safe, 2) # score_type = [round(x, 2) for key, x in score_type_dict.items()] # score_metric = [round(x, 2) for key, x in score_metric_dict.items()] print("安全性各指标基准值:", self.optimal_list) print(f"安全性得分为:{score_safe:.2f}分。") print(f"安全性各类型得分为:{score_type_dict}。") print(f"安全性各指标得分为:{score_metric_dict}。") # return score_safe, score_type, score_metric return score_safe, score_type_dict, score_metric_dict def report_statistic(self): report_dict = { "name": "安全性", "weight": f"{self.weight * 100:.2f}%", # 'collisionRisk': self.collisionRisk, # "noObjectCar": True, } # No objects if self.obj_id_list == 0: score_safe = 80 grade_safe = "良好" score_type_dict = { "safeCollision": 80, "safeSpeed": 80 } score_metric_dict = { "collisionCount": 80, "collisionRisk": 80, "collisionSeverity": 80, "overSpeed": 80 } description = "安全性在无目标车情况下表现良好。" description1 = "次数:0次" description2 = f"最大值:0%;" \ f"最小值:0%;" \ f"平均值:0%" description3 = f"最大值:0%;" \ f"最小值:0%;" \ f"平均值:0%" description4 = f"最大值:{100 * self.over_speed_dict['max']}%;" \ f"最小值:{100 * self.over_speed_dict['min']}%;" \ f"平均值:{100 * self.over_speed_dict['avg']}%" # with objects else: score_safe, score_type_dict, score_metric_dict = self.safe_score_new() score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2) grade_safe = score_grade(score_safe) description = f"· 在安全性方面,得分{score_safe}分,表现{grade_safe}," is_good = True if self.collisionCount > 0: is_good = False description += f"出现{self.collisionCount}次碰撞,需重点优化。" if self.over_speed_dict['max'] > 0: is_good = False description += f"超速比例最大值{100 * self.over_speed_dict['max']}%,需重点优化。" if is_good: description += "速度控制良好。" description1 = f"次数:{self.collisionCount}次" description2 = f"最大值:{self.collision_risk_dict['max']}%;" \ f"最小值:{self.collision_risk_dict['min']}%;" \ f"平均值:{self.collision_risk_dict['avg']}%" description3 = f"最大值:{self.collision_severity_dict['max']}%;" \ f"最小值:{self.collision_severity_dict['min']}%;" \ f"平均值:{self.collision_severity_dict['avg']}%" description4 = f"最大值:{self.over_speed_dict['max']}%;" \ f"最小值:{self.over_speed_dict['min']}%;" \ f"平均值:{self.over_speed_dict['avg']}%" report_dict["score"] = score_safe report_dict["level"] = grade_safe report_dict["description"] = replace_key_with_value(description, self.name_dict) collisionCount_index = { "weight": self.weight_dict['collisionCount'], "score": score_metric_dict['collisionCount'], "description": description1 } collisionRisk_index = { "weight": self.weight_dict['collisionRisk'], "score": score_metric_dict['collisionRisk'], "description": description2 } collisionSeverity_index = { "weight": self.weight_dict['collisionSeverity'], "score": score_metric_dict['collisionSeverity'], "description": description3 } overSpeed_index = { "weight": self.weight_dict['overSpeed'], "score": score_metric_dict['overSpeed'], "description": description4 } indexes_dict = { "collisionCount": collisionCount_index, "collisionRisk": collisionRisk_index, "collisionSeverity": collisionSeverity_index, "overSpeed": overSpeed_index } report_dict["indexes"] = indexes_dict print(report_dict) return report_dict # if __name__ == "__main__": # pass # # print("---started---") # count = 0 # dataPath = './task/case0613' # 设置路径 # configPath = r"./config_safe.json" # config = ConfigParse(configPath) # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些 # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性 # safe_indicator = Safe(data_processed) # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new() # # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素 # # # 遍历列表(从第二个元素开始) # for i in range(1, len(collision_dist_count)): # current_value = collision_dist_count[i] # 当前元素 # # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0 # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0): # count += 1 # 增加0的段的计数 # print("collisionRisk is", max(collisionRisk_list)) # print("collisionSeverity is", max(collisionSeverity_list)) # print("collision count is", count) # print("---end---")