#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2023/07/25 @Last Modified: 2023/07/26 @Summary: safe metrics """ import math import numpy as np import pandas as pd from modules.lib.score import Score from modules.config import config from collections import defaultdict import scipy.integrate as spi class Safe(object): def __init__(self, data_processed): # self.logger = log.get_logger() # 使用时再初始化 self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data.copy() self.obj_id_list = data_processed.obj_id_list self.metric_list = [ "TTC", "MTTC", "THW", "LatSD", "BTN", "collisionRisk", "collisionSeverity", ] # score data self.calculated_value = { "TTC": 10.0, "MTTC": 10.0, "THW": 10.0, # 'LonSD': 0.0, "LatSD": 3.0, # 'DRAC': 0.0, "BTN": 1.0, # 'STN': 1.0, "collisionRisk": 0.0, "collisionSeverity": 0.0, } # lists of drving control info self.time_list = self.ego_df["simTime"].values.tolist() self.frame_list = self.ego_df["simFrame"].values.tolist() self.collisionRisk = 0 self.empty_flag = True # no car following scene if len(self.obj_id_list) > 1: self.unsafe_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_time_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_dist_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_drac_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_acce_xtn_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_prob_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.most_dangerous = {} self.pass_percent = {} self._safe_param_cal_new() def _safe_param_cal_new(self): Tc = 0.3 # 安全距离 rho = Tc # 驾驶员制动反应时间(单位:秒) ego_accel_max = self.data_processed.vehicle_config[ "EGO_ACCEL_MAX" ] # 自车油门最大加速度 obj_decel_max = self.data_processed.vehicle_config[ "OBJ_DECEL_MAX" ] # 前车刹车最大减速度 ego_decel_min = self.data_processed.vehicle_config[ "EGO_DECEL_MIN" ] # 自车刹车最小减速度 ug ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"] ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"] # 构建双层字典数据结构 obj_dict = defaultdict(dict) # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对 obj_data_dict = self.df.to_dict("records") for item in obj_data_dict: obj_dict[item["simFrame"]][item["playerId"]] = item df_list = [] EGO_PLAYER_ID = 1 # self.empty_flag = True for frame_num in self.frame_list: ego_data = obj_dict[frame_num][EGO_PLAYER_ID] v1 = ego_data["v"] x1 = ego_data["posX"] y1 = ego_data["posY"] h1 = ego_data["posH"] # 将角度转换为弧度 # len1 = ego_data['dimX'] # width1 = ego_data['dimY'] # o_x1 = ego_data['offX'] v_x1 = ego_data["speedX"] v_y1 = ego_data["speedY"] a_x1 = ego_data["accelX"] a_y1 = ego_data["accelY"] # a1 = ego_data['accel'] for playerId in self.obj_id_list: if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] except KeyError: continue x2 = obj_data["posX"] y2 = obj_data["posY"] dist = self.dist(x1, y1, x2, y2) obj_data["dist"] = dist v_x2 = obj_data["speedX"] # m/s v_y2 = obj_data["speedY"] # m/s v2 = obj_data["v"] # m/s # h2 = obj_data['posH'] # len2 = obj_data['dimX'] # width2 = obj_data['dimY'] # o_x2 = obj_data['offX'] a_x2 = obj_data["accelX"] a_y2 = obj_data["accelY"] # a2 = obj_data['accel'] dx, dy = x2 - x1, y2 - y1 # 计算目标车相对于自车的方位角 beta = math.atan2(dy, dx) # 添加beta的计算 # 将全局坐标系下的相对位置向量转换到自车坐标系 # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向 h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角 # 坐标变换 lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负) lat_d = -dx * math.sin(h1_rad) + dy * math.cos(h1_rad) # 横向距离(左为正,右为负) # 添加调试信息 # if frame_num == self.frame_list[-1]: # 判断是否是最后一帧 # print(f"最后一帧数据 frame_num={frame_num}:") # print(f"目标车ID: {playerId}") # print(f"自车位置: ({x1:.2f}, {y1:.2f}), 朝向角: {h1:.2f}度") # print(f"目标车位置: ({x2:.2f}, {y2:.2f})") # print(f"相对距离: dx={dx:.2f}, dy={dy:.2f}") # print(f"纵向距离: lon_d={lon_d:.2f}") # print(f"横向距离: lat_d={lat_d:.2f}") # print("------------------------") obj_dict[frame_num][playerId]["lon_d"] = lon_d obj_dict[frame_num][playerId]["lat_d"] = lat_d # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离 # print(f"lon_d: {lon_d}, lat_d: {lat_d}") if lon_d > 100 or lon_d < -5 or abs(lat_d) > 4: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection( dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2 ) obj_dict[frame_num][playerId][ "vrel_projection_in_dist" ] = vrel_projection_in_dist obj_dict[frame_num][playerId][ "arel_projection_in_dist" ] = arel_projection_in_dist obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p obj_type = obj_data["type"] TTC = self._cal_TTC(dist, vrel_projection_in_dist) MTTC = self._cal_MTTC( TTC, vrel_projection_in_dist, arel_projection_in_dist ) THW = self._cal_THW(dist, v_ego_p) # 单车道时可用 # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, # obj_decel_max) lat_dist = 0.5 v_right = v1 v_left = v2 a_right_lat_brake_min = 1 a_left_lat_brake_min = 1 a_lat_max = 5 LatSD = self._cal_lateral_safe_dist( lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max, ) # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2) # 将加速度和速度投影到车辆坐标系 h1_rad_for_projection = math.radians(90 - h1) # 与L175保持一致 # 计算纵向加速度 lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection) lon_a2 = a_x2 * math.cos(h1_rad_for_projection) + a_y2 * math.sin(h1_rad_for_projection) lon_a = abs(lon_a1 - lon_a2) # 使用已计算的纵向距离或重新计算 # 选项1: 使用已计算的lon_d (推荐,保持一致性) # lon_d = obj_dict[frame_num][playerId]["lon_d"] # 选项2: 重新计算,确保与L178一致 # lon_d = dx * math.cos(h1_rad_for_projection) + dy * math.sin(h1_rad_for_projection) # 选项3: 使用简化公式,但确保角度单位一致 lon_d = max(dist * abs(math.cos(beta - h1_rad_for_projection)), 0.1) # 设置最小值避免除零 # 计算纵向速度 lon_v = v_x1 * math.cos(h1_rad_for_projection) + v_y1 * math.sin(h1_rad_for_projection) # 计算BTN BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) # 这部分代码可以简化,直接使用已计算的lon_a1和lat_a1 # 计算纵向加速度 # lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection) lon_a1 = ego_data["lon_acc"] # 直接使用已计算的纵向加速度 # 计算横向加速度 # lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1) lat_a1 = ego_data["lat_acc"] # 直接使用已计算的横向加速度 lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1) # 修改:考虑相对加速度的方向,不再取绝对值 lat_a = lat_a2 - lat_a1 # 目标车减去自车的加速度,保留方向信息 lat_d = dist * abs(math.sin(beta - h1_rad)) lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1) # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2) obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2 obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2 TTC = None if (not TTC or TTC < 0) else TTC MTTC = None if (not MTTC or MTTC < 0) else MTTC THW = None if (not THW or THW < 0) else THW # print(f'TTC: {TTC if TTC is None else f"{TTC:.2f}"}, ' # f'MTTC: {MTTC if MTTC is None else f"{MTTC:.2f}"}, ' # f'THW: {THW if THW is None else f"{THW:.2f}"}') # 原DRAC计算已被注释,需移除相关代码引用 # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行 # 修改后的指标赋值 obj_dict[frame_num][playerId]["TTC"] = TTC obj_dict[frame_num][playerId]["MTTC"] = MTTC obj_dict[frame_num][playerId]["THW"] = THW # obj_dict[frame_num][playerId]['LonSD'] = LonSD obj_dict[frame_num][playerId]["LatSD"] = LatSD # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行 obj_dict[frame_num][playerId]["BTN"] = abs(BTN) # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行 # 确保以下变量已定义 collisionSeverity = ( collisionSeverity if "collisionSeverity" in locals() else 0 ) pr_death = pr_death if "pr_death" in locals() else 0 collisionRisk = collisionRisk if "collisionRisk" in locals() else 0 obj_dict[frame_num][playerId]["collisionSeverity"] = ( collisionSeverity * 100 ) obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100 obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100 # print( # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}" # ) df_fnum = pd.DataFrame(obj_dict[frame_num].values()) df_list.append(df_fnum) df_safe = pd.concat(df_list) col_list = [ "simTime", "simFrame", "playerId", "TTC", "MTTC", "THW", "LatSD", "BTN", # 移除STN "collisionSeverity", "pr_death", "collisionRisk", ] if not self.empty_flag: self.df = df_safe[col_list].reset_index(drop=True) # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df, # 需要增加如果是空的情况下,self.df_safe的值为默认值。 def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy # print(f'_cal_v_projection:{V_on_AB}') return V_on_AB def _cal_a_projection( self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2 ): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx**2 + vy**2) AB_mod = math.sqrt(dx**2 + dy**2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = ( math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2) ) return angular_velocity def _death_pr(self, obj_type, v_relative): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): # 如果相对速度接近0,视为相对静止或无法计算TTC if abs(vrel_projection_in_dist) < 0.02: # 使用之前讨论过的阈值判断 return self.calculated_value["TTC"] # 返回默认值10.0 TTC = dist / vrel_projection_in_dist # 如果计算结果为负,表示正在远离,也视为安全 if TTC < 0: return self.calculated_value["TTC"] # 返回默认值10.0 return TTC def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist): if arel_projection_in_dist != 0: tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist if tmp < 0: return self.calculated_value["MTTC"] # 返回默认值10.0 t1 = ( -1 * vrel_projection_in_dist - math.sqrt(tmp) ) / arel_projection_in_dist t2 = ( -1 * vrel_projection_in_dist + math.sqrt(tmp) ) / arel_projection_in_dist if t1 > 0 and t2 > 0: if t1 >= t2: MTTC = t2 elif t1 < t2: MTTC = t1 elif t1 > 0 and t2 <= 0: MTTC = t1 elif t1 <= 0 and t2 > 0: MTTC = t2 else: return self.calculated_value["MTTC"] # 返回默认值10.0 elif arel_projection_in_dist == 0 and vrel_projection_in_dist > 0: MTTC = dist / vrel_projection_in_dist else: return self.calculated_value["MTTC"] # 返回默认值10.0 return MTTC # THW (time headway) def _cal_THW(self, dist, v_ego_projection_in_dist): if not v_ego_projection_in_dist or abs(v_ego_projection_in_dist) < 0.02: return self.calculated_value["THW"] # 返回默认值10.0 THW = dist / v_ego_projection_in_dist if THW < 0: return self.calculated_value["THW"] # 返回默认值10.0 return THW def velocity(self, v_x, v_y): v = math.sqrt(v_x**2 + v_y**2) * 3.6 return v def _cal_longitudinal_safe_dist( self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max ): lon_dist_min = ( v_ego_p * rho + ego_accel_max * (rho**2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min) - v_obj_p**2 / (2 * ego_decel_max) ) return lon_dist_min def _cal_lateral_safe_dist( self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max, ): # 检查除数是否为零 if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0: return self.calculated_value["LatSD"] # 返回默认值3.0 v_right_rho = v_right + rho * a_lat_max v_left_rho = v_left + rho * a_lat_max dist_min = lat_dist + ( (v_right + v_right_rho) * rho / 2 + v_right_rho**2 / a_right_lat_brake_min / 2 + ((v_left + v_right_rho) * rho / 2) + v_left_rho**2 / a_left_lat_brake_min / 2 ) return dist_min # DRAC (decelerate required avoid collision) def _cal_DRAC( self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2 ): dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671 if dist_length < 0: dist_width = dist - (width2 / 2 + width1 / 2) if dist_width < 0: return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值 else: d = dist_width else: d = dist_length # 避免除零 if d <= 0: return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值 DRAC = vrel_projection_in_dist**2 / (2 * d) return DRAC # BTN (brake threat number) def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max): # 安全检查,避免除零错误 if lon_d <= 0 or ego_decel_lon_max == 0: return self.calculated_value["BTN"] # 返回默认值1.0 # 注意:lon_a现在是带方向的相对加速度(lon_a2 - lon_a1) # 根据BTN公式:BTN = (lon_a1 + lon_a - lon_v**2 / (2 * lon_d)) / ego_decel_lon_max BTN = ( lon_a1 + lon_a - lon_v**2 / (2 * lon_d) ) / ego_decel_lon_max return BTN # STN (steer threat number) def _cal_STN_new( self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2 ): STN = ( lat_a1 + lat_a + 2 / ttc**2 * ( lat_d + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2 + abs(lat_v * ttc) ) ) / ego_decel_lat_max return STN # BTN (brake threat number) def cal_BTN(self, a_y1, ay, dy, vy, max_ay): BTN = ( a_y1 + ay - vy**2 / (2 * dy) ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2): STN = ( a_x1 + ax + 2 / ttc**2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc) ) / max_ax return STN # 追尾碰撞风险 def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp( -0.5 * (x - mean) ** 2 / std_dev ) def continuous_group(self, df): time_list = df["simTime"].values.tolist() frame_list = df["simFrame"].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"]) unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"]) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df # 统计最危险的指标 def _safe_statistic_most_dangerous(self): min_list = ["TTC", "MTTC", "THW", "LatSD"] max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN for metric in min_list: if metric in self.metric_list: if metric in self.df.columns: # 筛选出非NaN的值 valid_values = self.df[metric].dropna() if not valid_values.empty: # 如果有非NaN值,计算最小值 self.calculated_value[metric] = valid_values.min() print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}') else: # 如果全是NaN或列为空,赋默认值 self.calculated_value[metric] = self.calculated_value[metric] else: # 列不存在,赋默认值 self.calculated_value[metric] = self.calculated_value[metric] for metric in max_list: if metric in self.metric_list: if metric in self.df.columns: # 筛选出非NaN的值 valid_values = self.df[metric].dropna() if not valid_values.empty: # 如果有非NaN值,计算最大值 self.calculated_value[metric] = valid_values.max() print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}') else: # 如果全是NaN或列为空,赋默认值 self.calculated_value[metric] = self.calculated_value[metric] else: # 列不存在,赋默认值 self.calculated_value[metric] = self.calculated_value[metric] return self.calculated_value def _safe_no_obj_statistic(self): # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"] return self.calculated_value def report_statistic( self, ): safety_result = {} if len(self.obj_id_list) == 1: safety_result = self._safe_no_obj_statistic() else: safety_result = self._safe_statistic_most_dangerous() evaluator = Score(self.data_processed.safety_config) result = evaluator.evaluate(safety_result) # self.logger.info(f"安全性表现及得分情况:[{result}]") print("\n[安全性表现及得分情况]") return result