|
@@ -29,14 +29,43 @@ import yaml
|
|
|
from modules.lib.chart_generator import generate_function_chart_data
|
|
|
from shapely.geometry import Point, Polygon
|
|
|
from modules.lib.common import get_interpolation
|
|
|
+import pandas as pd
|
|
|
+import yaml
|
|
|
+import math
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208,
|
|
|
- "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
|
|
|
- "ForwardCollision": 212}
|
|
|
-
|
|
|
+scenario_sign_dict = {
|
|
|
+ "LeftTurnAssist": 206,
|
|
|
+ "HazardousLocationW": 207,
|
|
|
+ "RedLightViolationW": 208,
|
|
|
+ "AbnormalVehicleW":209,
|
|
|
+ "NsightVulnerableRoadUserCollisionW":210,
|
|
|
+ "LitterW":211,
|
|
|
+ "ForwardCollisionW":212,
|
|
|
+ "VisibilityW":213,
|
|
|
+ "EmergencyBrakeW":214,
|
|
|
+ "IntersectionCollisionW":215,
|
|
|
+ "BlindSpotW":216,
|
|
|
+ "DoNotPassW":217,
|
|
|
+ "ControlLossW":218,
|
|
|
+ "FrontTrafficJamW":219,
|
|
|
+ "EmergencyVehicleW":220,
|
|
|
+ "CooperativeVehicleMerge":221,
|
|
|
+ "CooperativeLaneChange":223,
|
|
|
+ "VulnerableRoadUserCollisionW":224,
|
|
|
+ "CooperativeIntersectionPassing":225,
|
|
|
+ "RampMerge":226,
|
|
|
+ "DrivingLaneRecommendation":227,
|
|
|
+ "TrafficJamW": 228,
|
|
|
+ "DynamicSpeedLimitingInformation": 229,
|
|
|
+ "EmergencyVehiclesPriority": 230,
|
|
|
+ "RemoteSupervision": 231,
|
|
|
+ "SignalLightReminder": 232,
|
|
|
+ "OtherVehicleRedLightViolationW": 233,
|
|
|
+ "GreenLightOptimalSpeedAdvisory": 234,
|
|
|
+}
|
|
|
|
|
|
def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
|
|
|
polygon = Polygon(polygon)
|
|
@@ -153,6 +182,115 @@ def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
|
|
|
first_time = warning_times.iloc[0]
|
|
|
return obj_df[obj_df['simTime'] == first_time]
|
|
|
|
|
|
+def getAxis(heading):
|
|
|
+ AxisL = [0, 0]
|
|
|
+ AxisW = [0, 0]
|
|
|
+
|
|
|
+ AxisL[0] = math.cos(heading)
|
|
|
+ AxisL[1] = math.sin(heading)
|
|
|
+
|
|
|
+ AxisW[0] = -math.sin(heading)
|
|
|
+ AxisW[1] = math.cos(heading)
|
|
|
+
|
|
|
+ return AxisL, AxisW
|
|
|
+
|
|
|
+
|
|
|
+def getDotMultiply(firstAxis, secondAxis):
|
|
|
+ dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1])
|
|
|
+
|
|
|
+ return dotMultiply
|
|
|
+
|
|
|
+
|
|
|
+def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
|
|
|
+ projAxisL = getDotMultiply(AxisL, baseAxis)
|
|
|
+ projAxisW = getDotMultiply(AxisW, baseAxis)
|
|
|
+
|
|
|
+ projectionRadius = halfLength * projAxisL + halfWidth * projAxisW
|
|
|
+
|
|
|
+ return projectionRadius
|
|
|
+
|
|
|
+
|
|
|
+def isCollision(
|
|
|
+ firstAxisL,
|
|
|
+ firstAxisW,
|
|
|
+ firstHalfLength,
|
|
|
+ firstHalfWidth,
|
|
|
+ secondAxisL,
|
|
|
+ secondAxisW,
|
|
|
+ secondHalfLength,
|
|
|
+ secondHalfWidth,
|
|
|
+ disVector,
|
|
|
+):
|
|
|
+ isCollision = True
|
|
|
+ axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
|
|
|
+ addProjectionRadius = 0
|
|
|
+ projectionDisVecotr = 0
|
|
|
+
|
|
|
+ for axis in axes:
|
|
|
+ addProjectionRadius = getProjectionRadius(
|
|
|
+ firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth
|
|
|
+ ) + getProjectionRadius(
|
|
|
+ secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth
|
|
|
+ )
|
|
|
+ projectionDisVecotr = getDotMultiply(disVector, axis)
|
|
|
+
|
|
|
+ if projectionDisVecotr > addProjectionRadius:
|
|
|
+ isCollision = False
|
|
|
+ break
|
|
|
+ return isCollision
|
|
|
+
|
|
|
+
|
|
|
+def funcIsCollision(
|
|
|
+ firstDimX,
|
|
|
+ firstDimY,
|
|
|
+ firstOffX,
|
|
|
+ firstOffY,
|
|
|
+ firstX,
|
|
|
+ firstY,
|
|
|
+ firstHeading,
|
|
|
+ secondDimX,
|
|
|
+ secondDimY,
|
|
|
+ secondOffX,
|
|
|
+ secondOffY,
|
|
|
+ secondX,
|
|
|
+ secondY,
|
|
|
+ secondHeading,
|
|
|
+):
|
|
|
+ firstAxisL = getAxis(firstHeading)[0]
|
|
|
+ firstAxisW = getAxis(firstHeading)[1]
|
|
|
+ secondAxisL = getAxis(secondHeading)[0]
|
|
|
+ secondAxisW = getAxis(secondHeading)[1]
|
|
|
+
|
|
|
+ firstHalfLength = 0.5 * firstDimX
|
|
|
+ firstHalfWidth = 0.5 * firstDimY
|
|
|
+ secondHalfLength = 0.5 * secondDimX
|
|
|
+ secondHalfWidth = 0.5 * secondDimY
|
|
|
+
|
|
|
+ firstCenter = [0, 0]
|
|
|
+ secondCenter = [0, 0]
|
|
|
+ disVector = [0, 0]
|
|
|
+
|
|
|
+ firstCenter[0] = firstX + firstOffX * math.cos(firstHeading)
|
|
|
+ firstCenter[1] = firstY + firstOffX * math.sin(firstHeading)
|
|
|
+ secondCenter[0] = secondX + secondOffX * math.cos(secondHeading)
|
|
|
+ secondCenter[1] = secondY + secondOffX * math.sin(secondHeading)
|
|
|
+
|
|
|
+ disVector[0] = secondCenter[0] - firstCenter[0]
|
|
|
+ disVector[1] = secondCenter[1] - firstCenter[1]
|
|
|
+
|
|
|
+ varIsCollision = isCollision(
|
|
|
+ firstAxisL,
|
|
|
+ firstAxisW,
|
|
|
+ firstHalfLength,
|
|
|
+ firstHalfWidth,
|
|
|
+ secondAxisL,
|
|
|
+ secondAxisW,
|
|
|
+ secondHalfLength,
|
|
|
+ secondHalfWidth,
|
|
|
+ disVector,
|
|
|
+ )
|
|
|
+
|
|
|
+ return varIsCollision
|
|
|
|
|
|
|
|
|
|
|
@@ -194,7 +332,7 @@ def earliestWarningDistance_LST(data) -> dict:
|
|
|
data.warning_speed = warning_speed
|
|
|
data.correctwarning = correctwarning
|
|
|
|
|
|
- if warning_dist.empty:
|
|
|
+ if len(warning_dist) == 0:
|
|
|
return {"earliestWarningDistance_LST": 0.0}
|
|
|
|
|
|
|
|
@@ -296,7 +434,12 @@ def warningDelayTimeofReachDecel_LST(data):
|
|
|
data.correctwarning = correctwarning
|
|
|
ego_df = data.ego_data
|
|
|
obj_df = data.obj_data
|
|
|
- obj_speed_simtime = obj_df[obj_df['accel'] <= -4]['simTime'].tolist()
|
|
|
+
|
|
|
+ speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6
|
|
|
+ obj_df["ux"] = obj_df["speedX"] / speed_mag
|
|
|
+ obj_df["uy"] = obj_df["speedY"] / speed_mag
|
|
|
+ obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"]
|
|
|
+ obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist()
|
|
|
warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
|
|
|
if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
|
|
|
return {"warningDelayTimeofReachDecel_LST": 0}
|
|
@@ -864,6 +1007,318 @@ def ifCrossingRedLight_PGVIL(data_processed) -> dict:
|
|
|
|
|
|
|
|
|
|
|
|
+def limitSpeed_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ max_speed = max(ego_df["v"])
|
|
|
+ if len(ego_df["v"]) == 0:
|
|
|
+ return {"speedLimit_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"speedLimit_PGVIL": max_speed}
|
|
|
+
|
|
|
+
|
|
|
+def leastDistance_PGVIL(data):
|
|
|
+ exclude_seconds = 2.0
|
|
|
+ ego_df = data.ego_data
|
|
|
+ max_sim_time = ego_df["simTime"].max()
|
|
|
+ threshold_time = max_sim_time - exclude_seconds
|
|
|
+ for index, ego in ego_df.iterrows():
|
|
|
+ if ego["simTime"] >= threshold_time:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if "v" in ego and ego["v"] == 0:
|
|
|
+ car_point = (ego["posX"], ego["posY"])
|
|
|
+
|
|
|
+ p1 = (ego["stopline_x1"], ego["stopline_y1"])
|
|
|
+ p2 = (ego["stopline_x2"], ego["stopline_y2"])
|
|
|
+ stop_line_points = np.array([p1, p2], dtype=float)
|
|
|
+
|
|
|
+ distance_to_stopline = get_car_to_stop_line_distance(
|
|
|
+ ego, car_point, stop_line_points
|
|
|
+ )
|
|
|
+
|
|
|
+ return {"minimumDistance_PGVIL": distance_to_stopline}
|
|
|
+
|
|
|
+ return {"minimumDistance_PGVIL": -1}
|
|
|
+
|
|
|
+
|
|
|
+def launchTimeinStopLine_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ in_stop = False
|
|
|
+ start_time = None
|
|
|
+ last_low_time = None
|
|
|
+ max_duration = 0.0
|
|
|
+ v_thresh = 0.01
|
|
|
+ for _, row in ego_df.iterrows():
|
|
|
+ simt = row["simTime"]
|
|
|
+ v = row.get("v", None)
|
|
|
+ if v is None or np.isnan(v) or abs(v) > v_thresh:
|
|
|
+ if in_stop:
|
|
|
+ duration = last_low_time - start_time
|
|
|
+ if duration > max_duration:
|
|
|
+ max_duration = duration
|
|
|
+ in_stop = False
|
|
|
+ else:
|
|
|
+ if not in_stop:
|
|
|
+ start_time = simt
|
|
|
+ in_stop = True
|
|
|
+
|
|
|
+ last_low_time = simt
|
|
|
+ if in_stop:
|
|
|
+ duration = last_low_time - start_time
|
|
|
+ if duration > max_duration:
|
|
|
+ max_duration = duration
|
|
|
+
|
|
|
+ if max_duration <= 0:
|
|
|
+ return {"launchTimeinStopLine_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"launchTimeinStopLine_PGVIL": float(max_duration)}
|
|
|
+
|
|
|
+
|
|
|
+def launchTimeinTrafficLight_PGVIL(data):
|
|
|
+ GREEN = 0x100000
|
|
|
+ RED = 0x10000000
|
|
|
+ ego_df = data.ego_data
|
|
|
+
|
|
|
+ is_transition = (ego_df["stateMask"] == GREEN) & (
|
|
|
+ ego_df["stateMask"].shift(1) == RED
|
|
|
+ )
|
|
|
+ transition_times = ego_df.loc[is_transition, "simTime"]
|
|
|
+ if transition_times.empty:
|
|
|
+ return {"timeInterval_PGVIL": -1}
|
|
|
+ time_red2green = transition_times.iloc[0]
|
|
|
+
|
|
|
+ car_move_times = ego_df.loc[
|
|
|
+ (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime"
|
|
|
+ ]
|
|
|
+ if car_move_times.empty:
|
|
|
+ return {"timeInterval_PGVIL": -1}
|
|
|
+
|
|
|
+ time_move = move_times.iloc[0]
|
|
|
+ return {"timeInterval_PGVIL": time_move - time_red2green}
|
|
|
+
|
|
|
+
|
|
|
+def crossJunctionToTargetLane_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ lane_ids = set(ego_df["lane_id"].dropna())
|
|
|
+
|
|
|
+ try:
|
|
|
+ scenario_name = find_nested_name(data.function_config["function"])
|
|
|
+ target_lane_id = data.function_config["function"][scenario_name][
|
|
|
+ "crossJunctionToTargetLane_PGVIL"
|
|
|
+ ]["max"]
|
|
|
+ except KeyError:
|
|
|
+ raise ValueError("请在配置文件中指定目标车道ID!")
|
|
|
+
|
|
|
+ result = target_lane_id if target_lane_id in lane_ids else -1
|
|
|
+ return {"crossJunctionToTargetLane_PGVIL": result}
|
|
|
+
|
|
|
+
|
|
|
+def noStop_PGVIL(data):
|
|
|
+ exclude_end_seconds = 5.0
|
|
|
+ exclude_start_seconds = 5.0
|
|
|
+ ego_df = data.ego_data
|
|
|
+ max_sim_time = ego_df["simTime"].max()
|
|
|
+
|
|
|
+ start_threshold = min_sim_time + exclude_start_seconds
|
|
|
+ end_threshold = max_sim_time - exclude_end_seconds
|
|
|
+ filtered_df = ego_df[
|
|
|
+ (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
|
|
|
+ ]
|
|
|
+
|
|
|
+ if (filtered_df["v"] == 0).any():
|
|
|
+ return {"noStop_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"noStop_PGVIL": 1}
|
|
|
+
|
|
|
+
|
|
|
+def noEmergencyBraking_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ ego_df["ip_dec"] = ego_df["v"].apply(
|
|
|
+ get_interpolation, point1=[18, -5], point2=[72, -3.5]
|
|
|
+ )
|
|
|
+ ego_df["slam_brake"] = (ego_df["accleX"] - ego_df["ip_dec"]).apply(
|
|
|
+ lambda x: 1 if x < 0 else 0
|
|
|
+ )
|
|
|
+ if sum(ego_df["slam_brake"]) == 0:
|
|
|
+ return {"noEmergencyBraking_PGVIL": 1}
|
|
|
+ else:
|
|
|
+ return {"noEmergencyBraking_PGVIL": -1}
|
|
|
+
|
|
|
+
|
|
|
+def noReverse_PGVIL(data):
|
|
|
+ ego_df = data.ego_data.copy()
|
|
|
+ heading_x = np.cos(ego_df["posH"])
|
|
|
+ reverse_flag = (ego_df["speedX"] * heading_x) < 0
|
|
|
+
|
|
|
+ if reverse_flag.any():
|
|
|
+ return {"noReverse_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"noReverse_PGVIL": 1}
|
|
|
+
|
|
|
+
|
|
|
+def laneOffset_PGVIL(data):
|
|
|
+ car_width = data.function_config["vehicle"]["CAR_WIDTH"]
|
|
|
+ ego_df = data.ego_data
|
|
|
+ is_zero = ego_df["v"] == 0
|
|
|
+ if not is_zero.any():
|
|
|
+
|
|
|
+ return {"laneOffset_PGVIL": None}
|
|
|
+ groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
|
|
|
+ last_zero_group = groups[is_zero].iloc[-1]
|
|
|
+ last_stop = ego_df[groups == last_zero_group]
|
|
|
+
|
|
|
+
|
|
|
+ edge_dist = (last_stop["lane_width"] / 2 + last_stop["laneOffset"]) - (
|
|
|
+ car_width / 2
|
|
|
+ )
|
|
|
+ return {"laneOffset_PGVIL": edge_dist.max()}
|
|
|
+
|
|
|
+
|
|
|
+def maxLongitudelDistance_PGVIL(data):
|
|
|
+ scenario_name = find_nested_name(data.function_config["function"])
|
|
|
+ stopX_pos = data.function_config["function"][scenario_name][
|
|
|
+ "maxLongitudelDistance_PGVIL"
|
|
|
+ ]["max"]
|
|
|
+ stopY_pos = data.function_config["function"][scenario_name][
|
|
|
+ "maxLongitudelDistance_PGVIL"
|
|
|
+ ]["min"]
|
|
|
+
|
|
|
+
|
|
|
+def keepInLane_PGVIL(data):
|
|
|
+ ego_df = data.ego_data.copy()
|
|
|
+ ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
|
|
|
+ if ego_df.empty:
|
|
|
+ return {"keepInLane_PGVIL": -1}
|
|
|
+ threshold = ego_df["lane_width"] / 2
|
|
|
+ if (ego_df["laneOffset"].abs() > threshold).any():
|
|
|
+ return {"keepInLane_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"keepInLane_PGVIL": 1}
|
|
|
+
|
|
|
+
|
|
|
+def launchTimewhenPedestrianLeave_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ ped_df = data.object_data.loc[
|
|
|
+ data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
|
|
|
+ ]
|
|
|
+ merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
|
|
|
+ if merged.empty:
|
|
|
+ return {"launchTimewhenPedestrianLeave_PGVIL": -1}
|
|
|
+
|
|
|
+ yaw_rad = np.deg2rad(merged["posH"])
|
|
|
+ dx = merged["posX_ped"] - merged["posX"]
|
|
|
+ dy = merged["posY_ped"] - merged["posY"]
|
|
|
+ merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
|
|
|
+ merged["threshold"] = merged["lane_width"] / 2
|
|
|
+
|
|
|
+ entry = merged[merged["lateral_dist"] <= merged["threshold"]]
|
|
|
+ if entry.empty:
|
|
|
+ return {"launchTimewhenPedestrianLeave_PGVIL": -1}
|
|
|
+ t_entry = entry["simTime"].iloc[0]
|
|
|
+
|
|
|
+ after_e = merged[merged["simTime"] > t_entry]
|
|
|
+ leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
|
|
|
+ if leave.empty:
|
|
|
+ return {"launchTimewhenPedestrianLeave_PGVIL": -1}
|
|
|
+ t_leave = leave["simTime"].iloc[0]
|
|
|
+ ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
|
|
|
+
|
|
|
+ stops = ego_df[(ego_df["simTime"] > t_leave) & (ego_df["v"] == 0)].copy()
|
|
|
+ if stops.empty:
|
|
|
+ return {"launchTimewhenPedestrianLeave_PGVIL": -1}
|
|
|
+
|
|
|
+ front_overPos = ego_df["dimX"] / 2 + ego_df["offX"]
|
|
|
+ yaw_stop = np.deg2rad(stops["posH"])
|
|
|
+
|
|
|
+ stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
|
|
|
+ stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
|
|
|
+ dx_s = stops["front_x"] - ped_x
|
|
|
+ dy_s = stops["front_y"] - ped_y
|
|
|
+ proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
|
|
|
+ valid = stops[proj <= 0]
|
|
|
+ t_stop = valid["simTime"].iloc[0]
|
|
|
+ launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"]
|
|
|
+ t_launch = launches.iloc[0]
|
|
|
+ return {"launchTimewhenPedestrianLeave_PGVIL": t_launch - t_stop}
|
|
|
+
|
|
|
+
|
|
|
+def waitTimeAtCrosswalkwithPedestrian_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ ped_df = data.object_data.loc[
|
|
|
+ data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
|
|
|
+ ]
|
|
|
+
|
|
|
+ first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"]
|
|
|
+ if first_launch.empty:
|
|
|
+ t0_launch = 0.0
|
|
|
+ else:
|
|
|
+ t0_launch = first_launch.iloc[0]
|
|
|
+
|
|
|
+ merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
|
|
|
+ merged["front_offset"] = merged["dimX"] / 2 + merged["offX"]
|
|
|
+ yaw = np.deg2rad(merged["posH"])
|
|
|
+
|
|
|
+ merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"]
|
|
|
+ merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"]
|
|
|
+ dx = merged["posX_ped"] - merged["front_x"]
|
|
|
+ dy = merged["posY_ped"] - merged["front_y"]
|
|
|
+ valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0
|
|
|
+
|
|
|
+ stops_df = merged.loc[
|
|
|
+ (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
|
|
|
+ ].sort_values("simTime")
|
|
|
+ if stops_df.empty:
|
|
|
+ return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
|
|
|
+ wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
|
|
|
+ return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": wait_time}
|
|
|
+
|
|
|
+
|
|
|
+def noCollision_PGVIL(data):
|
|
|
+ ego = data.ego_data[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
|
|
|
+ tar = data.object_data.loc[data.object_data["playerId"] == 2,
|
|
|
+ ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
|
|
|
+ df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
|
|
|
+
|
|
|
+ df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
|
|
|
+ df["posH_rad"] = np.deg2rad(df["posH"])
|
|
|
+ df["collision"] = df.apply(lambda row: funcIsCollision(
|
|
|
+ row["dimX"],row["dimY"],row["offX"],row["offY"],row["posX"],row["posY"],
|
|
|
+ row["posH_rad"],row["dimX_tar"],row["dimY_tar"],row["offX_tar"],row["offY_tar"],
|
|
|
+ row["posX_tar"],row["posY_tar"],row["posH_tar_rad"],),axis=1,)
|
|
|
+
|
|
|
+ if df["collision"].any():
|
|
|
+ return {"noCollision_PGVIL": -1}
|
|
|
+ else:
|
|
|
+ return {"noCollision_PGVIL": 1}
|
|
|
+
|
|
|
+
|
|
|
+def leastLateralDistance_PGVIL(data):
|
|
|
+ ego_df = data.ego_data
|
|
|
+ cones = data.object_data.loc[data.object_data['playerId'] != 1,
|
|
|
+ ['simTime','playerId','posX','posY']].copy()
|
|
|
+ df = ego.merge(cones, on='simTime', how='inner', suffixes=('','_cone'))
|
|
|
+ yaw = np.deg2rad(df['posH'])
|
|
|
+ x_c = df['posX'] + df['offX'] * np.cos(yaw)
|
|
|
+ y_c = df['posY'] + df['offX'] * np.sin(yaw)
|
|
|
+ dx = df['posX'] - x_c
|
|
|
+ dy = df['posY'] - y_c
|
|
|
+ dx = df['posX_cone'] - x_c
|
|
|
+ dy = df['posY_cone'] - y_c
|
|
|
+ local_x = np.cos(yaw) * dx + np.sin(yaw) * dy
|
|
|
+ local_y = -np.sin(yaw) * dx + np.cos(yaw) * dy
|
|
|
+ half_length = df['dimX'] / 2
|
|
|
+ half_width = df['dimY'] / 2
|
|
|
+
|
|
|
+ inside = (np.abs(local_x) <= half_length) & (np.abs(local_y) <= half_width)
|
|
|
+ collisions = df.loc[inside, ['simTime','playerId']]
|
|
|
+
|
|
|
+ if collisions.empty:
|
|
|
+ return {"noConeRectCollision_PGVIL": 1}
|
|
|
+ else:
|
|
|
+ collision_times = collisions['simTime'].unique().tolist()
|
|
|
+ collision_ids = collisions['playerId'].unique().tolist()
|
|
|
+ return {"noConeRectCollision_PGVIL": -1}
|
|
|
+
|
|
|
|
|
|
class FunctionRegistry:
|
|
|
"""动态函数注册器(支持参数验证)"""
|