|
@@ -71,6 +71,45 @@ def calculate_thw(data_processed) -> dict:
|
|
|
LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
|
|
|
return {"THW": None}
|
|
|
|
|
|
+def calculate_tlc(data_processed) -> dict:
|
|
|
+ """计算TLC (Time to Line Crossing)"""
|
|
|
+ if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
+ return {"TLC": None}
|
|
|
+ try:
|
|
|
+ safety = SafetyCalculator(data_processed)
|
|
|
+ tlc_value = safety.get_tlc_value()
|
|
|
+ LogManager().get_logger().info(f"安全指标[TLC]计算结果: {tlc_value}")
|
|
|
+ return {"TLC": tlc_value}
|
|
|
+ except Exception as e:
|
|
|
+ LogManager().get_logger().error(f"TLC计算异常: {str(e)}", exc_info=True)
|
|
|
+ return {"TLC": None}
|
|
|
+
|
|
|
+def calculate_ttb(data_processed) -> dict:
|
|
|
+ """计算TTB (Time to Brake)"""
|
|
|
+ if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
+ return {"TTB": None}
|
|
|
+ try:
|
|
|
+ safety = SafetyCalculator(data_processed)
|
|
|
+ ttb_value = safety.get_ttb_value()
|
|
|
+ LogManager().get_logger().info(f"安全指标[TTB]计算结果: {ttb_value}")
|
|
|
+ return {"TTB": ttb_value}
|
|
|
+ except Exception as e:
|
|
|
+ LogManager().get_logger().error(f"TTB计算异常: {str(e)}", exc_info=True)
|
|
|
+ return {"TTB": None}
|
|
|
+
|
|
|
+def calculate_tm(data_processed) -> dict:
|
|
|
+ """计算TM (Time Margin)"""
|
|
|
+ if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
+ return {"TM": None}
|
|
|
+ try:
|
|
|
+ safety = SafetyCalculator(data_processed)
|
|
|
+ tm_value = safety.get_tm_value()
|
|
|
+ LogManager().get_logger().info(f"安全指标[TM]计算结果: {tm_value}")
|
|
|
+ return {"TM": tm_value}
|
|
|
+ except Exception as e:
|
|
|
+ LogManager().get_logger().error(f"TM计算异常: {str(e)}", exc_info=True)
|
|
|
+ return {"TM": None}
|
|
|
+
|
|
|
def calculate_collisionrisk(data_processed) -> dict:
|
|
|
"""计算碰撞风险"""
|
|
|
safety = SafetyCalculator(data_processed)
|
|
@@ -188,6 +227,9 @@ class SafetyCalculator:
|
|
|
"TTC": 10.0,
|
|
|
"MTTC": 10.0,
|
|
|
"THW": 10.0,
|
|
|
+ "TLC": 10.0,
|
|
|
+ "TTB": 10.0,
|
|
|
+ "TM": 10.0,
|
|
|
"LatSD": 3.0,
|
|
|
"BTN": 1.0,
|
|
|
"collisionRisk": 0.0,
|
|
@@ -227,6 +269,8 @@ class SafetyCalculator:
|
|
|
ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
|
|
|
ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
|
|
|
ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
|
+ ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
|
+ x_relative_start_dist = self.ego_df["x_relative_start_dist"]
|
|
|
|
|
|
obj_dict = defaultdict(dict)
|
|
|
obj_data_dict = self.df.to_dict('records')
|
|
@@ -242,10 +286,13 @@ class SafetyCalculator:
|
|
|
x1 = ego_data['posX']
|
|
|
y1 = ego_data['posY']
|
|
|
h1 = ego_data['posH']
|
|
|
+ laneOffset = ego_data["laneOffset"]
|
|
|
+
|
|
|
v_x1 = ego_data['speedX']
|
|
|
v_y1 = ego_data['speedY']
|
|
|
a_x1 = ego_data['accelX']
|
|
|
a_y1 = ego_data['accelY']
|
|
|
+ a1 = np.sqrt(a_x1 ** 2 + a_y1 ** 2)
|
|
|
|
|
|
for playerId in self.obj_id_list:
|
|
|
if playerId == EGO_PLAYER_ID:
|
|
@@ -265,6 +312,7 @@ class SafetyCalculator:
|
|
|
v2 = obj_data['v']
|
|
|
a_x2 = obj_data['accelX']
|
|
|
a_y2 = obj_data['accelY']
|
|
|
+ a2 = np.sqrt(a_x2 ** 2 + a_y2 ** 2)
|
|
|
|
|
|
dx, dy = x2 - x1, y2 - y1
|
|
|
|
|
@@ -289,6 +337,7 @@ class SafetyCalculator:
|
|
|
|
|
|
vx, vy = v_x1 - v_x2, v_y1 - v_y2
|
|
|
ax, ay = a_x2 - a_x1, a_y2 - a_y1
|
|
|
+ relative_v = np.sqrt(vx ** 2 + vy ** 2)
|
|
|
|
|
|
v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
|
|
|
v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
|
|
@@ -306,6 +355,9 @@ class SafetyCalculator:
|
|
|
TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
|
|
|
MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
|
|
|
THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
|
|
|
+ TLC = self._cal_TLC(v1, h1, laneOffset)
|
|
|
+ TTB = self._cal_TTB(x_relative_start_dist, relative_v, ego_decel_max)
|
|
|
+ TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
|
|
|
|
LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
|
|
|
|
|
@@ -339,10 +391,16 @@ class SafetyCalculator:
|
|
|
TTC = None if (TTC is None or TTC < 0) else TTC
|
|
|
MTTC = None if (MTTC is None or MTTC < 0) else MTTC
|
|
|
THW = None if (THW is None or THW < 0) else THW
|
|
|
+ TLC = None if (TLC is None or TLC < 0) else TLC
|
|
|
+ TTB = None if (TTB is None or TTB < 0) else TTB
|
|
|
+ TM = None if (TM is None or TM < 0) else TM
|
|
|
|
|
|
obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
|
obj_dict[frame_num][playerId]['MTTC'] = MTTC
|
|
|
obj_dict[frame_num][playerId]['THW'] = THW
|
|
|
+ obj_dict[frame_num][playerId]['TLC'] = TLC
|
|
|
+ obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
|
+ obj_dict[frame_num][playerId]['TM'] = TM
|
|
|
obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
|
obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
|
obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
@@ -360,7 +418,7 @@ class SafetyCalculator:
|
|
|
|
|
|
df_safe = pd.concat(df_list)
|
|
|
col_list = ['simTime', 'simFrame', 'playerId',
|
|
|
- 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'BTN',
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'LonSD', 'LatSD', 'BTN',
|
|
|
'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
|
self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
|
|
|
@@ -511,6 +569,33 @@ class SafetyCalculator:
|
|
|
else:
|
|
|
THW = dist / v_ego_projection_in_dist
|
|
|
return THW
|
|
|
+ # TLC (time to line crossing)
|
|
|
+ def _cal_TLC(self, ego_v, ego_yaw, laneOffset):
|
|
|
+ TLC = laneOffset/ego_v/np.sin(ego_yaw) if ((ego_v != 0) and (np.sin(ego_yaw) != 0)) else 10.0
|
|
|
+ if TLC < 0:
|
|
|
+ TLC = None
|
|
|
+ return TLC
|
|
|
+
|
|
|
+ def _cal_TTB(self, x_relative_start_dist, relative_v, ego_decel_max):
|
|
|
+ if len(x_relative_start_dist):
|
|
|
+ return None
|
|
|
+ if (ego_decel_max == 0) or (relative_v == 0):
|
|
|
+ return self.calculated_value["TTB"]
|
|
|
+ else:
|
|
|
+ x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
+ TTB = (x_relative_start_dist0 + relative_v * relative_v/2/ego_decel_max)/relative_v
|
|
|
+ return TTB
|
|
|
+
|
|
|
+ def _cal_TM(self, x_relative_start_dist, v2, a2, v1, a1):
|
|
|
+ if len(x_relative_start_dist):
|
|
|
+ return None
|
|
|
+ if (a2 == 0) or (v1 == 0):
|
|
|
+ return self.calculated_value["TM"]
|
|
|
+ if a1 == 0:
|
|
|
+ return None
|
|
|
+ x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
+ TM = (x_relative_start_dist0 + v2**2/(2*a2) - v1**2/(2*a1)) / v1
|
|
|
+ return TM
|
|
|
|
|
|
def velocity(self, v_x, v_y):
|
|
|
v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
|
|
@@ -614,7 +699,7 @@ class SafetyCalculator:
|
|
|
# 统计最危险的指标
|
|
|
|
|
|
def _safe_statistic_most_dangerous(self):
|
|
|
- min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
|
|
|
+ min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM']
|
|
|
max_list = ['BTN', 'collisionRisk', 'collisionSeverity']
|
|
|
result = {}
|
|
|
for metric in min_list:
|
|
@@ -644,6 +729,9 @@ class SafetyCalculator:
|
|
|
'TTC': 10.0,
|
|
|
'MTTC': 4.2,
|
|
|
'THW': 2.1,
|
|
|
+ 'TLC': 10.0,
|
|
|
+ 'TTB': 10.0,
|
|
|
+ 'TM': 10.0,
|
|
|
'LonSD': 10.0,
|
|
|
'LatSD': 2.0,
|
|
|
'BTN': 1.0,
|
|
@@ -680,6 +768,24 @@ class SafetyCalculator:
|
|
|
thw_values = self.df_safe['THW'].dropna()
|
|
|
return float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
|
|
|
|
|
|
+ def get_tlc_value(self) -> float:
|
|
|
+ if self.empty_flag or self.df_safe is None:
|
|
|
+ return self._default_value('TLC')
|
|
|
+ tlc_values = self.df_safe['TLC'].dropna()
|
|
|
+ return float(tlc_values.min()) if not tlc_values.empty else self._default_value('TLC')
|
|
|
+
|
|
|
+ def get_ttb_value(self) -> float:
|
|
|
+ if self.empty_flag or self.df_safe is None:
|
|
|
+ return self._default_value('TTB')
|
|
|
+ ttb_values = self.df_safe['TTB'].dropna()
|
|
|
+ return float(ttb_values.min()) if not ttb_values.empty else self._default_value('TTB')
|
|
|
+
|
|
|
+ def get_tm_value(self) -> float:
|
|
|
+ if self.empty_flag or self.df_safe is None:
|
|
|
+ return self._default_value('TM')
|
|
|
+ tm_values = self.df_safe['TM'].dropna()
|
|
|
+ return float(tm_values.min()) if not tm_values.empty else self._default_value('TM')
|
|
|
+
|
|
|
def get_lonsd_value(self) -> float:
|
|
|
if self.empty_flag or self.df_safe is None:
|
|
|
return self._default_value('LonSD')
|