|
@@ -14,7 +14,6 @@
|
|
|
|
|
|
import sys
|
|
import sys
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
-
|
|
|
|
# 添加项目根目录到系统路径
|
|
# 添加项目根目录到系统路径
|
|
root_path = Path(__file__).resolve().parent.parent
|
|
root_path = Path(__file__).resolve().parent.parent
|
|
sys.path.append(str(root_path))
|
|
sys.path.append(str(root_path))
|
|
@@ -26,26 +25,24 @@ from typing import Dict, Tuple, Optional, Callable, Any
|
|
import pandas as pd
|
|
import pandas as pd
|
|
import yaml
|
|
import yaml
|
|
|
|
|
|
|
|
+
|
|
# ----------------------
|
|
# ----------------------
|
|
# 基础工具函数 (Pure functions)
|
|
# 基础工具函数 (Pure functions)
|
|
# ----------------------
|
|
# ----------------------
|
|
-scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208,
|
|
|
|
- "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
|
|
|
|
|
|
+scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208, "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
|
|
"ForwardCollision": 212}
|
|
"ForwardCollision": 212}
|
|
|
|
|
|
-
|
|
|
|
def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
|
|
def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
|
|
"""向量化距离计算"""
|
|
"""向量化距离计算"""
|
|
return np.linalg.norm(ego_pos - obj_pos, axis=1)
|
|
return np.linalg.norm(ego_pos - obj_pos, axis=1)
|
|
|
|
|
|
|
|
|
|
def calculate_relative_speed_PGVIL(
|
|
def calculate_relative_speed_PGVIL(
|
|
- ego_speed: np.ndarray, obj_speed: np.ndarray
|
|
|
|
|
|
+ ego_speed: np.ndarray, obj_speed: np.ndarray
|
|
) -> np.ndarray:
|
|
) -> np.ndarray:
|
|
"""向量化相对速度计算"""
|
|
"""向量化相对速度计算"""
|
|
return np.linalg.norm(ego_speed - obj_speed, axis=1)
|
|
return np.linalg.norm(ego_speed - obj_speed, axis=1)
|
|
|
|
|
|
-
|
|
|
|
def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
|
|
def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
|
|
"""向量化距离计算"""
|
|
"""向量化距离计算"""
|
|
dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
|
|
dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
|
|
@@ -57,13 +54,14 @@ def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.nd
|
|
return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
|
|
return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
|
|
def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
|
|
"""数据提取函数"""
|
|
"""数据提取函数"""
|
|
ego = data[data["playerId"] == 1].iloc[0]
|
|
ego = data[data["playerId"] == 1].iloc[0]
|
|
obj = data[data["playerId"] != 1]
|
|
obj = data[data["playerId"] != 1]
|
|
return ego, obj
|
|
return ego, obj
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
|
|
def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
|
|
"""带缓存的预警数据获取"""
|
|
"""带缓存的预警数据获取"""
|
|
ego_df = data_processed.ego_data
|
|
ego_df = data_processed.ego_data
|
|
@@ -76,7 +74,7 @@ def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
|
|
print("无法获取正确的预警信号标志位!")
|
|
print("无法获取正确的预警信号标志位!")
|
|
return None
|
|
return None
|
|
warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
|
|
warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
|
|
-
|
|
|
|
|
|
+
|
|
warning_times = warning_rows['simTime']
|
|
warning_times = warning_rows['simTime']
|
|
if warning_times.empty:
|
|
if warning_times.empty:
|
|
print("没有找到预警数据!")
|
|
print("没有找到预警数据!")
|
|
@@ -110,6 +108,7 @@ def earliestWarningDistance_LST(data) -> dict:
|
|
if warning_dist.empty:
|
|
if warning_dist.empty:
|
|
return {"earliestWarningDistance_LST": 0.0}
|
|
return {"earliestWarningDistance_LST": 0.0}
|
|
|
|
|
|
|
|
+
|
|
return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else np.inf}
|
|
return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else np.inf}
|
|
|
|
|
|
|
|
|
|
@@ -146,29 +145,31 @@ def earliestWarningDistance_TTC_LST(data) -> dict:
|
|
|
|
|
|
return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else np.inf}
|
|
return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else np.inf}
|
|
|
|
|
|
-
|
|
|
|
def warningDelayTime_LST(data):
|
|
def warningDelayTime_LST(data):
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
ego_df = data.ego_data
|
|
ego_df = data.ego_data
|
|
HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
|
|
HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
|
|
simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
|
|
simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
|
|
- rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
|
|
|
|
- 'simTime'].tolist()
|
|
|
|
|
|
+ rosbag_warning_rows = ego_df[(ego_df['event_info.eventSource'].notna())]['simTime'].tolist()
|
|
simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
|
|
simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
|
|
- if (simTime_HMI is None) or (simTime_rosbag is None):
|
|
|
|
- print("预警出错!")
|
|
|
|
|
|
+ if (simTime_HMI is None) and (simTime_rosbag is None):
|
|
|
|
+ print("没有发出预警!")
|
|
delay_time = 100.0
|
|
delay_time = 100.0
|
|
- else:
|
|
|
|
|
|
+ elif (simTime_HMI is not None) and (simTime_rosbag is not None):
|
|
delay_time = abs(simTime_HMI - simTime_rosbag)
|
|
delay_time = abs(simTime_HMI - simTime_rosbag)
|
|
|
|
+ elif (simTime_HMI is not None) and (simTime_rosbag is None):
|
|
|
|
+ print("没有发出预警!")
|
|
|
|
+ delay_time = None
|
|
|
|
+ else:
|
|
|
|
+ delay_time = simTime_rosbag
|
|
return {"warningDelayTime_LST": delay_time}
|
|
return {"warningDelayTime_LST": delay_time}
|
|
|
|
|
|
-
|
|
|
|
def warningDelayTimeOf4_LST(data):
|
|
def warningDelayTimeOf4_LST(data):
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
ego_df = data.ego_data
|
|
ego_df = data.ego_data
|
|
- ego_speed_simtime = ego_df[ego_df['accel'] <= -4]['simTime'].tolist() # 单位m/s^2
|
|
|
|
|
|
+ ego_speed_simtime = ego_df[ego_df['accel'] <= -4]['simTime'].tolist() # 单位m/s^2
|
|
warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
|
|
warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
|
|
if (len(warning_simTime) == 0) and (len(ego_speed_simtime) == 0):
|
|
if (len(warning_simTime) == 0) and (len(ego_speed_simtime) == 0):
|
|
return {"warningDelayTimeOf4_LST": 0}
|
|
return {"warningDelayTimeOf4_LST": 0}
|
|
@@ -179,12 +180,11 @@ def warningDelayTimeOf4_LST(data):
|
|
else:
|
|
else:
|
|
return {"warningDelayTimeOf4_LST": warning_simTime[0] - ego_speed_simtime[0]}
|
|
return {"warningDelayTimeOf4_LST": warning_simTime[0] - ego_speed_simtime[0]}
|
|
|
|
|
|
-
|
|
|
|
def rightWarningSignal_LST(data):
|
|
def rightWarningSignal_LST(data):
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
ego_df = data.ego_data
|
|
ego_df = data.ego_data
|
|
- if ego_df['ifwarning'].empty:
|
|
|
|
|
|
+ if correctwarning.empty:
|
|
print("无法获取正确预警信号标志位!")
|
|
print("无法获取正确预警信号标志位!")
|
|
return
|
|
return
|
|
warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
|
|
warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
|
|
@@ -193,32 +193,26 @@ def rightWarningSignal_LST(data):
|
|
else:
|
|
else:
|
|
return {"rightWarningSignal_LST": 1}
|
|
return {"rightWarningSignal_LST": 1}
|
|
|
|
|
|
-
|
|
|
|
def ifCrossingRedLight_LST(data):
|
|
def ifCrossingRedLight_LST(data):
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
ego_df = data.ego_data
|
|
ego_df = data.ego_data
|
|
- redlight_simtime = ego_df[
|
|
|
|
- (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (
|
|
|
|
- ego_df['v'] != 0)]['simTime']
|
|
|
|
|
|
+ redlight_simtime = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (ego_df['v'] != 0)]['simTime']
|
|
if redlight_simtime.empty:
|
|
if redlight_simtime.empty:
|
|
- return {"ifCrossingRedLight_LST": -1}
|
|
|
|
|
|
+ return {"ifCrossingRedLight_LST": 0}
|
|
else:
|
|
else:
|
|
return {"ifCrossingRedLight_LST": 1}
|
|
return {"ifCrossingRedLight_LST": 1}
|
|
|
|
|
|
-
|
|
|
|
def ifStopgreenWaveSpeedGuidance_LST(data):
|
|
def ifStopgreenWaveSpeedGuidance_LST(data):
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
scenario_name = data.function_config["function"]["scenario"]["name"]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
ego_df = data.ego_data
|
|
ego_df = data.ego_data
|
|
- greenlight_simtime = \
|
|
|
|
- ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
|
|
|
|
|
|
+ greenlight_simtime = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
|
|
if greenlight_simtime.empty:
|
|
if greenlight_simtime.empty:
|
|
- return {"ifStopgreenWaveSpeedGuidance_LST": -1}
|
|
|
|
|
|
+ return {"ifStopgreenWaveSpeedGuidance_LST": 0}
|
|
else:
|
|
else:
|
|
return {"ifStopgreenWaveSpeedGuidance_LST": 1}
|
|
return {"ifStopgreenWaveSpeedGuidance_LST": 1}
|
|
|
|
|
|
-
|
|
|
|
def rightWarningSignal_PGVIL(data_processed) -> dict:
|
|
def rightWarningSignal_PGVIL(data_processed) -> dict:
|
|
"""判断是否发出正确预警信号"""
|
|
"""判断是否发出正确预警信号"""
|
|
|
|
|
|
@@ -230,9 +224,7 @@ def rightWarningSignal_PGVIL(data_processed) -> dict:
|
|
print("无法获取正确的预警信号标志位!")
|
|
print("无法获取正确的预警信号标志位!")
|
|
return None
|
|
return None
|
|
# 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
|
|
# 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
|
|
- warning_rows = ego_df[
|
|
|
|
- (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
|
|
|
|
- ]
|
|
|
|
|
|
+ warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
|
|
|
|
|
|
if warning_rows.empty:
|
|
if warning_rows.empty:
|
|
return {"rightWarningSignal_PGVIL": -1}
|
|
return {"rightWarningSignal_PGVIL": -1}
|
|
@@ -286,7 +278,6 @@ def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
|
|
|
return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
|
|
|
|
-
|
|
|
|
def earliestWarningDistance_PGVIL(data_processed) -> dict:
|
|
def earliestWarningDistance_PGVIL(data_processed) -> dict:
|
|
"""预警距离计算流水线"""
|
|
"""预警距离计算流水线"""
|
|
ego_df = data_processed.ego_data
|
|
ego_df = data_processed.ego_data
|
|
@@ -306,7 +297,6 @@ def earliestWarningDistance_PGVIL(data_processed) -> dict:
|
|
|
|
|
|
return {"earliestWarningDistance": float(np.min(distances))}
|
|
return {"earliestWarningDistance": float(np.min(distances))}
|
|
|
|
|
|
-
|
|
|
|
def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
"""TTC计算流水线"""
|
|
"""TTC计算流水线"""
|
|
ego_df = data_processed.ego_data
|
|
ego_df = data_processed.ego_data
|
|
@@ -335,7 +325,7 @@ def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
|
|
|
return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
# def delayOfEmergencyBrakeWarning(data_processed) -> dict:
|
|
# def delayOfEmergencyBrakeWarning(data_processed) -> dict:
|
|
# #预警时机相对背景车辆减速度达到-4m/s2后的时延
|
|
# #预警时机相对背景车辆减速度达到-4m/s2后的时延
|
|
# ego_df = data_processed.ego_data
|
|
# ego_df = data_processed.ego_data
|
|
@@ -363,30 +353,24 @@ def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
# return {"delayOfEmergencyBrakeWarning": float(delay_time)}
|
|
# return {"delayOfEmergencyBrakeWarning": float(delay_time)}
|
|
|
|
|
|
# else:
|
|
# else:
|
|
-# print("没有达到预警减速度阈值:-4m/s^2")
|
|
|
|
|
|
+# print("没有达到预警减速度阈值:-4m/s^2")
|
|
# return {"delayOfEmergencyBrakeWarning": -1}
|
|
# return {"delayOfEmergencyBrakeWarning": -1}
|
|
|
|
|
|
-
|
|
|
|
def warningDelayTime_PGVIL(data_processed) -> dict:
|
|
def warningDelayTime_PGVIL(data_processed) -> dict:
|
|
- """车端接收到预警到HMI发出预警的时延"""
|
|
|
|
|
|
+ """车端接收到预警到HMI发出预警的时延"""
|
|
ego_df = data_processed.ego_data
|
|
ego_df = data_processed.ego_data
|
|
- # #打印ego_df的列名
|
|
|
|
- # print(ego_df.columns.tolist())
|
|
|
|
-
|
|
|
|
warning_data = get_first_warning(data_processed)
|
|
warning_data = get_first_warning(data_processed)
|
|
|
|
|
|
if warning_data is None:
|
|
if warning_data is None:
|
|
return {"warningDelayTime_PGVIL": -1}
|
|
return {"warningDelayTime_PGVIL": -1}
|
|
try:
|
|
try:
|
|
ego, obj = extract_ego_obj(warning_data)
|
|
ego, obj = extract_ego_obj(warning_data)
|
|
-
|
|
|
|
- # 找到event_Type不为空,且playerID为1的行
|
|
|
|
- rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
|
|
|
|
-
|
|
|
|
|
|
+ rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna())]
|
|
|
|
+
|
|
first_time = rosbag_warning_rows["simTime"].iloc[0]
|
|
first_time = rosbag_warning_rows["simTime"].iloc[0]
|
|
warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
|
|
warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
|
|
delay_time = warning_time - first_time
|
|
delay_time = warning_time - first_time
|
|
-
|
|
|
|
|
|
+
|
|
return {"warningDelayTime_PGVIL": float(delay_time)}
|
|
return {"warningDelayTime_PGVIL": float(delay_time)}
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
@@ -399,7 +383,7 @@ def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
|
|
计算主车后轴中心点到停止线的距离
|
|
计算主车后轴中心点到停止线的距离
|
|
:return 距离
|
|
:return 距离
|
|
"""
|
|
"""
|
|
- distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
|
|
|
|
|
|
+ distance_carpoint_carhead = ego['dimX']/2 + ego['offX']
|
|
# 计算停止线的方向向量
|
|
# 计算停止线的方向向量
|
|
line_vector = np.array(
|
|
line_vector = np.array(
|
|
[
|
|
[
|
|
@@ -408,32 +392,27 @@ def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
|
|
]
|
|
]
|
|
)
|
|
)
|
|
direction_vector_norm = np.linalg.norm(line_vector)
|
|
direction_vector_norm = np.linalg.norm(line_vector)
|
|
- direction_vector_unit = (
|
|
|
|
- line_vector / direction_vector_norm
|
|
|
|
- if direction_vector_norm != 0
|
|
|
|
- else np.array([0, 0])
|
|
|
|
- )
|
|
|
|
|
|
+ direction_vector_unit = line_vector / direction_vector_norm if direction_vector_norm != 0 else np.array([0, 0])
|
|
# 计算主车后轴中心点到停止线投影的坐标(垂足)
|
|
# 计算主车后轴中心点到停止线投影的坐标(垂足)
|
|
projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
|
|
projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
|
|
perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
|
|
perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
|
|
-
|
|
|
|
|
|
+
|
|
# 计算主车后轴中心点到垂足的距离
|
|
# 计算主车后轴中心点到垂足的距离
|
|
distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
|
|
distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
|
|
carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
|
|
carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
|
|
-
|
|
|
|
|
|
+
|
|
return carhead_distance_to_foot
|
|
return carhead_distance_to_foot
|
|
|
|
|
|
-
|
|
|
|
def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
- # 判断车辆是否闯红灯
|
|
|
|
|
|
+ #判断车辆是否闯红灯
|
|
|
|
|
|
- stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
|
|
|
|
|
|
+ stop_line_points = np.array([(276.555,-35.575),(279.751,-33.683)])
|
|
X_OFFSET = 258109.4239876
|
|
X_OFFSET = 258109.4239876
|
|
Y_OFFSET = 4149969.964821
|
|
Y_OFFSET = 4149969.964821
|
|
stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
|
|
stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
|
|
ego_df = data_processed.ego_data
|
|
ego_df = data_processed.ego_data
|
|
-
|
|
|
|
- prev_distance = float("inf") # 初始化为正无穷大
|
|
|
|
|
|
+
|
|
|
|
+ prev_distance = float('inf') # 初始化为正无穷大
|
|
"""
|
|
"""
|
|
traffic_light_status
|
|
traffic_light_status
|
|
0x100000为绿灯,1048576
|
|
0x100000为绿灯,1048576
|
|
@@ -441,25 +420,23 @@ def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
0x10000000为红灯,268435456
|
|
0x10000000为红灯,268435456
|
|
"""
|
|
"""
|
|
red_light_violation = False
|
|
red_light_violation = False
|
|
- for index, ego in ego_df.iterrows():
|
|
|
|
|
|
+ for index ,ego in ego_df.iterrows():
|
|
car_point = (ego["posX"], ego["posY"])
|
|
car_point = (ego["posX"], ego["posY"])
|
|
stateMask = ego["stateMask"]
|
|
stateMask = ego["stateMask"]
|
|
simTime = ego["simTime"]
|
|
simTime = ego["simTime"]
|
|
- distance_to_stopline = get_car_to_stop_line_distance(
|
|
|
|
- ego, car_point, stop_line_points
|
|
|
|
- )
|
|
|
|
|
|
+ distance_to_stopline = get_car_to_stop_line_distance(ego, car_point, stop_line_points)
|
|
|
|
|
|
- # 主车车头跨越停止线时非绿灯,返回-1,闯红灯
|
|
|
|
|
|
+ #主车车头跨越停止线时非绿灯,返回-1,闯红灯
|
|
if prev_distance > 0 and distance_to_stopline < 0:
|
|
if prev_distance > 0 and distance_to_stopline < 0:
|
|
- if stateMask is not None and stateMask != 1048576:
|
|
|
|
|
|
+ if stateMask != 1048576:
|
|
red_light_violation = True
|
|
red_light_violation = True
|
|
break
|
|
break
|
|
prev_distance = distance_to_stopline
|
|
prev_distance = distance_to_stopline
|
|
|
|
|
|
if red_light_violation:
|
|
if red_light_violation:
|
|
- return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯
|
|
|
|
|
|
+ return {"ifCrossingRedLight_PGVIL": -1}#闯红灯
|
|
else:
|
|
else:
|
|
- return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯
|
|
|
|
|
|
+ return {"ifCrossingRedLight_PGVIL": 1}#没有闯红灯
|
|
|
|
|
|
|
|
|
|
# def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
|
|
# def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
|
|
@@ -482,7 +459,7 @@ def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
# ]["simTime"]
|
|
# ]["simTime"]
|
|
# if stop_giveway_simtime.empty:
|
|
# if stop_giveway_simtime.empty:
|
|
# print("没有停车让行标志/标线")
|
|
# print("没有停车让行标志/标线")
|
|
-
|
|
|
|
|
|
+
|
|
# ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
|
|
# ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
|
|
# distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
|
|
# distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
|
|
# distance_to_stoplines = []
|
|
# distance_to_stoplines = []
|
|
@@ -494,10 +471,11 @@ def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
# ]
|
|
# ]
|
|
# distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
|
|
# distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
|
|
# distance_to_stoplines.append(distance_to_stopline)
|
|
# distance_to_stoplines.append(distance_to_stopline)
|
|
-
|
|
|
|
|
|
+
|
|
# mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
|
|
# mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
|
|
# return {"mindisStopline": mindisStopline}
|
|
# return {"mindisStopline": mindisStopline}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
|
|
class FunctionRegistry:
|
|
class FunctionRegistry:
|
|
"""动态函数注册器(支持参数验证)"""
|
|
"""动态函数注册器(支持参数验证)"""
|
|
@@ -517,7 +495,7 @@ class FunctionRegistry:
|
|
def _recurse(node):
|
|
def _recurse(node):
|
|
if isinstance(node, dict):
|
|
if isinstance(node, dict):
|
|
if "name" in node and not any(
|
|
if "name" in node and not any(
|
|
- isinstance(v, dict) for v in node.values()
|
|
|
|
|
|
+ isinstance(v, dict) for v in node.values()
|
|
):
|
|
):
|
|
metrics.append(node["name"])
|
|
metrics.append(node["name"])
|
|
for v in node.values():
|
|
for v in node.values():
|
|
@@ -577,4 +555,4 @@ class FunctionManager:
|
|
# 使用示例
|
|
# 使用示例
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
pass
|
|
pass
|
|
- # print("\n[功能类表现及得分情况]")
|
|
|
|
|
|
+ # print("\n[功能类表现及得分情况]")
|