|
@@ -8,7 +8,6 @@ import os
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
import math
|
|
import math
|
|
|
-import matplotlib.pyplot as plt
|
|
|
|
|
import scipy.integrate as spi
|
|
import scipy.integrate as spi
|
|
|
from collections import defaultdict
|
|
from collections import defaultdict
|
|
|
from typing import Dict, Any, List, Optional
|
|
from typing import Dict, Any, List, Optional
|
|
@@ -35,6 +34,7 @@ SAFETY_INFO = [
|
|
|
"type"
|
|
"type"
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
+
|
|
|
# ----------------------
|
|
# ----------------------
|
|
|
# 独立指标计算函数
|
|
# 独立指标计算函数
|
|
|
# ----------------------
|
|
# ----------------------
|
|
@@ -54,6 +54,7 @@ def calculate_ttc(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
|
|
|
return {"TTC": None}
|
|
return {"TTC": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_mttc(data_processed) -> dict:
|
|
def calculate_mttc(data_processed) -> dict:
|
|
|
"""计算MTTC (Modified Time To Collision)"""
|
|
"""计算MTTC (Modified Time To Collision)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -69,6 +70,7 @@ def calculate_mttc(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
|
|
|
return {"MTTC": None}
|
|
return {"MTTC": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_thw(data_processed) -> dict:
|
|
def calculate_thw(data_processed) -> dict:
|
|
|
"""计算THW (Time Headway)"""
|
|
"""计算THW (Time Headway)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -84,6 +86,7 @@ def calculate_thw(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
|
|
|
return {"THW": None}
|
|
return {"THW": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_tlc(data_processed) -> dict:
|
|
def calculate_tlc(data_processed) -> dict:
|
|
|
"""计算TLC (Time to Line Crossing)"""
|
|
"""计算TLC (Time to Line Crossing)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -99,6 +102,7 @@ def calculate_tlc(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
|
|
|
return {"TLC": None}
|
|
return {"TLC": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_ttb(data_processed) -> dict:
|
|
def calculate_ttb(data_processed) -> dict:
|
|
|
"""计算TTB (Time to Brake)"""
|
|
"""计算TTB (Time to Brake)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -114,6 +118,7 @@ def calculate_ttb(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
|
|
|
return {"TTB": None}
|
|
return {"TTB": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_tm(data_processed) -> dict:
|
|
def calculate_tm(data_processed) -> dict:
|
|
|
"""计算TM (Time Margin)"""
|
|
"""计算TM (Time Margin)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -129,6 +134,7 @@ def calculate_tm(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
|
|
|
return {"TM": None}
|
|
return {"TM": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_dtc(data_processed) -> dict:
|
|
def calculate_dtc(data_processed) -> dict:
|
|
|
"""计算DTC (Distance to Collision)"""
|
|
"""计算DTC (Distance to Collision)"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -142,18 +148,6 @@ def calculate_dtc(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
|
return {"DTC": None}
|
|
return {"DTC": None}
|
|
|
|
|
|
|
|
-def calculate_pet(data_processed) -> dict:
|
|
|
|
|
- """计算PET (Post Encroachment Time)"""
|
|
|
|
|
- if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
|
|
- return {"PET": None}
|
|
|
|
|
- try:
|
|
|
|
|
- safety = SafetyCalculator(data_processed)
|
|
|
|
|
- pet_value = safety.get_dtc_value()
|
|
|
|
|
- LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}")
|
|
|
|
|
- return {"PET": pet_value}
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True)
|
|
|
|
|
- return {"PET": None}
|
|
|
|
|
|
|
|
|
|
def calculate_psd(data_processed) -> dict:
|
|
def calculate_psd(data_processed) -> dict:
|
|
|
"""计算PSD (Potential Safety Distance)"""
|
|
"""计算PSD (Potential Safety Distance)"""
|
|
@@ -169,7 +163,6 @@ def calculate_psd(data_processed) -> dict:
|
|
|
return {"PSD": None}
|
|
return {"PSD": None}
|
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
|
def calculate_collisionrisk(data_processed) -> dict:
|
|
def calculate_collisionrisk(data_processed) -> dict:
|
|
|
"""计算碰撞风险"""
|
|
"""计算碰撞风险"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -185,6 +178,7 @@ def calculate_collisionrisk(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"collisionRisk计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"collisionRisk计算异常: {str(e)}", exc_info=True)
|
|
|
return {"collisionRisk": None}
|
|
return {"collisionRisk": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_lonsd(data_processed) -> dict:
|
|
def calculate_lonsd(data_processed) -> dict:
|
|
|
"""计算纵向安全距离"""
|
|
"""计算纵向安全距离"""
|
|
|
safety = SafetyCalculator(data_processed)
|
|
safety = SafetyCalculator(data_processed)
|
|
@@ -194,6 +188,7 @@ def calculate_lonsd(data_processed) -> dict:
|
|
|
LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}")
|
|
LogManager().get_logger().info(f"安全指标[LonSD]计算结果: {lonsd_value}")
|
|
|
return {"LonSD": lonsd_value}
|
|
return {"LonSD": lonsd_value}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_latsd(data_processed) -> dict:
|
|
def calculate_latsd(data_processed) -> dict:
|
|
|
"""计算横向安全距离"""
|
|
"""计算横向安全距离"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -210,6 +205,7 @@ def calculate_latsd(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"LatSD计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"LatSD计算异常: {str(e)}", exc_info=True)
|
|
|
return {"LatSD": None}
|
|
return {"LatSD": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_btn(data_processed) -> dict:
|
|
def calculate_btn(data_processed) -> dict:
|
|
|
"""计算制动威胁数"""
|
|
"""计算制动威胁数"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -226,6 +222,7 @@ def calculate_btn(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"BTN计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"BTN计算异常: {str(e)}", exc_info=True)
|
|
|
return {"BTN": None}
|
|
return {"BTN": None}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
def calculate_collisionseverity(data_processed) -> dict:
|
|
def calculate_collisionseverity(data_processed) -> dict:
|
|
|
"""计算碰撞严重性"""
|
|
"""计算碰撞严重性"""
|
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -245,38 +242,40 @@ def calculate_collisionseverity(data_processed) -> dict:
|
|
|
|
|
|
|
|
class SafetyRegistry:
|
|
class SafetyRegistry:
|
|
|
"""安全指标注册器"""
|
|
"""安全指标注册器"""
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def __init__(self, data_processed):
|
|
def __init__(self, data_processed):
|
|
|
self.logger = LogManager().get_logger()
|
|
self.logger = LogManager().get_logger()
|
|
|
self.data = data_processed
|
|
self.data = data_processed
|
|
|
self.safety_config = data_processed.safety_config["safety"]
|
|
self.safety_config = data_processed.safety_config["safety"]
|
|
|
self.metrics = self._extract_metrics(self.safety_config)
|
|
self.metrics = self._extract_metrics(self.safety_config)
|
|
|
self._registry = self._build_registry()
|
|
self._registry = self._build_registry()
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _extract_metrics(self, config_node: dict) -> list:
|
|
def _extract_metrics(self, config_node: dict) -> list:
|
|
|
"""从配置中提取指标名称"""
|
|
"""从配置中提取指标名称"""
|
|
|
metrics = []
|
|
metrics = []
|
|
|
|
|
+
|
|
|
def _recurse(node):
|
|
def _recurse(node):
|
|
|
if isinstance(node, dict):
|
|
if isinstance(node, dict):
|
|
|
if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
|
|
if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
|
|
|
metrics.append(node['name'])
|
|
metrics.append(node['name'])
|
|
|
for v in node.values():
|
|
for v in node.values():
|
|
|
_recurse(v)
|
|
_recurse(v)
|
|
|
|
|
+
|
|
|
_recurse(config_node)
|
|
_recurse(config_node)
|
|
|
self.logger.info(f'评比的安全指标列表:{metrics}')
|
|
self.logger.info(f'评比的安全指标列表:{metrics}')
|
|
|
return metrics
|
|
return metrics
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _build_registry(self) -> dict:
|
|
def _build_registry(self) -> dict:
|
|
|
"""构建指标函数注册表"""
|
|
"""构建指标函数注册表"""
|
|
|
registry = {}
|
|
registry = {}
|
|
|
for metric_name in self.metrics:
|
|
for metric_name in self.metrics:
|
|
|
func_name = f"calculate_{metric_name.lower()}"
|
|
func_name = f"calculate_{metric_name.lower()}"
|
|
|
- if func_name in globals(): # global()会获取当前模块下所有全局变量、函数、类和其他对象的名称及其对应的值
|
|
|
|
|
|
|
+ if func_name in globals():
|
|
|
registry[metric_name] = globals()[func_name]
|
|
registry[metric_name] = globals()[func_name]
|
|
|
else:
|
|
else:
|
|
|
self.logger.warning(f"未实现安全指标函数: {func_name}")
|
|
self.logger.warning(f"未实现安全指标函数: {func_name}")
|
|
|
return registry
|
|
return registry
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def batch_execute(self) -> dict:
|
|
def batch_execute(self) -> dict:
|
|
|
"""批量执行指标计算"""
|
|
"""批量执行指标计算"""
|
|
|
results = {}
|
|
results = {}
|
|
@@ -293,17 +292,17 @@ class SafetyRegistry:
|
|
|
|
|
|
|
|
class SafeManager:
|
|
class SafeManager:
|
|
|
"""安全指标管理类"""
|
|
"""安全指标管理类"""
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def __init__(self, data_processed):
|
|
def __init__(self, data_processed):
|
|
|
self.data = data_processed
|
|
self.data = data_processed
|
|
|
self.registry = SafetyRegistry(self.data)
|
|
self.registry = SafetyRegistry(self.data)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def report_statistic(self):
|
|
def report_statistic(self):
|
|
|
"""计算并报告安全指标结果"""
|
|
"""计算并报告安全指标结果"""
|
|
|
safety_result = self.registry.batch_execute()
|
|
safety_result = self.registry.batch_execute()
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return safety_result
|
|
return safety_result
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class SafetyCalculator:
|
|
class SafetyCalculator:
|
|
|
"""安全指标计算类 - 兼容Safe类风格"""
|
|
"""安全指标计算类 - 兼容Safe类风格"""
|
|
@@ -316,7 +315,8 @@ class SafetyCalculator:
|
|
|
self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据
|
|
self.ego_df = data_processed.ego_data.copy() # 使用copy()避免修改原始数据
|
|
|
self.obj_id_list = data_processed.obj_id_list
|
|
self.obj_id_list = data_processed.obj_id_list
|
|
|
self.metric_list = [
|
|
self.metric_list = [
|
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
|
|
|
|
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk',
|
|
|
|
|
+ 'collisionSeverity'
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
# 初始化默认值
|
|
# 初始化默认值
|
|
@@ -328,7 +328,6 @@ class SafetyCalculator:
|
|
|
"TTB": 10.0,
|
|
"TTB": 10.0,
|
|
|
"TM": 10.0,
|
|
"TM": 10.0,
|
|
|
# "MPrTTC": 10.0,
|
|
# "MPrTTC": 10.0,
|
|
|
- "PET": 10.0,
|
|
|
|
|
"DTC": 10.0,
|
|
"DTC": 10.0,
|
|
|
"PSD": 10.0,
|
|
"PSD": 10.0,
|
|
|
"LatSD": 3.0,
|
|
"LatSD": 3.0,
|
|
@@ -341,7 +340,7 @@ class SafetyCalculator:
|
|
|
self.frame_list = self.ego_df['simFrame'].values.tolist()
|
|
self.frame_list = self.ego_df['simFrame'].values.tolist()
|
|
|
self.collisionRisk = 0
|
|
self.collisionRisk = 0
|
|
|
self.empty_flag = True
|
|
self.empty_flag = True
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 初始化数据存储列表
|
|
# 初始化数据存储列表
|
|
|
self.ttc_data = []
|
|
self.ttc_data = []
|
|
|
self.mttc_data = []
|
|
self.mttc_data = []
|
|
@@ -354,10 +353,10 @@ class SafetyCalculator:
|
|
|
self.btn_data = []
|
|
self.btn_data = []
|
|
|
self.collision_risk_data = []
|
|
self.collision_risk_data = []
|
|
|
self.collision_severity_data = []
|
|
self.collision_severity_data = []
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 初始化安全事件记录表
|
|
# 初始化安全事件记录表
|
|
|
self.unsafe_events_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
self.unsafe_events_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 设置输出目录
|
|
# 设置输出目录
|
|
|
self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
@@ -391,9 +390,9 @@ class SafetyCalculator:
|
|
|
ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
|
|
ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
|
|
|
ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
|
ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
|
- #TEMP_COMMENT: x_relative_start_dist 注释开始
|
|
|
|
|
|
|
+ # TEMP_COMMENT: x_relative_start_dist 注释开始
|
|
|
x_relative_start_dist = self.ego_df["x_relative_dist"]
|
|
x_relative_start_dist = self.ego_df["x_relative_dist"]
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 设置安全指标阈值
|
|
# 设置安全指标阈值
|
|
|
self.safety_thresholds = {
|
|
self.safety_thresholds = {
|
|
|
'TTC': {'min': 1.5, 'max': None}, # TTC小于1.5秒视为危险
|
|
'TTC': {'min': 1.5, 'max': None}, # TTC小于1.5秒视为危险
|
|
@@ -451,18 +450,27 @@ class SafetyCalculator:
|
|
|
a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
|
|
a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
|
|
|
|
|
|
|
|
dx, dy = x2 - x1, y2 - y1
|
|
dx, dy = x2 - x1, y2 - y1
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 计算目标车相对于自车的方位角
|
|
# 计算目标车相对于自车的方位角
|
|
|
- beta = math.atan2(dy, dx)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ # beta = math.atan2(dy, dx)
|
|
|
|
|
+
|
|
|
# 将全局坐标系下的相对位置向量转换到自车坐标系
|
|
# 将全局坐标系下的相对位置向量转换到自车坐标系
|
|
|
# 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
|
|
# 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
|
|
|
|
|
+ '''
|
|
|
|
|
+ 你现在的 posH 是 正北为 0°,顺时针为正角度。
|
|
|
|
|
+ 但是我们在做二维旋转时,Python/数学里默认是:
|
|
|
|
|
+
|
|
|
|
|
+ 0° 是 X 轴(正东)
|
|
|
|
|
+
|
|
|
|
|
+ 顺时针是负角,逆时针是正角
|
|
|
|
|
+
|
|
|
|
|
+ 所以,要将你的 posH(正北为 0)转换成数学中以正东为 0° 的角度'''
|
|
|
h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
|
|
h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 坐标变换
|
|
# 坐标变换
|
|
|
lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
|
|
lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
|
|
|
lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值)
|
|
lat_d = abs(-dx * math.sin(h1_rad) + dy * math.cos(h1_rad)) # 横向距离(取绝对值)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
obj_dict[frame_num][playerId]['lon_d'] = lon_d
|
|
obj_dict[frame_num][playerId]['lon_d'] = lon_d
|
|
|
obj_dict[frame_num][playerId]['lat_d'] = lat_d
|
|
obj_dict[frame_num][playerId]['lat_d'] = lat_d
|
|
|
|
|
|
|
@@ -495,31 +503,10 @@ class SafetyCalculator:
|
|
|
TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
|
|
TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
|
|
|
TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
|
DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, rho)
|
|
DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, rho)
|
|
|
- # MPrTTC = self._cal_MPrTTC(x_relative_start_dist)
|
|
|
|
|
- # PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
|
|
|
|
|
- PET = None
|
|
|
|
|
-
|
|
|
|
|
- for lane_pos in lane_poss:
|
|
|
|
|
- lane_posx1 = ast.literal_eval(lane_pos)[0][0]
|
|
|
|
|
- lane_posy1 = ast.literal_eval(lane_pos)[0][1]
|
|
|
|
|
- lane_posx2 = ast.literal_eval(lane_pos)[-1][0]
|
|
|
|
|
- lane_posy2 = ast.literal_eval(lane_pos)[-1][1]
|
|
|
|
|
- for ramp_pos in ramp_poss:
|
|
|
|
|
- ramp_posx1 = ast.literal_eval(ramp_pos)[0][0]
|
|
|
|
|
- ramp_posy1 = ast.literal_eval(ramp_pos)[0][1]
|
|
|
|
|
- ramp_posx2 = ast.literal_eval(ramp_pos)[-1][0]
|
|
|
|
|
- ramp_posy2 = ast.literal_eval(ramp_pos)[-1][1]
|
|
|
|
|
- ego_posx = x1
|
|
|
|
|
- ego_posy = y1
|
|
|
|
|
- obj_posx = x2
|
|
|
|
|
- obj_posy = y2
|
|
|
|
|
- delta_t = self._cal_reaction_time_to_avgspeed(self.ego_df)
|
|
|
|
|
- lane_width = self.ego_df["lane_width"].iloc[0]
|
|
|
|
|
- PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2,
|
|
|
|
|
- ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
|
|
|
|
|
PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
|
|
PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
|
|
|
|
|
|
|
|
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
|
|
|
|
|
|
|
+ LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
|
|
|
|
|
+ obj_decel_max)
|
|
|
|
|
|
|
|
lat_dist = 0.5
|
|
lat_dist = 0.5
|
|
|
v_right = v1
|
|
v_right = v1
|
|
@@ -545,7 +532,8 @@ class SafetyCalculator:
|
|
|
lat_a = abs(lat_a1 - lat_a2)
|
|
lat_a = abs(lat_a1 - lat_a2)
|
|
|
lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad)
|
|
lat_v = -v_x1 * math.sin(h1_rad) + v_y1 * math.cos(h1_rad)
|
|
|
|
|
|
|
|
- obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (-v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad))
|
|
|
|
|
|
|
+ obj_dict[frame_num][playerId]['lat_v_rel'] = lat_v - (
|
|
|
|
|
+ -v_x2 * math.sin(h1_rad) + v_y2 * math.cos(h1_rad))
|
|
|
obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad))
|
|
obj_dict[frame_num][playerId]['lon_v_rel'] = lon_v - (v_x2 * math.cos(h1_rad) + v_y2 * math.sin(h1_rad))
|
|
|
|
|
|
|
|
TTC = None if (TTC is None or TTC < 0) else TTC
|
|
TTC = None if (TTC is None or TTC < 0) else TTC
|
|
@@ -555,7 +543,6 @@ class SafetyCalculator:
|
|
|
TTB = None if (TTB is None or TTB < 0) else TTB
|
|
TTB = None if (TTB is None or TTB < 0) else TTB
|
|
|
TM = None if (TM is None or TM < 0) else TM
|
|
TM = None if (TM is None or TM < 0) else TM
|
|
|
DTC = None if (DTC is None or DTC < 0) else DTC
|
|
DTC = None if (DTC is None or DTC < 0) else DTC
|
|
|
- PET = None if (PET is None or PET < 0) else PET
|
|
|
|
|
PSD = None if (PSD is None or PSD < 0) else PSD
|
|
PSD = None if (PSD is None or PSD < 0) else PSD
|
|
|
|
|
|
|
|
obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
@@ -565,13 +552,12 @@ class SafetyCalculator:
|
|
|
obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
|
obj_dict[frame_num][playerId]['TM'] = TM
|
|
obj_dict[frame_num][playerId]['TM'] = TM
|
|
|
obj_dict[frame_num][playerId]['DTC'] = DTC
|
|
obj_dict[frame_num][playerId]['DTC'] = DTC
|
|
|
- obj_dict[frame_num][playerId]['PET'] = PET
|
|
|
|
|
obj_dict[frame_num][playerId]['PSD'] = PSD
|
|
obj_dict[frame_num][playerId]['PSD'] = PSD
|
|
|
obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
|
obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
|
obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
|
|
|
|
|
|
- # TTC要进行筛选,否则会出现nan或者TTC过大的情况
|
|
|
|
|
|
|
+ # TTC要进行筛选,否则会出现nan或者TTC过大的情况
|
|
|
if not TTC or TTC > 4000: # threshold = 4258.41
|
|
if not TTC or TTC > 4000: # threshold = 4258.41
|
|
|
collisionSeverity = 0
|
|
collisionSeverity = 0
|
|
|
pr_death = 0
|
|
pr_death = 0
|
|
@@ -591,7 +577,7 @@ class SafetyCalculator:
|
|
|
|
|
|
|
|
df_safe = pd.concat(df_list)
|
|
df_safe = pd.concat(df_list)
|
|
|
col_list = ['simTime', 'simFrame', 'playerId',
|
|
col_list = ['simTime', 'simFrame', 'playerId',
|
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN',
|
|
|
|
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PSD', 'LonSD', 'LatSD', 'BTN',
|
|
|
'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
|
self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
|
|
|
|
|
@@ -706,11 +692,10 @@ class SafetyCalculator:
|
|
|
def dist(self, x1, y1, x2, y2):
|
|
def dist(self, x1, y1, x2, y2):
|
|
|
dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
|
|
dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
|
|
|
return dist
|
|
return dist
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def generate_metric_chart(self, metric_name: str) -> None:
|
|
def generate_metric_chart(self, metric_name: str) -> None:
|
|
|
"""生成指标图表
|
|
"""生成指标图表
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
Args:
|
|
Args:
|
|
|
metric_name: 指标名称
|
|
metric_name: 指标名称
|
|
|
"""
|
|
"""
|
|
@@ -719,14 +704,14 @@ class SafetyCalculator:
|
|
|
if self.output_dir is None:
|
|
if self.output_dir is None:
|
|
|
self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
self.output_dir = os.path.join(os.getcwd(), 'data')
|
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 调用图表生成函数
|
|
# 调用图表生成函数
|
|
|
chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
|
|
chart_path = generate_safety_chart_data(self, metric_name, self.output_dir)
|
|
|
if chart_path:
|
|
if chart_path:
|
|
|
self.logger.info(f"{metric_name}图表已生成: {chart_path}")
|
|
self.logger.info(f"{metric_name}图表已生成: {chart_path}")
|
|
|
else:
|
|
else:
|
|
|
self.logger.warning(f"{metric_name}图表生成失败")
|
|
self.logger.warning(f"{metric_name}图表生成失败")
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
|
|
self.logger.error(f"生成{metric_name}图表失败: {str(e)}", exc_info=True)
|
|
|
|
|
|
|
@@ -765,9 +750,10 @@ class SafetyCalculator:
|
|
|
else:
|
|
else:
|
|
|
THW = dist / v_ego_projection_in_dist
|
|
THW = dist / v_ego_projection_in_dist
|
|
|
return THW
|
|
return THW
|
|
|
|
|
+
|
|
|
# TLC (time to line crossing)
|
|
# TLC (time to line crossing)
|
|
|
def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
|
|
def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
|
|
|
- TLC = laneOffset/ego_v/np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
|
|
|
|
|
|
|
+ TLC = laneOffset / ego_v / np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
|
|
|
if TLC < 0:
|
|
if TLC < 0:
|
|
|
TLC = None
|
|
TLC = None
|
|
|
return TLC
|
|
return TLC
|
|
@@ -775,11 +761,11 @@ class SafetyCalculator:
|
|
|
def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
|
|
def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
|
|
|
if len(x_relative_start_dist):
|
|
if len(x_relative_start_dist):
|
|
|
return None
|
|
return None
|
|
|
- if (ego_decel_max == 0) or (relative_v == 0):
|
|
|
|
|
|
|
+ if (ego_decel_max == 0) or (relative_v == 0):
|
|
|
return self.calculated_value["TTB"]
|
|
return self.calculated_value["TTB"]
|
|
|
else:
|
|
else:
|
|
|
x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
- TTB = (x_relative_start_dist0 + relative_v * relative_v/2/ego_decel_max)/relative_v
|
|
|
|
|
|
|
+ TTB = (x_relative_start_dist0 + relative_v * relative_v / 2 / ego_decel_max) / relative_v
|
|
|
return TTB
|
|
return TTB
|
|
|
|
|
|
|
|
def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
|
|
def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
|
|
@@ -790,7 +776,7 @@ class SafetyCalculator:
|
|
|
if a1 == 0:
|
|
if a1 == 0:
|
|
|
return None
|
|
return None
|
|
|
x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
- TM = (x_relative_start_dist0 + v2**2/(2*a2) - v1**2/(2*a1)) / v1
|
|
|
|
|
|
|
+ TM = (x_relative_start_dist0 + v2 ** 2 / (2 * a2) - v1 ** 2 / (2 * a1)) / v1
|
|
|
return TM
|
|
return TM
|
|
|
|
|
|
|
|
def velocity(self, v_x, v_y):
|
|
def velocity(self, v_x, v_y):
|
|
@@ -807,35 +793,23 @@ class SafetyCalculator:
|
|
|
# 检查除数是否为零
|
|
# 检查除数是否为零
|
|
|
if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
|
|
if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
|
|
|
return self._default_value('LatSD') # 返回默认值
|
|
return self._default_value('LatSD') # 返回默认值
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
v_right_rho = v_right + rho * a_lat_max
|
|
v_right_rho = v_right + rho * a_lat_max
|
|
|
v_left_rho = v_left + rho * a_lat_max
|
|
v_left_rho = v_left + rho * a_lat_max
|
|
|
dist_min = lat_dist + (
|
|
dist_min = lat_dist + (
|
|
|
- (v_right + v_right_rho) * rho / 2
|
|
|
|
|
- + v_right_rho**2 / a_right_lat_brake_min / 2
|
|
|
|
|
- + ((v_left + v_right_rho) * rho / 2)
|
|
|
|
|
- + v_left_rho**2 / a_left_lat_brake_min / 2
|
|
|
|
|
|
|
+ (v_right + v_right_rho) * rho / 2
|
|
|
|
|
+ + v_right_rho ** 2 / a_right_lat_brake_min / 2
|
|
|
|
|
+ + ((v_left + v_right_rho) * rho / 2)
|
|
|
|
|
+ + v_left_rho ** 2 / a_left_lat_brake_min / 2
|
|
|
)
|
|
)
|
|
|
return dist_min
|
|
return dist_min
|
|
|
|
|
+
|
|
|
def _cal_DTC(self, v_on_dist, a_on_dist, t):
|
|
def _cal_DTC(self, v_on_dist, a_on_dist, t):
|
|
|
if a_on_dist == 0:
|
|
if a_on_dist == 0:
|
|
|
return None
|
|
return None
|
|
|
DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
|
|
DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
|
|
|
return DTC
|
|
return DTC
|
|
|
|
|
|
|
|
- def _cal_PET(self, lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2):
|
|
|
|
|
- dist1 = self.horizontal_distance(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy)
|
|
|
|
|
- dist2 = self.horizontal_distance(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy)
|
|
|
|
|
- if ((dist1 <= lane_width/2) and (self._is_alone_the_road(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy))
|
|
|
|
|
- and (self._is_in_the_road(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy))
|
|
|
|
|
- and (dist2 <= lane_width/2) and (a1 != 0) and (a2 != 0)):
|
|
|
|
|
- dist_ego = np.sqrt((ego_posx - lane_posx1)**2 + (ego_posy-lane_posy1)**2)
|
|
|
|
|
- dist_obj = np.sqrt((obj_posx - ramp_posx2)**2 + (obj_posy-ramp_posy2)**2)
|
|
|
|
|
- PET = (-2*v2 + np.sqrt((4* v2**2)-8*a2*(v2*delta_t - dist_obj)))/ 2/ a2 - (2*v1 + np.sqrt((4* v1**2)-8*a1*dist_ego))/ 2/ a1 + delta_t
|
|
|
|
|
- return PET
|
|
|
|
|
- else:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
|
|
def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
|
|
|
if v1 == 0:
|
|
if v1 == 0:
|
|
|
return None
|
|
return None
|
|
@@ -846,6 +820,7 @@ class SafetyCalculator:
|
|
|
return PSD
|
|
return PSD
|
|
|
else:
|
|
else:
|
|
|
return None
|
|
return None
|
|
|
|
|
+
|
|
|
# DRAC (decelerate required avoid collision)
|
|
# DRAC (decelerate required avoid collision)
|
|
|
def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
|
|
def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
|
|
|
dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
|
|
dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
|
|
@@ -921,9 +896,9 @@ class SafetyCalculator:
|
|
|
unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
|
|
unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
|
|
|
return unfunc_df
|
|
return unfunc_df
|
|
|
# 统计最危险的指标
|
|
# 统计最危险的指标
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _safe_statistic_most_dangerous(self):
|
|
def _safe_statistic_most_dangerous(self):
|
|
|
- min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PET', 'PSD']
|
|
|
|
|
|
|
+ min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PSD']
|
|
|
max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
|
|
max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
|
|
|
result = {}
|
|
result = {}
|
|
|
for metric in min_list:
|
|
for metric in min_list:
|
|
@@ -957,7 +932,6 @@ class SafetyCalculator:
|
|
|
'TTB': 10.0,
|
|
'TTB': 10.0,
|
|
|
'TM': 10.0,
|
|
'TM': 10.0,
|
|
|
'DTC': 10.0,
|
|
'DTC': 10.0,
|
|
|
- 'PET': 10.0,
|
|
|
|
|
'PSD': 10.0,
|
|
'PSD': 10.0,
|
|
|
'LonSD': 10.0,
|
|
'LonSD': 10.0,
|
|
|
'LatSD': 2.0,
|
|
'LatSD': 2.0,
|
|
@@ -982,15 +956,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('TTC')
|
|
return self._default_value('TTC')
|
|
|
ttc_values = self.df_safe['TTC'].dropna()
|
|
ttc_values = self.df_safe['TTC'].dropna()
|
|
|
ttc_value = float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
|
|
ttc_value = float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集TTC数据
|
|
# 收集TTC数据
|
|
|
if not ttc_values.empty:
|
|
if not ttc_values.empty:
|
|
|
self.ttc_data = []
|
|
self.ttc_data = []
|
|
|
for time, frame, ttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTC']):
|
|
for time, frame, ttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTC']):
|
|
|
if pd.notnull(ttc):
|
|
if pd.notnull(ttc):
|
|
|
self.ttc_data.append({'simTime': time, 'simFrame': frame, 'TTC': ttc})
|
|
self.ttc_data.append({'simTime': time, 'simFrame': frame, 'TTC': ttc})
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return ttc_value
|
|
return ttc_value
|
|
|
|
|
|
|
|
def get_mttc_value(self) -> float:
|
|
def get_mttc_value(self) -> float:
|
|
@@ -998,14 +971,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('MTTC')
|
|
return self._default_value('MTTC')
|
|
|
mttc_values = self.df_safe['MTTC'].dropna()
|
|
mttc_values = self.df_safe['MTTC'].dropna()
|
|
|
mttc_value = float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
|
|
mttc_value = float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集MTTC数据
|
|
# 收集MTTC数据
|
|
|
if not mttc_values.empty:
|
|
if not mttc_values.empty:
|
|
|
self.mttc_data = []
|
|
self.mttc_data = []
|
|
|
for time, frame, mttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['MTTC']):
|
|
for time, frame, mttc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['MTTC']):
|
|
|
if pd.notnull(mttc):
|
|
if pd.notnull(mttc):
|
|
|
self.mttc_data.append({'simTime': time, 'simFrame': frame, 'MTTC': mttc})
|
|
self.mttc_data.append({'simTime': time, 'simFrame': frame, 'MTTC': mttc})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return mttc_value
|
|
return mttc_value
|
|
|
|
|
|
|
|
def get_thw_value(self) -> float:
|
|
def get_thw_value(self) -> float:
|
|
@@ -1013,14 +986,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('THW')
|
|
return self._default_value('THW')
|
|
|
thw_values = self.df_safe['THW'].dropna()
|
|
thw_values = self.df_safe['THW'].dropna()
|
|
|
thw_value = float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
|
|
thw_value = float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集THW数据
|
|
# 收集THW数据
|
|
|
if not thw_values.empty:
|
|
if not thw_values.empty:
|
|
|
self.thw_data = []
|
|
self.thw_data = []
|
|
|
for time, frame, thw in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['THW']):
|
|
for time, frame, thw in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['THW']):
|
|
|
if pd.notnull(thw):
|
|
if pd.notnull(thw):
|
|
|
self.thw_data.append({'simTime': time, 'simFrame': frame, 'THW': thw})
|
|
self.thw_data.append({'simTime': time, 'simFrame': frame, 'THW': thw})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return thw_value
|
|
return thw_value
|
|
|
|
|
|
|
|
def get_tlc_value(self) -> float:
|
|
def get_tlc_value(self) -> float:
|
|
@@ -1028,14 +1001,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('TLC')
|
|
return self._default_value('TLC')
|
|
|
tlc_values = self.df_safe['TLC'].dropna()
|
|
tlc_values = self.df_safe['TLC'].dropna()
|
|
|
tlc_value = float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
|
|
tlc_value = float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集TLC数据
|
|
# 收集TLC数据
|
|
|
if not tlc_values.empty:
|
|
if not tlc_values.empty:
|
|
|
self.tlc_data = []
|
|
self.tlc_data = []
|
|
|
for time, frame, tlc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TLC']):
|
|
for time, frame, tlc in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TLC']):
|
|
|
if pd.notnull(tlc):
|
|
if pd.notnull(tlc):
|
|
|
self.tlc_data.append({'simTime': time, 'simFrame': frame, 'TLC': tlc})
|
|
self.tlc_data.append({'simTime': time, 'simFrame': frame, 'TLC': tlc})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return tlc_value
|
|
return tlc_value
|
|
|
|
|
|
|
|
def get_ttb_value(self) -> float:
|
|
def get_ttb_value(self) -> float:
|
|
@@ -1043,14 +1016,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('TTB')
|
|
return self._default_value('TTB')
|
|
|
ttb_values = self.df_safe['TTB'].dropna()
|
|
ttb_values = self.df_safe['TTB'].dropna()
|
|
|
ttb_value = float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
|
|
ttb_value = float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集TTB数据
|
|
# 收集TTB数据
|
|
|
if not ttb_values.empty:
|
|
if not ttb_values.empty:
|
|
|
self.ttb_data = []
|
|
self.ttb_data = []
|
|
|
for time, frame, ttb in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTB']):
|
|
for time, frame, ttb in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TTB']):
|
|
|
if pd.notnull(ttb):
|
|
if pd.notnull(ttb):
|
|
|
self.ttb_data.append({'simTime': time, 'simFrame': frame, 'TTB': ttb})
|
|
self.ttb_data.append({'simTime': time, 'simFrame': frame, 'TTB': ttb})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return ttb_value
|
|
return ttb_value
|
|
|
|
|
|
|
|
def get_tm_value(self) -> float:
|
|
def get_tm_value(self) -> float:
|
|
@@ -1058,14 +1031,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('TM')
|
|
return self._default_value('TM')
|
|
|
tm_values = self.df_safe['TM'].dropna()
|
|
tm_values = self.df_safe['TM'].dropna()
|
|
|
tm_value = float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
|
|
tm_value = float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集TM数据
|
|
# 收集TM数据
|
|
|
if not tm_values.empty:
|
|
if not tm_values.empty:
|
|
|
self.tm_data = []
|
|
self.tm_data = []
|
|
|
for time, frame, tm in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TM']):
|
|
for time, frame, tm in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['TM']):
|
|
|
if pd.notnull(tm):
|
|
if pd.notnull(tm):
|
|
|
self.tm_data.append({'simTime': time, 'simFrame': frame, 'TM': tm})
|
|
self.tm_data.append({'simTime': time, 'simFrame': frame, 'TM': tm})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return tm_value
|
|
return tm_value
|
|
|
|
|
|
|
|
def get_dtc_value(self) -> float:
|
|
def get_dtc_value(self) -> float:
|
|
@@ -1074,12 +1047,6 @@ class SafetyCalculator:
|
|
|
dtc_values = self.df_safe['DTC'].dropna()
|
|
dtc_values = self.df_safe['DTC'].dropna()
|
|
|
return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
|
|
return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
|
|
|
|
|
|
|
|
- def get_pet_value(self) -> float:
|
|
|
|
|
- if self.empty_flag or self.df_safe is None:
|
|
|
|
|
- return self._default_value('PET')
|
|
|
|
|
- pet_values = self.df_safe['PET'].dropna()
|
|
|
|
|
- return float(pet_values.min()) if not pet_values.empty else self._default_value('PET')
|
|
|
|
|
-
|
|
|
|
|
def get_psd_value(self) -> float:
|
|
def get_psd_value(self) -> float:
|
|
|
if self.empty_flag or self.df_safe is None:
|
|
if self.empty_flag or self.df_safe is None:
|
|
|
return self._default_value('PSD')
|
|
return self._default_value('PSD')
|
|
@@ -1091,14 +1058,14 @@ class SafetyCalculator:
|
|
|
return self._default_value('LonSD')
|
|
return self._default_value('LonSD')
|
|
|
lonsd_values = self.df_safe['LonSD'].dropna()
|
|
lonsd_values = self.df_safe['LonSD'].dropna()
|
|
|
lonsd_value = float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
|
|
lonsd_value = float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集LonSD数据
|
|
# 收集LonSD数据
|
|
|
if not lonsd_values.empty:
|
|
if not lonsd_values.empty:
|
|
|
self.lonsd_data = []
|
|
self.lonsd_data = []
|
|
|
for time, frame, lonsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LonSD']):
|
|
for time, frame, lonsd in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['LonSD']):
|
|
|
if pd.notnull(lonsd):
|
|
if pd.notnull(lonsd):
|
|
|
self.lonsd_data.append({'simTime': time, 'simFrame': frame, 'LonSD': lonsd})
|
|
self.lonsd_data.append({'simTime': time, 'simFrame': frame, 'LonSD': lonsd})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return lonsd_value
|
|
return lonsd_value
|
|
|
|
|
|
|
|
def get_latsd_value(self) -> float:
|
|
def get_latsd_value(self) -> float:
|
|
@@ -1107,7 +1074,7 @@ class SafetyCalculator:
|
|
|
latsd_values = self.df_safe['LatSD'].dropna()
|
|
latsd_values = self.df_safe['LatSD'].dropna()
|
|
|
# 使用最小值而非平均值,与safety1.py保持一致
|
|
# 使用最小值而非平均值,与safety1.py保持一致
|
|
|
latsd_value = float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD')
|
|
latsd_value = float(latsd_values.min()) if not latsd_values.empty else self._default_value('LatSD')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集LatSD数据
|
|
# 收集LatSD数据
|
|
|
if not latsd_values.empty:
|
|
if not latsd_values.empty:
|
|
|
self.latsd_data = []
|
|
self.latsd_data = []
|
|
@@ -1122,7 +1089,7 @@ class SafetyCalculator:
|
|
|
return self._default_value('BTN')
|
|
return self._default_value('BTN')
|
|
|
btn_values = self.df_safe['BTN'].dropna()
|
|
btn_values = self.df_safe['BTN'].dropna()
|
|
|
btn_value = float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
|
|
btn_value = float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集BTN数据
|
|
# 收集BTN数据
|
|
|
if not btn_values.empty:
|
|
if not btn_values.empty:
|
|
|
self.btn_data = []
|
|
self.btn_data = []
|
|
@@ -1137,11 +1104,12 @@ class SafetyCalculator:
|
|
|
return self._default_value('collisionRisk')
|
|
return self._default_value('collisionRisk')
|
|
|
risk_values = self.df_safe['collisionRisk'].dropna()
|
|
risk_values = self.df_safe['collisionRisk'].dropna()
|
|
|
risk_value = float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
|
|
risk_value = float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 收集碰撞风险数据
|
|
# 收集碰撞风险数据
|
|
|
if not risk_values.empty:
|
|
if not risk_values.empty:
|
|
|
self.collision_risk_data = []
|
|
self.collision_risk_data = []
|
|
|
- for time, frame, risk in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionRisk']):
|
|
|
|
|
|
|
+ for time, frame, risk in zip(self.df_safe['simTime'], self.df_safe['simFrame'],
|
|
|
|
|
+ self.df_safe['collisionRisk']):
|
|
|
if pd.notnull(risk):
|
|
if pd.notnull(risk):
|
|
|
self.collision_risk_data.append({'simTime': time, 'simFrame': frame, 'collisionRisk': risk})
|
|
self.collision_risk_data.append({'simTime': time, 'simFrame': frame, 'collisionRisk': risk})
|
|
|
|
|
|
|
@@ -1151,15 +1119,17 @@ class SafetyCalculator:
|
|
|
if self.empty_flag or self.df_safe is None:
|
|
if self.empty_flag or self.df_safe is None:
|
|
|
return self._default_value('collisionSeverity')
|
|
return self._default_value('collisionSeverity')
|
|
|
severity_values = self.df_safe['collisionSeverity'].dropna()
|
|
severity_values = self.df_safe['collisionSeverity'].dropna()
|
|
|
- severity_value = float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')
|
|
|
|
|
-
|
|
|
|
|
|
|
+ severity_value = float(severity_values.max()) if not severity_values.empty else self._default_value(
|
|
|
|
|
+ 'collisionSeverity')
|
|
|
|
|
+
|
|
|
# 收集碰撞严重性数据
|
|
# 收集碰撞严重性数据
|
|
|
if not severity_values.empty:
|
|
if not severity_values.empty:
|
|
|
self.collision_severity_data = []
|
|
self.collision_severity_data = []
|
|
|
- for time, frame, severity in zip(self.df_safe['simTime'], self.df_safe['simFrame'], self.df_safe['collisionSeverity']):
|
|
|
|
|
|
|
+ for time, frame, severity in zip(self.df_safe['simTime'], self.df_safe['simFrame'],
|
|
|
|
|
+ self.df_safe['collisionSeverity']):
|
|
|
if pd.notnull(severity):
|
|
if pd.notnull(severity):
|
|
|
- self.collision_severity_data.append({'simTime': time, 'simFrame': frame, 'collisionSeverity': severity})
|
|
|
|
|
|
|
+ self.collision_severity_data.append(
|
|
|
|
|
+ {'simTime': time, 'simFrame': frame, 'collisionSeverity': severity})
|
|
|
|
|
|
|
|
-
|
|
|
|
|
return severity_value
|
|
return severity_value
|
|
|
|
|
|