|
@@ -1,7 +1,13 @@
|
|
|
#!/usr/bin/env python
|
|
|
# -*- coding: utf-8 -*-
|
|
|
"""
|
|
|
-安全指标计算模块
|
|
|
+安全指标计算模块(支持多目标物) - 场景感知版本
|
|
|
+优化要点:
|
|
|
+1. 重构代码结构,提高可读性
|
|
|
+2. 去除冗余代码,优化性能
|
|
|
+3. 保持原始计算逻辑不变
|
|
|
+4. 增加全面注释
|
|
|
+5. 优化数据处理流程
|
|
|
"""
|
|
|
|
|
|
import os
|
|
@@ -10,256 +16,137 @@ import pandas as pd
|
|
|
import math
|
|
|
import scipy.integrate as spi
|
|
|
from collections import defaultdict
|
|
|
-from typing import Dict, Any, List, Optional
|
|
|
+from typing import Dict, Any, List, Optional, Tuple
|
|
|
from pathlib import Path
|
|
|
-import ast
|
|
|
|
|
|
from modules.lib.score import Score
|
|
|
from modules.lib.log_manager import LogManager
|
|
|
from modules.lib.chart_generator import generate_safety_chart_data
|
|
|
|
|
|
-# 安全指标相关常量
|
|
|
-SAFETY_INFO = [
|
|
|
- "simTime",
|
|
|
- "simFrame",
|
|
|
- "playerId",
|
|
|
- "posX",
|
|
|
- "posY",
|
|
|
- "posH",
|
|
|
- "speedX",
|
|
|
- "speedY",
|
|
|
- "accelX",
|
|
|
- "accelY",
|
|
|
- "v",
|
|
|
- "type"
|
|
|
-]
|
|
|
-
|
|
|
-
|
|
|
-# ----------------------
|
|
|
-# 独立指标计算函数
|
|
|
-# ----------------------
|
|
|
+
|
|
|
+# ==================== 安全指标计算接口函数 ====================
|
|
|
+# 每个函数对应一个安全指标的计算入口
|
|
|
+# ------------------------------------------------------------
|
|
|
+
|
|
|
def calculate_ttc(data_processed, plot_path) -> dict:
|
|
|
- """计算TTC (Time To Collision)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"TTC": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- ttc_value = safety.get_ttc_value()
|
|
|
- # 只生成图表,数据导出由chart_generator处理
|
|
|
- if safety.ttc_data:
|
|
|
- safety.generate_metric_chart('TTC', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[TTC]计算结果: {ttc_value}")
|
|
|
- return {"TTC": ttc_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"TTC": None}
|
|
|
+ """计算碰撞时间(TTC)"""
|
|
|
+ return _calculate_metric('TTC', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_mttc(data_processed, plot_path) -> dict:
|
|
|
- """计算MTTC (Modified Time To Collision)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"MTTC": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- mttc_value = safety.get_mttc_value()
|
|
|
- if safety.mttc_data:
|
|
|
- safety.generate_metric_chart('MTTC', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[MTTC]计算结果: {mttc_value}")
|
|
|
- return {"MTTC": mttc_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"MTTC": None}
|
|
|
+ """计算修正碰撞时间(MTTC)"""
|
|
|
+ return _calculate_metric('MTTC', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_thw(data_processed, plot_path) -> dict:
|
|
|
- """计算THW (Time Headway)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"THW": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- thw_value = safety.get_thw_value()
|
|
|
- if safety.thw_data:
|
|
|
- safety.generate_metric_chart('THW', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[THW]计算结果: {thw_value}")
|
|
|
- return {"THW": thw_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"THW": None}
|
|
|
-
|
|
|
-
|
|
|
-def calculate_tlc(data_processed, plot_path) -> dict:
|
|
|
- """计算TLC (Time to Line Crossing)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"TLC": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- tlc_value = safety.get_tlc_value()
|
|
|
- if safety.tlc_data:
|
|
|
- safety.generate_metric_chart('TLC', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[TLC]计算结果: {tlc_value}")
|
|
|
- return {"TLC": tlc_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"TLC": None}
|
|
|
+ """计算车头时距(THW)"""
|
|
|
+ return _calculate_metric('THW', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_ttb(data_processed, plot_path) -> dict:
|
|
|
- """计算TTB (Time to Brake)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"TTB": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- ttb_value = safety.get_ttb_value()
|
|
|
- if safety.ttb_data:
|
|
|
- safety.generate_metric_chart('TTB', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[TTB]计算结果: {ttb_value}")
|
|
|
- return {"TTB": ttb_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"TTB": None}
|
|
|
+ """计算制动时间(TTB)"""
|
|
|
+ return _calculate_metric('TTB', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_tm(data_processed, plot_path) -> dict:
|
|
|
- """计算TM (Time Margin)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"TM": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- tm_value = safety.get_tm_value()
|
|
|
- if safety.tm_data:
|
|
|
- safety.generate_metric_chart('TM', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[TM]计算结果: {tm_value}")
|
|
|
- return {"TM": tm_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"TM": None}
|
|
|
+ """计算时间裕度(TM)"""
|
|
|
+ return _calculate_metric('TM', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_dtc(data_processed, plot_path) -> dict:
|
|
|
- """计算DTC (Distance to Collision)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"DTC": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- dtc_value = safety.get_dtc_value()
|
|
|
- LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}")
|
|
|
- return {"DTC": dtc_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"DTC": None}
|
|
|
+ """计算碰撞距离(DTC)"""
|
|
|
+ return _calculate_metric('DTC', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_psd(data_processed, plot_path) -> dict:
|
|
|
- """计算PSD (Potential Safety Distance)"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"PSD": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- psd_value = safety.get_psd_value()
|
|
|
- LogManager().get_logger().info(f"安全指标[PSD]计算结果: {psd_value}")
|
|
|
- return {"PSD": psd_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"PSD计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"PSD": None}
|
|
|
+ """计算预测安全距离比(PSD)"""
|
|
|
+ return _calculate_metric('PSD', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_collisionrisk(data_processed, plot_path) -> dict:
|
|
|
- """计算碰撞风险"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"collisionRisk": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- collision_risk_value = safety.get_collision_risk_value()
|
|
|
- if safety.collision_risk_data:
|
|
|
- safety.generate_metric_chart('collisionRisk', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[collisionRisk]计算结果: {collision_risk_value}")
|
|
|
- return {"collisionRisk": collision_risk_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"collisionRisk计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"collisionRisk": None}
|
|
|
+ """计算碰撞风险(collisionRisk)"""
|
|
|
+ return _calculate_metric('collisionRisk', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_lonsd(data_processed, plot_path) -> dict:
|
|
|
- """计算纵向安全距离"""
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- lonsd_value = safety.get_lonsd_value()
|
|
|
- if safety.lonsd_data:
|
|
|
- safety.generate_metric_chart('LonSD', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}")
|
|
|
- return {"LonSD": lonsd_value}
|
|
|
+ """计算纵向安全距离(LonSD)"""
|
|
|
+ return _calculate_metric('LonSD', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_latsd(data_processed, plot_path) -> dict:
|
|
|
- """计算横向安全距离"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"LatSD": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- latsd_value = safety.get_latsd_value()
|
|
|
- if safety.latsd_data:
|
|
|
- # 只生成图表,数据导出由chart_generator处理
|
|
|
- safety.generate_metric_chart('LatSD', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[LatSD]计算结果: {latsd_value}")
|
|
|
- return {"LatSD": latsd_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"LatSD计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"LatSD": None}
|
|
|
+ """计算横向安全距离(LatSD)"""
|
|
|
+ return _calculate_metric('LatSD', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_btn(data_processed, plot_path) -> dict:
|
|
|
- """计算制动威胁数"""
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"BTN": None}
|
|
|
- try:
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
- btn_value = safety.get_btn_value()
|
|
|
- if safety.btn_data:
|
|
|
- # 只生成图表,数据导出由chart_generator处理
|
|
|
- safety.generate_metric_chart('BTN', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[BTN]计算结果: {btn_value}")
|
|
|
- return {"BTN": btn_value}
|
|
|
- except Exception as e:
|
|
|
- LogManager().get_logger().error(f"BTN计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"BTN": None}
|
|
|
+ """计算制动威胁数(BTN)"""
|
|
|
+ return _calculate_metric('BTN', data_processed, plot_path)
|
|
|
|
|
|
|
|
|
def calculate_collisionseverity(data_processed, plot_path) -> dict:
|
|
|
- """计算碰撞严重性"""
|
|
|
+ """计算碰撞严重性(collisionSeverity)"""
|
|
|
+ return _calculate_metric('collisionSeverity', data_processed, plot_path)
|
|
|
+
|
|
|
+
|
|
|
+def _calculate_metric(metric_name, data_processed, plot_path) -> dict:
|
|
|
+ """安全指标计算的通用处理函数"""
|
|
|
+ key_name = metric_name
|
|
|
+ result_key = {metric_name: None}
|
|
|
+
|
|
|
+ # 检查输入数据有效性
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
- return {"collisionSeverity": None}
|
|
|
+ return result_key
|
|
|
+
|
|
|
try:
|
|
|
safety = SafetyCalculator(data_processed)
|
|
|
- collision_severity_value = safety.get_collision_severity_value()
|
|
|
- if safety.collision_severity_data:
|
|
|
- # 只生成图表,数据导出由chart_generator处理
|
|
|
- safety.generate_metric_chart('collisionSeverity', plot_path)
|
|
|
- LogManager().get_logger().info(f"安全指标[collisionSeverity]计算结果: {collision_severity_value}")
|
|
|
- return {"collisionSeverity": collision_severity_value}
|
|
|
+
|
|
|
+ # 特殊处理 collisionRisk 和 collisionSeverity 指标
|
|
|
+ if metric_name.lower() == 'collisionrisk':
|
|
|
+ metric_value = safety.get_collision_risk_value()
|
|
|
+ elif metric_name.lower() == 'collisionseverity':
|
|
|
+ metric_value = safety.get_collision_severity_value()
|
|
|
+ else:
|
|
|
+ # 其他指标使用原有逻辑
|
|
|
+ metric_value = getattr(safety, f'get_{metric_name.lower()}_value')()
|
|
|
+
|
|
|
+ # 生成图表数据
|
|
|
+ metric_data = getattr(safety, f'{metric_name.lower()}_data', None)
|
|
|
+ if metric_data:
|
|
|
+ safety.generate_metric_chart(metric_name, plot_path)
|
|
|
+
|
|
|
+ LogManager().get_logger().info(f"安全指标[{metric_name}]计算结果: {metric_value}")
|
|
|
+ return {metric_name: metric_value}
|
|
|
except Exception as e:
|
|
|
- LogManager().get_logger().error(f"collisionSeverity计算异常: {str(e)}", exc_info=True)
|
|
|
- return {"collisionSeverity": None}
|
|
|
+ LogManager().get_logger().error(f"{metric_name}计算异常: {str(e)}", exc_info=True)
|
|
|
+ return result_key
|
|
|
+
|
|
|
|
|
|
+# ==================== 安全指标注册与批处理类 ====================
|
|
|
+# 用于根据配置批量执行安全指标计算
|
|
|
+# ------------------------------------------------------------
|
|
|
|
|
|
class SafetyRegistry:
|
|
|
- """安全指标注册器"""
|
|
|
+ """安全指标注册器,根据配置动态注册和执行指标计算"""
|
|
|
|
|
|
def __init__(self, data_processed, plot_path):
|
|
|
self.logger = LogManager().get_logger()
|
|
|
self.data = data_processed
|
|
|
self.plot_path = plot_path
|
|
|
- # 检查safety_config是否为空
|
|
|
+
|
|
|
+ # 检查安全配置
|
|
|
if not hasattr(data_processed, 'safety_config') or not data_processed.safety_config:
|
|
|
self.logger.warning("安全配置为空,跳过安全指标计算")
|
|
|
self.safety_config = {}
|
|
|
self.metrics = []
|
|
|
self._registry = {}
|
|
|
return
|
|
|
+
|
|
|
self.safety_config = data_processed.safety_config.get("safety", {})
|
|
|
self.metrics = self._extract_metrics(self.safety_config)
|
|
|
self._registry = self._build_registry()
|
|
|
|
|
|
def _extract_metrics(self, config_node: dict) -> list:
|
|
|
- """从配置中提取指标名称"""
|
|
|
+ """从配置中递归提取所有指标名称"""
|
|
|
metrics = []
|
|
|
|
|
|
def _recurse(node):
|
|
@@ -270,11 +157,11 @@ class SafetyRegistry:
|
|
|
_recurse(v)
|
|
|
|
|
|
_recurse(config_node)
|
|
|
- self.logger.info(f'评比的安全指标列表:{metrics}')
|
|
|
+ self.logger.info(f'评比的安全指标列表: {metrics}')
|
|
|
return metrics
|
|
|
|
|
|
def _build_registry(self) -> dict:
|
|
|
- """构建指标函数注册表"""
|
|
|
+ """构建指标名称到计算函数的映射"""
|
|
|
registry = {}
|
|
|
for metric_name in self.metrics:
|
|
|
func_name = f"calculate_{metric_name.lower()}"
|
|
@@ -285,9 +172,8 @@ class SafetyRegistry:
|
|
|
return registry
|
|
|
|
|
|
def batch_execute(self) -> dict:
|
|
|
- """批量执行指标计算"""
|
|
|
+ """批量执行所有注册的安全指标计算"""
|
|
|
results = {}
|
|
|
- # 如果配置为空或没有注册的指标,直接返回空结果
|
|
|
if not hasattr(self, 'safety_config') or not self.safety_config or not self._registry:
|
|
|
self.logger.info("安全配置为空或无注册指标,返回空结果")
|
|
|
return results
|
|
@@ -299,18 +185,19 @@ class SafetyRegistry:
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
|
|
|
results[name] = None
|
|
|
- self.logger.info(f'安全指标计算结果:{results}')
|
|
|
+ self.logger.info(f'安全指标计算结果: {results}')
|
|
|
return results
|
|
|
|
|
|
|
|
|
class SafeManager:
|
|
|
- """安全指标管理类"""
|
|
|
+ """安全管理器,封装安全指标计算流程"""
|
|
|
|
|
|
def __init__(self, data_processed, plot_path):
|
|
|
self.data = data_processed
|
|
|
self.logger = LogManager().get_logger()
|
|
|
self.plot_path = plot_path
|
|
|
- # 检查safety_config是否为空
|
|
|
+
|
|
|
+ # 初始化安全指标注册器
|
|
|
if not hasattr(data_processed, 'safety_config') or not data_processed.safety_config:
|
|
|
self.logger.warning("安全配置为空,跳过安全指标计算初始化")
|
|
|
self.registry = None
|
|
@@ -318,845 +205,636 @@ class SafeManager:
|
|
|
self.registry = SafetyRegistry(self.data, self.plot_path)
|
|
|
|
|
|
def report_statistic(self):
|
|
|
- """计算并报告安全指标结果"""
|
|
|
- # 如果registry为None,直接返回空字典
|
|
|
+ """生成安全指标统计报告"""
|
|
|
if self.registry is None:
|
|
|
self.logger.info("安全指标管理器未初始化,返回空结果")
|
|
|
return {}
|
|
|
+ return self.registry.batch_execute()
|
|
|
|
|
|
- safety_result = self.registry.batch_execute()
|
|
|
- return safety_result
|
|
|
|
|
|
+# ==================== 安全指标计算核心类 ====================
|
|
|
+# 包含所有安全指标的具体计算实现
|
|
|
+# ------------------------------------------------------------
|
|
|
|
|
|
class SafetyCalculator:
|
|
|
- """安全指标计算类 - 兼容Safe类风格"""
|
|
|
+ """安全指标计算器,实现各种安全指标的具体计算"""
|
|
|
+
|
|
|
+ DEFAULT_VALUES = {
|
|
|
+ 'TTC': 10.0, 'MTTC': 3.3, 'THW': 2.5,
|
|
|
+ 'TTB': 10.0, 'TM': 10.0, 'DTC': 10.0, 'PSD': 10.0,
|
|
|
+ 'LonSD': 100.0, 'LatSD': 2.0, 'BTN': 0.0,
|
|
|
+ 'collisionRisk': 0.0, 'collisionSeverity': 0.0
|
|
|
+ }
|
|
|
|
|
|
def __init__(self, data_processed):
|
|
|
self.logger = LogManager().get_logger()
|
|
|
self.data_processed = data_processed
|
|
|
-
|
|
|
self.df = data_processed.object_df.copy()
|
|
|
- self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据
|
|
|
+ self.ego_df = data_processed.ego_data.copy()
|
|
|
self.obj_id_list = data_processed.obj_id_list
|
|
|
- self.obj_df = self.df[self.df['playerId'] == 2].copy().reset_index(drop=True) if len(
|
|
|
- self.obj_id_list) > 1 else pd.DataFrame(columns=self.ego_df.columns) # 使用copy()避免修改原始数据
|
|
|
- self.metric_list = [
|
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk',
|
|
|
- 'collisionSeverity'
|
|
|
- ]
|
|
|
|
|
|
- # 初始化默认值
|
|
|
- self.calculated_value = {
|
|
|
- "TTC": 10.0,
|
|
|
- "MTTC": 10.0,
|
|
|
- "THW": 10.0,
|
|
|
- "TLC": 10.0,
|
|
|
- "TTB": 10.0,
|
|
|
- "TM": 10.0,
|
|
|
- # "MPrTTC": 10.0,
|
|
|
- "DTC": 10.0,
|
|
|
- "PSD": 10.0,
|
|
|
- "LatSD": 3.0,
|
|
|
- "BTN": 1.0,
|
|
|
- "collisionRisk": 0.0,
|
|
|
- "collisionSeverity": 0.0,
|
|
|
- }
|
|
|
+ # 初始化配置参数
|
|
|
+ self._init_config_params()
|
|
|
|
|
|
+ # 初始化数据结构和指标存储
|
|
|
+ self._init_data_structures()
|
|
|
+
|
|
|
+ # 执行安全参数计算
|
|
|
+ if self.obj_id_list:
|
|
|
+ self.logger.info("开始执行安全参数计算,目标物数量: %d", len(self.obj_id_list) - 1)
|
|
|
+ self._calculate_safety_parameters()
|
|
|
+ self.logger.info("安全参数计算完成")
|
|
|
+ else:
|
|
|
+ self.logger.info("没有目标物,跳过安全参数计算")
|
|
|
+ self.empty_flag = True
|
|
|
+
|
|
|
+ def _init_config_params(self):
|
|
|
+ """从配置中初始化车辆参数"""
|
|
|
+ config = self.data_processed.vehicle_config
|
|
|
+ self.rho = config.get("RHO", 1.5) # 默认反应时间1.5秒
|
|
|
+ self.ego_accel_max = config.get("EGO_ACCEL_MAX", 3.0) # 默认最大加速度3.0 m/s²
|
|
|
+ self.obj_decel_max = config.get("OBJ_DECEL_MAX", 3.0) # 默认目标最大减速度3.0 m/s²
|
|
|
+ self.ego_decel_min = config.get("EGO_DECEL_MIN", 4.0) # 默认自车最小减速度4.0 m/s²
|
|
|
+ self.ego_decel_lon_max = config.get("EGO_DECEL_LON_MAX", 4.0) # 默认纵向最大减速度4.0 m/s²
|
|
|
+ self.ego_decel_lat_max = config.get("EGO_DECEL_LAT_MAX", 3.0) # 默认横向最大减速度3.0 m/s²
|
|
|
+ self.ego_width = config.get("CAR_WIDTH", 2.0) # 默认车辆宽度2.0米
|
|
|
+ self.ego_length = config.get("CAR_LENGTH", 4.5) # 默认车辆长度4.5米
|
|
|
+ self.vehicle_length = config.get("VEHICLE_LENGTH", 4.5) # 默认车辆长度4.5米
|
|
|
+ self.vehicle_width = config.get("VEHICLE_WIDTH", 2.0) # 默认车辆宽度2.0米
|
|
|
+ self.pedestrian_length = config.get("PEDESTRIAN_LENGTH", 0.5) # 默认行人长度0.5米
|
|
|
+ self.pedestrian_width = config.get("PEDESTRIAN_WIDTH", 0.5) # 默认行人宽度0.5米
|
|
|
+
|
|
|
+ # 计算派生参数
|
|
|
+ self.max_deceleration = self.ego_decel_lon_max
|
|
|
+ self.ego_decel_max = np.sqrt(self.ego_decel_lon_max ** 2 + self.ego_decel_lat_max ** 2)
|
|
|
+ self.ped_width = self.pedestrian_width
|
|
|
+ self.ped_length = self.pedestrian_length
|
|
|
+
|
|
|
+ def _init_data_structures(self):
|
|
|
+ """初始化数据存储结构"""
|
|
|
self.time_list = self.ego_df['simTime'].values.tolist()
|
|
|
self.frame_list = self.ego_df['simFrame'].values.tolist()
|
|
|
- self.collisionRisk = 0
|
|
|
- self.empty_flag = True
|
|
|
-
|
|
|
- # 初始化数据存储列表
|
|
|
- self.ttc_data = []
|
|
|
- self.mttc_data = []
|
|
|
- self.thw_data = []
|
|
|
- self.tlc_data = []
|
|
|
- self.ttb_data = []
|
|
|
- self.tm_data = []
|
|
|
- self.lonsd_data = []
|
|
|
- self.latsd_data = []
|
|
|
- self.btn_data = []
|
|
|
- self.collision_risk_data = []
|
|
|
- self.collision_severity_data = []
|
|
|
-
|
|
|
- # 初始化安全事件记录表
|
|
|
- self.unsafe_events_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
-
|
|
|
- # 设置输出目录
|
|
|
+ self.empty_flag = True # 标记是否有有效目标
|
|
|
+
|
|
|
+ # 初始化指标数据存储
|
|
|
+ self._init_metric_storage()
|
|
|
+
|
|
|
+ # 创建输出目录
|
|
|
self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
|
|
|
- self.logger.info("SafetyCalculator初始化完成,场景中包含自车的目标物一共为: %d", len(self.obj_id_list))
|
|
|
-
|
|
|
- if len(self.obj_id_list) > 1:
|
|
|
- self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.unsafe_acce_drac_df = pd.DataFrame(
|
|
|
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.unsafe_acce_xtn_df = pd.DataFrame(
|
|
|
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
- self.most_dangerous = {}
|
|
|
- self.pass_percent = {}
|
|
|
- self.logger.info("开始执行安全参数计算 _safe_param_cal_new")
|
|
|
- self._safe_param_cal_new()
|
|
|
- self.logger.info("安全参数计算完成")
|
|
|
+ def _init_metric_storage(self):
|
|
|
+ """初始化所有指标的存储列表"""
|
|
|
+ metrics = [
|
|
|
+ 'ttc', 'mttc', 'thw', 'ttb', 'tm', 'lonsd', 'latsd', 'btn',
|
|
|
+ 'collision_risk', 'collision_severity'
|
|
|
+ ]
|
|
|
+ for metric in metrics:
|
|
|
+ setattr(self, f'{metric}_data', [])
|
|
|
|
|
|
- def _safe_param_cal_new(self):
|
|
|
- self.logger.debug("进入 _safe_param_cal_new 方法")
|
|
|
- # 直接复用Safe类的实现
|
|
|
- Tc = 0.3 # 安全距离
|
|
|
-
|
|
|
- rho = self.data_processed.vehicle_config["RHO"]
|
|
|
- ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"]
|
|
|
- obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"]
|
|
|
- ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
|
|
|
- ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
|
|
|
- ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
|
- ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
|
- # TEMP_COMMENT: x_relative_start_dist 注释开始
|
|
|
- x_relative_start_dist = abs(self.ego_df["posY"] - self.df['posY'])
|
|
|
- # x_relative_start_dist = self.ego_df['x_relative_dist']
|
|
|
-
|
|
|
- # 设置安全指标阈值
|
|
|
- self.safety_thresholds = {
|
|
|
- 'TTC': {'min': 1.5, 'max': None}, # TTC小于1.5秒视为危险
|
|
|
- 'MTTC': {'min': 1.5, 'max': None}, # MTTC小于1.5秒视为危险
|
|
|
- 'THW': {'min': 1.0, 'max': None}, # THW小于1.0秒视为危险
|
|
|
- 'LonSD': {'min': None, 'max': None}, # 根据实际情况设置
|
|
|
- 'LatSD': {'min': 0.5, 'max': None}, # LatSD小于0.5米视为危险
|
|
|
- 'BTN': {'min': None, 'max': 0.8}, # BTN大于0.8视为危险
|
|
|
- 'collisionRisk': {'min': None, 'max': 30}, # 碰撞风险大于30%视为危险
|
|
|
- 'collisionSeverity': {'min': None, 'max': 30} # 碰撞严重性大于30%视为危险
|
|
|
- }
|
|
|
+ def _calculate_safety_parameters(self):
|
|
|
+ """核心安全参数计算方法"""
|
|
|
+ # 预处理:构建按帧组织的目标字典
|
|
|
+ obj_dict = self._preprocess_object_data()
|
|
|
+ df_list = []
|
|
|
+ EGO_PLAYER_ID = 1
|
|
|
+ found_valid_target = False
|
|
|
+
|
|
|
+ # 逐帧处理数据
|
|
|
+ for frame_num in self.frame_list:
|
|
|
+ try:
|
|
|
+ ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
|
|
|
+ except KeyError:
|
|
|
+ continue
|
|
|
|
|
|
+ frame_targets = self._process_frame_targets(frame_num, ego_data, obj_dict, EGO_PLAYER_ID)
|
|
|
+
|
|
|
+ if frame_targets:
|
|
|
+ found_valid_target = True
|
|
|
+ df_fnum = pd.DataFrame(frame_targets)
|
|
|
+ df_list.append(df_fnum)
|
|
|
+
|
|
|
+ # 合并处理结果
|
|
|
+ self._postprocess_results(df_list, found_valid_target)
|
|
|
+
|
|
|
+ def _preprocess_object_data(self):
|
|
|
+ """预处理目标数据为按帧组织的字典"""
|
|
|
obj_dict = defaultdict(dict)
|
|
|
obj_data_dict = self.df.to_dict('records')
|
|
|
for item in obj_data_dict:
|
|
|
obj_dict[item['simFrame']][item['playerId']] = item
|
|
|
+ return obj_dict
|
|
|
|
|
|
- df_list = []
|
|
|
- EGO_PLAYER_ID = 1
|
|
|
+ def _process_frame_targets(self, frame_num, ego_data, obj_dict, ego_id):
|
|
|
+ """处理单帧内的所有目标物"""
|
|
|
+ frame_targets = []
|
|
|
+ vx_ego = ego_data.get('lon_v_vehicle', 0)
|
|
|
+ vy_ego = ego_data.get('lat_v_vehicle', 0)
|
|
|
+ self.lane_width = ego_data.get('lane_width', 3.75)
|
|
|
|
|
|
- for frame_num in self.frame_list:
|
|
|
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
|
|
|
- v1 = ego_data['v']
|
|
|
- x1 = ego_data['posX']
|
|
|
- y1 = ego_data['posY']
|
|
|
- h1 = ego_data['posH']
|
|
|
- laneOffset = ego_data["laneOffset"]
|
|
|
-
|
|
|
- v_x1 = ego_data['speedX']
|
|
|
- v_y1 = ego_data['speedY']
|
|
|
- a_x1 = ego_data['accelX']
|
|
|
- a_y1 = ego_data['accelY']
|
|
|
- a1 = np.sqrt(a_x1 ** 2 + a_y1 ** 2)
|
|
|
-
|
|
|
- for playerId in self.obj_id_list:
|
|
|
- if playerId == EGO_PLAYER_ID:
|
|
|
- continue
|
|
|
- try:
|
|
|
- obj_data = obj_dict[frame_num][playerId]
|
|
|
- except KeyError:
|
|
|
- continue
|
|
|
-
|
|
|
- x2 = obj_data['posX']
|
|
|
- y2 = obj_data['posY']
|
|
|
- dist = self.dist(x1, y1, x2, y2)
|
|
|
- obj_data['dist'] = dist
|
|
|
-
|
|
|
- v_x2 = obj_data['speedX']
|
|
|
- v_y2 = obj_data['speedY']
|
|
|
- v2 = obj_data['v']
|
|
|
- a_x2 = obj_data['accelX']
|
|
|
- a_y2 = obj_data['accelY']
|
|
|
- a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
|
|
|
-
|
|
|
- dx, dy = x2 - x1, y2 - y1
|
|
|
-
|
|
|
- # 计算目标车相对于自车的方位角
|
|
|
- # beta = math.atan2(dy, dx)
|
|
|
-
|
|
|
- # 将全局坐标系下的相对位置向量转换到自车坐标系
|
|
|
- # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
|
|
|
- '''
|
|
|
- 你现在的 posH 是 正北为 0°,顺时针为正角度。
|
|
|
- 但是我们在做二维旋转时,Python/数学里默认是:
|
|
|
-
|
|
|
- 0° 是 X 轴(正东)
|
|
|
-
|
|
|
- 顺时针是负角,逆时针是正角
|
|
|
-
|
|
|
- 所以,要将你的 posH(正北为 0)转换成数学中以正东为 0° 的角度'''
|
|
|
- h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
|
|
|
-
|
|
|
- # 坐标变换
|
|
|
- lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
|
|
|
- lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值)
|
|
|
-
|
|
|
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
|
|
|
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
|
|
|
-
|
|
|
- # if lon_d > 100 or lon_d < -5 or lat_d > 4:
|
|
|
- # continue
|
|
|
-
|
|
|
- self.empty_flag = False
|
|
|
-
|
|
|
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
|
|
|
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
|
|
|
- relative_v = np.sqrt(vx ** 2 + vy ** 2)
|
|
|
-
|
|
|
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
|
|
|
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
|
|
|
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
|
|
|
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
|
|
|
- v_x2, v_y2)
|
|
|
-
|
|
|
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
|
|
|
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
|
|
|
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
|
|
|
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
|
|
|
-
|
|
|
- obj_type = obj_data['type']
|
|
|
-
|
|
|
- TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
|
|
|
- MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
|
|
|
- THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
|
|
|
- TLC = self._cal_TLC(v1, h1, laneOffset)
|
|
|
- TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
|
|
|
- TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
|
- DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, rho)
|
|
|
- PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
|
|
|
-
|
|
|
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
|
|
|
- obj_decel_max)
|
|
|
-
|
|
|
- lat_dist = 0.5
|
|
|
- v_right = v1
|
|
|
- v_left = v2
|
|
|
- a_right_lat_brake_min = 1
|
|
|
- a_left_lat_brake_min = 1
|
|
|
- a_lat_max = 5
|
|
|
-
|
|
|
- LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
|
|
|
- a_left_lat_brake_min, a_lat_max)
|
|
|
-
|
|
|
- # 使用自车坐标系下的纵向加速度
|
|
|
- lon_a1 = a_x1 * math.cos(h1_rad) + a_y1 * math.sin(h1_rad)
|
|
|
- lon_a2 = a_x2 * math.cos(h1_rad) + a_y2 * math.sin(h1_rad)
|
|
|
- lon_a = abs(lon_a1 - lon_a2)
|
|
|
- lon_d = max(0.1, lon_d) # 确保纵向距离为正
|
|
|
- lon_v = v_x1 * math.cos(h1_rad) + v_y1 * math.sin(h1_rad)
|
|
|
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
|
|
|
-
|
|
|
- # 使用自车坐标系下的横向加速度
|
|
|
- lat_a1 = -a_x1 * math.sin(h1_rad) + a_y1 * math.cos(h1_rad)
|
|
|
- lat_a2 = -a_x2 * math.sin(h1_rad) + a_y2 * math.cos(h1_rad)
|
|
|
- lat_a = abs(lat_a1 - lat_a2)
|
|
|
- lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad)
|
|
|
-
|
|
|
- obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (
|
|
|
- -v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad))
|
|
|
- obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad))
|
|
|
-
|
|
|
- TTC = None if (TTC is None or TTC < 0) else TTC
|
|
|
- MTTC = None if (MTTC is None or MTTC < 0) else MTTC
|
|
|
- THW = None if (THW is None or THW < 0) else THW
|
|
|
- TLC = None if (TLC is None or TLC < 0) else TLC
|
|
|
- TTB = None if (TTB is None or TTB < 0) else TTB
|
|
|
- TM = None if (TM is None or TM < 0) else TM
|
|
|
- DTC = None if (DTC is None or DTC < 0) else DTC
|
|
|
- PSD = None if (PSD is None or PSD < 0) else PSD
|
|
|
-
|
|
|
- obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
|
- obj_dict[frame_num][playerId]['MTTC'] = MTTC
|
|
|
- obj_dict[frame_num][playerId]['THW'] = THW
|
|
|
- obj_dict[frame_num][playerId]['TLC'] = TLC
|
|
|
- obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
|
- obj_dict[frame_num][playerId]['TM'] = TM
|
|
|
- obj_dict[frame_num][playerId]['DTC'] = DTC
|
|
|
- obj_dict[frame_num][playerId]['PSD'] = PSD
|
|
|
- obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
|
- obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
|
- obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
|
-
|
|
|
- # TTC要进行筛选,否则会出现nan或者TTC过大的情况
|
|
|
- if not TTC or TTC > 4000: # threshold = 4258.41
|
|
|
- collisionSeverity = 0
|
|
|
- pr_death = 0
|
|
|
- collisionRisk = 0
|
|
|
- else:
|
|
|
- result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
|
|
|
- collisionSeverity = 1 - result
|
|
|
- pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
|
|
|
- collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
|
|
|
-
|
|
|
- obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
|
|
|
- obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
|
|
|
- obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
|
|
|
-
|
|
|
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
|
|
|
- df_list.append(df_fnum)
|
|
|
-
|
|
|
- df_safe = pd.concat(df_list)
|
|
|
- col_list = ['simTime', 'simFrame', 'playerId',
|
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN',
|
|
|
- 'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
|
- self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
|
-
|
|
|
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
|
|
|
- # 计算 AB 连线的向量 AB
|
|
|
- # dx = x2 - x1
|
|
|
- # dy = y2 - y1
|
|
|
-
|
|
|
- # 计算 AB 连线的模长 |AB|
|
|
|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
|
|
|
-
|
|
|
- # 计算 AB 连线的单位向量 U_AB
|
|
|
- U_ABx = dx / AB_mod
|
|
|
- U_ABy = dy / AB_mod
|
|
|
-
|
|
|
- # 计算 A 在 AB 连线上的速度 V1_on_AB
|
|
|
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
|
|
|
-
|
|
|
- return V1_on_AB
|
|
|
-
|
|
|
- def _cal_v_projection(self, dx, dy, vx, vy):
|
|
|
- # 计算 AB 连线的向量 AB
|
|
|
- # dx = x2 - x1
|
|
|
- # dy = y2 - y1
|
|
|
-
|
|
|
- # 计算 AB 连线的模长 |AB|
|
|
|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
|
|
|
-
|
|
|
- # 计算 AB 连线的单位向量 U_AB
|
|
|
- U_ABx = dx / AB_mod
|
|
|
- U_ABy = dy / AB_mod
|
|
|
-
|
|
|
- # 计算 A 相对于 B 的速度 V_relative
|
|
|
- # vx = vx1 - vx2
|
|
|
- # vy = vy1 - vy2
|
|
|
-
|
|
|
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
|
|
|
- V_on_AB = vx * U_ABx + vy * U_ABy
|
|
|
-
|
|
|
- return V_on_AB
|
|
|
-
|
|
|
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
|
|
|
- # 计算 AB 连线的向量 AB
|
|
|
- # dx = x2 - x1
|
|
|
- # dy = y2 - y1
|
|
|
-
|
|
|
- # 计算 θ
|
|
|
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
|
|
|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
|
|
|
- if V_mod == 0 or AB_mod == 0:
|
|
|
- return 0
|
|
|
-
|
|
|
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
|
|
|
- theta = math.acos(cos_theta)
|
|
|
-
|
|
|
- # 计算 AB 连线的模长 |AB|
|
|
|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
|
|
|
-
|
|
|
- # 计算 AB 连线的单位向量 U_AB
|
|
|
- U_ABx = dx / AB_mod
|
|
|
- U_ABy = dy / AB_mod
|
|
|
-
|
|
|
- # 计算 A 相对于 B 的加速度 a_relative
|
|
|
- # ax = ax1 - ax2
|
|
|
- # ay = ay1 - ay2
|
|
|
-
|
|
|
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
|
|
|
- a_on_AB = ax * U_ABx + ay * U_ABy
|
|
|
-
|
|
|
- VA = np.array([v_x1, v_y1])
|
|
|
- VB = np.array([v_x2, v_y2])
|
|
|
- D_A = np.array([x1, y1])
|
|
|
- D_B = np.array([x2, y2])
|
|
|
- V_r = VA - VB
|
|
|
- V = np.linalg.norm(V_r)
|
|
|
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
|
|
|
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
|
|
|
- return a_on_AB_back
|
|
|
-
|
|
|
- # 计算相对加速度
|
|
|
- def _calculate_derivative(self, a, w, V, theta):
|
|
|
- # 计算(V×cos(θ))'的值
|
|
|
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
|
|
|
- derivative = a - w * V * math.sin(theta)
|
|
|
- return derivative
|
|
|
-
|
|
|
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
|
|
|
- dx = A[0] - B[0]
|
|
|
- dy = A[1] - B[1]
|
|
|
- dvx = VA[0] - VB[0]
|
|
|
- dvy = VA[1] - VB[1]
|
|
|
- # (dx * dvy - dy * dvx)
|
|
|
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
|
|
|
- return angular_velocity
|
|
|
+ for player_id in self.obj_id_list:
|
|
|
+ if player_id == ego_id:
|
|
|
+ continue
|
|
|
|
|
|
- def _death_pr(self, obj_type, v_relative):
|
|
|
- if obj_type == 5:
|
|
|
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
|
|
|
+ try:
|
|
|
+ obj_data = obj_dict[frame_num][player_id]
|
|
|
+ except KeyError:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 计算目标物安全指标
|
|
|
+ result = self._calculate_target_metrics(ego_data, obj_data, vx_ego, vy_ego)
|
|
|
+ obj_data.update(result)
|
|
|
+
|
|
|
+ frame_targets.append(obj_data)
|
|
|
+
|
|
|
+ return frame_targets
|
|
|
+
|
|
|
+ def _calculate_target_metrics(self, ego_data, obj_data, vx_ego, vy_ego):
|
|
|
+ """计算单个目标物的安全指标"""
|
|
|
+ # 提取基本参数
|
|
|
+ obj_type = obj_data.get('type', 0)
|
|
|
+ lon_d = obj_data.get('x_relative_start_dist', 0)
|
|
|
+ lat_d = obj_data.get('y_relative_start_dist', 0)
|
|
|
+ vx_obj = obj_data.get('lon_v_vehicle', 0)
|
|
|
+ vy_obj = obj_data.get('lat_v_vehicle', 0)
|
|
|
+
|
|
|
+ # 计算相对运动参数
|
|
|
+ vx_rel = vx_obj - vx_ego
|
|
|
+ vy_rel = vy_obj - vy_ego
|
|
|
+ dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
|
|
|
+
|
|
|
+ # 计算航向角差异
|
|
|
+ h1 = ego_data['posH']
|
|
|
+ h2 = obj_data.get('posH', h1)
|
|
|
+ heading_diff = abs(h1 - h2)
|
|
|
+ if heading_diff > 180:
|
|
|
+ heading_diff = 360 - heading_diff
|
|
|
+
|
|
|
+ # 计算相对加速度
|
|
|
+ ax_ego = ego_data.get('lon_acc_vehicle', 0)
|
|
|
+ ay_ego = ego_data.get('lat_acc_vehicle', 0)
|
|
|
+ ax_obj = obj_data.get('lon_acc_vehicle', 0)
|
|
|
+ ay_obj = obj_data.get('lat_acc_vehicle', 0)
|
|
|
+ ax_rel_ego = ax_obj - ax_ego
|
|
|
+ ay_rel_ego = ay_obj - ay_ego
|
|
|
+
|
|
|
+ # 计算投影
|
|
|
+ vrel_projection = self._cal_v_projection_using_relative(lon_d, lat_d, vx_rel, vy_rel)
|
|
|
+ accl_projection = self._cal_a_projection_using_relative(lon_d, lat_d, ax_rel_ego, ay_rel_ego)
|
|
|
+
|
|
|
+ # 检查目标相关性
|
|
|
+ is_relevant = self._is_relevant_target(lat_d, lon_d, vy_rel, heading_diff, obj_type)
|
|
|
+
|
|
|
+ # 检查过滤条件
|
|
|
+ passed_conflict = self._has_passed_conflict_point(lon_d, lat_d, obj_type, heading_diff)
|
|
|
+ moving_away = self._is_moving_away(
|
|
|
+ (vx_ego, vy_ego), (vx_obj, vy_obj), lon_d, lat_d, obj_type, heading_diff
|
|
|
+ )
|
|
|
+ skip_risk_calculation = passed_conflict and (moving_away or obj_data.get('v', 0) < 0.05)
|
|
|
+
|
|
|
+ # 计算指标
|
|
|
+ if not is_relevant or skip_risk_calculation:
|
|
|
+ return self._get_default_metrics(obj_type)
|
|
|
else:
|
|
|
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
|
|
|
- return p_death
|
|
|
+ return self._calculate_relevant_metrics(
|
|
|
+ dist, vrel_projection, accl_projection,
|
|
|
+ lon_d, lat_d, vx_ego, vy_ego, vx_rel, ax_rel_ego,
|
|
|
+ heading_diff, obj_type, ego_data, obj_data
|
|
|
+ )
|
|
|
+
|
|
|
+ def _get_default_metrics(self, obj_type):
|
|
|
+ """获取默认指标值"""
|
|
|
+ return {
|
|
|
+ 'TTC': self.DEFAULT_VALUES['TTC'],
|
|
|
+ 'MTTC': self.DEFAULT_VALUES['MTTC'],
|
|
|
+ 'THW': self.DEFAULT_VALUES['THW'] if obj_type != 5 else None,
|
|
|
+ 'TTB': self.DEFAULT_VALUES['TTB'] if obj_type != 5 else None,
|
|
|
+ 'TM': self.DEFAULT_VALUES['TM'] if obj_type != 5 else None,
|
|
|
+ 'PSD': self.DEFAULT_VALUES['PSD'],
|
|
|
+ 'DTC': self.DEFAULT_VALUES['DTC'],
|
|
|
+ 'LonSD': self.DEFAULT_VALUES['LonSD'] if obj_type != 5 else None,
|
|
|
+ 'LatSD': self.DEFAULT_VALUES['LatSD'] if obj_type != 5 else None,
|
|
|
+ 'BTN': self.DEFAULT_VALUES['BTN'] if obj_type != 5 else None,
|
|
|
+ 'collisionSeverity': self.DEFAULT_VALUES['collisionSeverity'],
|
|
|
+ 'pr_death': 0,
|
|
|
+ 'collisionRisk': self.DEFAULT_VALUES['collisionRisk']
|
|
|
+ }
|
|
|
|
|
|
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
|
|
|
- if obj_type == 5:
|
|
|
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
|
|
|
+ def _calculate_relevant_metrics(self, dist, vrel_projection, accl_projection,
|
|
|
+ lon_d, lat_d, vx_ego, vy_ego, vx_rel, ax_rel_ego,
|
|
|
+ heading_diff, obj_type, ego_data, obj_data):
|
|
|
+ """计算相关目标的指标"""
|
|
|
+ Tc = 0.3 # 反应时间
|
|
|
+
|
|
|
+ # 核心指标计算
|
|
|
+ metrics = {
|
|
|
+ 'TTC': self._cal_TTC(dist, vrel_projection),
|
|
|
+ 'MTTC': self._cal_MTTC(dist, vrel_projection, ego_data.get('lon_acc_vehicle', 0)),
|
|
|
+ 'THW': self._cal_THW(lon_d, vx_ego),
|
|
|
+ 'TTB': self._cal_TTB(dist, vrel_projection, self.ego_decel_max) if obj_type != 5 else None,
|
|
|
+ 'TM': self._cal_TM(vrel_projection, obj_data.get('v', 0),
|
|
|
+ obj_data.get('lon_acc_vehicle', 0),
|
|
|
+ ego_data.get('v', 0),
|
|
|
+ ego_data.get('lon_acc_vehicle', 0)) if obj_type != 5 else None,
|
|
|
+ 'PSD': self._cal_PSD(dist, vrel_projection, self.max_deceleration,
|
|
|
+ heading_diff, vx_ego, vy_ego, obj_type, lon_d, lat_d),
|
|
|
+ 'DTC': self._cal_DTC(vx_rel, ax_rel_ego, self.rho) if abs(heading_diff) < 30 else self.DEFAULT_VALUES[
|
|
|
+ 'DTC'],
|
|
|
+ 'LonSD': self._cal_longitudinal_safe_dist(vx_ego, vx_rel, self.rho,
|
|
|
+ self.ego_decel_min,
|
|
|
+ self.obj_decel_max) if obj_type != 5 else None,
|
|
|
+ 'LatSD': self._cal_lateral_safe_dist(abs(lat_d), vy_ego, self.ego_width,
|
|
|
+ self.lane_width, self.ego_decel_lat_max) if obj_type != 5 else None,
|
|
|
+ 'BTN': self._cal_BTN_new(ego_data.get('lon_acc_vehicle', 0),
|
|
|
+ ego_data.get('lon_acc_vehicle', 0),
|
|
|
+ abs(lon_d), vx_ego, self.ego_decel_lon_max) if obj_type != 5 else None
|
|
|
+ }
|
|
|
+
|
|
|
+ # 处理碰撞风险相关指标
|
|
|
+ if metrics['TTC'] is None or metrics['TTC'] > 4000:
|
|
|
+ metrics.update({
|
|
|
+ 'collisionSeverity': 0,
|
|
|
+ 'pr_death': 0,
|
|
|
+ 'collisionRisk': 0
|
|
|
+ })
|
|
|
else:
|
|
|
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
|
|
|
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
|
|
|
- return collisionRisk
|
|
|
+ try:
|
|
|
+ result, _ = spi.quad(self._normal_distribution, 0, metrics['TTC'] - Tc)
|
|
|
+ collisionSeverity = 1 - result
|
|
|
+ pr_death = self._death_pr(obj_type, vrel_projection)
|
|
|
+ collisionRisk = self._cal_collisionRisk_level(obj_type, vrel_projection, collisionSeverity)
|
|
|
+ metrics.update({
|
|
|
+ 'collisionSeverity': collisionSeverity * 100,
|
|
|
+ 'pr_death': pr_death * 100,
|
|
|
+ 'collisionRisk': collisionRisk * 100
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"碰撞风险计算错误: {e}")
|
|
|
+ metrics.update({
|
|
|
+ 'collisionSeverity': 0,
|
|
|
+ 'pr_death': 0,
|
|
|
+ 'collisionRisk': 0
|
|
|
+ })
|
|
|
|
|
|
- # 求两车之间当前距离
|
|
|
- def dist(self, x1, y1, x2, y2):
|
|
|
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
|
|
|
- return dist
|
|
|
+ return metrics
|
|
|
|
|
|
- def generate_metric_chart(self, metric_name: str, plot_path: Path) -> None:
|
|
|
- """生成指标图表
|
|
|
+ def _postprocess_results(self, df_list, found_valid_target):
|
|
|
+ """后处理计算结果"""
|
|
|
+ self.empty_flag = not found_valid_target
|
|
|
+
|
|
|
+ # 合并所有帧数据
|
|
|
+ if df_list:
|
|
|
+ self.df_safe = pd.concat(df_list)
|
|
|
+ col_list = ['simTime', 'simFrame', 'playerId',
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN',
|
|
|
+ 'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
|
+ self.df_safe = self.df_safe[col_list].reset_index(drop=True)
|
|
|
+ else:
|
|
|
+ self.df_safe = pd.DataFrame()
|
|
|
+ self.empty_flag = True
|
|
|
+
|
|
|
+ self.logger.info(f"处理完成,找到有效目标: {not self.empty_flag}")
|
|
|
+
|
|
|
+ # ==================== 核心计算方法 ====================
|
|
|
+
|
|
|
+ def _cal_v_projection_using_relative(self, lon_d, lat_d, vx_rel, vy_rel):
|
|
|
+ """计算速度投影(自车坐标系)"""
|
|
|
+ dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
|
|
|
+ if dist < 1e-6:
|
|
|
+ return 0.0
|
|
|
+ U_ABx = lon_d / dist
|
|
|
+ U_ABy = lat_d / dist
|
|
|
+ return vx_rel * U_ABx + vy_rel * U_ABy
|
|
|
+
|
|
|
+ def _cal_a_projection_using_relative(self, lon_d, lat_d, ax_rel, ay_rel):
|
|
|
+ """计算加速度投影(自车坐标系)"""
|
|
|
+ dist = math.sqrt(lon_d ** 2 + lat_d ** 2)
|
|
|
+ if dist < 1e-6:
|
|
|
+ return 0.0
|
|
|
+ U_ABx = lon_d / dist
|
|
|
+ U_ABy = lat_d / dist
|
|
|
+ return ax_rel * U_ABx + ay_rel * U_ABy
|
|
|
+
|
|
|
+ def _cal_TTC(self, dist, vrel_projection):
|
|
|
+ """计算碰撞时间(TTC)"""
|
|
|
+ if vrel_projection >= 0: # 不接近
|
|
|
+ return None
|
|
|
+ return dist / abs(vrel_projection)
|
|
|
+
|
|
|
+ def _cal_MTTC(self, dist, vrel_projection, a_ego):
|
|
|
+ """计算修正的碰撞时间(MTTC)"""
|
|
|
+ if vrel_projection >= 0 or dist <= 0 or a_ego >= 0:
|
|
|
+ return None
|
|
|
|
|
|
- Args:
|
|
|
- metric_name: 指标名称
|
|
|
- """
|
|
|
try:
|
|
|
- # 确定输出目录
|
|
|
- if plot_path is None:
|
|
|
- self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
|
- os.makedirs(self.output_dir, exist_ok=True)
|
|
|
- else:
|
|
|
- self.output_dir = plot_path
|
|
|
+ if abs(a_ego) < 1e-6:
|
|
|
+ return dist / abs(vrel_projection) if abs(vrel_projection) > 1e-6 else None
|
|
|
|
|
|
- # 调用图表生成函数
|
|
|
- chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
|
|
|
- if chart_path:
|
|
|
- self.logger.info(f"{metric_name}图表已生成: {chart_path}")
|
|
|
- else:
|
|
|
- self.logger.warning(f"{metric_name}图表生成失败")
|
|
|
+ discriminant = vrel_projection ** 2 + 2 * a_ego * dist
|
|
|
+ if discriminant < 0:
|
|
|
+ return None
|
|
|
+
|
|
|
+ t1 = (-vrel_projection + math.sqrt(discriminant)) / a_ego
|
|
|
+ t2 = (-vrel_projection - math.sqrt(discriminant)) / a_ego
|
|
|
|
|
|
+ valid_times = [t for t in (t1, t2) if t > 0]
|
|
|
+ return min(valid_times) if valid_times else None
|
|
|
except Exception as e:
|
|
|
- self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
|
|
|
-
|
|
|
- # TTC (time to collision)
|
|
|
- def _cal_TTC(self, dist, vrel_projection_in_dist):
|
|
|
- if vrel_projection_in_dist == 0:
|
|
|
- return math.inf
|
|
|
- TTC = dist / vrel_projection_in_dist
|
|
|
- return TTC
|
|
|
-
|
|
|
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
|
|
|
- MTTC = math.nan
|
|
|
- if arel_projection_in_dist != 0:
|
|
|
- tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
|
|
|
- if tmp < 0:
|
|
|
- return math.nan
|
|
|
- t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
|
|
|
- t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
|
|
|
- if t1 > 0 and t2 > 0:
|
|
|
- if t1 >= t2:
|
|
|
- MTTC = t2
|
|
|
- elif t1 < t2:
|
|
|
- MTTC = t1
|
|
|
- elif t1 > 0 and t2 <= 0:
|
|
|
- MTTC = t1
|
|
|
- elif t1 <= 0 and t2 > 0:
|
|
|
- MTTC = t2
|
|
|
- if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
|
|
|
- MTTC = dist / vrel_projection_in_dist
|
|
|
- return MTTC
|
|
|
-
|
|
|
- # THW (time headway)
|
|
|
- def _cal_THW(self, dist, v_ego_projection_in_dist):
|
|
|
- if not v_ego_projection_in_dist:
|
|
|
- THW = None
|
|
|
- else:
|
|
|
- THW = dist / v_ego_projection_in_dist
|
|
|
- return THW
|
|
|
-
|
|
|
- # TLC (time to line crossing)
|
|
|
- def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
|
|
|
- TLC = laneOffset / ego_v / np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
|
|
|
- if TLC < 0:
|
|
|
- TLC = None
|
|
|
- return TLC
|
|
|
-
|
|
|
- def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
|
|
|
- if len(x_relative_start_dist):
|
|
|
+ self.logger.warning(f"MTTC计算错误: {e}")
|
|
|
return None
|
|
|
- if (ego_decel_max == 0) or (relative_v == 0):
|
|
|
- return self.calculated_value["TTB"]
|
|
|
- else:
|
|
|
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
- TTB = (x_relative_start_dist0 + relative_v * relative_v / 2 / ego_decel_max) / relative_v
|
|
|
- return TTB
|
|
|
|
|
|
- def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
|
|
|
- if len(x_relative_start_dist):
|
|
|
+ def _cal_THW(self, lon_d, vx_ego):
|
|
|
+ """计算车头时距(THW)"""
|
|
|
+ if vx_ego is None or vx_ego <= 0 or lon_d is None or lon_d <= 0:
|
|
|
return None
|
|
|
- if (a2 == 0) or (v1 == 0):
|
|
|
- return self.calculated_value["TM"]
|
|
|
- if a1 == 0:
|
|
|
+ return lon_d / vx_ego
|
|
|
+
|
|
|
+ def _cal_TTB(self, dist, vrel_projection, ego_decel_max):
|
|
|
+ """计算制动时间(TTB)"""
|
|
|
+ if vrel_projection is None or ego_decel_max is None or vrel_projection >= 0:
|
|
|
return None
|
|
|
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
- TM = (x_relative_start_dist0 + v2 ** 2 / (2 * a2) - v1 ** 2 / (2 * a1)) / v1
|
|
|
- return TM
|
|
|
-
|
|
|
- def velocity(self, v_x, v_y):
|
|
|
- v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
|
|
|
- return v
|
|
|
-
|
|
|
- def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
|
|
|
- lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
|
|
|
- 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
|
|
|
- return lon_dist_min
|
|
|
-
|
|
|
- def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
|
|
|
- a_left_lat_brake_min, a_lat_max):
|
|
|
- # 检查除数是否为零
|
|
|
- if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
|
|
|
- return self._default_value('LatSD') # 返回默认值
|
|
|
-
|
|
|
- v_right_rho = v_right + rho * a_lat_max
|
|
|
- v_left_rho = v_left + rho * a_lat_max
|
|
|
- dist_min = lat_dist + (
|
|
|
- (v_right + v_right_rho) * rho / 2
|
|
|
- + v_right_rho ** 2 / a_right_lat_brake_min / 2
|
|
|
- + ((v_left + v_right_rho) * rho / 2)
|
|
|
- + v_left_rho ** 2 / a_left_lat_brake_min / 2
|
|
|
- )
|
|
|
- return dist_min
|
|
|
|
|
|
- def _cal_DTC(self, v_on_dist, a_on_dist, t):
|
|
|
- if a_on_dist == 0:
|
|
|
+ try:
|
|
|
+ TTB = (dist + vrel_projection ** 2 / (2 * ego_decel_max) / vrel_projection)
|
|
|
+ return TTB if TTB > 0 and not (np.isinf(TTB) or np.isnan(TTB)) else None
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"计算TTB时出错: {e}")
|
|
|
return None
|
|
|
- DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
|
|
|
- return DTC
|
|
|
|
|
|
- def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
|
|
|
- if v1 == 0:
|
|
|
+ def _cal_TM(self, vrel_projection, v2, a2, v1, a1):
|
|
|
+ """计算时间裕度(TM)"""
|
|
|
+ if (vrel_projection is None or v2 is None or a2 is None or a2 <= 0 or
|
|
|
+ v1 is None or a1 is None or a1 <= 0 or vrel_projection >= 0):
|
|
|
return None
|
|
|
- else:
|
|
|
- if len(x_relative_start_dist) > 0:
|
|
|
- x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
- PSD = x_relative_start_dist0 * 2 * ego_decel_lon_max / v1
|
|
|
- return PSD
|
|
|
- else:
|
|
|
- return None
|
|
|
|
|
|
- # DRAC (decelerate required avoid collision)
|
|
|
- def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
|
|
|
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
|
|
|
- if dist_length < 0:
|
|
|
- dist_width = dist - (width2 / 2 + width1 / 2)
|
|
|
- if dist_width < 0:
|
|
|
- return math.inf
|
|
|
- else:
|
|
|
- d = dist_width
|
|
|
- else:
|
|
|
- d = dist_length
|
|
|
- DRAC = vrel_projection_in_dist ** 2 / (2 * d)
|
|
|
- return DRAC
|
|
|
+ try:
|
|
|
+ TM = (v2 ** 2 / (2 * a2) - v1 ** 2 / (2 * a1)) / v1
|
|
|
+ return TM if TM > 0 and not (np.isinf(TM) or np.isnan(TM)) else None
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"计算TM时出错: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _cal_PSD(self, dist, vrel_projection, max_decel, heading_diff, vx_ego, vy_ego, obj_type, relative_x,
|
|
|
+ relative_y):
|
|
|
+ """计算预测安全距离比(PSD)"""
|
|
|
+ abs_heading_diff = abs(heading_diff)
|
|
|
+
|
|
|
+ # 根据航向差确定场景类型
|
|
|
+ if abs_heading_diff < 45:
|
|
|
+ # 纵向场景
|
|
|
+ D = abs(relative_x)
|
|
|
+ v_dir = abs(vx_ego)
|
|
|
+ min_stopping_dist = (v_dir ** 2) / (2 * self.ego_decel_lon_max)
|
|
|
+ elif 45 <= abs_heading_diff <= 135:
|
|
|
+ # 横向场景
|
|
|
+ D = abs(relative_y)
|
|
|
+ v_dir = abs(vy_ego)
|
|
|
+ if obj_type == 5: # 行人
|
|
|
+ reaction_dist = abs(relative_y) * self.rho
|
|
|
+ braking_dist = (v_dir ** 2) / (2 * self.ego_decel_lat_max)
|
|
|
+ min_stopping_dist = reaction_dist + braking_dist
|
|
|
+ else: # 车辆
|
|
|
+ min_stopping_dist = (v_dir ** 2) / (2 * self.ego_decel_lat_max)
|
|
|
+ else: # 对向场景
|
|
|
+ return self.DEFAULT_VALUES['PSD']
|
|
|
+
|
|
|
+ # 处理特殊情况
|
|
|
+ if min_stopping_dist < 1e-3:
|
|
|
+ return float('inf') if D > 0.1 else 0.0
|
|
|
+
|
|
|
+ return D / min_stopping_dist
|
|
|
+
|
|
|
+ def _cal_DTC(self, vx_rel, ax_rel_ego, t):
|
|
|
+ """计算碰撞距离(DTC)"""
|
|
|
+ if vx_rel >= 0: # 没有接近风险
|
|
|
+ return self.DEFAULT_VALUES['DTC']
|
|
|
+ try:
|
|
|
+ speed = abs(vx_rel)
|
|
|
+ reaction_distance = speed * t
|
|
|
+ braking_distance = (speed ** 2) / (2 * self.ego_decel_lon_max)
|
|
|
+ dtc = reaction_distance + braking_distance
|
|
|
+ return max(0, dtc)
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"DTC计算错误: {e}")
|
|
|
+ return self.DEFAULT_VALUES['DTC']
|
|
|
+
|
|
|
+ def _cal_longitudinal_safe_dist(self, v_ego, v_lead_rel, rho, decel_ego, decel_lead):
|
|
|
+ """计算纵向安全距离(LonSD)"""
|
|
|
+ try:
|
|
|
+ v_lead = v_ego + v_lead_rel
|
|
|
+ reaction_distance = v_ego * rho
|
|
|
+ braking_distance_ego = v_ego ** 2 / (2 * decel_ego) if v_ego > 0 else 0
|
|
|
+ braking_distance_lead = v_lead ** 2 / (2 * decel_lead) if v_lead > 0 else 0
|
|
|
+ safe_dist = reaction_distance + braking_distance_ego - braking_distance_lead
|
|
|
+ return max(2.0, safe_dist)
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"计算纵向安全距离错误: {e}")
|
|
|
+ return 25.0
|
|
|
+
|
|
|
+ def _cal_lateral_safe_dist(self, lat_dist, v_ego_lat, vehicle_width, lane_width, decel_lat_max):
|
|
|
+ """计算横向安全距离(LatSD)"""
|
|
|
+ try:
|
|
|
+ available_space = lane_width - vehicle_width
|
|
|
+ base_margin = 0.5
|
|
|
+ speed_margin = 0.1 * abs(v_ego_lat)
|
|
|
+ safe_dist = min(base_margin + speed_margin, available_space / 2)
|
|
|
+ return max(base_margin, safe_dist)
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning(f"计算横向安全距离错误: {e}")
|
|
|
+ return 1.0
|
|
|
|
|
|
- # BTN (brake threat number)
|
|
|
def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
|
|
|
- BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
|
|
|
- return BTN
|
|
|
-
|
|
|
- # STN (steer threat number)
|
|
|
- def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
|
|
|
- STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
|
|
|
- width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
|
|
|
- return STN
|
|
|
-
|
|
|
- # BTN (brake threat number)
|
|
|
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
|
|
|
- BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
|
|
|
- return BTN
|
|
|
-
|
|
|
- # STN (steer threat number)
|
|
|
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
|
|
|
- STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
|
|
|
- return STN
|
|
|
-
|
|
|
- # 追尾碰撞风险
|
|
|
- def _normal_distribution(self, x):
|
|
|
- mean = 1.32
|
|
|
- std_dev = 0.26
|
|
|
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
|
|
|
+ """计算制动威胁数(BTN)"""
|
|
|
+ if lon_d is None or lon_d <= 0 or ego_decel_lon_max is None or ego_decel_lon_max <= 0:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return (lon_a1 + lon_a - (lon_v ** 2) / (2 * lon_d)) / ego_decel_lon_max
|
|
|
+ except:
|
|
|
+ return None
|
|
|
|
|
|
- def continuous_group(self, df):
|
|
|
- time_list = df['simTime'].values.tolist()
|
|
|
- frame_list = df['simFrame'].values.tolist()
|
|
|
+ def _death_pr(self, obj_type, v_relative):
|
|
|
+ """计算死亡概率"""
|
|
|
+ if obj_type == 5: # 行人
|
|
|
+ return 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
|
|
|
+ else: # 车辆
|
|
|
+ return 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
|
|
|
|
|
|
- group_time = []
|
|
|
- group_frame = []
|
|
|
- sub_group_time = []
|
|
|
- sub_group_frame = []
|
|
|
+ def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
|
|
|
+ """计算碰撞风险等级"""
|
|
|
+ p_death = self._death_pr(obj_type, v_relative)
|
|
|
+ return 0.4 * p_death + 0.6 * collisionSeverity
|
|
|
|
|
|
- for i in range(len(frame_list)):
|
|
|
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
|
|
|
- sub_group_time.append(time_list[i])
|
|
|
- sub_group_frame.append(frame_list[i])
|
|
|
- else:
|
|
|
- group_time.append(sub_group_time)
|
|
|
- group_frame.append(sub_group_frame)
|
|
|
- sub_group_time = [time_list[i]]
|
|
|
- sub_group_frame = [frame_list[i]]
|
|
|
-
|
|
|
- group_time.append(sub_group_time)
|
|
|
- group_frame.append(sub_group_frame)
|
|
|
- group_time = [g for g in group_time if len(g) >= 2]
|
|
|
- group_frame = [g for g in group_frame if len(g) >= 2]
|
|
|
-
|
|
|
- # 输出图表值
|
|
|
- time = [[g[0], g[-1]] for g in group_time]
|
|
|
- frame = [[g[0], g[-1]] for g in group_frame]
|
|
|
-
|
|
|
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
|
|
|
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
|
|
|
-
|
|
|
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
|
|
|
- return unfunc_df
|
|
|
- # 统计最危险的指标
|
|
|
-
|
|
|
- def _safe_statistic_most_dangerous(self):
|
|
|
- min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PSD']
|
|
|
- max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
|
|
|
- result = {}
|
|
|
- for metric in min_list:
|
|
|
- if metric in self.metric_list:
|
|
|
- if metric in self.df.columns:
|
|
|
- val = self.df[metric].min()
|
|
|
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
|
|
|
- else:
|
|
|
- result[metric] = self._default_value(metric)
|
|
|
- for metric in max_list:
|
|
|
- if metric in self.metric_list:
|
|
|
- if metric in self.df.columns:
|
|
|
- val = self.df[metric].max()
|
|
|
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
|
|
|
- else:
|
|
|
- result[metric] = self._default_value(metric)
|
|
|
- return result
|
|
|
-
|
|
|
- def _safe_no_obj_statistic(self):
|
|
|
- # 仅有自车时的默认值
|
|
|
- result = {metric: self._default_value(metric) for metric in self.metric_list}
|
|
|
- return result
|
|
|
-
|
|
|
- def _default_value(self, metric):
|
|
|
- # 统一默认值
|
|
|
- defaults = {
|
|
|
- 'TTC': 10.0,
|
|
|
- 'MTTC': 4.2,
|
|
|
- 'THW': 2.1,
|
|
|
- 'TLC': 10.0,
|
|
|
- 'TTB': 10.0,
|
|
|
- 'TM': 10.0,
|
|
|
- 'DTC': 10.0,
|
|
|
- 'PSD': 10.0,
|
|
|
- 'LonSD': 10.0,
|
|
|
- 'LatSD': 2.0,
|
|
|
- 'BTN': 1.0,
|
|
|
- 'collisionRisk': 0.0,
|
|
|
- 'collisionSeverity': 0.0
|
|
|
- }
|
|
|
- return defaults.get(metric, None)
|
|
|
+ def _normal_distribution(self, x, mean=1.32, std_dev=0.26):
|
|
|
+ """正态分布函数用于碰撞严重性计算"""
|
|
|
+ return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
|
|
|
|
|
|
- def report_statistic(self):
|
|
|
- if len(self.obj_id_list) == 1:
|
|
|
- safety_result = self._safe_no_obj_statistic()
|
|
|
- else:
|
|
|
- safety_result = self._safe_statistic_most_dangerous()
|
|
|
- evaluator = Score(self.data_processed.safety_config)
|
|
|
- result = evaluator.evaluate(safety_result)
|
|
|
- print("\n[安全性表现及得分情况]")
|
|
|
- return result
|
|
|
+ # ==================== 目标物过滤方法 ====================
|
|
|
|
|
|
- def get_ttc_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('TTC')
|
|
|
- ttc_values = self.df_safe['TTC'].dropna()
|
|
|
- ttc_value = float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
|
|
|
+ def _is_relevant_target(self, lat_d, lon_d, v_lat, heading_diff, obj_type):
|
|
|
+ """判断目标物是否相关"""
|
|
|
+ abs_lat_d = abs(lat_d)
|
|
|
+ abs_heading_diff = abs(heading_diff)
|
|
|
+ lane_half_width = self.lane_width / 2
|
|
|
+ ego_half_width = self.ego_width / 2
|
|
|
+ ped_half_width = self.ped_width / 2
|
|
|
+ safe_width = ego_half_width + ped_half_width + 0.2
|
|
|
|
|
|
- # 收集TTC数据
|
|
|
- if not ttc_values.empty:
|
|
|
- self.ttc_data = []
|
|
|
- for time, frame, ttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTC']):
|
|
|
- if pd.notnull(ttc):
|
|
|
- self.ttc_data.append({'simTime': time, 'simFrame': frame, 'TTC': ttc})
|
|
|
+ # 行人目标处理
|
|
|
+ if obj_type == 5:
|
|
|
+ if abs_heading_diff >= 60: # 横穿行人
|
|
|
+ return (0 < lon_d < 50) and (abs_lat_d < safe_width or abs_lat_d < lane_half_width)
|
|
|
+ else: # 沿路行人
|
|
|
+ return (0 < lon_d < 30) and (abs_lat_d < safe_width) and abs(v_lat) < 1.0
|
|
|
|
|
|
- return ttc_value
|
|
|
+ # 车辆类目标处理
|
|
|
+ if abs_heading_diff < 45: # 纵向目标
|
|
|
+ return abs_lat_d < lane_half_width + 0.1 and abs(lon_d) < 80
|
|
|
|
|
|
- def get_mttc_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('MTTC')
|
|
|
- mttc_values = self.df_safe['MTTC'].dropna()
|
|
|
- mttc_value = float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
|
|
|
+ if 45 <= abs_heading_diff <= 135: # 横向目标
|
|
|
+ return 0 < lon_d < 30 and abs_lat_d < ego_half_width + 0.1
|
|
|
|
|
|
- # 收集MTTC数据
|
|
|
- if not mttc_values.empty:
|
|
|
- self.mttc_data = []
|
|
|
- for time, frame, mttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['MTTC']):
|
|
|
- if pd.notnull(mttc):
|
|
|
- self.mttc_data.append({'simTime': time, 'simFrame': frame, 'MTTC': mttc})
|
|
|
+ return False # 反向目标
|
|
|
|
|
|
- return mttc_value
|
|
|
+ def _has_passed_conflict_point(self, relative_x, relative_y, obj_type, heading_diff):
|
|
|
+ """判断目标是否已越过冲突点"""
|
|
|
+ if obj_type == 5: # 行人
|
|
|
+ obj_length = self.pedestrian_length
|
|
|
+ obj_width = self.pedestrian_width
|
|
|
+ else: # 车辆
|
|
|
+ obj_length = self.vehicle_length
|
|
|
+ obj_width = self.vehicle_width
|
|
|
|
|
|
- def get_thw_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('THW')
|
|
|
- thw_values = self.df_safe['THW'].dropna()
|
|
|
- thw_value = float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
|
|
|
-
|
|
|
- # 收集THW数据
|
|
|
- if not thw_values.empty:
|
|
|
- self.thw_data = []
|
|
|
- for time, frame, thw in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['THW']):
|
|
|
- if pd.notnull(thw):
|
|
|
- self.thw_data.append({'simTime': time, 'simFrame': frame, 'THW': thw})
|
|
|
-
|
|
|
- return thw_value
|
|
|
-
|
|
|
- def get_tlc_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('TLC')
|
|
|
- tlc_values = self.df_safe['TLC'].dropna()
|
|
|
- tlc_value = float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
|
|
|
-
|
|
|
- # 收集TLC数据
|
|
|
- if not tlc_values.empty:
|
|
|
- self.tlc_data = []
|
|
|
- for time, frame, tlc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TLC']):
|
|
|
- if pd.notnull(tlc):
|
|
|
- self.tlc_data.append({'simTime': time, 'simFrame': frame, 'TLC': tlc})
|
|
|
-
|
|
|
- return tlc_value
|
|
|
+ length_safe_dist = (self.ego_length + obj_length) / 2
|
|
|
+ width_safe_dist = (self.ego_width + obj_width) / 2
|
|
|
|
|
|
- def get_ttb_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('TTB')
|
|
|
- ttb_values = self.df_safe['TTB'].dropna()
|
|
|
- ttb_value = float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
|
|
|
+ if obj_type == 5:
|
|
|
+ return relative_x < -length_safe_dist or abs(relative_y) > width_safe_dist * 2
|
|
|
+ else:
|
|
|
+ if abs(heading_diff) < 45: # 同向
|
|
|
+ return relative_x < -length_safe_dist or abs(relative_y) > width_safe_dist * 1.5
|
|
|
+ elif 45 <= abs(heading_diff) <= 135: # 横向
|
|
|
+ return relative_x < -length_safe_dist or relative_x > length_safe_dist
|
|
|
+ else: # 对向
|
|
|
+ return ((relative_x < -length_safe_dist and relative_y < -width_safe_dist) or
|
|
|
+ (relative_x < -length_safe_dist and relative_y > width_safe_dist) or
|
|
|
+ (relative_x > length_safe_dist and relative_y < -width_safe_dist) or
|
|
|
+ (relative_x > length_safe_dist and relative_y > width_safe_dist))
|
|
|
+
|
|
|
+ def _is_moving_away(self, ego_vel, target_vel, relative_x, relative_y, obj_type, heading_diff):
|
|
|
+ """判断目标是否正在远离"""
|
|
|
+ vx_rel = target_vel[0] - ego_vel[0]
|
|
|
+ vy_rel = target_vel[1] - ego_vel[1]
|
|
|
+
|
|
|
+ dist = math.sqrt(relative_x ** 2 + relative_y ** 2)
|
|
|
+ if dist < 1e-6:
|
|
|
+ return False
|
|
|
+
|
|
|
+ v_projection = (vx_rel * relative_x + vy_rel * relative_y) / dist
|
|
|
+
|
|
|
+ if obj_type == 5: # 行人
|
|
|
+ if relative_x < 0: # 后方
|
|
|
+ return v_projection > 0.5
|
|
|
+ else: # 前方
|
|
|
+ return v_projection < -0.5
|
|
|
+ else:
|
|
|
+ abs_heading_diff = abs(heading_diff)
|
|
|
+
|
|
|
+ if abs_heading_diff < 45: # 同向
|
|
|
+ if relative_x < 0: # 后方
|
|
|
+ return v_projection > 0
|
|
|
+ else: # 前方
|
|
|
+ return v_projection < 0
|
|
|
+ elif 45 <= abs_heading_diff <= 135: # 横向
|
|
|
+ if relative_x > 0: # 前方
|
|
|
+ return v_projection < 0
|
|
|
+ else: # 后方
|
|
|
+ return v_projection > 0
|
|
|
+ else: # 对向
|
|
|
+ return False
|
|
|
+
|
|
|
+ # ==================== 指标获取方法 ====================
|
|
|
+
|
|
|
+ def _get_metric_value(self, metric_name, aggregation='min'):
|
|
|
+ """通用指标值获取方法"""
|
|
|
+ if self.empty_flag or self.df_safe is None or self.df_safe.empty:
|
|
|
+ return self.DEFAULT_VALUES.get(metric_name, None)
|
|
|
+
|
|
|
+ values = self.df_safe[metric_name].dropna()
|
|
|
+ if values.empty:
|
|
|
+ return self.DEFAULT_VALUES.get(metric_name, None)
|
|
|
+
|
|
|
+ # 根据聚合类型获取值
|
|
|
+ if aggregation == 'min':
|
|
|
+ metric_value = float(values.min())
|
|
|
+ elif aggregation == 'max':
|
|
|
+ metric_value = float(values.max())
|
|
|
+ elif aggregation == 'mean':
|
|
|
+ metric_value = float(values.mean())
|
|
|
+ else:
|
|
|
+ metric_value = float(values.min())
|
|
|
+
|
|
|
+ # 存储指标数据
|
|
|
+ metric_data = []
|
|
|
+ for _, row in self.df_safe.iterrows():
|
|
|
+ if pd.notnull(row[metric_name]):
|
|
|
+ metric_data.append({
|
|
|
+ 'simTime': row['simTime'],
|
|
|
+ 'simFrame': row['simFrame'],
|
|
|
+ 'playerId': row['playerId'],
|
|
|
+ metric_name: row[metric_name]
|
|
|
+ })
|
|
|
+ setattr(self, f'{metric_name.lower()}_data', metric_data)
|
|
|
+
|
|
|
+ return metric_value
|
|
|
|
|
|
- # 收集TTB数据
|
|
|
- if not ttb_values.empty:
|
|
|
- self.ttb_data = []
|
|
|
- for time, frame, ttb in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTB']):
|
|
|
- if pd.notnull(ttb):
|
|
|
- self.ttb_data.append({'simTime': time, 'simFrame': frame, 'TTB': ttb})
|
|
|
+ def get_ttc_value(self) -> float:
|
|
|
+ return self._get_metric_value('TTC')
|
|
|
|
|
|
- return ttb_value
|
|
|
+ def get_mttc_value(self) -> float:
|
|
|
+ return self._get_metric_value('MTTC')
|
|
|
|
|
|
- def get_tm_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('TM')
|
|
|
- tm_values = self.df_safe['TM'].dropna()
|
|
|
- tm_value = float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
|
|
|
+ def get_thw_value(self) -> float:
|
|
|
+ return self._get_metric_value('THW')
|
|
|
|
|
|
- # 收集TM数据
|
|
|
- if not tm_values.empty:
|
|
|
- self.tm_data = []
|
|
|
- for time, frame, tm in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TM']):
|
|
|
- if pd.notnull(tm):
|
|
|
- self.tm_data.append({'simTime': time, 'simFrame': frame, 'TM': tm})
|
|
|
+ def get_ttb_value(self) -> float:
|
|
|
+ return self._get_metric_value('TTB')
|
|
|
|
|
|
- return tm_value
|
|
|
+ def get_tm_value(self) -> float:
|
|
|
+ return self._get_metric_value('TM')
|
|
|
|
|
|
def get_dtc_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('DTC')
|
|
|
- dtc_values = self.df_safe['DTC'].dropna()
|
|
|
- return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
|
|
|
+ return self._get_metric_value('DTC', 'min')
|
|
|
|
|
|
def get_psd_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('PSD')
|
|
|
- psd_values = self.df_safe['PSD'].dropna()
|
|
|
- return float(psd_values.min()) if not psd_values.empty else self._default_value('PSD')
|
|
|
+ return self._get_metric_value('PSD', 'min')
|
|
|
|
|
|
def get_lonsd_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('LonSD')
|
|
|
- lonsd_values = self.df_safe['LonSD'].dropna()
|
|
|
- lonsd_value = float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
|
|
|
-
|
|
|
- # 收集LonSD数据
|
|
|
- if not lonsd_values.empty:
|
|
|
- self.lonsd_data = []
|
|
|
- for time, frame, lonsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LonSD']):
|
|
|
- if pd.notnull(lonsd):
|
|
|
- self.lonsd_data.append({'simTime': time, 'simFrame': frame, 'LonSD': lonsd})
|
|
|
-
|
|
|
- return lonsd_value
|
|
|
+ return self._get_metric_value('LonSD', 'mean')
|
|
|
|
|
|
def get_latsd_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('LatSD')
|
|
|
- latsd_values = self.df_safe['LatSD'].dropna()
|
|
|
- # 使用最小值而非平均值,与safety1.py保持一致
|
|
|
- latsd_value = float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD')
|
|
|
-
|
|
|
- # 收集LatSD数据
|
|
|
- if not latsd_values.empty:
|
|
|
- self.latsd_data = []
|
|
|
- for time, frame, latsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LatSD']):
|
|
|
- if pd.notnull(latsd):
|
|
|
- self.latsd_data.append({'simTime': time, 'simFrame': frame, 'LatSD': latsd})
|
|
|
-
|
|
|
- return latsd_value
|
|
|
+ return self._get_metric_value('LatSD', 'min')
|
|
|
|
|
|
def get_btn_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('BTN')
|
|
|
- btn_values = self.df_safe['BTN'].dropna()
|
|
|
- btn_value = float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
|
|
|
-
|
|
|
- # 收集BTN数据
|
|
|
- if not btn_values.empty:
|
|
|
- self.btn_data = []
|
|
|
- for time, frame, btn in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['BTN']):
|
|
|
- if pd.notnull(btn):
|
|
|
- self.btn_data.append({'simTime': time, 'simFrame': frame, 'BTN': btn})
|
|
|
-
|
|
|
- return btn_value
|
|
|
+ return self._get_metric_value('BTN', 'max')
|
|
|
|
|
|
def get_collision_risk_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('collisionRisk')
|
|
|
- risk_values = self.df_safe['collisionRisk'].dropna()
|
|
|
- risk_value = float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
|
|
|
+ return self._get_metric_value('collisionRisk', 'max')
|
|
|
|
|
|
- # 收集碰撞风险数据
|
|
|
- if not risk_values.empty:
|
|
|
- self.collision_risk_data = []
|
|
|
- for time, frame, risk in zip(self.df_safe['simTime'], self.df_safe['simFrame'],
|
|
|
- self.df_safe['collisionRisk']):
|
|
|
- if pd.notnull(risk):
|
|
|
- self.collision_risk_data.append({'simTime': time, 'simFrame': frame, 'collisionRisk': risk})
|
|
|
+ def get_collision_severity_value(self) -> float:
|
|
|
+ return self._get_metric_value('collisionSeverity', 'max')
|
|
|
|
|
|
- return risk_value
|
|
|
+ # ==================== 辅助方法 ====================
|
|
|
|
|
|
- def get_collision_severity_value(self) -> float:
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
- return self._default_value('collisionSeverity')
|
|
|
- severity_values = self.df_safe['collisionSeverity'].dropna()
|
|
|
- severity_value = float(severity_values.max()) if not severity_values.empty else self._default_value(
|
|
|
- 'collisionSeverity')
|
|
|
-
|
|
|
- # 收集碰撞严重性数据
|
|
|
- if not severity_values.empty:
|
|
|
- self.collision_severity_data = []
|
|
|
- for time, frame, severity in zip(self.df_safe['simTime'], self.df_safe['simFrame'],
|
|
|
- self.df_safe['collisionSeverity']):
|
|
|
- if pd.notnull(severity):
|
|
|
- self.collision_severity_data.append(
|
|
|
- {'simTime': time, 'simFrame': frame, 'collisionSeverity': severity})
|
|
|
-
|
|
|
- return severity_value
|
|
|
+ def generate_metric_chart(self, metric_name: str, plot_path: Path) -> None:
|
|
|
+ """生成指标图表"""
|
|
|
+ try:
|
|
|
+ self.output_dir = plot_path if plot_path else os.path.join(os.getcwd(), 'data')
|
|
|
+ os.makedirs(self.output_dir, exist_ok=True)
|
|
|
|
|
|
+ chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
|
|
|
+ if chart_path:
|
|
|
+ self.logger.info(f"{metric_name}图表已生成: {chart_path}")
|
|
|
+ else:
|
|
|
+ self.logger.warning(f"{metric_name}图表生成失败")
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
|