|
@@ -40,24 +40,24 @@ scenario_sign_dict = {
|
|
|
"LeftTurnAssist": 206,
|
|
|
"HazardousLocationW": 207,
|
|
|
"RedLightViolationW": 208,
|
|
|
- "AbnormalVehicleW":209,
|
|
|
- "NsightVulnerableRoadUserCollisionW":210,
|
|
|
- "LitterW":211,
|
|
|
- "ForwardCollisionW":212,
|
|
|
- "VisibilityW":213,
|
|
|
- "EmergencyBrakeW":214,
|
|
|
- "IntersectionCollisionW":215,
|
|
|
- "BlindSpotW":216,
|
|
|
- "DoNotPassW":217,
|
|
|
- "ControlLossW":218,
|
|
|
- "FrontTrafficJamW":219,
|
|
|
- "EmergencyVehicleW":220,
|
|
|
- "CooperativeVehicleMerge":221,
|
|
|
- "CooperativeLaneChange":223,
|
|
|
- "VulnerableRoadUserCollisionW":224,
|
|
|
- "CooperativeIntersectionPassing":225,
|
|
|
- "RampMerge":226,
|
|
|
- "DrivingLaneRecommendation":227,
|
|
|
+ "AbnormalVehicleW": 209,
|
|
|
+ "NsightVulnerableRoadUserCollisionW": 210,
|
|
|
+ "LitterW": 211,
|
|
|
+ "ForwardCollisionW": 212,
|
|
|
+ "VisibilityW": 213,
|
|
|
+ "EmergencyBrakeW": 214,
|
|
|
+ "IntersectionCollisionW": 215,
|
|
|
+ "BlindSpotW": 216,
|
|
|
+ "DoNotPassW": 217,
|
|
|
+ "ControlLossW": 218,
|
|
|
+ "FrontTrafficJamW": 219,
|
|
|
+ "EmergencyVehicleW": 220,
|
|
|
+ "CooperativeVehicleMerge": 221,
|
|
|
+ "CooperativeLaneChange": 223,
|
|
|
+ "VulnerableRoadUserCollisionW": 224,
|
|
|
+ "CooperativeIntersectionPassing": 225,
|
|
|
+ "RampMerge": 226,
|
|
|
+ "DrivingLaneRecommendation": 227,
|
|
|
"TrafficJamW": 228,
|
|
|
"DynamicSpeedLimitingInformation": 229,
|
|
|
"EmergencyVehiclesPriority": 230,
|
|
@@ -67,6 +67,7 @@ scenario_sign_dict = {
|
|
|
"GreenLightOptimalSpeedAdvisory": 234,
|
|
|
}
|
|
|
|
|
|
+
|
|
|
def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
|
|
|
polygon = Polygon(polygon)
|
|
|
point = Point(test_point)
|
|
@@ -182,6 +183,7 @@ def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
|
|
|
first_time = warning_times.iloc[0]
|
|
|
return obj_df[obj_df['simTime'] == first_time]
|
|
|
|
|
|
+
|
|
|
def getAxis(heading):
|
|
|
AxisL = [0, 0]
|
|
|
AxisW = [0, 0]
|
|
@@ -211,15 +213,15 @@ def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
|
|
|
|
|
|
|
|
|
def isCollision(
|
|
|
- firstAxisL,
|
|
|
- firstAxisW,
|
|
|
- firstHalfLength,
|
|
|
- firstHalfWidth,
|
|
|
- secondAxisL,
|
|
|
- secondAxisW,
|
|
|
- secondHalfLength,
|
|
|
- secondHalfWidth,
|
|
|
- disVector,
|
|
|
+ firstAxisL,
|
|
|
+ firstAxisW,
|
|
|
+ firstHalfLength,
|
|
|
+ firstHalfWidth,
|
|
|
+ secondAxisL,
|
|
|
+ secondAxisW,
|
|
|
+ secondHalfLength,
|
|
|
+ secondHalfWidth,
|
|
|
+ disVector,
|
|
|
):
|
|
|
isCollision = True
|
|
|
axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
|
|
@@ -241,20 +243,20 @@ def isCollision(
|
|
|
|
|
|
|
|
|
def funcIsCollision(
|
|
|
- firstDimX,
|
|
|
- firstDimY,
|
|
|
- firstOffX,
|
|
|
- firstOffY,
|
|
|
- firstX,
|
|
|
- firstY,
|
|
|
- firstHeading,
|
|
|
- secondDimX,
|
|
|
- secondDimY,
|
|
|
- secondOffX,
|
|
|
- secondOffY,
|
|
|
- secondX,
|
|
|
- secondY,
|
|
|
- secondHeading,
|
|
|
+ firstDimX,
|
|
|
+ firstDimY,
|
|
|
+ firstOffX,
|
|
|
+ firstOffY,
|
|
|
+ firstX,
|
|
|
+ firstY,
|
|
|
+ firstHeading,
|
|
|
+ secondDimX,
|
|
|
+ secondDimY,
|
|
|
+ secondOffX,
|
|
|
+ secondOffY,
|
|
|
+ secondX,
|
|
|
+ secondY,
|
|
|
+ secondHeading,
|
|
|
):
|
|
|
firstAxisL = getAxis(firstHeading)[0]
|
|
|
firstAxisW = getAxis(firstHeading)[1]
|
|
@@ -292,10 +294,11 @@ def funcIsCollision(
|
|
|
|
|
|
return varIsCollision
|
|
|
|
|
|
+
|
|
|
# ----------------------
|
|
|
# 核心计算功能函数
|
|
|
# ----------------------
|
|
|
-def latestWarningDistance_LST(data) -> dict:
|
|
|
+def latestWarningDistance_LST(data, plot_path) -> dict:
|
|
|
"""预警距离计算流水线"""
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
|
|
@@ -313,12 +316,12 @@ def latestWarningDistance_LST(data) -> dict:
|
|
|
return {"latestWarningDistance_LST": 0.0}
|
|
|
|
|
|
# 生成图表数据
|
|
|
- generate_function_chart_data(data, 'latestWarningDistance_LST')
|
|
|
+ generate_function_chart_data(data, 'latestWarningDistance_LST', plot_path)
|
|
|
|
|
|
return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value}
|
|
|
|
|
|
|
|
|
-def earliestWarningDistance_LST(data) -> dict:
|
|
|
+def earliestWarningDistance_LST(data, plot_path) -> dict:
|
|
|
"""预警距离计算流水线"""
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
|
|
@@ -337,12 +340,12 @@ def earliestWarningDistance_LST(data) -> dict:
|
|
|
|
|
|
# 生成图表数据
|
|
|
|
|
|
- generate_function_chart_data(data, 'earliestWarningDistance_LST')
|
|
|
+ generate_function_chart_data(data, 'earliestWarningDistance_LST', plot_path)
|
|
|
|
|
|
return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value}
|
|
|
|
|
|
|
|
|
-def latestWarningDistance_TTC_LST(data) -> dict:
|
|
|
+def latestWarningDistance_TTC_LST(data, plot_path) -> dict:
|
|
|
"""TTC计算流水线"""
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
|
|
@@ -369,12 +372,12 @@ def latestWarningDistance_TTC_LST(data) -> dict:
|
|
|
data.ttc = ttc
|
|
|
# 生成图表数据
|
|
|
# from modules.lib.chart_generator import generate_function_chart_data
|
|
|
- generate_function_chart_data(data, 'latestWarningDistance_TTC_LST')
|
|
|
+ generate_function_chart_data(data, 'latestWarningDistance_TTC_LST', plot_path)
|
|
|
|
|
|
return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
|
|
|
|
|
|
|
|
|
-def earliestWarningDistance_TTC_LST(data) -> dict:
|
|
|
+def earliestWarningDistance_TTC_LST(data, plot_path) -> dict:
|
|
|
"""TTC计算流水线"""
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
|
|
@@ -403,12 +406,12 @@ def earliestWarningDistance_TTC_LST(data) -> dict:
|
|
|
data.correctwarning = correctwarning
|
|
|
|
|
|
# 生成图表数据
|
|
|
- generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST')
|
|
|
+ generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST', plot_path)
|
|
|
|
|
|
return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
|
|
|
|
|
|
|
|
|
-def warningDelayTime_LST(data):
|
|
|
+def warningDelayTime_LST(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
|
# 将correctwarning保存到data对象中,供图表生成使用
|
|
@@ -427,7 +430,7 @@ def warningDelayTime_LST(data):
|
|
|
return {"warningDelayTime_LST": delay_time}
|
|
|
|
|
|
|
|
|
-def warningDelayTimeofReachDecel_LST(data):
|
|
|
+def warningDelayTimeofReachDecel_LST(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
|
# 将correctwarning保存到data对象中,供图表生成使用
|
|
@@ -451,7 +454,7 @@ def warningDelayTimeofReachDecel_LST(data):
|
|
|
return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
|
|
|
|
|
|
|
|
|
-def rightWarningSignal_LST(data):
|
|
|
+def rightWarningSignal_LST(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
|
# 将correctwarning保存到data对象中,供图表生成使用
|
|
@@ -467,7 +470,7 @@ def rightWarningSignal_LST(data):
|
|
|
return {"rightWarningSignal_LST": 1}
|
|
|
|
|
|
|
|
|
-def ifCrossingRedLight_LST(data):
|
|
|
+def ifCrossingRedLight_LST(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
|
# 将correctwarning保存到data对象中,供图表生成使用
|
|
@@ -482,7 +485,7 @@ def ifCrossingRedLight_LST(data):
|
|
|
return {"ifCrossingRedLight_LST": 1}
|
|
|
|
|
|
|
|
|
-def ifStopgreenWaveSpeedGuidance_LST(data):
|
|
|
+def ifStopgreenWaveSpeedGuidance_LST(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
correctwarning = scenario_sign_dict[scenario_name]
|
|
|
# 将correctwarning保存到data对象中,供图表生成使用
|
|
@@ -497,7 +500,7 @@ def ifStopgreenWaveSpeedGuidance_LST(data):
|
|
|
|
|
|
|
|
|
# ------ 单车智能指标 ------
|
|
|
-def limitSpeed_LST(data):
|
|
|
+def limitSpeed_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
|
|
@@ -506,11 +509,11 @@ def limitSpeed_LST(data):
|
|
|
return {"speedLimit_LST": -1}
|
|
|
max_speed = max(speed_limit)
|
|
|
data.speedLimit = limit_speed
|
|
|
- generate_function_chart_data(data, 'limitspeed_LST')
|
|
|
+ generate_function_chart_data(data, 'limitspeed_LST', plot_path)
|
|
|
return {"speedLimit_LST": max_speed}
|
|
|
|
|
|
|
|
|
-def limitSpeedPastLimitSign_LST(data):
|
|
|
+def limitSpeedPastLimitSign_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
|
|
@@ -519,13 +522,13 @@ def limitSpeedPastLimitSign_LST(data):
|
|
|
ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
|
|
|
data.speedLimit = limit_speed
|
|
|
data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
|
|
|
- generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST')
|
|
|
+ generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST', plot_path)
|
|
|
if len(ego_speed) == 0:
|
|
|
return {"speedPastLimitSign_LST": -1}
|
|
|
return {"speedPastLimitSign_LST": ego_speed[0]}
|
|
|
|
|
|
|
|
|
-def leastDistance_LST(data):
|
|
|
+def leastDistance_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
dist_row = ego_df[ego_df['v'] == 0]['relative_dist'].tolist()
|
|
|
if len(dist_row) == 0:
|
|
@@ -535,7 +538,7 @@ def leastDistance_LST(data):
|
|
|
return {"leastDistance_LST": min_dist}
|
|
|
|
|
|
|
|
|
-def launchTimeinStopLine_LST(data):
|
|
|
+def launchTimeinStopLine_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
|
|
|
if len(simtime_row) == 0:
|
|
@@ -545,7 +548,7 @@ def launchTimeinStopLine_LST(data):
|
|
|
return {"launchTimeinStopLine_LST": delta_t}
|
|
|
|
|
|
|
|
|
-def launchTimewhenFollowingCar_LST(data):
|
|
|
+def launchTimewhenFollowingCar_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
|
|
|
simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
|
|
@@ -559,7 +562,7 @@ def launchTimewhenFollowingCar_LST(data):
|
|
|
return {"launchTimewhenFollowingCar_LST": max(delta_t)}
|
|
|
|
|
|
|
|
|
-def noStop_LST(data):
|
|
|
+def noStop_LST(data, plot_path):
|
|
|
ego_df_ini = data.ego_data
|
|
|
min_time = ego_df_ini['simTime'].min() + 5
|
|
|
max_time = ego_df_ini['simTime'].max() - 5
|
|
@@ -571,7 +574,7 @@ def noStop_LST(data):
|
|
|
return {"noStop_LST": 1}
|
|
|
|
|
|
|
|
|
-def launchTimeinTrafficLight_LST(data):
|
|
|
+def launchTimeinTrafficLight_LST(data, plot_path):
|
|
|
'''
|
|
|
待修改:
|
|
|
红灯的状态值:1
|
|
@@ -588,7 +591,7 @@ def launchTimeinTrafficLight_LST(data):
|
|
|
return {"timeInterval_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
|
|
|
|
|
|
|
|
|
-def crossJunctionToTargetLane_LST(data):
|
|
|
+def crossJunctionToTargetLane_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
lane_in_leftturn = set(ego_df['lane_id'].tolist())
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
@@ -599,15 +602,15 @@ def crossJunctionToTargetLane_LST(data):
|
|
|
return {"crossJunctionToTargetLane_LST": target_lane_id}
|
|
|
|
|
|
|
|
|
-def keepInLane_LST(data):
|
|
|
+def keepInLane_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
- notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width']/2].tolist()
|
|
|
+ notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width'] / 2].tolist()
|
|
|
if len(notkeepinlane):
|
|
|
return {"keepInLane_LST": -1}
|
|
|
return {"keepInLane_LST": 1}
|
|
|
|
|
|
|
|
|
-def leastLateralDistance_LST(data):
|
|
|
+def leastLateralDistance_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
lane_width = ego_df[ego_df['x_relative_dist'] == 0]['lane_width']
|
|
|
if lane_width.empty():
|
|
@@ -620,7 +623,7 @@ def leastLateralDistance_LST(data):
|
|
|
return {"leastLateralDistance_LST": -1}
|
|
|
|
|
|
|
|
|
-def waitTimeAtCrosswalkwithPedestrian_LST(data):
|
|
|
+def waitTimeAtCrosswalkwithPedestrian_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
object_df = data.object_data
|
|
|
data['in_crosswalk'] = []
|
|
@@ -640,7 +643,7 @@ def waitTimeAtCrosswalkwithPedestrian_LST(data):
|
|
|
car_wait_pedestrian) > 0 else 0}
|
|
|
|
|
|
|
|
|
-def launchTimewhenPedestrianLeave_LST(data):
|
|
|
+def launchTimewhenPedestrianLeave_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
car_stop_time = ego_df[ego_df['v'] == 0]["simTime"]
|
|
|
if car_stop_time.empty():
|
|
@@ -652,7 +655,7 @@ def launchTimewhenPedestrianLeave_LST(data):
|
|
|
return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
|
|
|
|
|
|
|
|
|
-def noCollision_LST(data):
|
|
|
+def noCollision_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
if ego_df['relative_dist'].any() == 0:
|
|
|
return {"noCollision_LST": -1}
|
|
@@ -660,7 +663,7 @@ def noCollision_LST(data):
|
|
|
return {"noCollision_LST": 1}
|
|
|
|
|
|
|
|
|
-def noReverse_LST(data):
|
|
|
+def noReverse_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
|
|
|
return {"noReverse_LST": -1}
|
|
@@ -668,7 +671,7 @@ def noReverse_LST(data):
|
|
|
return {"noReverse_LST": 1}
|
|
|
|
|
|
|
|
|
-def turnAround_LST(data):
|
|
|
+def turnAround_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
|
|
|
ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
|
|
@@ -677,7 +680,7 @@ def turnAround_LST(data):
|
|
|
return {"turnAround_LST": -1}
|
|
|
|
|
|
|
|
|
-def laneOffset_LST(data):
|
|
|
+def laneOffset_LST(data, plot_path):
|
|
|
car_width = data.function_config['vehicle']['CAR_WIDTH']
|
|
|
ego_df_ini = data.ego_data
|
|
|
min_time = ego_df_ini['simTime'].min() + 5
|
|
@@ -687,7 +690,7 @@ def laneOffset_LST(data):
|
|
|
return {"laneOffset_LST": max(laneoffset)}
|
|
|
|
|
|
|
|
|
-def maxLongitudeDist_LST(data):
|
|
|
+def maxLongitudeDist_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
longitude_dist = abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist())
|
|
|
data.longitude_dist = min(abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist()))
|
|
@@ -695,11 +698,11 @@ def maxLongitudeDist_LST(data):
|
|
|
data.stop_time = min(stop_time)
|
|
|
if len(longitude_dist) == 0:
|
|
|
return {"maxLongitudeDist_LST": -1}
|
|
|
- generate_function_chart_data(data, 'maxLongitudeDist_LST')
|
|
|
+ generate_function_chart_data(data, 'maxLongitudeDist_LST', plot_path)
|
|
|
return {"maxLongDist_LST": min(longitude_dist)}
|
|
|
|
|
|
|
|
|
-def noEmergencyBraking_LST(data):
|
|
|
+def noEmergencyBraking_LST(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
ego_df['ip_dec'] = ego_df['v'].apply(
|
|
|
get_interpolation, point1=[18, -5], point2=[72, -3.5])
|
|
@@ -711,7 +714,7 @@ def noEmergencyBraking_LST(data):
|
|
|
return {"noEmergencyBraking_LST": -1}
|
|
|
|
|
|
|
|
|
-def rightWarningSignal_PGVIL(data_processed) -> dict:
|
|
|
+def rightWarningSignal_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""判断是否发出正确预警信号"""
|
|
|
|
|
|
ego_df = data_processed.ego_data
|
|
@@ -732,7 +735,7 @@ def rightWarningSignal_PGVIL(data_processed) -> dict:
|
|
|
return {"rightWarningSignal_PGVIL": 1}
|
|
|
|
|
|
|
|
|
-def latestWarningDistance_PGVIL(data_processed) -> dict:
|
|
|
+def latestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""预警距离计算流水线"""
|
|
|
ego_df = data_processed.ego_data
|
|
|
obj_df = data_processed.object_df
|
|
@@ -751,11 +754,11 @@ def latestWarningDistance_PGVIL(data_processed) -> dict:
|
|
|
if distances.size == 0:
|
|
|
print("没有找到数据!")
|
|
|
return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
|
|
|
- generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL')
|
|
|
+ generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL', plot_path)
|
|
|
return {"latestWarningDistance_PGVIL": float(np.min(distances))}
|
|
|
|
|
|
|
|
|
-def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
+def latestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""TTC计算流水线"""
|
|
|
ego_df = data_processed.ego_data
|
|
|
obj_df = data_processed.object_df
|
|
@@ -784,11 +787,11 @@ def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
print("没有找到数据!")
|
|
|
return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
|
|
|
data_processed.ttc = ttc
|
|
|
- generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL')
|
|
|
+ generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL', plot_path)
|
|
|
return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
|
|
|
|
|
|
|
-def earliestWarningDistance_PGVIL(data_processed) -> dict:
|
|
|
+def earliestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""预警距离计算流水线"""
|
|
|
ego_df = data_processed.ego_data
|
|
|
obj_df = data_processed.object_df
|
|
@@ -807,11 +810,11 @@ def earliestWarningDistance_PGVIL(data_processed) -> dict:
|
|
|
if distances.size == 0:
|
|
|
print("没有找到数据!")
|
|
|
return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
|
|
|
- generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL')
|
|
|
+ generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL', plot_path)
|
|
|
return {"earliestWarningDistance": float(np.min(distances))}
|
|
|
|
|
|
|
|
|
-def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
+def earliestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""TTC计算流水线"""
|
|
|
ego_df = data_processed.ego_data
|
|
|
obj_df = data_processed.object_df
|
|
@@ -841,7 +844,7 @@ def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
print("没有找到数据!")
|
|
|
return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
|
|
|
data_processed.ttc = ttc
|
|
|
- generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL')
|
|
|
+ generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL', plot_path)
|
|
|
return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
|
|
|
|
|
|
|
|
@@ -876,7 +879,7 @@ def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
|
|
|
# return {"delayOfEmergencyBrakeWarning": -1}
|
|
|
|
|
|
|
|
|
-def warningDelayTime_PGVIL(data_processed) -> dict:
|
|
|
+def warningDelayTime_PGVIL(data_processed, plot_path) -> dict:
|
|
|
"""车端接收到预警到HMI发出预警的时延"""
|
|
|
ego_df = data_processed.ego_data
|
|
|
# #打印ego_df的列名
|
|
@@ -933,7 +936,7 @@ def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
|
|
|
return carhead_distance_to_foot
|
|
|
|
|
|
|
|
|
-def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
|
+def ifCrossingRedLight_PGVIL(data_processed, plot_path) -> dict:
|
|
|
# 判断车辆是否闯红灯
|
|
|
|
|
|
stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
|
|
@@ -1007,7 +1010,7 @@ def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
|
# mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
|
|
|
# return {"mindisStopline": mindisStopline}
|
|
|
|
|
|
-def limitSpeed_PGVIL(data):
|
|
|
+def limitSpeed_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
max_speed = max(ego_df["v"])
|
|
|
if len(ego_df["v"]) == 0:
|
|
@@ -1016,7 +1019,7 @@ def limitSpeed_PGVIL(data):
|
|
|
return {"speedLimit_PGVIL": max_speed}
|
|
|
|
|
|
|
|
|
-def leastDistance_PGVIL(data):
|
|
|
+def leastDistance_PGVIL(data, plot_path):
|
|
|
exclude_seconds = 2.0
|
|
|
ego_df = data.ego_data
|
|
|
max_sim_time = ego_df["simTime"].max()
|
|
@@ -1036,12 +1039,12 @@ def leastDistance_PGVIL(data):
|
|
|
ego, car_point, stop_line_points
|
|
|
)
|
|
|
|
|
|
- return {"leastDistance_PGVIL": distance_to_stopline}
|
|
|
+ return {"minimumDistance_PGVIL": distance_to_stopline}
|
|
|
|
|
|
- return {"leastDistance_PGVIL": -1}
|
|
|
+ return {"minimumDistance_PGVIL": -1}
|
|
|
|
|
|
|
|
|
-def launchTimeinStopLine_PGVIL(data):
|
|
|
+def launchTimeinStopLine_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
in_stop = False
|
|
|
start_time = None
|
|
@@ -1074,13 +1077,13 @@ def launchTimeinStopLine_PGVIL(data):
|
|
|
return {"launchTimeinStopLine_PGVIL": float(max_duration)}
|
|
|
|
|
|
|
|
|
-def launchTimeinTrafficLight_PGVIL(data):
|
|
|
+def launchTimeinTrafficLight_PGVIL(data, plot_path):
|
|
|
GREEN = 0x100000
|
|
|
RED = 0x10000000
|
|
|
ego_df = data.ego_data
|
|
|
# 找到第一次红灯 to 绿灯的切换时刻
|
|
|
is_transition = (ego_df["stateMask"] == GREEN) & (
|
|
|
- ego_df["stateMask"].shift(1) == RED
|
|
|
+ ego_df["stateMask"].shift(1) == RED
|
|
|
)
|
|
|
transition_times = ego_df.loc[is_transition, "simTime"]
|
|
|
if transition_times.empty:
|
|
@@ -1097,7 +1100,7 @@ def launchTimeinTrafficLight_PGVIL(data):
|
|
|
return {"timeInterval_PGVIL": time_move - time_red2green}
|
|
|
|
|
|
|
|
|
-def crossJunctionToTargetLane_PGVIL(data):
|
|
|
+def crossJunctionToTargetLane_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
lane_ids = set(ego_df["lane_id"].dropna())
|
|
|
|
|
@@ -1113,7 +1116,7 @@ def crossJunctionToTargetLane_PGVIL(data):
|
|
|
return {"crossJunctionToTargetLane_PGVIL": result}
|
|
|
|
|
|
|
|
|
-def noStop_PGVIL(data):
|
|
|
+def noStop_PGVIL(data, plot_path):
|
|
|
exclude_end_seconds = 5.0
|
|
|
exclude_start_seconds = 5.0
|
|
|
ego_df = data.ego_data
|
|
@@ -1123,7 +1126,7 @@ def noStop_PGVIL(data):
|
|
|
end_threshold = max_sim_time - exclude_end_seconds
|
|
|
filtered_df = ego_df[
|
|
|
(ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
|
|
|
- ]
|
|
|
+ ]
|
|
|
|
|
|
if (filtered_df["v"] == 0).any():
|
|
|
return {"noStop_PGVIL": -1}
|
|
@@ -1131,7 +1134,7 @@ def noStop_PGVIL(data):
|
|
|
return {"noStop_PGVIL": 1}
|
|
|
|
|
|
|
|
|
-def noEmergencyBraking_PGVIL(data):
|
|
|
+def noEmergencyBraking_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
ego_df["ip_dec"] = ego_df["v"].apply(
|
|
|
get_interpolation, point1=[18, -5], point2=[72, -3.5]
|
|
@@ -1145,7 +1148,7 @@ def noEmergencyBraking_PGVIL(data):
|
|
|
return {"noEmergencyBraking_PGVIL": -1}
|
|
|
|
|
|
|
|
|
-def noReverse_PGVIL(data):
|
|
|
+def noReverse_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data.copy()
|
|
|
heading_x = np.cos(ego_df["posH"])
|
|
|
reverse_flag = (ego_df["speedX"] * heading_x) < 0
|
|
@@ -1156,7 +1159,7 @@ def noReverse_PGVIL(data):
|
|
|
return {"noReverse_PGVIL": 1}
|
|
|
|
|
|
|
|
|
-def laneOffset_PGVIL(data):
|
|
|
+def laneOffset_PGVIL(data, plot_path):
|
|
|
car_width = data.function_config["vehicle"]["CAR_WIDTH"]
|
|
|
ego_df = data.ego_data
|
|
|
is_zero = ego_df["v"] == 0
|
|
@@ -1169,12 +1172,12 @@ def laneOffset_PGVIL(data):
|
|
|
|
|
|
# 距离右侧车道线
|
|
|
edge_dist = (last_stop["lane_width"] / 2 + last_stop["laneOffset"]) - (
|
|
|
- car_width / 2
|
|
|
+ car_width / 2
|
|
|
)
|
|
|
return {"laneOffset_PGVIL": edge_dist.max()}
|
|
|
|
|
|
|
|
|
-def maxLongitudelDistance_PGVIL(data):
|
|
|
+def maxLongitudelDistance_PGVIL(data, plot_path):
|
|
|
scenario_name = find_nested_name(data.function_config["function"])
|
|
|
stopX_pos = data.function_config["function"][scenario_name][
|
|
|
"maxLongitudelDistance_PGVIL"
|
|
@@ -1184,7 +1187,7 @@ def maxLongitudelDistance_PGVIL(data):
|
|
|
]["min"]
|
|
|
|
|
|
|
|
|
-def keepInLane_PGVIL(data):
|
|
|
+def keepInLane_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data.copy()
|
|
|
ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
|
|
|
if ego_df.empty:
|
|
@@ -1196,7 +1199,7 @@ def keepInLane_PGVIL(data):
|
|
|
return {"keepInLane_PGVIL": 1}
|
|
|
|
|
|
|
|
|
-def launchTimewhenPedestrianLeave_PGVIL(data):
|
|
|
+def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
ped_df = data.object_data.loc[
|
|
|
data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
|
|
@@ -1242,7 +1245,7 @@ def launchTimewhenPedestrianLeave_PGVIL(data):
|
|
|
return {"launchTimewhenPedestrianLeave_PGVIL": t_launch - t_stop}
|
|
|
|
|
|
|
|
|
-def waitTimeAtCrosswalkwithPedestrian_PGVIL(data):
|
|
|
+def waitTimeAtCrosswalkwithPedestrian_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
ped_df = data.object_data.loc[
|
|
|
data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
|
|
@@ -1266,25 +1269,25 @@ def waitTimeAtCrosswalkwithPedestrian_PGVIL(data):
|
|
|
|
|
|
stops_df = merged.loc[
|
|
|
(merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
|
|
|
- ].sort_values("simTime")
|
|
|
+ ].sort_values("simTime")
|
|
|
if stops_df.empty:
|
|
|
return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
|
|
|
wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
|
|
|
return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": wait_time}
|
|
|
|
|
|
|
|
|
-def noCollision_PGVIL(data):
|
|
|
+def noCollision_PGVIL(data, plot_path):
|
|
|
ego = data.ego_data[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
|
|
|
tar = data.object_data.loc[data.object_data["playerId"] == 2,
|
|
|
- ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
|
|
|
+ ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
|
|
|
df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
|
|
|
|
|
|
df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
|
|
|
df["posH_rad"] = np.deg2rad(df["posH"])
|
|
|
df["collision"] = df.apply(lambda row: funcIsCollision(
|
|
|
- row["dimX"],row["dimY"],row["offX"],row["offY"],row["posX"],row["posY"],
|
|
|
- row["posH_rad"],row["dimX_tar"],row["dimY_tar"],row["offX_tar"],row["offY_tar"],
|
|
|
- row["posX_tar"],row["posY_tar"],row["posH_tar_rad"],),axis=1,)
|
|
|
+ row["dimX"], row["dimY"], row["offX"], row["offY"], row["posX"], row["posY"],
|
|
|
+ row["posH_rad"], row["dimX_tar"], row["dimY_tar"], row["offX_tar"], row["offY_tar"],
|
|
|
+ row["posX_tar"], row["posY_tar"], row["posH_tar_rad"], ), axis=1, )
|
|
|
|
|
|
if df["collision"].any():
|
|
|
return {"noCollision_PGVIL": -1}
|
|
@@ -1292,40 +1295,41 @@ def noCollision_PGVIL(data):
|
|
|
return {"noCollision_PGVIL": 1}
|
|
|
|
|
|
|
|
|
-def leastLateralDistance_PGVIL(data):
|
|
|
+def leastLateralDistance_PGVIL(data, plot_path):
|
|
|
ego_df = data.ego_data
|
|
|
cones = data.object_data.loc[data.object_data['playerId'] != 1,
|
|
|
- ['simTime','playerId','posX','posY']].copy()
|
|
|
- df = ego.merge(cones, on='simTime', how='inner', suffixes=('','_cone'))
|
|
|
+ ['simTime', 'playerId', 'posX', 'posY']].copy()
|
|
|
+ df = ego_df.merge(cones, on='simTime', how='inner', suffixes=('', '_cone'))
|
|
|
yaw = np.deg2rad(df['posH'])
|
|
|
x_c = df['posX'] + df['offX'] * np.cos(yaw)
|
|
|
y_c = df['posY'] + df['offX'] * np.sin(yaw)
|
|
|
- dx = df['posX'] - x_c
|
|
|
- dy = df['posY'] - y_c
|
|
|
+ dx = df['posX'] - x_c
|
|
|
+ dy = df['posY'] - y_c
|
|
|
dx = df['posX_cone'] - x_c
|
|
|
dy = df['posY_cone'] - y_c
|
|
|
- local_x = np.cos(yaw) * dx + np.sin(yaw) * dy
|
|
|
+ local_x = np.cos(yaw) * dx + np.sin(yaw) * dy
|
|
|
local_y = -np.sin(yaw) * dx + np.cos(yaw) * dy
|
|
|
half_length = df['dimX'] / 2
|
|
|
- half_width = df['dimY'] / 2
|
|
|
+ half_width = df['dimY'] / 2
|
|
|
|
|
|
inside = (np.abs(local_x) <= half_length) & (np.abs(local_y) <= half_width)
|
|
|
- collisions = df.loc[inside, ['simTime','playerId']]
|
|
|
+ collisions = df.loc[inside, ['simTime', 'playerId']]
|
|
|
|
|
|
if collisions.empty:
|
|
|
return {"noConeRectCollision_PGVIL": 1}
|
|
|
else:
|
|
|
collision_times = collisions['simTime'].unique().tolist()
|
|
|
- collision_ids = collisions['playerId'].unique().tolist()
|
|
|
+ collision_ids = collisions['playerId'].unique().tolist()
|
|
|
return {"noConeRectCollision_PGVIL": -1}
|
|
|
|
|
|
|
|
|
class FunctionRegistry:
|
|
|
"""动态函数注册器(支持参数验证)"""
|
|
|
|
|
|
- def __init__(self, data_processed):
|
|
|
+ def __init__(self, data_processed, plot_path):
|
|
|
self.logger = LogManager().get_logger() # 获取全局日志实例
|
|
|
self.data = data_processed
|
|
|
+ self.plot_path = plot_path
|
|
|
# 检查function_config是否为空
|
|
|
if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
|
|
|
self.logger.warning("功能配置为空,跳过功能指标计算")
|
|
@@ -1373,10 +1377,10 @@ class FunctionRegistry:
|
|
|
if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry:
|
|
|
self.logger.info("功能配置为空或无注册指标,返回空结果")
|
|
|
return results
|
|
|
-
|
|
|
+
|
|
|
for name, func in self._registry.items():
|
|
|
try:
|
|
|
- result = func(self.data) # 统一传递数据上下文
|
|
|
+ result = func(self.data, self.plot_path) # 统一传递数据上下文
|
|
|
results.update(result)
|
|
|
except Exception as e:
|
|
|
print(f"{name} 执行失败: {str(e)}")
|
|
@@ -1389,15 +1393,16 @@ class FunctionRegistry:
|
|
|
class FunctionManager:
|
|
|
"""管理功能指标计算的类"""
|
|
|
|
|
|
- def __init__(self, data_processed):
|
|
|
+ def __init__(self, data_processed, plot_path):
|
|
|
self.data = data_processed
|
|
|
self.logger = LogManager().get_logger()
|
|
|
+ self.plot_path = plot_path
|
|
|
# 检查function_config是否为空
|
|
|
if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
|
|
|
self.logger.warning("功能配置为空,跳过功能指标计算初始化")
|
|
|
self.function = None
|
|
|
else:
|
|
|
- self.function = FunctionRegistry(self.data)
|
|
|
+ self.function = FunctionRegistry(self.data, self.plot_path)
|
|
|
|
|
|
def report_statistic(self):
|
|
|
"""
|
|
@@ -1408,7 +1413,7 @@ class FunctionManager:
|
|
|
if self.function is None:
|
|
|
self.logger.info("功能指标管理器未初始化,返回空结果")
|
|
|
return {}
|
|
|
-
|
|
|
+
|
|
|
function_result = self.function.batch_execute()
|
|
|
|
|
|
print("\n[功能性表现及评价结果]")
|