|
@@ -9,6 +9,7 @@ import pandas as pd
|
|
import math
|
|
import math
|
|
from collections import defaultdict
|
|
from collections import defaultdict
|
|
from typing import Dict, Any, List, Optional
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
+import ast
|
|
|
|
|
|
from modules.lib.score import Score
|
|
from modules.lib.score import Score
|
|
from modules.lib.log_manager import LogManager
|
|
from modules.lib.log_manager import LogManager
|
|
@@ -26,7 +27,12 @@ SAFETY_INFO = [
|
|
"accelX",
|
|
"accelX",
|
|
"accelY",
|
|
"accelY",
|
|
"v",
|
|
"v",
|
|
- "type"
|
|
|
|
|
|
+ "type",
|
|
|
|
+ "lane_width",
|
|
|
|
+ "lane_type",
|
|
|
|
+ "road_type",
|
|
|
|
+ "lane_coords",
|
|
|
|
+ "link_coords"
|
|
]
|
|
]
|
|
|
|
|
|
# ----------------------
|
|
# ----------------------
|
|
@@ -123,19 +129,6 @@ def calculate_tm(data_processed) -> dict:
|
|
# LogManager().get_logger().error(f"MPrTTC计算异常: {str(e)}", exc_info=True)
|
|
# LogManager().get_logger().error(f"MPrTTC计算异常: {str(e)}", exc_info=True)
|
|
# return {"MPrTTC": None}
|
|
# return {"MPrTTC": None}
|
|
|
|
|
|
-# def calculate_pet(data_processed) -> dict:
|
|
|
|
-# # PET (Post Encroachment Time)
|
|
|
|
-# if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
|
-# return {"PET": None}
|
|
|
|
-# try:
|
|
|
|
-# safety = SafetyCalculator(data_processed)
|
|
|
|
-# pet_value = safety.get_pet_value()
|
|
|
|
-# LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}")
|
|
|
|
-# return {"PET": pet_value}
|
|
|
|
-# except Exception as e:
|
|
|
|
-# LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True)
|
|
|
|
-# return {"PET": None}
|
|
|
|
-
|
|
|
|
def calculate_dtc(data_processed) -> dict:
|
|
def calculate_dtc(data_processed) -> dict:
|
|
"""计算DTC (Distance to Collision)"""
|
|
"""计算DTC (Distance to Collision)"""
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
@@ -149,18 +142,31 @@ def calculate_dtc(data_processed) -> dict:
|
|
LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
return {"DTC": None}
|
|
return {"DTC": None}
|
|
|
|
|
|
-def calculate_dtc(data_processed) -> dict:
|
|
|
|
- """计算DTC (Distance to Collision)"""
|
|
|
|
|
|
+def calculate_pet(data_processed) -> dict:
|
|
|
|
+ """计算PET (Post Encroachment Time)"""
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
- return {"DTC": None}
|
|
|
|
|
|
+ return {"PET": None}
|
|
try:
|
|
try:
|
|
safety = SafetyCalculator(data_processed)
|
|
safety = SafetyCalculator(data_processed)
|
|
- dtc_value = safety.get_dtc_value()
|
|
|
|
- LogManager().get_logger().info(f"安全指标[DTC]计算结果: {dtc_value}")
|
|
|
|
- return {"DTC": dtc_value}
|
|
|
|
|
|
+ pet_value = safety.get_dtc_value()
|
|
|
|
+ LogManager().get_logger().info(f"安全指标[PET]计算结果: {pet_value}")
|
|
|
|
+ return {"PET": pet_value}
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- LogManager().get_logger().error(f"DTC计算异常: {str(e)}", exc_info=True)
|
|
|
|
- return {"DTC": None}
|
|
|
|
|
|
+ LogManager().get_logger().error(f"PET计算异常: {str(e)}", exc_info=True)
|
|
|
|
+ return {"PET": None}
|
|
|
|
+
|
|
|
|
+def calculate_psd(data_processed) -> dict:
|
|
|
|
+ """计算PSD (Potential Safety Distance)"""
|
|
|
|
+ if data_processed is None or not hasattr(data_processed, 'object_df'):
|
|
|
|
+ return {"PSD": None}
|
|
|
|
+ try:
|
|
|
|
+ safety = SafetyCalculator(data_processed)
|
|
|
|
+ psd_value = safety.get_psd_value()
|
|
|
|
+ LogManager().get_logger().info(f"安全指标[PSD]计算结果: {psd_value}")
|
|
|
|
+ return {"PSD": psd_value}
|
|
|
|
+ except Exception as e:
|
|
|
|
+ LogManager().get_logger().error(f"PSD计算异常: {str(e)}", exc_info=True)
|
|
|
|
+ return {"PSD": None}
|
|
|
|
|
|
def calculate_collisionrisk(data_processed) -> dict:
|
|
def calculate_collisionrisk(data_processed) -> dict:
|
|
"""计算碰撞风险"""
|
|
"""计算碰撞风险"""
|
|
@@ -272,7 +278,7 @@ class SafetyCalculator:
|
|
self.obj_id_list = data_processed.obj_id_list
|
|
self.obj_id_list = data_processed.obj_id_list
|
|
|
|
|
|
self.metric_list = [
|
|
self.metric_list = [
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
|
|
|
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
|
|
]
|
|
]
|
|
|
|
|
|
# 初始化默认值
|
|
# 初始化默认值
|
|
@@ -284,8 +290,9 @@ class SafetyCalculator:
|
|
"TTB": 10.0,
|
|
"TTB": 10.0,
|
|
"TM": 10.0,
|
|
"TM": 10.0,
|
|
# "MPrTTC": 10.0,
|
|
# "MPrTTC": 10.0,
|
|
- # "PET": 10.0,
|
|
|
|
|
|
+ "PET": 10.0,
|
|
"DTC": 10.0,
|
|
"DTC": 10.0,
|
|
|
|
+ "PSD": 10.0,
|
|
"LatSD": 3.0,
|
|
"LatSD": 3.0,
|
|
"BTN": 1.0,
|
|
"BTN": 1.0,
|
|
"collisionRisk": 0.0,
|
|
"collisionRisk": 0.0,
|
|
@@ -327,7 +334,7 @@ class SafetyCalculator:
|
|
ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
|
|
driver_reaction_time = self.data_processed.vehicle_config["RHO"]
|
|
driver_reaction_time = self.data_processed.vehicle_config["RHO"]
|
|
ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
ego_decel_max = np.sqrt(ego_decel_lon_max ** 2 + ego_decel_lat_max ** 2)
|
|
- x_relative_start_dist = self.ego_df["x_relative_start_dist"]
|
|
|
|
|
|
+ x_relative_start_dist = self.ego_df["x_relative_dist"]
|
|
|
|
|
|
obj_dict = defaultdict(dict)
|
|
obj_dict = defaultdict(dict)
|
|
obj_data_dict = self.df.to_dict('records')
|
|
obj_data_dict = self.df.to_dict('records')
|
|
@@ -336,7 +343,8 @@ class SafetyCalculator:
|
|
|
|
|
|
df_list = []
|
|
df_list = []
|
|
EGO_PLAYER_ID = 1
|
|
EGO_PLAYER_ID = 1
|
|
-
|
|
|
|
|
|
+ ramp_poss = self.ego_df[self.ego_df["link_type"] == 19]["link_coords"].drop_duplicates().tolist() # 寻找匝道的位置坐标
|
|
|
|
+ lane_poss = self.ego_df[self.ego_df["lane_type"] == 2]["lane_coords"].drop_duplicates().tolist() # 寻找匝道的位置坐标
|
|
for frame_num in self.frame_list:
|
|
for frame_num in self.frame_list:
|
|
ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
|
|
ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
|
|
v1 = ego_data['v']
|
|
v1 = ego_data['v']
|
|
@@ -417,7 +425,28 @@ class SafetyCalculator:
|
|
TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
TM = self._cal_TM(x_relative_start_dist, v2, a2, v1, a1)
|
|
DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, driver_reaction_time)
|
|
DTC = self._cal_DTC(vrel_projection_in_dist, arel_projection_in_dist, driver_reaction_time)
|
|
# MPrTTC = self._cal_MPrTTC(x_relative_start_dist)
|
|
# MPrTTC = self._cal_MPrTTC(x_relative_start_dist)
|
|
- # PET = self._cal_PET(x_relative_start_dist, v2, a2, v1, a1)
|
|
|
|
|
|
+ # PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
|
|
|
|
+ PET = None
|
|
|
|
+ for lane_pos in lane_poss:
|
|
|
|
+ lane_posx1 = ast.literal_eval(lane_pos)[0][0]
|
|
|
|
+ lane_posy1 = ast.literal_eval(lane_pos)[0][1]
|
|
|
|
+ lane_posx2 = ast.literal_eval(lane_pos)[-1][0]
|
|
|
|
+ lane_posy2 = ast.literal_eval(lane_pos)[-1][1]
|
|
|
|
+ for ramp_pos in ramp_poss:
|
|
|
|
+ ramp_posx1 = ast.literal_eval(ramp_pos)[0][0]
|
|
|
|
+ ramp_posy1 = ast.literal_eval(ramp_pos)[0][1]
|
|
|
|
+ ramp_posx2 = ast.literal_eval(ramp_pos)[-1][0]
|
|
|
|
+ ramp_posy2 = ast.literal_eval(ramp_pos)[-1][1]
|
|
|
|
+ ego_posx = x1
|
|
|
|
+ ego_posy = y1
|
|
|
|
+ obj_posx = x2
|
|
|
|
+ obj_posy = y2
|
|
|
|
+ delta_t = self._cal_reaction_time_to_avgspeed(self.ego_df)
|
|
|
|
+ lane_width = self.ego_df["lane_width"].iloc[0]
|
|
|
|
+ PET = self._cal_PET(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2,
|
|
|
|
+ ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2)
|
|
|
|
+ PSD = self._cal_PSD(x_relative_start_dist, v1, ego_decel_lon_max)
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
|
|
LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
|
|
@@ -456,6 +485,8 @@ class SafetyCalculator:
|
|
TTB = None if (TTB is None or TTB < 0) else TTB
|
|
TTB = None if (TTB is None or TTB < 0) else TTB
|
|
TM = None if (TM is None or TM < 0) else TM
|
|
TM = None if (TM is None or TM < 0) else TM
|
|
DTC = None if (DTC is None or DTC < 0) else DTC
|
|
DTC = None if (DTC is None or DTC < 0) else DTC
|
|
|
|
+ PET = None if (PET is None or PET < 0) else PET
|
|
|
|
+ PSD = None if (PSD is None or PSD < 0) else PSD
|
|
|
|
|
|
obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
obj_dict[frame_num][playerId]['TTC'] = TTC
|
|
obj_dict[frame_num][playerId]['MTTC'] = MTTC
|
|
obj_dict[frame_num][playerId]['MTTC'] = MTTC
|
|
@@ -464,6 +495,8 @@ class SafetyCalculator:
|
|
obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
obj_dict[frame_num][playerId]['TTB'] = TTB
|
|
obj_dict[frame_num][playerId]['TM'] = TM
|
|
obj_dict[frame_num][playerId]['TM'] = TM
|
|
obj_dict[frame_num][playerId]['DTC'] = DTC
|
|
obj_dict[frame_num][playerId]['DTC'] = DTC
|
|
|
|
+ obj_dict[frame_num][playerId]['PET'] = PET
|
|
|
|
+ obj_dict[frame_num][playerId]['PSD'] = PSD
|
|
obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
obj_dict[frame_num][playerId]['LonSD'] = LonSD
|
|
obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
obj_dict[frame_num][playerId]['LatSD'] = LatSD
|
|
obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
|
|
@@ -481,10 +514,32 @@ class SafetyCalculator:
|
|
|
|
|
|
df_safe = pd.concat(df_list)
|
|
df_safe = pd.concat(df_list)
|
|
col_list = ['simTime', 'simFrame', 'playerId',
|
|
col_list = ['simTime', 'simFrame', 'playerId',
|
|
- 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'LonSD', 'LatSD', 'BTN',
|
|
|
|
|
|
+ 'TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'TM', 'DTC', 'PET', 'PSD', 'LonSD', 'LatSD', 'BTN',
|
|
'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
'collisionSeverity', 'pr_death', 'collisionRisk']
|
|
self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
self.df_safe = df_safe[col_list].reset_index(drop=True)
|
|
|
|
|
|
|
|
+ # 计算车辆从非匀速达到匀速的反应时间
|
|
|
|
+ def _cal_reaction_time_to_avgspeed(self, ego_df, threshold = 0.01):
|
|
|
|
+ ego_df = ego_df.reset_index(drop=True)
|
|
|
|
+ ego_df['v_change'] = ego_df['v'].diff()
|
|
|
|
+ # 初始化结果列表
|
|
|
|
+ uniform_speed_segments = []
|
|
|
|
+ start_index = 0
|
|
|
|
+
|
|
|
|
+ # 遍历数据,找出匀速部分
|
|
|
|
+ for i in range(1, len(ego_df)):
|
|
|
|
+ if ego_df['v_change'].iloc[i] > threshold:
|
|
|
|
+ if i - start_index > 1: # 至少有两个数据点才能形成一个匀速段
|
|
|
|
+ uniform_speed_segments.append(
|
|
|
|
+ (ego_df.iloc[start_index]['simTime'], ego_df.iloc[i - 1]['simTime'], ego_df.iloc[start_index]['v']))
|
|
|
|
+ start_index = i
|
|
|
|
+
|
|
|
|
+ # 检查最后一个段
|
|
|
|
+ if len(ego_df) - start_index > 1:
|
|
|
|
+ uniform_speed_segments.append(
|
|
|
|
+ (ego_df.iloc[start_index]['simTime'], ego_df.iloc[-1]['simTime'], ego_df.iloc[start_index]['v']))
|
|
|
|
+ return uniform_speed_segments[-1][0] - ego_df['simTime'].iloc[0]
|
|
|
|
+
|
|
def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
|
|
def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
|
|
# 计算 AB 连线的向量 AB
|
|
# 计算 AB 连线的向量 AB
|
|
# dx = x2 - x1
|
|
# dx = x2 - x1
|
|
@@ -597,6 +652,27 @@ class SafetyCalculator:
|
|
dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
|
|
dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
|
|
return dist
|
|
return dist
|
|
|
|
|
|
|
|
+ # 求车辆与道路之间的横向距离
|
|
|
|
+ def horizontal_distance(self, posx1, posy1, posx2, posy2, posx, posy):
|
|
|
|
+ dist = np.sqrt((posx2 - posx1)(posy - posy1) - (posy2 - posy1)(posx - posy1))/np.sqrt((posx2 - posx1)**2 + (posy2 - posy1)**2)
|
|
|
|
+ return dist
|
|
|
|
+
|
|
|
|
+ # 求车辆与道路之间的纵向距离
|
|
|
|
+ def _is_alone_the_road(self, posx1, posy1, posx2, posy2, posx, posy):
|
|
|
|
+ pro_car = (posx2 - posx1)(posx - posx1) + (posy2 - posy1)(posy - posy1)
|
|
|
|
+ if pro_car > 0:
|
|
|
|
+ return True
|
|
|
|
+ else:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def _is_in_the_road(self, posx1, posy1, posx2, posy2, posx, posy):
|
|
|
|
+ pro_obj1 = (posx2 - posx1)(posx - posx1) + (posy2 - posy1)(posy - posy1)
|
|
|
|
+ pro_obj2 = (posx1 - posx2)(posx - posx2) + (posy1 - posy2)(posy - posy2)
|
|
|
|
+ if pro_obj1 > 0 and pro_obj2 > 0:
|
|
|
|
+ return True
|
|
|
|
+ else:
|
|
|
|
+ return False
|
|
|
|
+
|
|
# TTC (time to collision)
|
|
# TTC (time to collision)
|
|
def _cal_TTC(self, dist, vrel_projection_in_dist):
|
|
def _cal_TTC(self, dist, vrel_projection_in_dist):
|
|
if vrel_projection_in_dist == 0:
|
|
if vrel_projection_in_dist == 0:
|
|
@@ -673,6 +749,30 @@ class SafetyCalculator:
|
|
DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
|
|
DTC = v_on_dist * t + v_on_dist ** 2 / a_on_dist
|
|
return DTC
|
|
return DTC
|
|
|
|
|
|
|
|
+ def _cal_PET(self, lane_posx1, lane_posy1, lane_posx2, lane_posy2, ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, ego_posx, ego_posy, obj_posx, obj_posy, lane_width, delta_t, v1, v2, a1, a2):
|
|
|
|
+ dist1 = self.horizontal_distance(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy)
|
|
|
|
+ dist2 = self.horizontal_distance(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy)
|
|
|
|
+ if ((dist1 <= lane_width/2) and (self._is_alone_the_road(lane_posx1, lane_posy1, lane_posx2, lane_posy2, ego_posx, ego_posy))
|
|
|
|
+ and (self._is_in_the_road(ramp_posx1, ramp_posy1, ramp_posx2, ramp_posy2, obj_posx, obj_posy))
|
|
|
|
+ and (dist2 <= lane_width/2) and (a1 != 0) and (a2 != 0)):
|
|
|
|
+ dist_ego = np.sqrt((ego_posx - lane_posx1)**2 + (ego_posy-lane_posy1)**2)
|
|
|
|
+ dist_obj = np.sqrt((obj_posx - ramp_posx2)**2 + (obj_posy-ramp_posy2)**2)
|
|
|
|
+ PET = (-2*v2 + np.sqrt((4* v2**2)-8*a2*(v2*delta_t - dist_obj)))/ 2/ a2 - (2*v1 + np.sqrt((4* v1**2)-8*a1*dist_ego))/ 2/ a1 + delta_t
|
|
|
|
+ return PET
|
|
|
|
+ else:
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ def _cal_PSD(self, x_relative_start_dist, v1, ego_decel_lon_max):
|
|
|
|
+ if v1 == 0:
|
|
|
|
+ return None
|
|
|
|
+ else:
|
|
|
|
+ if len(x_relative_start_dist) > 0:
|
|
|
|
+ x_relative_start_dist0 = x_relative_start_dist.tolist()[0]
|
|
|
|
+ PSD = x_relative_start_dist0 * 2 * ego_decel_lon_max / v1
|
|
|
|
+ return PSD
|
|
|
|
+ else:
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
|
|
def velocity(self, v_x, v_y):
|
|
def velocity(self, v_x, v_y):
|
|
v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
|
|
v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
|
|
@@ -776,7 +876,7 @@ class SafetyCalculator:
|
|
# 统计最危险的指标
|
|
# 统计最危险的指标
|
|
|
|
|
|
def _safe_statistic_most_dangerous(self):
|
|
def _safe_statistic_most_dangerous(self):
|
|
- min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM']
|
|
|
|
|
|
+ min_list = ['TTC', 'MTTC', 'THW', 'TLC', 'TTB', 'LonSD', 'LatSD', 'TM', 'PET', 'PSD']
|
|
max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
|
|
max_list = ['DTC', 'BTN', 'collisionRisk', 'collisionSeverity']
|
|
result = {}
|
|
result = {}
|
|
for metric in min_list:
|
|
for metric in min_list:
|
|
@@ -810,6 +910,8 @@ class SafetyCalculator:
|
|
'TTB': 10.0,
|
|
'TTB': 10.0,
|
|
'TM': 10.0,
|
|
'TM': 10.0,
|
|
'DTC': 10.0,
|
|
'DTC': 10.0,
|
|
|
|
+ 'PET': 10.0,
|
|
|
|
+ 'PSD': 10.0,
|
|
'LonSD': 10.0,
|
|
'LonSD': 10.0,
|
|
'LatSD': 2.0,
|
|
'LatSD': 2.0,
|
|
'BTN': 1.0,
|
|
'BTN': 1.0,
|
|
@@ -870,6 +972,18 @@ class SafetyCalculator:
|
|
dtc_values = self.df_safe['DTC'].dropna()
|
|
dtc_values = self.df_safe['DTC'].dropna()
|
|
return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
|
|
return float(dtc_values.min()) if not dtc_values.empty else self._default_value('DTC')
|
|
|
|
|
|
|
|
+ def get_pet_value(self) -> float:
|
|
|
|
+ if self.empty_flag or self.df_safe is None:
|
|
|
|
+ return self._default_value('PET')
|
|
|
|
+ pet_values = self.df_safe['PET'].dropna()
|
|
|
|
+ return float(pet_values.min()) if not pet_values.empty else self._default_value('PET')
|
|
|
|
+
|
|
|
|
+ def get_psd_value(self) -> float:
|
|
|
|
+ if self.empty_flag or self.df_safe is None:
|
|
|
|
+ return self._default_value('PSD')
|
|
|
|
+ psd_values = self.df_safe['PSD'].dropna()
|
|
|
|
+ return float(psd_values.min()) if not psd_values.empty else self._default_value('PSD')
|
|
|
|
+
|
|
def get_lonsd_value(self) -> float:
|
|
def get_lonsd_value(self) -> float:
|
|
if self.empty_flag or self.df_safe is None:
|
|
if self.empty_flag or self.df_safe is None:
|
|
return self._default_value('LonSD')
|
|
return self._default_value('LonSD')
|