|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 安全指标计算模块
- """
- import os
- import numpy as np
- import pandas as pd
- import math
- import matplotlib.pyplot as plt
- import scipy.integrate as spi
- from collections import defaultdict
- from typing import Dict, Any, List, Optional
- from pathlib import Path
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- from modules.lib.chart_generator import generate_safety_chart_data
- # 安全指标相关常量
- SAFETY_INFO = [
- "simTime",
- "simFrame",
- "playerId",
- "posX",
- "posY",
- "posH",
- "speedX",
- "speedY",
- "accelX",
- "accelY",
- "v",
- "type"
- ]
- # ----------------------
- # 独立指标计算函数
- # ----------------------
- def calculate_ttc(data_processed) -> dict:
- """计算TTC (Time To Collision)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"TTC": None}
- try:
- safety = SafetyCalculator(data_processed)
- ttc_value = safety.get_ttc_value()
- # 只生成图表,数据导出由chart_generator处理
- if safety.ttc_data:
- safety.generate_metric_chart('TTC')
- LogManager().get_logger().info(f"安全指标[TTC]计算结果: {ttc_value}")
- return {"TTC": ttc_value}
- except Exception as e:
- LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
- return {"TTC": None}
- def calculate_mttc(data_processed) -> dict:
- """计算MTTC (Modified Time To Collision)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"MTTC": None}
- try:
- safety = SafetyCalculator(data_processed)
- mttc_value = safety.get_mttc_value()
- if safety.mttc_data:
- safety.generate_metric_chart('MTTC')
- LogManager().get_logger().info(f"安全指标[MTTC]计算结果: {mttc_value}")
- return {"MTTC": mttc_value}
- except Exception as e:
- LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
- return {"MTTC": None}
- def calculate_thw(data_processed) -> dict:
- """计算THW (Time Headway)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"THW": None}
- try:
- safety = SafetyCalculator(data_processed)
- thw_value = safety.get_thw_value()
- if safety.thw_data:
- safety.generate_metric_chart('THW')
- LogManager().get_logger().info(f"安全指标[THW]计算结果: {thw_value}")
- return {"THW": thw_value}
- except Exception as e:
- LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
- return {"THW": None}
- def calculate_tlc(data_processed) -> dict:
- """计算TLC (Time to Line Crossing)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"TLC": None}
- try:
- safety = SafetyCalculator(data_processed)
- tlc_value = safety.get_tlc_value()
- if safety.tlc_data:
- safety.generate_metric_chart('TLC')
- LogManager().get_logger().info(f"安全指标[TLC]计算结果: {tlc_value}")
- return {"TLC": tlc_value}
- except Exception as e:
- LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
- return {"TLC": None}
- def calculate_ttb(data_processed) -> dict:
- """计算TTB (Time to Brake)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"TTB": None}
- try:
- safety = SafetyCalculator(data_processed)
- ttb_value = safety.get_ttb_value()
- if safety.ttb_data:
- safety.generate_metric_chart('TTB')
- LogManager().get_logger().info(f"安全指标[TTB]计算结果: {ttb_value}")
- return {"TTB": ttb_value}
- except Exception as e:
- LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
- return {"TTB": None}
- def calculate_tm(data_processed) -> dict:
- """计算TM (Time Margin)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"TM": None}
- try:
- safety = SafetyCalculator(data_processed)
- tm_value = safety.get_tm_value()
- if safety.tm_data:
- safety.generate_metric_chart('TM')
- LogManager().get_logger().info(f"安全指标[TM]计算结果: {tm_value}")
- return {"TM": tm_value}
- except Exception as e:
- LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
- return {"TM": None}
- def calculate_dtc(data_processed) -> dict:
- """计算DTC (Distance to Collision)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"DTC": None}
- try:
- safety = SafetyCalculator(data_processed)
- dtc_value = safety.get_dtc_value()
- LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}")
- return {"DTC": dtc_value}
- except Exception as e:
- LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
- return {"DTC": None}
- def calculate_pet(data_processed) -> dict:
- """计算PET (Post Encroachment Time)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"PET": None}
- try:
- safety = SafetyCalculator(data_processed)
- pet_value = safety.get_dtc_value()
- LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}")
- return {"PET": pet_value}
- except Exception as e:
- LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True)
- return {"PET": None}
- def calculate_psd(data_processed) -> dict:
- """计算PSD (Potential Safety Distance)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"PSD": None}
- try:
- safety = SafetyCalculator(data_processed)
- psd_value = safety.get_psd_value()
- LogManager().get_logger().info(f"安全指标[PSD]计算结果: {psd_value}")
- return {"PSD": psd_value}
- except Exception as e:
- LogManager().get_logger().error(f"PSD计算异常: {str(e)}", exc_info=True)
- return {"PSD": None}
- def calculate_collisionrisk(data_processed) -> dict:
- """计算碰撞风险"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"collisionRisk": None}
- try:
- safety = SafetyCalculator(data_processed)
- collision_risk_value = safety.get_collision_risk_value()
- if safety.collision_risk_data:
- safety.generate_metric_chart('collisionRisk')
- LogManager().get_logger().info(f"安全指标[collisionRisk]计算结果: {collision_risk_value}")
- return {"collisionRisk": collision_risk_value}
- except Exception as e:
- LogManager().get_logger().error(f"collisionRisk计算异常: {str(e)}", exc_info=True)
- return {"collisionRisk": None}
- def calculate_lonsd(data_processed) -> dict:
- """计算纵向安全距离"""
- safety = SafetyCalculator(data_processed)
- lonsd_value = safety.get_lonsd_value()
- if safety.lonsd_data:
- safety.generate_metric_chart('LonSD')
- LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}")
- return {"LonSD": lonsd_value}
- def calculate_latsd(data_processed) -> dict:
- """计算横向安全距离"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"LatSD": None}
- try:
- safety = SafetyCalculator(data_processed)
- latsd_value = safety.get_latsd_value()
- if safety.latsd_data:
- # 只生成图表,数据导出由chart_generator处理
- safety.generate_metric_chart('LatSD')
- LogManager().get_logger().info(f"安全指标[LatSD]计算结果: {latsd_value}")
- return {"LatSD": latsd_value}
- except Exception as e:
- LogManager().get_logger().error(f"LatSD计算异常: {str(e)}", exc_info=True)
- return {"LatSD": None}
- def calculate_btn(data_processed) -> dict:
- """计算制动威胁数"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"BTN": None}
- try:
- safety = SafetyCalculator(data_processed)
- btn_value = safety.get_btn_value()
- if safety.btn_data:
- # 只生成图表,数据导出由chart_generator处理
- safety.generate_metric_chart('BTN')
- LogManager().get_logger().info(f"安全指标[BTN]计算结果: {btn_value}")
- return {"BTN": btn_value}
- except Exception as e:
- LogManager().get_logger().error(f"BTN计算异常: {str(e)}", exc_info=True)
- return {"BTN": None}
- def calculate_collisionseverity(data_processed) -> dict:
- """计算碰撞严重性"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"collisionSeverity": None}
- try:
- safety = SafetyCalculator(data_processed)
- collision_severity_value = safety.get_collision_severity_value()
- if safety.collision_severity_data:
- # 只生成图表,数据导出由chart_generator处理
- safety.generate_metric_chart('collisionSeverity')
- LogManager().get_logger().info(f"安全指标[collisionSeverity]计算结果: {collision_severity_value}")
- return {"collisionSeverity": collision_severity_value}
- except Exception as e:
- LogManager().get_logger().error(f"collisionSeverity计算异常: {str(e)}", exc_info=True)
- return {"collisionSeverity": None}
- class SafetyRegistry:
- """安全指标注册器"""
-
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger()
- self.data = data_processed
- self.safety_config = data_processed.safety_config["safety"]
- self.metrics = self._extract_metrics(self.safety_config)
- self._registry = self._build_registry()
-
- def _extract_metrics(self, config_node: dict) -> list:
- """从配置中提取指标名称"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
- metrics.append(node['name'])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f'评比的安全指标列表:{metrics}')
- return metrics
-
- def _build_registry(self) -> dict:
- """构建指标函数注册表"""
- registry = {}
- for metric_name in self.metrics:
- func_name = f"calculate_{metric_name.lower()}"
- if func_name in globals():
- registry[metric_name] = globals()[func_name]
- else:
- self.logger.warning(f"未实现安全指标函数: {func_name}")
- return registry
-
- def batch_execute(self) -> dict:
- """批量执行指标计算"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data)
- results.update(result)
- except Exception as e:
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f'安全指标计算结果:{results}')
- return results
- class SafeManager:
- """安全指标管理类"""
-
- def __init__(self, data_processed):
- self.data = data_processed
- self.registry = SafetyRegistry(self.data)
-
- def report_statistic(self):
- """计算并报告安全指标结果"""
- safety_result = self.registry.batch_execute()
-
- return safety_result
-
- class SafetyCalculator:
- """安全指标计算类 - 兼容Safe类风格"""
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger()
- self.data_processed = data_processed
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据
- self.obj_id_list = data_processed.obj_id_list
- self.metric_list = [
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
- ]
- # 初始化默认值
- self.calculated_value = {
- "TTC": 10.0,
- "MTTC": 10.0,
- "THW": 10.0,
- "TLC": 10.0,
- "TTB": 10.0,
- "TM": 10.0,
- # "MPrTTC": 10.0,
- "PET": 10.0,
- "DTC": 10.0,
- "PSD": 10.0,
- "LatSD": 3.0,
- "BTN": 1.0,
- "collisionRisk": 0.0,
- "collisionSeverity": 0.0,
- }
- self.time_list = self.ego_df['simTime'].values.tolist()
- self.frame_list = self.ego_df['simFrame'].values.tolist()
- self.collisionRisk = 0
- self.empty_flag = True
-
- # 初始化数据存储列表
- self.ttc_data = []
- self.mttc_data = []
- self.thw_data = []
- self.tlc_data = []
- self.ttb_data = []
- self.tm_data = []
- self.lonsd_data = []
- self.latsd_data = []
- self.btn_data = []
- self.collision_risk_data = []
- self.collision_severity_data = []
-
- # 初始化安全事件记录表
- self.unsafe_events_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
-
- # 设置输出目录
- self.output_dir = os.path.join(os.getcwd(), 'data')
- os.makedirs(self.output_dir, exist_ok=True)
- self.logger.info("SafetyCalculator初始化完成,场景中包含自车的目标物一共为: %d", len(self.obj_id_list))
- if len(self.obj_id_list) > 1:
- self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_drac_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_xtn_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.most_dangerous = {}
- self.pass_percent = {}
- self.logger.info("开始执行安全参数计算 _safe_param_cal_new")
- self._safe_param_cal_new()
- self.logger.info("安全参数计算完成")
- def _safe_param_cal_new(self):
- self.logger.debug("进入 _safe_param_cal_new 方法")
- # 直接复用Safe类的实现
- Tc = 0.3 # 安全距离
- rho = self.data_processed.vehicle_config["RHO"]
- ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"]
- obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"]
- ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
- ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
- ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
- ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
- #TEMP_COMMENT: x_relative_start_dist 注释开始
- #x_relative_start_dist = self.ego_df["x_relative_start_dist"]
-
- # 设置安全指标阈值
- self.safety_thresholds = {
- 'TTC': {'min': 1.5, 'max': None}, # TTC小于1.5秒视为危险
- 'MTTC': {'min': 1.5, 'max': None}, # MTTC小于1.5秒视为危险
- 'THW': {'min': 1.0, 'max': None}, # THW小于1.0秒视为危险
- 'LonSD': {'min': None, 'max': None}, # 根据实际情况设置
- 'LatSD': {'min': 0.5, 'max': None}, # LatSD小于0.5米视为危险
- 'BTN': {'min': None, 'max': 0.8}, # BTN大于0.8视为危险
- 'collisionRisk': {'min': None, 'max': 30}, # 碰撞风险大于30%视为危险
- 'collisionSeverity': {'min': None, 'max': 30} # 碰撞严重性大于30%视为危险
- }
- obj_dict = defaultdict(dict)
- obj_data_dict = self.df.to_dict('records')
- for item in obj_data_dict:
- obj_dict[item['simFrame']][item['playerId']] = item
- df_list = []
- EGO_PLAYER_ID = 1
- for frame_num in self.frame_list:
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- v1 = ego_data['v']
- x1 = ego_data['posX']
- y1 = ego_data['posY']
- h1 = ego_data['posH']
- laneOffset = ego_data["laneOffset"]
- v_x1 = ego_data['speedX']
- v_y1 = ego_data['speedY']
- a_x1 = ego_data['accelX']
- a_y1 = ego_data['accelY']
- a1 = np.sqrt(a_x1 ** 2 + a_y1 ** 2)
- for playerId in self.obj_id_list:
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- except KeyError:
- continue
- x2 = obj_data['posX']
- y2 = obj_data['posY']
- dist = self.dist(x1, y1, x2, y2)
- obj_data['dist'] = dist
- v_x2 = obj_data['speedX']
- v_y2 = obj_data['speedY']
- v2 = obj_data['v']
- a_x2 = obj_data['accelX']
- a_y2 = obj_data['accelY']
- a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
- dx, dy = x2 - x1, y2 - y1
-
- # 计算目标车相对于自车的方位角
- beta = math.atan2(dy, dx)
-
- # 将全局坐标系下的相对位置向量转换到自车坐标系
- # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
- h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
-
- # 坐标变换
- lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
- lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值)
-
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
- if lon_d > 100 or lon_d < -5 or lat_d > 4:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- relative_v = np.sqrt(vx ** 2 + vy ** 2)
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
- v_x2, v_y2)
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
- obj_type = obj_data['type']
- TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
- MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
- THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
- TLC = self._cal_TLC(v1, h1, laneOffset)
- TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
- TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
- DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, driver_reaction_time)
- # MPrTTC = self._cal_MPrTTC(x_relative_start_dist)
- # PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
- PET = None
- for lane_pos in lane_poss:
- lane_posx1 = ast.literal_eval(lane_pos)[0][0]
- lane_posy1 = ast.literal_eval(lane_pos)[0][1]
- lane_posx2 = ast.literal_eval(lane_pos)[-1][0]
- lane_posy2 = ast.literal_eval(lane_pos)[-1][1]
- for ramp_pos in ramp_poss:
- ramp_posx1 = ast.literal_eval(ramp_pos)[0][0]
- ramp_posy1 = ast.literal_eval(ramp_pos)[0][1]
- ramp_posx2 = ast.literal_eval(ramp_pos)[-1][0]
- ramp_posy2 = ast.literal_eval(ramp_pos)[-1][1]
- ego_posx = x1
- ego_posy = y1
- obj_posx = x2
- obj_posy = y2
- delta_t = self._cal_reaction_time_to_avgspeed(self.ego_df)
- lane_width = self.ego_df["lane_width"].iloc[0]
- PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2,
- ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
- PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
- lat_dist = 0.5
- v_right = v1
- v_left = v2
- a_right_lat_brake_min = 1
- a_left_lat_brake_min = 1
- a_lat_max = 5
- LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- a_left_lat_brake_min, a_lat_max)
- # 使用自车坐标系下的纵向加速度
- lon_a1 = a_x1 * math.cos(h1_rad) + a_y1 * math.sin(h1_rad)
- lon_a2 = a_x2 * math.cos(h1_rad) + a_y2 * math.sin(h1_rad)
- lon_a = abs(lon_a1 - lon_a2)
- lon_d = max(0.1, lon_d) # 确保纵向距离为正
- lon_v = v_x1 * math.cos(h1_rad) + v_y1 * math.sin(h1_rad)
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- # 使用自车坐标系下的横向加速度
- lat_a1 = -a_x1 * math.sin(h1_rad) + a_y1 * math.cos(h1_rad)
- lat_a2 = -a_x2 * math.sin(h1_rad) + a_y2 * math.cos(h1_rad)
- lat_a = abs(lat_a1 - lat_a2)
- lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad)
- obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (-v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad))
- obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad))
- TTC = None if (TTC is None or TTC < 0) else TTC
- MTTC = None if (MTTC is None or MTTC < 0) else MTTC
- THW = None if (THW is None or THW < 0) else THW
- TLC = None if (TLC is None or TLC < 0) else TLC
- TTB = None if (TTB is None or TTB < 0) else TTB
- TM = None if (TM is None or TM < 0) else TM
- DTC = None if (DTC is None or DTC < 0) else DTC
- PET = None if (PET is None or PET < 0) else PET
- PSD = None if (PSD is None or PSD < 0) else PSD
- obj_dict[frame_num][playerId]['TTC'] = TTC
- obj_dict[frame_num][playerId]['MTTC'] = MTTC
- obj_dict[frame_num][playerId]['THW'] = THW
- obj_dict[frame_num][playerId]['TLC'] = TLC
- obj_dict[frame_num][playerId]['TTB'] = TTB
- obj_dict[frame_num][playerId]['TM'] = TM
- obj_dict[frame_num][playerId]['DTC'] = DTC
- obj_dict[frame_num][playerId]['PET'] = PET
- obj_dict[frame_num][playerId]['PSD'] = PSD
- obj_dict[frame_num][playerId]['LonSD'] = LonSD
- obj_dict[frame_num][playerId]['LatSD'] = LatSD
- obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
- # TTC要进行筛选,否则会出现nan或者TTC过大的情况
- if not TTC or TTC > 4000: # threshold = 4258.41
- collisionSeverity = 0
- pr_death = 0
- collisionRisk = 0
- else:
- result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
- collisionSeverity = 1 - result
- pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
- collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
- obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
- obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
- obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
- df_list.append(df_fnum)
- df_safe = pd.concat(df_list)
- col_list = ['simTime', 'simFrame', 'playerId',
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN',
- 'collisionSeverity', 'pr_death', 'collisionRisk']
- self.df_safe = df_safe[col_list].reset_index(drop=True)
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- return V_on_AB
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
- return angular_velocity
- def _death_pr(self, obj_type, v_relative):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
-
-
- def generate_metric_chart(self, metric_name: str) -> None:
- """生成指标图表
-
- Args:
- metric_name: 指标名称
- """
- try:
- # 确定输出目录
- if self.output_dir is None:
- self.output_dir = os.path.join(os.getcwd(), 'data')
- os.makedirs(self.output_dir, exist_ok=True)
-
- # 调用图表生成函数
- chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
- if chart_path:
- self.logger.info(f"{metric_name}图表已生成: {chart_path}")
- else:
- self.logger.warning(f"{metric_name}图表生成失败")
-
- except Exception as e:
- self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- if vrel_projection_in_dist == 0:
- return math.inf
- TTC = dist / vrel_projection_in_dist
- return TTC
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
- MTTC = math.nan
- if arel_projection_in_dist != 0:
- tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
- if tmp < 0:
- return math.nan
- t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
- t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
- if t1 > 0 and t2 > 0:
- if t1 >= t2:
- MTTC = t2
- elif t1 < t2:
- MTTC = t1
- elif t1 > 0 and t2 <= 0:
- MTTC = t1
- elif t1 <= 0 and t2 > 0:
- MTTC = t2
- if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
- MTTC = dist / vrel_projection_in_dist
- return MTTC
- # THW (time headway)
- def _cal_THW(self, dist, v_ego_projection_in_dist):
- if not v_ego_projection_in_dist:
- THW = None
- else:
- THW = dist / v_ego_projection_in_dist
- return THW
- # TLC (time to line crossing)
- def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
- TLC = laneOffset/ego_v/np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
- if TLC < 0:
- TLC = None
- return TLC
- def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
- if len(x_relative_start_dist):
- return None
- if (ego_decel_max == 0) or (relative_v == 0):
- return self.calculated_value["TTB"]
- else:
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
- TTB = (x_relative_start_dist0 + relative_v * relative_v/2/ego_decel_max)/relative_v
- return TTB
- def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
- if len(x_relative_start_dist):
- return None
- if (a2 == 0) or (v1 == 0):
- return self.calculated_value["TM"]
- if a1 == 0:
- return None
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
- TM = (x_relative_start_dist0 + v2**2/(2*a2) - v1**2/(2*a1)) / v1
- return TM
- def velocity(self, v_x, v_y):
- v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
- return v
- def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
- lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
- 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
- return lon_dist_min
- def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- a_left_lat_brake_min, a_lat_max):
- # 检查除数是否为零
- if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
- return self._default_value('LatSD') # 返回默认值
-
- v_right_rho = v_right + rho * a_lat_max
- v_left_rho = v_left + rho * a_lat_max
- dist_min = lat_dist + (
- (v_right + v_right_rho) * rho / 2
- + v_right_rho**2 / a_right_lat_brake_min / 2
- + ((v_left + v_right_rho) * rho / 2)
- + v_left_rho**2 / a_left_lat_brake_min / 2
- )
- return dist_min
- def _cal_DTC(self, v_on_dist, a_on_dist, t):
- if a_on_dist == 0:
- return None
- DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
- return DTC
- def _cal_PET(self, lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2):
- dist1 = self.horizontal_distance(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy)
- dist2 = self.horizontal_distance(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy)
- if ((dist1 <= lane_width/2) and (self._is_alone_the_road(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy))
- and (self._is_in_the_road(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy))
- and (dist2 <= lane_width/2) and (a1 != 0) and (a2 != 0)):
- dist_ego = np.sqrt((ego_posx - lane_posx1)**2 + (ego_posy-lane_posy1)**2)
- dist_obj = np.sqrt((obj_posx - ramp_posx2)**2 + (obj_posy-ramp_posy2)**2)
- PET = (-2*v2 + np.sqrt((4* v2**2)-8*a2*(v2*delta_t - dist_obj)))/ 2/ a2 - (2*v1 + np.sqrt((4* v1**2)-8*a1*dist_ego))/ 2/ a1 + delta_t
- return PET
- else:
- return None
- def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
- if v1 == 0:
- return None
- else:
- if len(x_relative_start_dist) > 0:
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
- PSD = x_relative_start_dist0 * 2 * ego_decel_lon_max / v1
- return PSD
- else:
- return None
- # DRAC (decelerate required avoid collision)
- def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
- if dist_length < 0:
- dist_width = dist - (width2 / 2 + width1 / 2)
- if dist_width < 0:
- return math.inf
- else:
- d = dist_width
- else:
- d = dist_length
- DRAC = vrel_projection_in_dist ** 2 / (2 * d)
- return DRAC
- # BTN (brake threat number)
- def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
- BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
- STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
- width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
- return STN
- # BTN (brake threat number)
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
- BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
- STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
- return STN
- # 追尾碰撞风险
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- # 统计最危险的指标
-
- def _safe_statistic_most_dangerous(self):
- min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PET', 'PSD']
- max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
- result = {}
- for metric in min_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- val = self.df[metric].min()
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
- else:
- result[metric] = self._default_value(metric)
- for metric in max_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- val = self.df[metric].max()
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
- else:
- result[metric] = self._default_value(metric)
- return result
- def _safe_no_obj_statistic(self):
- # 仅有自车时的默认值
- result = {metric: self._default_value(metric) for metric in self.metric_list}
- return result
- def _default_value(self, metric):
- # 统一默认值
- defaults = {
- 'TTC': 10.0,
- 'MTTC': 4.2,
- 'THW': 2.1,
- 'TLC': 10.0,
- 'TTB': 10.0,
- 'TM': 10.0,
- 'DTC': 10.0,
- 'PET': 10.0,
- 'PSD': 10.0,
- 'LonSD': 10.0,
- 'LatSD': 2.0,
- 'BTN': 1.0,
- 'collisionRisk': 0.0,
- 'collisionSeverity': 0.0
- }
- return defaults.get(metric, None)
- def report_statistic(self):
- if len(self.obj_id_list) == 1:
- safety_result = self._safe_no_obj_statistic()
- else:
- safety_result = self._safe_statistic_most_dangerous()
- evaluator = Score(self.data_processed.safety_config)
- result = evaluator.evaluate(safety_result)
- print("\n[安全性表现及得分情况]")
- return result
- def get_ttc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('TTC')
- ttc_values = self.df_safe['TTC'].dropna()
- ttc_value = float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
-
- # 收集TTC数据
- if not ttc_values.empty:
- self.ttc_data = []
- for time, frame, ttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTC']):
- if pd.notnull(ttc):
- self.ttc_data.append({'simTime': time, 'simFrame': frame, 'TTC': ttc})
-
-
- return ttc_value
- def get_mttc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('MTTC')
- mttc_values = self.df_safe['MTTC'].dropna()
- mttc_value = float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
-
- # 收集MTTC数据
- if not mttc_values.empty:
- self.mttc_data = []
- for time, frame, mttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['MTTC']):
- if pd.notnull(mttc):
- self.mttc_data.append({'simTime': time, 'simFrame': frame, 'MTTC': mttc})
-
- return mttc_value
- def get_thw_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('THW')
- thw_values = self.df_safe['THW'].dropna()
- thw_value = float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
-
- # 收集THW数据
- if not thw_values.empty:
- self.thw_data = []
- for time, frame, thw in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['THW']):
- if pd.notnull(thw):
- self.thw_data.append({'simTime': time, 'simFrame': frame, 'THW': thw})
-
- return thw_value
- def get_tlc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('TLC')
- tlc_values = self.df_safe['TLC'].dropna()
- tlc_value = float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
-
- # 收集TLC数据
- if not tlc_values.empty:
- self.tlc_data = []
- for time, frame, tlc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TLC']):
- if pd.notnull(tlc):
- self.tlc_data.append({'simTime': time, 'simFrame': frame, 'TLC': tlc})
-
- return tlc_value
- def get_ttb_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('TTB')
- ttb_values = self.df_safe['TTB'].dropna()
- ttb_value = float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
-
- # 收集TTB数据
- if not ttb_values.empty:
- self.ttb_data = []
- for time, frame, ttb in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTB']):
- if pd.notnull(ttb):
- self.ttb_data.append({'simTime': time, 'simFrame': frame, 'TTB': ttb})
-
- return ttb_value
- def get_tm_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('TM')
- tm_values = self.df_safe['TM'].dropna()
- tm_value = float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
-
- # 收集TM数据
- if not tm_values.empty:
- self.tm_data = []
- for time, frame, tm in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TM']):
- if pd.notnull(tm):
- self.tm_data.append({'simTime': time, 'simFrame': frame, 'TM': tm})
-
- return tm_value
- def get_dtc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('DTC')
- dtc_values = self.df_safe['DTC'].dropna()
- return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
- def get_pet_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('PET')
- pet_values = self.df_safe['PET'].dropna()
- return float(pet_values.min()) if not pet_values.empty else self._default_value('PET')
- def get_psd_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('PSD')
- psd_values = self.df_safe['PSD'].dropna()
- return float(psd_values.min()) if not psd_values.empty else self._default_value('PSD')
- def get_lonsd_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('LonSD')
- lonsd_values = self.df_safe['LonSD'].dropna()
- lonsd_value = float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
-
- # 收集LonSD数据
- if not lonsd_values.empty:
- self.lonsd_data = []
- for time, frame, lonsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LonSD']):
- if pd.notnull(lonsd):
- self.lonsd_data.append({'simTime': time, 'simFrame': frame, 'LonSD': lonsd})
-
- return lonsd_value
- def get_latsd_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('LatSD')
- latsd_values = self.df_safe['LatSD'].dropna()
- # 使用最小值而非平均值,与safety1.py保持一致
- latsd_value = float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD')
-
- # 收集LatSD数据
- if not latsd_values.empty:
- self.latsd_data = []
- for time, frame, latsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LatSD']):
- if pd.notnull(latsd):
- self.latsd_data.append({'simTime': time, 'simFrame': frame, 'LatSD': latsd})
- return latsd_value
- def get_btn_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('BTN')
- btn_values = self.df_safe['BTN'].dropna()
- btn_value = float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
-
- # 收集BTN数据
- if not btn_values.empty:
- self.btn_data = []
- for time, frame, btn in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['BTN']):
- if pd.notnull(btn):
- self.btn_data.append({'simTime': time, 'simFrame': frame, 'BTN': btn})
- return btn_value
- def get_collision_risk_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('collisionRisk')
- risk_values = self.df_safe['collisionRisk'].dropna()
- risk_value = float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
-
- # 收集碰撞风险数据
- if not risk_values.empty:
- self.collision_risk_data = []
- for time, frame, risk in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionRisk']):
- if pd.notnull(risk):
- self.collision_risk_data.append({'simTime': time, 'simFrame': frame, 'collisionRisk': risk})
- return risk_value
- def get_collision_severity_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('collisionSeverity')
- severity_values = self.df_safe['collisionSeverity'].dropna()
- severity_value = float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')
-
- # 收集碰撞严重性数据
- if not severity_values.empty:
- self.collision_severity_data = []
- for time, frame, severity in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionSeverity']):
- if pd.notnull(severity):
- self.collision_severity_data.append({'simTime': time, 'simFrame': frame, 'collisionSeverity': severity})
-
- return severity_value
|