#!/usr/bin/env python # -*- coding: utf-8 -*- """ 安全指标计算模块 """ import numpy as np import pandas as pd import math from collections import defaultdict from typing import Dict, Any, List, Optional from modules.lib.score import Score from modules.lib.log_manager import LogManager # 安全指标相关常量 SAFETY_INFO = [ "simTime", "simFrame", "playerId", "posX", "posY", "posH", "speedX", "speedY", "accelX", "accelY", "v", "type" ] # ---------------------- # 独立指标计算函数 # ---------------------- def calculate_ttc(data_processed) -> dict: """计算TTC (Time To Collision)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"TTC": None} try: safety = SafetyCalculator(data_processed) ttc_value = safety.get_ttc_value() LogManager().get_logger().info(f"安全指标[TTC]计算结果: {ttc_value}") return {"TTC": ttc_value} except Exception as e: LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True) return {"TTC": None} def calculate_mttc(data_processed) -> dict: """计算MTTC (Modified Time To Collision)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"MTTC": None} try: safety = SafetyCalculator(data_processed) mttc_value = safety.get_mttc_value() LogManager().get_logger().info(f"安全指标[MTTC]计算结果: {mttc_value}") return {"MTTC": mttc_value} except Exception as e: LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True) return {"MTTC": None} def calculate_thw(data_processed) -> dict: """计算THW (Time Headway)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"THW": None} try: safety = SafetyCalculator(data_processed) thw_value = safety.get_thw_value() LogManager().get_logger().info(f"安全指标[THW]计算结果: {thw_value}") return {"THW": thw_value} except Exception as e: LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True) return {"THW": None} def calculate_tlc(data_processed) -> dict: """计算TLC (Time to Line Crossing)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"TLC": None} try: safety = SafetyCalculator(data_processed) tlc_value = safety.get_tlc_value() LogManager().get_logger().info(f"安全指标[TLC]计算结果: {tlc_value}") return {"TLC": tlc_value} except Exception as e: LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True) return {"TLC": None} def calculate_ttb(data_processed) -> dict: """计算TTB (Time to Brake)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"TTB": None} try: safety = SafetyCalculator(data_processed) ttb_value = safety.get_ttb_value() LogManager().get_logger().info(f"安全指标[TTB]计算结果: {ttb_value}") return {"TTB": ttb_value} except Exception as e: LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True) return {"TTB": None} def calculate_tm(data_processed) -> dict: """计算TM (Time Margin)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"TM": None} try: safety = SafetyCalculator(data_processed) tm_value = safety.get_tm_value() LogManager().get_logger().info(f"安全指标[TM]计算结果: {tm_value}") return {"TM": tm_value} except Exception as e: LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True) return {"TM": None} # def calculate_MPrTTC(data_processed) -> dict: # """计算MPrTTC (Model Predictive Time-to-Collision)""" # if data_processed is None or not hasattr(data_processed, 'object_df'): # return {"MPrTTC": None} # try: # safety = SafetyCalculator(data_processed) # mprttc_value = safety.get_mprttc_value() # LogManager().get_logger().info(f"安全指标[MPrTTC]计算结果: {mprttc_value}") # return {"MPrTTC": mprttc_value} # except Exception as e: # LogManager().get_logger().error(f"MPrTTC计算异常: {str(e)}", exc_info=True) # return {"MPrTTC": None} # def calculate_pet(data_processed) -> dict: # # PET (Post Encroachment Time) # if data_processed is None or not hasattr(data_processed, 'object_df'): # return {"PET": None} # try: # safety = SafetyCalculator(data_processed) # pet_value = safety.get_pet_value() # LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}") # return {"PET": pet_value} # except Exception as e: # LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True) # return {"PET": None} def calculate_dtc(data_processed) -> dict: """计算DTC (Distance to Collision)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"DTC": None} try: safety = SafetyCalculator(data_processed) dtc_value = safety.get_dtc_value() LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}") return {"DTC": dtc_value} except Exception as e: LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True) return {"DTC": None} def calculate_dtc(data_processed) -> dict: """计算DTC (Distance to Collision)""" if data_processed is None or not hasattr(data_processed, 'object_df'): return {"DTC": None} try: safety = SafetyCalculator(data_processed) dtc_value = safety.get_dtc_value() LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}") return {"DTC": dtc_value} except Exception as e: LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True) return {"DTC": None} def calculate_collisionrisk(data_processed) -> dict: """计算碰撞风险""" safety = SafetyCalculator(data_processed) collision_risk_value = safety.get_collision_risk_value() LogManager().get_logger().info(f"安全指标[collisionRisk]计算结果: {collision_risk_value}") return {"collisionRisk": collision_risk_value} def calculate_lonsd(data_processed) -> dict: """计算纵向安全距离""" safety = SafetyCalculator(data_processed) lonsd_value = safety.get_lonsd_value() LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}") return {"LonSD": lonsd_value} def calculate_latsd(data_processed) -> dict: """计算横向安全距离""" safety = SafetyCalculator(data_processed) latsd_value = safety.get_latsd_value() LogManager().get_logger().info(f"安全指标[LatSD]计算结果: {latsd_value}") return {"LatSD": latsd_value} def calculate_btn(data_processed) -> dict: """计算制动威胁数""" safety = SafetyCalculator(data_processed) btn_value = safety.get_btn_value() LogManager().get_logger().info(f"安全指标[BTN]计算结果: {btn_value}") return {"BTN": btn_value} def calculate_collisionseverity(data_processed) -> dict: """计算碰撞严重性""" safety = SafetyCalculator(data_processed) collision_severity_value = safety.get_collision_severity_value() LogManager().get_logger().info(f"安全指标[collisionSeverity]计算结果: {collision_severity_value}") return {"collisionSeverity": collision_severity_value} class SafetyRegistry: """安全指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() self.data = data_processed self.safety_config = data_processed.safety_config["safety"] self.metrics = self._extract_metrics(self.safety_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """从配置中提取指标名称""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的安全指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """构建指标函数注册表""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" if func_name in globals(): registry[metric_name] = globals()[func_name] else: self.logger.warning(f"未实现安全指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'安全指标计算结果:{results}') return results class SafeManager: """安全指标管理类""" def __init__(self, data_processed): self.data = data_processed self.registry = SafetyRegistry(self.data) def report_statistic(self): """计算并报告安全指标结果""" safety_result = self.registry.batch_execute() return safety_result class SafetyCalculator: """安全指标计算类 - 兼容Safe类风格""" def __init__(self, data_processed): self.logger = LogManager().get_logger() self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据 self.obj_id_list = data_processed.obj_id_list self.metric_list = [ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity' ] # 初始化默认值 self.calculated_value = { "TTC": 10.0, "MTTC": 10.0, "THW": 10.0, "TLC": 10.0, "TTB": 10.0, "TM": 10.0, # "MPrTTC": 10.0, # "PET": 10.0, "DTC": 10.0, "LatSD": 3.0, "BTN": 1.0, "collisionRisk": 0.0, "collisionSeverity": 0.0, } self.time_list = self.ego_df['simTime'].values.tolist() self.frame_list = self.ego_df['simFrame'].values.tolist() self.collisionRisk = 0 self.empty_flag = True self.logger.info("SafetyCalculator初始化完成,场景中包含自车的目标物一共为: %d", len(self.obj_id_list)) if len(self.obj_id_list) > 1: self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_drac_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_xtn_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.most_dangerous = {} self.pass_percent = {} self.logger.info("开始执行安全参数计算 _safe_param_cal_new") self._safe_param_cal_new() self.logger.info("安全参数计算完成") def _safe_param_cal_new(self): self.logger.debug("进入 _safe_param_cal_new 方法") # 直接复用Safe类的实现 Tc = 0.3 # 安全距离 rho = self.data_processed.vehicle_config["RHO"] ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"] obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"] ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"] ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"] ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"] driver_reaction_time = self.data_processed.vehicle_config["RHO"] ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2) x_relative_start_dist = self.ego_df["x_relative_start_dist"] obj_dict = defaultdict(dict) obj_data_dict = self.df.to_dict('records') for item in obj_data_dict: obj_dict[item['simFrame']][item['playerId']] = item df_list = [] EGO_PLAYER_ID = 1 for frame_num in self.frame_list: ego_data = obj_dict[frame_num][EGO_PLAYER_ID] v1 = ego_data['v'] x1 = ego_data['posX'] y1 = ego_data['posY'] h1 = ego_data['posH'] laneOffset = ego_data["laneOffset"] v_x1 = ego_data['speedX'] v_y1 = ego_data['speedY'] a_x1 = ego_data['accelX'] a_y1 = ego_data['accelY'] a1 = np.sqrt(a_x1 ** 2 + a_y1 ** 2) for playerId in self.obj_id_list: if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] except KeyError: continue x2 = obj_data['posX'] y2 = obj_data['posY'] dist = self.dist(x1, y1, x2, y2) obj_data['dist'] = dist v_x2 = obj_data['speedX'] v_y2 = obj_data['speedY'] v2 = obj_data['v'] a_x2 = obj_data['accelX'] a_y2 = obj_data['accelY'] a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2) dx, dy = x2 - x1, y2 - y1 # 计算目标车相对于自车的方位角 beta = math.atan2(dy, dx) # 将全局坐标系下的相对位置向量转换到自车坐标系 # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向 h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角 # 坐标变换 lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负) lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值) obj_dict[frame_num][playerId]['lon_d'] = lon_d obj_dict[frame_num][playerId]['lat_d'] = lat_d if lon_d > 100 or lon_d < -5 or lat_d > 4: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 relative_v = np.sqrt(vx ** 2 + vy ** 2) v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2) obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p obj_type = obj_data['type'] TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist) THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None TLC = self._cal_TLC(v1, h1, laneOffset) TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max) TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1) DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, driver_reaction_time) # MPrTTC = self._cal_MPrTTC(x_relative_start_dist) # PET = self._cal_PET(x_relative_start_dist, v2, a2, v1, a1) LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max) lat_dist = 0.5 v_right = v1 v_left = v2 a_right_lat_brake_min = 1 a_left_lat_brake_min = 1 a_lat_max = 5 LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max) # 使用自车坐标系下的纵向加速度 lon_a1 = a_x1 * math.cos(h1_rad) + a_y1 * math.sin(h1_rad) lon_a2 = a_x2 * math.cos(h1_rad) + a_y2 * math.sin(h1_rad) lon_a = abs(lon_a1 - lon_a2) lon_d = max(0.1, lon_d) # 确保纵向距离为正 lon_v = v_x1 * math.cos(h1_rad) + v_y1 * math.sin(h1_rad) BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) # 使用自车坐标系下的横向加速度 lat_a1 = -a_x1 * math.sin(h1_rad) + a_y1 * math.cos(h1_rad) lat_a2 = -a_x2 * math.sin(h1_rad) + a_y2 * math.cos(h1_rad) lat_a = abs(lat_a1 - lat_a2) lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad) obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (-v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad)) obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad)) TTC = None if (TTC is None or TTC < 0) else TTC MTTC = None if (MTTC is None or MTTC < 0) else MTTC THW = None if (THW is None or THW < 0) else THW TLC = None if (TLC is None or TLC < 0) else TLC TTB = None if (TTB is None or TTB < 0) else TTB TM = None if (TM is None or TM < 0) else TM DTC = None if (DTC is None or DTC < 0) else DTC obj_dict[frame_num][playerId]['TTC'] = TTC obj_dict[frame_num][playerId]['MTTC'] = MTTC obj_dict[frame_num][playerId]['THW'] = THW obj_dict[frame_num][playerId]['TLC'] = TLC obj_dict[frame_num][playerId]['TTB'] = TTB obj_dict[frame_num][playerId]['TM'] = TM obj_dict[frame_num][playerId]['DTC'] = DTC obj_dict[frame_num][playerId]['LonSD'] = LonSD obj_dict[frame_num][playerId]['LatSD'] = LatSD obj_dict[frame_num][playerId]['BTN'] = abs(BTN) collisionSeverity = 0 pr_death = 0 collisionRisk = 0 obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100 obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100 obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100 df_fnum = pd.DataFrame(obj_dict[frame_num].values()) df_list.append(df_fnum) df_safe = pd.concat(df_list) col_list = ['simTime', 'simFrame', 'playerId', 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'LonSD', 'LatSD', 'BTN', 'collisionSeverity', 'pr_death', 'collisionRisk'] self.df_safe = df_safe[col_list].reset_index(drop=True) def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy return V_on_AB def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx ** 2 + vy ** 2) AB_mod = math.sqrt(dx ** 2 + dy ** 2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2) return angular_velocity def _death_pr(self, obj_type, v_relative): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): if vrel_projection_in_dist == 0: return math.inf TTC = dist / vrel_projection_in_dist return TTC def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist): MTTC = math.nan if arel_projection_in_dist != 0: tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist if tmp < 0: return math.nan t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist if t1 > 0 and t2 > 0: if t1 >= t2: MTTC = t2 elif t1 < t2: MTTC = t1 elif t1 > 0 and t2 <= 0: MTTC = t1 elif t1 <= 0 and t2 > 0: MTTC = t2 if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0: MTTC = dist / vrel_projection_in_dist return MTTC # THW (time headway) def _cal_THW(self, dist, v_ego_projection_in_dist): if not v_ego_projection_in_dist: THW = None else: THW = dist / v_ego_projection_in_dist return THW # TLC (time to line crossing) def _cal_TLC(self, ego_v, ego_yaw, laneOffset): TLC = laneOffset/ego_v/np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0 if TLC < 0: TLC = None return TLC def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max): if len(x_relative_start_dist): return None if (ego_decel_max == 0) or (relative_v == 0): return self.calculated_value["TTB"] else: x_relative_start_dist0 = x_relative_start_dist.tolist()[0] TTB = (x_relative_start_dist0 + relative_v * relative_v/2/ego_decel_max)/relative_v return TTB def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1): if len(x_relative_start_dist): return None if (a2 == 0) or (v1 == 0): return self.calculated_value["TM"] if a1 == 0: return None x_relative_start_dist0 = x_relative_start_dist.tolist()[0] TM = (x_relative_start_dist0 + v2**2/(2*a2) - v1**2/(2*a1)) / v1 return TM # def _cal_MPrTTC(self, T=5, c = False, collision_dist = 5.99): # time_interval = self.ego_df['simTime'].tolist()[1] - self.ego_df['simTime'].tolist()[0] # # for i in range(len(self.obj_id_list)): # for j in range(T): # MPrTTC = j * time_interval def _cal_DTC(self, v_on_dist, a_on_dist, t): if a_on_dist == 0: return None DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist return DTC def velocity(self, v_x, v_y): v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6 return v def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max): lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / ( 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max) return lon_dist_min def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max): # 检查除数是否为零 if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0: return self._default_value('LatSD') # 返回默认值 v_right_rho = v_right + rho * a_lat_max v_left_rho = v_left + rho * a_lat_max dist_min = lat_dist + ( (v_right + v_right_rho) * rho / 2 + v_right_rho**2 / a_right_lat_brake_min / 2 + ((v_left + v_right_rho) * rho / 2) + v_left_rho**2 / a_left_lat_brake_min / 2 ) return dist_min # DRAC (decelerate required avoid collision) def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2): dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671 if dist_length < 0: dist_width = dist - (width2 / 2 + width1 / 2) if dist_width < 0: return math.inf else: d = dist_width else: d = dist_length DRAC = vrel_projection_in_dist ** 2 / (2 * d) return DRAC # BTN (brake threat number) def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max): BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2): STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * ( width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max return STN # BTN (brake threat number) def cal_BTN(self, a_y1, ay, dy, vy, max_ay): BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2): STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax return STN # 追尾碰撞风险 def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev) def continuous_group(self, df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time']) unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame']) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df # 统计最危险的指标 def _safe_statistic_most_dangerous(self): min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM'] max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity'] result = {} for metric in min_list: if metric in self.metric_list: if metric in self.df.columns: val = self.df[metric].min() result[metric] = float(val) if pd.notnull(val) else self._default_value(metric) else: result[metric] = self._default_value(metric) for metric in max_list: if metric in self.metric_list: if metric in self.df.columns: val = self.df[metric].max() result[metric] = float(val) if pd.notnull(val) else self._default_value(metric) else: result[metric] = self._default_value(metric) return result def _safe_no_obj_statistic(self): # 仅有自车时的默认值 result = {metric: self._default_value(metric) for metric in self.metric_list} return result def _default_value(self, metric): # 统一默认值 defaults = { 'TTC': 10.0, 'MTTC': 4.2, 'THW': 2.1, 'TLC': 10.0, 'TTB': 10.0, 'TM': 10.0, 'DTC': 10.0, 'LonSD': 10.0, 'LatSD': 2.0, 'BTN': 1.0, 'collisionRisk': 0.0, 'collisionSeverity': 0.0 } return defaults.get(metric, None) def report_statistic(self): if len(self.obj_id_list) == 1: safety_result = self._safe_no_obj_statistic() else: safety_result = self._safe_statistic_most_dangerous() evaluator = Score(self.data_processed.safety_config) result = evaluator.evaluate(safety_result) print("\n[安全性表现及得分情况]") return result def get_ttc_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('TTC') ttc_values = self.df_safe['TTC'].dropna() return float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC') def get_mttc_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('MTTC') mttc_values = self.df_safe['MTTC'].dropna() return float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC') def get_thw_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('THW') thw_values = self.df_safe['THW'].dropna() return float(thw_values.min()) if not thw_values.empty else self._default_value('THW') def get_tlc_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('TLC') tlc_values = self.df_safe['TLC'].dropna() return float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC') def get_ttb_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('TTB') ttb_values = self.df_safe['TTB'].dropna() return float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB') def get_tm_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('TM') tm_values = self.df_safe['TM'].dropna() return float(tm_values.min()) if not tm_values.empty else self._default_value('TM') def get_dtc_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('DTC') dtc_values = self.df_safe['DTC'].dropna() return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC') def get_lonsd_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('LonSD') lonsd_values = self.df_safe['LonSD'].dropna() return float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD') def get_latsd_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('LatSD') latsd_values = self.df_safe['LatSD'].dropna() # 使用最小值而非平均值,与safety1.py保持一致 return float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD') def get_btn_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('BTN') btn_values = self.df_safe['BTN'].dropna() return float(btn_values.max()) if not btn_values.empty else self._default_value('BTN') def get_collision_risk_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('collisionRisk') risk_values = self.df_safe['collisionRisk'].dropna() return float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk') def get_collision_severity_value(self) -> float: if self.empty_flag or self.df_safe is None: return self._default_value('collisionSeverity') severity_values = self.df_safe['collisionSeverity'].dropna() return float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')