#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2025 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2025/01/5 @Last Modified: 2025/01/5 @Summary: Function Metrics Calculation """ import sys from pathlib import Path # 添加项目根目录到系统路径 root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) print(root_path) from modules.lib.score import Score from modules.lib.log_manager import LogManager import numpy as np from typing import Dict, Tuple, Optional, Callable, Any import pandas as pd import yaml from modules.lib.chart_generator import generate_function_chart_data from shapely.geometry import Point, Polygon from modules.lib.common import get_interpolation import pandas as pd import yaml import math # ---------------------- # 基础工具函数 (Pure functions) # ---------------------- scenario_sign_dict = { "LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208, "AbnormalVehicleW": 209, "NsightVulnerableRoadUserCollisionW": 210, "LitterW": 211, "ForwardCollision": 212, "VisibilityW": 213, "EmergencyBrakeW": 214, "IntersectionCollisionW": 215, "BlindSpotW": 216, "DoNotPassW": 217, "ControlLossW": 218, "FrontTrafficJamW": 219, "EmergencyVehicleW": 220, "CooperativeVehicleMerge": 221, "CooperativeLaneChange": 223, "VulnerableRoadUserCollisionW": 224, "CooperativeIntersectionPassing": 225, "RampMerge": 226, "DrivingLaneRecommendation": 227, "TrafficJamW": 228, "DynamicSpeedLimitingInformation": 229, "EmergencyVehiclesPriority": 230, "RemoteSupervision": 231, "SignalLightReminder": 232, "OtherVehicleRedLightViolationW": 233, "GreenLightOptimalSpeedAdvisory": 234, } def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool: polygon = Polygon(polygon) point = Point(test_point) return polygon.contains(point) def _is_segment_by_interval(time_list, expected_step) -> list: """ 根据时间戳之间的间隔进行分段。 参数: time_list (list): 时间戳列表。 expected_step (float): 预期的固定步长。 返回: list: 分段后的时间戳列表,每个元素是一个子列表。 """ if not time_list: return [] segments = [] current_segment = [time_list[0]] for i in range(1, len(time_list)): actual_step = time_list[i] - time_list[i - 1] if actual_step != expected_step: # 如果间隔不符合预期,则开始一个新的段 segments.append(current_segment) current_segment = [time_list[i]] else: # 否则,将当前时间戳添加到当前段中 current_segment.append(time_list[i]) # 添加最后一个段 if current_segment: segments.append(current_segment) return segments # 寻找二级指标的名称 def find_nested_name(data): """ 查找字典中嵌套的name结构。 :param data: 要搜索的字典 :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None """ if isinstance(data, dict): for key, value in data.items(): if isinstance(value, dict) and 'name' in value: return value['name'] # 递归查找嵌套字典 result = find_nested_name(value) if result is not None: return result elif isinstance(data, list): for item in data: result = find_nested_name(item) if result is not None: return result return None def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray: """向量化距离计算""" return np.linalg.norm(ego_pos - obj_pos, axis=1) def calculate_relative_speed_PGVIL( ego_speed: np.ndarray, obj_speed: np.ndarray ) -> np.ndarray: """向量化相对速度计算""" return np.linalg.norm(ego_speed - obj_speed, axis=1) def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray: """向量化距离计算""" dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist'] return dist def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray: """向量化相对速度计算""" return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v'] def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]: """数据提取函数""" ego = data[data["playerId"] == 1].iloc[0] obj = data[data["playerId"] != 1] return ego, obj def get_first_warning(data_processed) -> Optional[pd.DataFrame]: """带缓存的预警数据获取""" ego_df = data_processed.ego_data obj_df = data_processed.object_df scenario_name = find_nested_name(data_processed.function_config["function"]) correctwarning = scenario_sign_dict.get(scenario_name) if correctwarning is None: print("无法获取正确的预警信号标志位!") return None warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())] warning_times = warning_rows['simTime'] if warning_times.empty: print("没有找到预警数据!") return None first_time = warning_times.iloc[0] return obj_df[obj_df['simTime'] == first_time] def getAxis(heading): AxisL = [0, 0] AxisW = [0, 0] AxisL[0] = math.cos(heading) AxisL[1] = math.sin(heading) AxisW[0] = -math.sin(heading) AxisW[1] = math.cos(heading) return AxisL, AxisW def getDotMultiply(firstAxis, secondAxis): dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1]) return dotMultiply def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth): projAxisL = getDotMultiply(AxisL, baseAxis) projAxisW = getDotMultiply(AxisW, baseAxis) projectionRadius = halfLength * projAxisL + halfWidth * projAxisW return projectionRadius def isCollision( firstAxisL, firstAxisW, firstHalfLength, firstHalfWidth, secondAxisL, secondAxisW, secondHalfLength, secondHalfWidth, disVector, ): isCollision = True axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW] addProjectionRadius = 0 projectionDisVecotr = 0 for axis in axes: addProjectionRadius = getProjectionRadius( firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth ) + getProjectionRadius( secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth ) projectionDisVecotr = getDotMultiply(disVector, axis) if projectionDisVecotr > addProjectionRadius: isCollision = False break return isCollision def funcIsCollision( firstDimX, firstDimY, firstOffX, firstOffY, firstX, firstY, firstHeading, secondDimX, secondDimY, secondOffX, secondOffY, secondX, secondY, secondHeading, ): firstAxisL = getAxis(firstHeading)[0] firstAxisW = getAxis(firstHeading)[1] secondAxisL = getAxis(secondHeading)[0] secondAxisW = getAxis(secondHeading)[1] firstHalfLength = 0.5 * firstDimX firstHalfWidth = 0.5 * firstDimY secondHalfLength = 0.5 * secondDimX secondHalfWidth = 0.5 * secondDimY firstCenter = [0, 0] secondCenter = [0, 0] disVector = [0, 0] firstCenter[0] = firstX + firstOffX * math.cos(firstHeading) firstCenter[1] = firstY + firstOffX * math.sin(firstHeading) secondCenter[0] = secondX + secondOffX * math.cos(secondHeading) secondCenter[1] = secondY + secondOffX * math.sin(secondHeading) disVector[0] = secondCenter[0] - firstCenter[0] disVector[1] = secondCenter[1] - firstCenter[1] varIsCollision = isCollision( firstAxisL, firstAxisW, firstHalfLength, firstHalfWidth, secondAxisL, secondAxisW, secondHalfLength, secondHalfWidth, disVector, ) return varIsCollision # ---------------------- # 核心计算功能函数 # ---------------------- def latestWarningDistance_LST(data, plot_path) -> dict: """预警距离计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) warning_speed = calculate_relative_speed(ego_df, correctwarning) # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.correctwarning = correctwarning if len(warning_dist.tolist()) == 0: return {"latestWarningDistance_LST": 0.0} # 生成图表数据 generate_function_chart_data(data, 'latestWarningDistance_LST', plot_path) return {"latestWarningDistance_LST": warning_dist.tolist()[-1] if len(warning_dist) > 0 else value} def earliestWarningDistance_LST(data, plot_path) -> dict: """预警距离计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) warning_speed = calculate_relative_speed(ego_df, correctwarning) # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.correctwarning = correctwarning if len(warning_dist.tolist()) == 0: return {"earliestWarningDistance_LST": 0.0} # 生成图表数据 generate_function_chart_data(data, 'earliestWarningDistance_LST', plot_path) return {"earliestWarningDistance_LST": float(warning_dist.tolist()[0]) if len(warning_dist) > 0 else value} def latestWarningDistance_TTC_LST(data, plot_path) -> dict: """TTC计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) if len(warning_dist.tolist()) == 0: return {"latestWarningDistance_TTC_LST": 0.0} # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning warning_speed = calculate_relative_speed(ego_df, correctwarning) with np.errstate(divide='ignore', invalid='ignore'): ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf) # 处理无效的TTC值 for i in range(len(ttc)): ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i] data.warning_dist = warning_dist data.warning_speed = warning_speed data.ttc = ttc # 生成图表数据 # from modules.lib.chart_generator import generate_function_chart_data generate_function_chart_data(data, 'latestWarningDistance_TTC_LST', plot_path) return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value} def earliestWarningDistance_TTC_LST(data, plot_path) -> dict: """TTC计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) if len(warning_dist.tolist()) == 0: return {"earliestWarningDistance_TTC_LST": 0.0} # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning warning_speed = calculate_relative_speed(ego_df, correctwarning) with np.errstate(divide='ignore', invalid='ignore'): ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf) # 处理无效的TTC值 for i in range(len(ttc)): ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i] # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.ttc = ttc data.correctwarning = correctwarning # 生成图表数据 generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST', plot_path) return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value} def warningDelayTime_LST(data, plot_path): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist() simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][ 'simTime'].tolist() simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None if (simTime_HMI is None) or (simTime_rosbag is None): print("预警出错!") delay_time = 100.0 else: delay_time = abs(simTime_HMI - simTime_rosbag) return {"warningDelayTime_LST": delay_time} def warningDelayTimeofReachDecel_LST(data, plot_path): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data obj_df = data.obj_data[2] # 加速度在速度方向上的投影 speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6 obj_df["ux"] = obj_df["speedX"] / speed_mag obj_df["uy"] = obj_df["speedY"] / speed_mag obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"] obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2 warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist() if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0): return {"warningDelayTimeofReachDecel_LST": 0} elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0): return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]} elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0): return {"warningDelayTimeofReachDecel_LST": -1} else: return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]} def rightWarningSignal_LST(data, plot_path): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data if ego_df['ifwarning'].empty: print("无法获取正确预警信号标志位!") return warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())] if warning_rows.empty: return {"rightWarningSignal_LST": -1} else: return {"rightWarningSignal_LST": 1} def noWarning_LST(data, plot_path): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data if ego_df['ifwarning'].empty: print("无法获取正确预警信号标志位!") return warning_rows = (ego_df['ifwarning'].dropna() == -1).all() if warning_rows: return {"noWarning_LST": 1} else: return {"noWarning_LST": -1} # def ifCrossingRedLight_LST(data, plot_path): # scenario_name = find_nested_name(data.function_config["function"]) # correctwarning = scenario_sign_dict[scenario_name] # # 将correctwarning保存到data对象中,供图表生成使用 # data.correctwarning = correctwarning # ego_df = data.ego_data # redlight_simtime = ego_df[ # (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] <= 0.5) & ( # ego_df['v'] != 0)]['simTime'] # if redlight_simtime.empty: # return {"ifCrossingRedLight_LST": -1} # else: # return {"ifCrossingRedLight_LST": 1} def ifStopgreenWaveSpeedGuidance_LST(data, plot_path): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data greenlight_simtime = \ ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] <= 0.1)]['simTime'] if greenlight_simtime.empty: return {"ifStopgreenWaveSpeedGuidance_LST": -1} else: return {"ifStopgreenWaveSpeedGuidance_LST": 1} # ------ 单车智能指标 ------ def limitSpeed_LST(data, plot_path): ego_df = data.ego_data scenario_name = find_nested_name(data.function_config["function"]) limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"] speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.2]['v'].tolist() if len(speed_limit) == 0: return {"limitSpeed_LST": -1} max_speed = max(speed_limit)*3.6 data.speedLimit = limit_speed generate_function_chart_data(data, 'limitspeed_LST', plot_path) return {"limitSpeed_LST": max_speed} def limitSpeedPastLimitSign_LST(data, plot_path): ego_df = data.ego_data scenario_name = find_nested_name(data.function_config["function"]) limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"] car_length = data.function_config["vehicle"]['CAR_LENGTH'] ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist() ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist() data.speedLimit = limit_speed data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST', plot_path) if len(ego_speed) == 0: return {"speedPastLimitSign_LST": -1} return {"speedPastLimitSign_LST": ego_speed[0]} def leastDistance_LST(data, plot_path): ego_df = data.ego_data dist_row = ego_df[ego_df['v'] <= 0.1]['relative_dist'].tolist() if len(dist_row) == 0: return {"leastDistance_LST": -1} else: min_dist = min(dist_row) return {"leastDistance_LST": min_dist} def noCrossRedLight_LST(data, plot_path): ego_df = data.ego_data cross_redlight_dist = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None) & abs(ego_df['v'] > 0.1)]['x_relative_dist'] has_positive = any(x > 0 for x in cross_redlight_dist) has_negative = any(x < 0 for x in cross_redlight_dist) has_both = has_positive and has_negative if has_both: return {'noCrossRedLight_LST': -1} else: return {'noCrossRedLight_LST': 1} def launchTimeinStopLine_LST(data, plot_path): ego_df = data.ego_data simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist() if len(simtime_row) == 0: return {"launchTimeinStopLine_LST": -1} else: delta_t = simtime_row[-1] - simtime_row[0] return {"launchTimeinStopLine_LST": delta_t} def launchTimewhenFollowingCar_LST(data, plot_path): ego_df = data.ego_data t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0] simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist() if len(simtime_row) == 0: return {"launchTimewhenFollowingCar_LST": 0} else: time_interval = _is_segment_by_interval(simtime_row, t_interval) delta_t = [] for t in time_interval: delta_t.append(t[-1] - t[0]) return {"launchTimewhenFollowingCar_LST": max(delta_t)} def noStop_LST(data, plot_path): ego_df_ini = data.ego_data min_time = ego_df_ini['simTime'].min() + 0 max_time = ego_df_ini['simTime'].max() - 0 ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)] speed = ego_df['v'].tolist() if (any(speed) <= 0.5): return {"noStop_LST": -1} else: return {"noStop_LST": 1} def launchTimeinTrafficLight_LST(data, plot_path): ''' 待修改: 红灯的状态值:1 绿灯的状态值:0 ''' ego_df = data.ego_data simtime_of_redlight = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None)]['simTime'] simtime_of_stop = ego_df[ego_df['v'] <= 0.1]['simTime'] if len(simtime_of_stop) or len(simtime_of_redlight): return {"launchTimeinTrafficLight_LST": -1} simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist() if len(simtime_of_launch) == 0: return {"launchTimeinTrafficLight_LST": -1} return {"launchTimeinTrafficLight_LST": simtime_of_launch[-1] - simtime_of_launch[0]} def crossJunctionToTargetLane_LST(data, plot_path): ego_df = data.ego_data lane_in_leftturn = set(ego_df['lane_id'].tolist()) scenario_name = find_nested_name(data.function_config["function"]) if (30225902 in lane_in_leftturn) or (30226005 in lane_in_leftturn): return {"crossJunctionToTargetLane_LST": 1} else: return {"crossJunctionToTargetLane_LST": -1} def keepInLane_LST(data, plot_path): # ego_df = data.ego_data[data.ego_data['road_type'] == 15] ego_df = data.ego_data[data.ego_data['lane_id'].isin([30225904, 30225978])].copy() notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width'] / 2] if len(notkeepinlane) == 0: return {"keepInLane_LST": -1} return {"keepInLane_LST": 1} def leastLateralDistance_LST(data, plot_path): ego_df = data.ego_data lane_width = ego_df[abs(ego_df['x_relative_dist']) <= 0.5]['lane_width'].tolist()[0] if len(lane_width) == 0: return {"leastLateralDistance_LST": -1} else: y_relative_dist = ego_df[abs(ego_df['x_relative_dist']) <= 0.05]['y_relative_dist'] if (abs(y_relative_dist) <= lane_width / 2).all(): return {"leastLateralDistance_LST": 1} else: return {"leastLateralDistance_LST": -1} def waitTimeAtCrosswalkwithPedestrian_LST(data, plot_path): ego_df = data.ego_data object_df = data.obj_data data['in_crosswalk'] = [] position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True) for cross_id in position_data['cross_id'].tolist(): for posX, posY in object_df['posX', 'posY']: pedestrian_pos = (posX, posY) plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0] if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos): data[data['playerId'] == 2]['in_crosswalk'] = 1 else: data[data['playerId'] == 2]['in_crosswalk'] = 0 car_stop_time = ego_df[ego_df['v'] <= 0.1]['simTime'] pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime'] car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist() return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len( car_wait_pedestrian) > 0 else 0} def launchTimewhenPedestrianLeave_LST(data, plot_path): ego_df = data.ego_data car_stop_time = ego_df[ego_df['v'] <= 0.1]["simTime"] if len(car_stop_time) == 0: return {"launchTimewhenPedestrianLeave_LST": 0} else: lane_width = ego_df[ego_df['v'] <= 0.1]['lane_width'].tolist()[0] car_to_pedestrain = ego_df[abs(ego_df['y_relative_dist']) <= lane_width / 2]["simTime"] legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist() return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]} def noCollision_LST(data, plot_path): ego_df = data.ego_data is_dist_min = False is_sign_change = False for i in range(1, len(ego_df) - 1): relative_dist_decreasing = ego_df['relative_dist'][i-1] > ego_df['relative_dist'][i] relative_dist_increasing = ego_df['relative_dist'][i+1] > ego_df['relative_dist'][i] is_dist_min = relative_dist_decreasing and relative_dist_increasing and (ego_df['relative_dist'][i] < 0.5) x_relative = (ego_df['x_relative_dist'][i-1]*ego_df['x_relative_dist'][i]) < 0 y_relative = (ego_df['y_relative_dist'][i-1]*ego_df['y_relative_dist'][i]) < 0 is_sign_change = x_relative or y_relative if is_dist_min and is_sign_change: return {"noCollision_LST": -1} return {"noCollision_LST": 1} def noReverse_LST(data, plot_path): ego_df = data.ego_data if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0: return {"noReverse_LST": -1} else: return {"noReverse_LST": 1} def turnAround_LST(data, plot_path): ego_df = data.ego_data if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and ( ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0): return {"turnAround_LST": 1} else: return {"turnAround_LST": -1} def laneOffset_LST(data, plot_path): car_width = data.vehicle_config['CAR_WIDTH'] ego_df_ini = data.ego_data min_time = ego_df_ini['simTime'].min() + 0 max_time = ego_df_ini['simTime'].max() - 0 ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)] laneoffset = abs(ego_df['y_relative_dist']) - car_width / 2 return {"laneOffset_LST": max(laneoffset)} def maxLongitudeDist_LST(data, plot_path): ego_df = data.ego_data longitude_dist = abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist']) data.longitude_dist = min(abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist']), default=0) stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist() data.stop_time = min(stop_time) if len(longitude_dist) == 0: return {"maxLongitudeDist_LST": -1} generate_function_chart_data(data, 'maxLongitudeDist_LST', plot_path) return {"maxLongDist_LST": min(longitude_dist)} def noEmergencyBraking_LST(data, plot_path): ego_df = data.ego_data ego_df['ip_dec'] = ego_df['v'].apply( get_interpolation, point1=[18, -5], point2=[72, -3.5]) ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply( lambda x: 1 if x < 0 else 0) if sum(ego_df['slam_brake']) == 0: return {"noEmergencyBraking_LST": 1} else: return {"noEmergencyBraking_LST": -1} def rightWarningSignal_PGVIL(data_processed, plot_path) -> dict: """判断是否发出正确预警信号""" ego_df = data_processed.ego_data scenario_name = find_nested_name(data_processed.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] if correctwarning is None: print("无法获取正确的预警信号标志位!") return None # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行 warning_rows = ego_df[ (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna()) ] if warning_rows.empty: return {"rightWarningSignal_PGVIL": -1} else: return {"rightWarningSignal_PGVIL": 1} def latestWarningDistance_PGVIL(data_processed, plot_path) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistance_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] # dis_pos2head = 5.929 / 2 + 1.982 ego, obj = extract_ego_obj(warning_data) distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values ) - dis_pos2head # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"latestWarningDistance_PGVIL": -1} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL', plot_path) return {"latestWarningDistance_PGVIL": float(np.min(distances))} def latestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistancetoConflictPoint_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] ego, obj = extract_ego_obj(warning_data) ConflictPoint = obj[obj["type"] == 25].copy() distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), ConflictPoint[["posX", "posY"]].values ) - dis_pos2head # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"latestWarningDistancetoConflictPoint_PGVIL": -1} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_PGVIL', plot_path) return {"latestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))} def latestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] # dis_pos2head = 5.929 / 2 + 1.982 warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistance_TTC_PGVIL": 0.0} ego, obj = extract_ego_obj(warning_data) # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) obj_pos = obj[["posX", "posY"]].values obj_speed = obj[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"latestWarningDistance_TTC_PGVIL": -2} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL', plot_path) return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))} def latestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] ego, obj = extract_ego_obj(warning_data) # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) ConflictPoint = obj[obj["type"] == 25].copy() if ConflictPoint.empty: print("未找到 type == 25 的目标物") return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22} ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values ConflictPoint_speed = obj[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -33} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path) return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))} def earliestWarningDistance_PGVIL(data_processed, plot_path) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistance_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] # dis_pos2head = 5.929 / 2 + 1.982 ego, obj = extract_ego_obj(warning_data) distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values ) - dis_pos2head # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"earliestWarningDistance_PGVIL": -22} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL', plot_path) return {"earliestWarningDistance_PGVIL": float(np.min(distances))} def earliestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistancetoConflictPoint_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] ego, obj = extract_ego_obj(warning_data) distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values ) - dis_pos2head # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"earliestWarningDistancetoConflictPoint_PGVIL": -22} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_PGVIL', plot_path) return {"earliestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))} def earliestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistance_TTC_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] # dis_pos2head = 5.929 / 2 + 1.982 ego, obj = extract_ego_obj(warning_data) # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) obj_pos = obj[["posX", "posY"]].values obj_speed = obj[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"earliestWarningDistance_TTC_PGVIL": -22} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL', plot_path) return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))} def earliestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -11} dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] ego, obj = extract_ego_obj(warning_data) ConflictPoint = obj[obj["type"] == 25].copy() if ConflictPoint.empty: print("未找到 type == 25 的目标物") return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22} # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values ConflictPoint_speed = ConflictPoint[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -22} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path) return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))} def warningDelayTimeofReachDecel_PGVIL(data_processed, plot_path) -> dict: # 预警时机相对背景车辆减速度达到-4m/s2后的时延 ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"warningDelayTimeofReachDecel_PGVIL": -1} try: ego, _ = extract_ego_obj(warning_data) warning_time = ego["simTime"] if isinstance(ego, pd.Series) else ego["simTime"].iloc[0] # 回溯:只取在预警前的背景车轨迹 obj_pre = obj_df[(obj_df["playerId"] == 2) & (obj_df["simTime"] < warning_time)].copy() if len(obj_pre) < 2: print("预警前目标车辆轨迹不足2帧,无法计算减速度") return {"warningDelayTimeofReachDecel_PGVIL": -2} obj_pre = obj_pre.sort_values(by="simTime").reset_index(drop=True) obj_speed = obj_pre[["speedX", "speedY"]].values simtime_gap = obj_pre["simTime"].iloc[1] - obj_pre["simTime"].iloc[0] obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1) obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap if np.min(obj_deceleration) < -4: max_index = np.argmin(obj_deceleration) decel_time = obj_pre["simTime"].iloc[max_index + 1] # 注意 +1 是因为 diff 少1行 delay_time = warning_time - decel_time return {"warningDelayTimeofReachDecel_PGVIL": round(float(delay_time), 3)} else: print("未发现背景车减速度低于 -4 m/s² 的时刻") return {"warningDelayTimeofReachDecel_PGVIL": -3} except Exception as e: print("异常:", e) return {"warningDelayTimeofReachDecel_PGVIL": -99} def warningDelayTime_PGVIL(data_processed, plot_path) -> dict: """车端接收到预警到HMI发出预警的时延""" ego_df = data_processed.ego_data # #打印ego_df的列名 # print(ego_df.columns.tolist()) warning_data = get_first_warning(data_processed) if warning_data is None: return {"warningDelayTime_PGVIL": -1} try: ego, obj = extract_ego_obj(warning_data) # 找到event_Type不为空,且playerID为1的行 rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())] first_time = rosbag_warning_rows["simTime"].iloc[0] warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0] delay_time = warning_time - first_time return {"warningDelayTime_PGVIL": float(delay_time)} except Exception as e: print(f"计算预警时延时发生错误: {e}") return {"warningDelayTime_PGVIL": -1} def noWarning_PGVIL(data_processed, plot_path) -> dict: scenario_name = find_nested_name(data_processed.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] ego_df = data_processed.ego_data if ego_df['ifwarning'].empty: print("无法获取正确预警信号标志位!") return warning_rows = (ego_df['ifwarning'].dropna() == -1).all() if warning_rows: return {"noWarning_PGVIL": 1} else: return {"noWarning_PGVIL": -1} def get_car_to_stop_line_distance(ego, car_point, stop_line_points): """ 计算主车后轴中心点到停止线的距离 :return 距离 """ distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"] # 计算停止线的方向向量 line_vector = np.array( [ stop_line_points[1][0] - stop_line_points[0][0], stop_line_points[1][1] - stop_line_points[0][1], ] ) direction_vector_norm = np.linalg.norm(line_vector) direction_vector_unit = ( line_vector / direction_vector_norm if direction_vector_norm != 0 else np.array([0, 0]) ) # 计算主车后轴中心点到停止线投影的坐标(垂足) projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit) perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit # 计算主车后轴中心点到垂足的距离 distance_to_foot = np.linalg.norm(car_point - perpendicular_foot) carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead return carhead_distance_to_foot def ifCrossingRedLight_PGVIL(data_processed, plot_path) -> dict: GREEN = 0x100000 # 1048576 YELLOW = 0x1000000 # 16777216 RED = 0x10000000 # 268435456 ego_df = data_processed.ego_data.copy() stopline_df = data_processed.object_df.copy() # 过滤出停止线对象(type==25),只保留两端点 stopline_df = stopline_df.loc[ stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"] ] # 去除无效stateMask帧 ego_df = ego_df[ego_df["stateMask"].notna()].copy() ego_df.sort_values(by="simTime", inplace=True) dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0] red_light_violation = False prev_distance = float("inf") for index, ego in ego_df.iterrows(): simTime = ego["simTime"] car_point = (ego["posX"], ego["posY"]) stateMask = int(ego["stateMask"]) # 获取该时刻的两个停止线点(playerId 2 和 3) p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == simTime)] p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == simTime)] # 如果缺失,跳过该帧 if p1_row.empty or p2_row.empty: continue p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0]) p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0]) stop_line_points = np.array([p1, p2], dtype=float) distance_to_stopline = get_car_to_stop_line_distance( ego, car_point, stop_line_points ) - dis_pos2head # 如果车头刚刚越线 if prev_distance > 0 and distance_to_stopline <= 0: # 如果不是绿灯,就视为闯红灯 if stateMask != GREEN: red_light_violation = True print(f"[Red Light Violation] simTime={simTime}, stateMask={stateMask}") break prev_distance = distance_to_stopline return {"ifCrossingRedLight_PGVIL": -1 if red_light_violation else 1} def limitSpeed_PGVIL(data, plot_path): ego_df = data.ego_data max_speed = max(ego_df["v"]) if len(ego_df["v"]) == 0: return {"speedLimit_PGVIL": -1} else: return {"speedLimit_PGVIL": round(max_speed * 3.6, 3)} def leastDistance_PGVIL(data, plot_path): exclude_seconds = 2.0 ego_df = data.ego_data stopline_df = data.object_df stopline_df = stopline_df.loc[ stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"] ] max_sim_time = ego_df["simTime"].max() threshold_time = max_sim_time - exclude_seconds for index, ego in ego_df.iterrows(): if ego["simTime"] >= threshold_time: continue if "v" in ego and ego["v"] == 0: current_time = ego["simTime"] car_point = (ego["posX"], ego["posY"]) # p1 = (ego["stopline_x1"], ego["stopline_y1"]) # p2 = (ego["stopline_x2"], ego["stopline_y2"]) p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == current_time)] p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == current_time)] if p1_row.empty or p2_row.empty: continue # 如果找不到,就跳过,继续找下一个停车帧 p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0]) p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0]) stop_line_points = np.array([p1, p2], dtype=float) distance_to_stopline = get_car_to_stop_line_distance( ego, car_point, stop_line_points ) # print(f"distance_to_stopline:{distance_to_stopline}") return {"leastDistance_PGVIL": round(distance_to_stopline, 3)} return {"leastDistance_PGVIL": -1} def launchTimeinStopLine_PGVIL(data, plot_path): ego_df = data.ego_data # === 先计算有效时间段 === max_sim_time = ego_df["simTime"].max() min_sim_time = ego_df["simTime"].min() threshold_start = min_sim_time + 3.0 threshold_end = max_sim_time - 3.0 # === 只保留有效时间段 === ego_df = ego_df[(ego_df["simTime"] >= threshold_start) & (ego_df["simTime"] <= threshold_end)] in_stop = False start_time = None last_low_time = None max_duration = 0.0 v_thresh = 0.01 for _, row in ego_df.iterrows(): simt = row["simTime"] v = row.get("v", None) if v is None or np.isnan(v) or abs(v) > v_thresh: if in_stop: duration = last_low_time - start_time if duration > max_duration: max_duration = duration in_stop = False else: if not in_stop: start_time = simt in_stop = True last_low_time = simt if in_stop: duration = last_low_time - start_time if duration > max_duration: max_duration = duration if max_duration <= 0: return {"launchTimeinStopLine_PGVIL": -1} else: return {"launchTimeinStopLine_PGVIL": round(float(max_duration), 3)} def launchTimeinTrafficLight_PGVIL(data, plot_path): GREEN = 1048576 # 0x100000 RED = 268435456 # 0x10000000 ego_df = data.ego_data mask = (ego_df["stateMask"]).notna() ego_df_light = ego_df[mask].copy() ego_df_light = ego_df_light.drop_duplicates(subset=["simTime"]).sort_values(by="simTime") # 找到第一次红灯 to 绿灯的切换时刻 is_transition = ( (ego_df_light["stateMask"] == GREEN) & (ego_df_light["stateMask"].shift(1) == RED) ) transition_times = ego_df_light.loc[is_transition, "simTime"] if transition_times.empty: # print("没有红绿灯切换") return {"launchTimeinTrafficLight_PGVIL": -88} time_red2green = transition_times.iloc[0] car_speed_at_transition = ego_df.loc[ego_df["simTime"] == time_red2green, "v"] if not car_speed_at_transition.empty and car_speed_at_transition.iloc[0] > 0.5: return {"launchTimeinTrafficLight_PGVIL": -66} car_move_times = ego_df.loc[ (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime" ] if car_move_times.empty: # print("没有车移动") return {"launchTimeinTrafficLight_PGVIL": -99} time_move = car_move_times.iloc[0] return {"launchTimeinTrafficLight_PGVIL": round(time_move - time_red2green, 3)} def crossJunctionToTargetLane_PGVIL(data, plot_path): ego_df = data.ego_data road_ids = set(ego_df["road_id"].dropna()) try: scenario_name = find_nested_name(data.function_config["function"]) target_lane_id = data.function_config["function"][scenario_name][ "crossJunctionToTargetLane_PGVIL" ]["max"] except KeyError: raise ValueError("请在配置文件中指定目标车道ID!") result = target_lane_id if target_lane_id in road_ids else -1 return {"crossJunctionToTargetLane_PGVIL": result} def crossJunctionToTargetLane_PGVIL(data, plot_path): ego_df = data.ego_data road_ids = set(ego_df["road_id"].dropna()) try: scenario_name = find_nested_name(data.function_config["function"]) target_lane_id = data.function_config["function"][scenario_name][ "crossJunctionToTargetLane_PGVIL" ]["max"] except KeyError: raise ValueError("请在配置文件中指定目标车道ID!") result = target_lane_id if target_lane_id in road_ids else -1 return {"crossJunctionToTargetLane_PGVIL": result} def noStop_PGVIL(data, plot_path): exclude_end_seconds = 5.0 exclude_start_seconds = 5.0 ego_df = data.ego_data min_sim_time = ego_df["simTime"].min() max_sim_time = ego_df["simTime"].max() start_threshold = min_sim_time + exclude_start_seconds end_threshold = max_sim_time - exclude_end_seconds filtered_df = ego_df[ (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold) ] if (filtered_df["v"] == 0).any(): return {"noStop_PGVIL": -1} else: return {"noStop_PGVIL": 1} def noEmergencyBraking_PGVIL(data, plot_path): ego_df = data.ego_data ego_df["ip_dec"] = ego_df["v"].apply( get_interpolation, point1=[18, -5], point2=[72, -3.5] ) ego_df["slam_brake"] = (ego_df["accelX"] - ego_df["ip_dec"]).apply( lambda x: 1 if x < 0 else 0 ) if sum(ego_df["slam_brake"]) == 0: return {"noEmergencyBraking_PGVIL": 1} else: return {"noEmergencyBraking_PGVIL": -1} def noReverse_PGVIL(data, plot_path): ego_df = data.ego_data.copy() ego_df["posH"] = (90 - ego_df["posH"]) % 360 yaw_rad = np.deg2rad(ego_df["posH"]) # 车头朝向向量 heading_x = np.cos(yaw_rad) heading_y = np.sin(yaw_rad) # 速度向量 speed_x = ego_df["speedX"] speed_y = ego_df["speedY"] # 点积:判断速度是否与车头方向一致 dot_product = speed_x * heading_x + speed_y * heading_y moving = np.hypot(speed_x, speed_y) > 0.5 # 排除近乎静止的帧(避免误判) reverse_flag = (dot_product < 0) & moving # 倒车帧:点积为负 且 在移动 if reverse_flag.any(): return {"noReverse_PGVIL": -1} # 存在倒车 else: return {"noReverse_PGVIL": 1} # 全程未倒车 def laneOffset_PGVIL(data, plot_path): ego_df = data.ego_data car_width = ego_df["dimY"].loc[0] print(f"car_width:{car_width}") is_zero = ego_df["v"] == 0 if not is_zero.any(): # 全程未停车 return {"laneOffset_PGVIL": -1} groups = (is_zero != is_zero.shift(fill_value=False)).cumsum() last_zero_group = groups[is_zero].iloc[-1] last_stop = ego_df[groups == last_zero_group] # 距离右侧车道线 edge_dist = (last_stop["width"] / 2 + last_stop["laneOffset"]) - ( car_width / 2 ) return {"laneOffset_PGVIL": round(edge_dist.max(), 3)} def maxLongitudelDistance_PGVIL(data, plot_path): ego_df = data.ego_data object_df = data.object_df is_zero = ego_df["v"] == 0 if not is_zero.any(): # 全程未停车 return {"maxLongitudelDistance_PGVIL": -1} groups = (is_zero != is_zero.shift(fill_value=False)).cumsum() last_zero_group = groups[is_zero].iloc[-1] last_stop = ego_df[groups == last_zero_group] stop_point = object_df[object_df["type"] == 25] stop_x = stop_point["posX"].iloc[0] stop_y = stop_point["posY"].iloc[0] dx = last_stop["posX"] - stop_x dy = last_stop["posY"] - stop_y last_stop["posH"] = (90 - last_stop["posH"]) % 360 heading_rad = last_stop["posH"] # 弧度制 longitudinal_offset = np.cos(heading_rad) * dx + np.sin(heading_rad) * dy max_longitudinal_offset = np.abs(longitudinal_offset).max() return {"maxLongitudelDistance_PGVIL": round(max_longitudinal_offset, 3)} def keepInLane_PGVIL(data, plot_path): ego_df = data.ego_data.copy() ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"]) if ego_df.empty: return {"keepInLane_PGVIL": -1} threshold = ego_df["lane_width"] / 2 if (ego_df["laneOffset"].abs() > threshold).any(): return {"keepInLane_PGVIL": -1} else: return {"keepInLane_PGVIL": 1} # def launchTimewhenPedestrianLeave_PGVIL(data, plot_path): # ego_df = data.ego_data # object_df = data.object_df # # == 1. 提取行人轨迹 == # ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]] # merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped")) # if merged.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -1} # # == 2. 判断主车是否在行人完全离开前抢行 == # ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "offX"]].copy() # ped = ped_df.rename(columns={"posX": "posX_ped", "posY": "posY_ped"}) # ego_ped = pd.merge(ego, ped, on="simTime") # ego_ped = ego_ped.sort_values("simTime").reset_index(drop=True) # front_offset = ego_ped["dimX"] / 2 + ego_ped["offX"] # yaw = np.deg2rad(ego_ped["posH"]) # ego_ped["front_x"] = ego_ped["posX"] + np.cos(yaw) * front_offset # ego_ped["front_y"] = ego_ped["posY"] + np.sin(yaw) * front_offset # dx = ego_ped["front_x"] - ego_ped["posX_ped"] # dy = ego_ped["front_y"] - ego_ped["posY_ped"] # ego_ped["proj"] = dx * np.cos(yaw) + dy * np.sin(yaw) # ego_ped["lateral_dist"] = np.abs(-np.sin(yaw) * dx + np.cos(yaw) * dy) # cross_idx = ego_ped[ego_ped["proj"] > 0].index # if not cross_idx.empty and cross_idx[-1] <= len(ego_ped) - 3: # i = cross_idx[0] # d0 = ego_ped.loc[i, "lateral_dist"] # d1 = ego_ped.loc[i + 1, "lateral_dist"] # d2 = ego_ped.loc[i + 2, "lateral_dist"] # if d1 < d0 and d2 < d1: # return {"launchTimewhenPedestrianLeave_PGVIL": -66} # 主车未避让行人 # # == 3. 判断行人进入与离开车道 == # yaw_rad = np.deg2rad(merged["posH"]) # dx = merged["posX_ped"] - merged["posX"] # dy = merged["posY_ped"] - merged["posY"] # merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy) # merged["threshold"] = merged["lane_width"] / 2 # entry = merged[merged["lateral_dist"] <= merged["threshold"]] # if entry.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -11} # t_entry = entry["simTime"].iloc[0] # after_e = merged[merged["simTime"] > t_entry] # leave = after_e[after_e["lateral_dist"] > after_e["threshold"]] # if leave.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -22} # t_leave = leave["simTime"].iloc[0] # ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0] # # == 4. 查找 t_leave 之后主车是否在前方合理范围内停车 == # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] == 0)].copy() # if stops.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -33} # front_overPos = stops["dimX"] / 2 + stops["offX"] # yaw_stop = np.deg2rad(stops["posH"]) # stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos # stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos # dx_s = stops["front_x"] - ped_x # dy_s = stops["front_y"] - ped_y # proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop) # valid = stops[ # (proj >= 0) & # (proj <= 20) & # ((stops["simTime"] - t_leave) <= 5.0) # ] # if valid.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -88} # # == 5. 查找 t_stop 后首次启动 == # t_stop = valid["simTime"].iloc[0] # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"] # if launches.empty: # return {"launchTimewhenPedestrianLeave_PGVIL": -99} # t_launch = launches.iloc[0] # return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)} def launchTimewhenPedestrianLeave_PGVIL(data, plot_path): ego_df = data.ego_data object_df = data.object_df ego_df["posH"] = (90 - ego_df["posH"]) % 360 object_df["posH"] = (90 - object_df["posH"]) % 360 ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]] merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped")) if merged.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -1} yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度 dx = merged["posX_ped"] - merged["posX"] dy = merged["posY_ped"] - merged["posY"] merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy) merged["threshold"] = merged["lane_width"] / 2 entry = merged[merged["lateral_dist"] <= merged["threshold"]] if entry.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -11} t_entry = entry["simTime"].iloc[0] after_e = merged[merged["simTime"] > t_entry] leave = after_e[after_e["lateral_dist"] > after_e["threshold"]] if leave.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -22} t_leave = leave["simTime"].iloc[0] ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0] ego_after_leave = ego_df[ego_df["simTime"] >= t_leave].copy() min_V = ego_after_leave["v"].min() if ego_after_leave["v"].min() == 0: stops = ego_after_leave[ego_after_leave["v"] == 0].copy() elif min_V <= 1: stops = ego_after_leave[ego_after_leave["v"] == min_V].copy() else: stops = pd.DataFrame() # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] <= 0.01)].copy() if stops.empty: # 没有停车 return {"launchTimewhenPedestrianLeave_PGVIL": -33} front_overPos = stops["dimX"] / 2 + stops["offX"] yaw_stop = np.deg2rad(stops["posH"]) stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos dx_s = stops["front_x"] - ped_x dy_s = stops["front_y"] - ped_y proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop) valid = stops[ (proj >= 0) & (proj <= 20) & ((stops["simTime"] - t_leave) <= 5.0) ] if valid.empty: # 车辆未在行人离开后停止 return {"launchTimewhenPedestrianLeave_PGVIL": -88} t_stop = valid["simTime"].iloc[0] if ego_after_leave["v"].min() == 0: launches_data = ego_after_leave[ego_after_leave["v"] > 0].copy() else: launches_data = ego_after_leave[ego_after_leave["v"] > 1].copy() launches = launches_data[launches_data["simTime"] > t_stop] # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0.5)]["simTime"] if launches.empty: # 车辆停止后未启动 return {"launchTimewhenPedestrianLeave_PGVIL": -99} # t_launch = launches.iloc[0] t_launch = launches["simTime"].iloc[0] return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)} def waitTimeAtCrosswalkwithPedestrian_PGVIL(data, plot_path): ego_df = data.ego_data object_df = data.object_df # ego_df["posH"] = (90 - ego_df["posH"]) % 360 # object_df["posH"] = (90 - object_df["posH"]) % 360 ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]] first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"] if first_launch.empty: t0_launch = 0.0 else: t0_launch = first_launch.iloc[0] merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped")) merged["front_offset"] = merged["dimX"] / 2 + merged["offX"] yaw = np.deg2rad(merged["posH"]) merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"] merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"] dx = merged["posX_ped"] - merged["front_x"] dy = merged["posY_ped"] - merged["front_y"] valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0.1 stops_df = merged.loc[ (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times ].sort_values("simTime") if stops_df.empty: print("no stopping time found!") return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1} wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0] return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": round(wait_time, 3)} def noCollision_PGVIL(data, plot_path): ego_df = data.ego_data object_df = data.object_df ego_df['posH'] = (90 - ego_df['posH']) % 360 object_df['posH'] = (90 - object_df['posH']) % 360 ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]] tar = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]] df = ego.merge(tar, on="simTime", suffixes=("", "_tar")) df["posH_tar_rad"] = np.deg2rad(df["posH_tar"]) df["posH_rad"] = np.deg2rad(df["posH"]) df["collision"] = df.apply(lambda row: funcIsCollision( row["dimX"], row["dimY"], row["offX"], 0, row["posX"], row["posY"], row["posH_rad"], row["dimX_tar"], row["dimY_tar"], row["offX_tar"], 0, row["posX_tar"], row["posY_tar"], row["posH_tar_rad"], ), axis=1, ) if df["collision"].any(): return {"noCollision_PGVIL": -1} else: return {"noCollision_PGVIL": 1} def leastLateralDistance_PGVIL(data, plot_path): ego_df = data.ego_data object_df = data.object_df ego_df['posH'] = (90 - ego_df['posH']) % 360 object_df['posH'] = (90 - object_df['posH']) % 360 cones = object_df[object_df["type"] == 25].copy() ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]].copy() ego["posH_rad"] = np.deg2rad(ego["posH"]) has_collision = False for idx, cone in cones.iterrows(): # 找到与ego同一时刻的数据 ego_at_time = ego[ego["simTime"] == cone["simTime"]] if ego_at_time.empty: continue posX_tar = cone["posX"] posY_tar = cone["posY"] dimX_tar = cone["dimX"] dimY_tar = cone["dimY"] offX_tar = cone.get("offX", 0) posH_tar_rad = np.deg2rad(cone["posH"]) if not np.isnan(cone["posH"]) else 0 for _, row in ego_at_time.iterrows(): result = funcIsCollision( row["dimX"], row["dimY"], row["offX"], 0, row["posX"], row["posY"], row["posH_rad"], dimX_tar, dimY_tar, offX_tar, 0, posX_tar, posY_tar, posH_tar_rad, ) if result: has_collision = True break if has_collision: break return {"leastLateralDistance_PGVIL": -1 if has_collision else 1} class FunctionRegistry: """动态函数注册器(支持参数验证)""" def __init__(self, data_processed, plot_path): self.logger = LogManager().get_logger() # 获取全局日志实例 self.data = data_processed self.plot_path = plot_path # 检查function_config是否为空 if not hasattr(data_processed, 'function_config') or not data_processed.function_config: self.logger.warning("功能配置为空,跳过功能指标计算") self.fun_config = {} self.level_3_merics = [] self._registry = {} return self.fun_config = data_processed.function_config.get("function", {}) self.level_3_merics = self._extract_level_3_metrics(self.fun_config) self._registry: Dict[str, Callable] = {} self._registry = self._build_registry() def _extract_level_3_metrics(self, config_node: dict) -> list: """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)""" metrics = [] def _recurse(node): if isinstance(node, dict): if "name" in node and not any( isinstance(v, dict) for v in node.values() ): metrics.append(node["name"]) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f"评比的功能指标列表:{metrics}") return metrics def _build_registry(self) -> dict: """自动注册指标函数(防御性编程)""" registry = {} for func_name in self.level_3_merics: try: registry[func_name] = globals()[func_name] except KeyError: print(f"未实现指标函数: {func_name}") self.logger.error(f"未实现指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算(带熔断机制)""" results = {} # 如果配置为空或没有注册的指标,直接返回空结果 if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry: self.logger.info("功能配置为空或无注册指标,返回空结果") return results for name, func in self._registry.items(): try: result = func(self.data, self.plot_path) # 统一传递数据上下文 results.update(result) except Exception as e: print(f"{name} 执行失败: {str(e)}") self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f"功能指标计算结果:{results}") return results class FunctionManager: """管理功能指标计算的类""" def __init__(self, data_processed, plot_path): self.data = data_processed self.logger = LogManager().get_logger() self.plot_path = plot_path # 检查function_config是否为空 if not hasattr(data_processed, 'function_config') or not data_processed.function_config: self.logger.warning("功能配置为空,跳过功能指标计算初始化") self.function = None else: self.function = FunctionRegistry(self.data, self.plot_path) def report_statistic(self): """ 计算并报告功能指标结果。 :return: 评估结果 """ # 如果function为None,直接返回空字典 if self.function is None: self.logger.info("功能指标管理器未初始化,返回空结果") return {} function_result = self.function.batch_execute() print("\n[功能性表现及评价结果]") return function_result # self.logger.info(f'Function Result: {function_result}') # 使用示例 if __name__ == "__main__": pass # print("\n[功能类表现及得分情况]")