#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2025 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2025/01/5 @Last Modified: 2025/01/5 @Summary: Function Metrics Calculation """ import sys from pathlib import Path # 添加项目根目录到系统路径 root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) print(root_path) from modules.lib.score import Score from modules.lib.log_manager import LogManager import numpy as np from typing import Dict, Tuple, Optional, Callable, Any import pandas as pd import yaml from modules.lib.chart_generator import generate_function_chart_data from shapely.geometry import Point, Polygon from modules.lib.common import get_interpolation import pandas as pd import yaml import math # ---------------------- # 基础工具函数 (Pure functions) # ---------------------- scenario_sign_dict = { "LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208, "AbnormalVehicleW":209, "NsightVulnerableRoadUserCollisionW":210, "LitterW":211, "ForwardCollisionW":212, "VisibilityW":213, "EmergencyBrakeW":214, "IntersectionCollisionW":215, "BlindSpotW":216, "DoNotPassW":217, "ControlLossW":218, "FrontTrafficJamW":219, "EmergencyVehicleW":220, "CooperativeVehicleMerge":221, "CooperativeLaneChange":223, "VulnerableRoadUserCollisionW":224, "CooperativeIntersectionPassing":225, "RampMerge":226, "DrivingLaneRecommendation":227, "TrafficJamW": 228, "DynamicSpeedLimitingInformation": 229, "EmergencyVehiclesPriority": 230, "RemoteSupervision": 231, "SignalLightReminder": 232, "OtherVehicleRedLightViolationW": 233, "GreenLightOptimalSpeedAdvisory": 234, } def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool: polygon = Polygon(polygon) point = Point(test_point) return polygon.contains(point) def _is_segment_by_interval(time_list, expected_step) -> list: """ 根据时间戳之间的间隔进行分段。 参数: time_list (list): 时间戳列表。 expected_step (float): 预期的固定步长。 返回: list: 分段后的时间戳列表,每个元素是一个子列表。 """ if not time_list: return [] segments = [] current_segment = [time_list[0]] for i in range(1, len(time_list)): actual_step = time_list[i] - time_list[i - 1] if actual_step != expected_step: # 如果间隔不符合预期,则开始一个新的段 segments.append(current_segment) current_segment = [time_list[i]] else: # 否则,将当前时间戳添加到当前段中 current_segment.append(time_list[i]) # 添加最后一个段 if current_segment: segments.append(current_segment) return segments # 寻找二级指标的名称 def find_nested_name(data): """ 查找字典中嵌套的name结构。 :param data: 要搜索的字典 :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None """ if isinstance(data, dict): for key, value in data.items(): if isinstance(value, dict) and 'name' in value: return value['name'] # 递归查找嵌套字典 result = find_nested_name(value) if result is not None: return result elif isinstance(data, list): for item in data: result = find_nested_name(item) if result is not None: return result return None def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray: """向量化距离计算""" return np.linalg.norm(ego_pos - obj_pos, axis=1) def calculate_relative_speed_PGVIL( ego_speed: np.ndarray, obj_speed: np.ndarray ) -> np.ndarray: """向量化相对速度计算""" return np.linalg.norm(ego_speed - obj_speed, axis=1) def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray: """向量化距离计算""" dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist'] return dist def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray: """向量化相对速度计算""" return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v'] def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]: """数据提取函数""" ego = data[data["playerId"] == 1].iloc[0] obj = data[data["playerId"] != 1] return ego, obj def get_first_warning(data_processed) -> Optional[pd.DataFrame]: """带缓存的预警数据获取""" ego_df = data_processed.ego_data obj_df = data_processed.object_df scenario_name = find_nested_name(data_processed.function_config["function"]) correctwarning = scenario_sign_dict.get(scenario_name) if correctwarning is None: print("无法获取正确的预警信号标志位!") return None warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())] warning_times = warning_rows['simTime'] if warning_times.empty: print("没有找到预警数据!") return None first_time = warning_times.iloc[0] return obj_df[obj_df['simTime'] == first_time] def getAxis(heading): AxisL = [0, 0] AxisW = [0, 0] AxisL[0] = math.cos(heading) AxisL[1] = math.sin(heading) AxisW[0] = -math.sin(heading) AxisW[1] = math.cos(heading) return AxisL, AxisW def getDotMultiply(firstAxis, secondAxis): dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1]) return dotMultiply def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth): projAxisL = getDotMultiply(AxisL, baseAxis) projAxisW = getDotMultiply(AxisW, baseAxis) projectionRadius = halfLength * projAxisL + halfWidth * projAxisW return projectionRadius def isCollision( firstAxisL, firstAxisW, firstHalfLength, firstHalfWidth, secondAxisL, secondAxisW, secondHalfLength, secondHalfWidth, disVector, ): isCollision = True axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW] addProjectionRadius = 0 projectionDisVecotr = 0 for axis in axes: addProjectionRadius = getProjectionRadius( firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth ) + getProjectionRadius( secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth ) projectionDisVecotr = getDotMultiply(disVector, axis) if projectionDisVecotr > addProjectionRadius: isCollision = False break return isCollision def funcIsCollision( firstDimX, firstDimY, firstOffX, firstOffY, firstX, firstY, firstHeading, secondDimX, secondDimY, secondOffX, secondOffY, secondX, secondY, secondHeading, ): firstAxisL = getAxis(firstHeading)[0] firstAxisW = getAxis(firstHeading)[1] secondAxisL = getAxis(secondHeading)[0] secondAxisW = getAxis(secondHeading)[1] firstHalfLength = 0.5 * firstDimX firstHalfWidth = 0.5 * firstDimY secondHalfLength = 0.5 * secondDimX secondHalfWidth = 0.5 * secondDimY firstCenter = [0, 0] secondCenter = [0, 0] disVector = [0, 0] firstCenter[0] = firstX + firstOffX * math.cos(firstHeading) firstCenter[1] = firstY + firstOffX * math.sin(firstHeading) secondCenter[0] = secondX + secondOffX * math.cos(secondHeading) secondCenter[1] = secondY + secondOffX * math.sin(secondHeading) disVector[0] = secondCenter[0] - firstCenter[0] disVector[1] = secondCenter[1] - firstCenter[1] varIsCollision = isCollision( firstAxisL, firstAxisW, firstHalfLength, firstHalfWidth, secondAxisL, secondAxisW, secondHalfLength, secondHalfWidth, disVector, ) return varIsCollision # ---------------------- # 核心计算功能函数 # ---------------------- def latestWarningDistance_LST(data) -> dict: """预警距离计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) warning_speed = calculate_relative_speed(ego_df, correctwarning) # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.correctwarning = correctwarning if warning_dist.empty: return {"latestWarningDistance_LST": 0.0} # 生成图表数据 generate_function_chart_data(data, 'latestWarningDistance_LST') return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value} def earliestWarningDistance_LST(data) -> dict: """预警距离计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) warning_speed = calculate_relative_speed(ego_df, correctwarning) # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.correctwarning = correctwarning if len(warning_dist) == 0: return {"earliestWarningDistance_LST": 0.0} # 生成图表数据 generate_function_chart_data(data, 'earliestWarningDistance_LST') return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value} def latestWarningDistance_TTC_LST(data) -> dict: """TTC计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) if warning_dist.empty: return {"latestWarningDistance_TTC_LST": 0.0} # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning warning_speed = calculate_relative_speed(ego_df, correctwarning) with np.errstate(divide='ignore', invalid='ignore'): ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf) # 处理无效的TTC值 for i in range(len(ttc)): ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i] data.warning_dist = warning_dist data.warning_speed = warning_speed data.ttc = ttc # 生成图表数据 # from modules.lib.chart_generator import generate_function_chart_data generate_function_chart_data(data, 'latestWarningDistance_TTC_LST') return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value} def earliestWarningDistance_TTC_LST(data) -> dict: """TTC计算流水线""" scenario_name = find_nested_name(data.function_config["function"]) value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"] correctwarning = scenario_sign_dict[scenario_name] ego_df = data.ego_data warning_dist = calculate_distance(ego_df, correctwarning) if warning_dist.empty: return {"earliestWarningDistance_TTC_LST": 0.0} # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning warning_speed = calculate_relative_speed(ego_df, correctwarning) with np.errstate(divide='ignore', invalid='ignore'): ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf) # 处理无效的TTC值 for i in range(len(ttc)): ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i] # 将计算结果保存到data对象中,供图表生成使用 data.warning_dist = warning_dist data.warning_speed = warning_speed data.ttc = ttc data.correctwarning = correctwarning # 生成图表数据 generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST') return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value} def warningDelayTime_LST(data): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist() simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][ 'simTime'].tolist() simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None if (simTime_HMI is None) or (simTime_rosbag is None): print("预警出错!") delay_time = 100.0 else: delay_time = abs(simTime_HMI - simTime_rosbag) return {"warningDelayTime_LST": delay_time} def warningDelayTimeofReachDecel_LST(data): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data obj_df = data.obj_data # 加速度在速度方向上的投影 speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6 obj_df["ux"] = obj_df["speedX"] / speed_mag obj_df["uy"] = obj_df["speedY"] / speed_mag obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"] obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2 warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist() if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0): return {"warningDelayTimeofReachDecel_LST": 0} elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0): return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]} elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0): return {"warningDelayTimeofReachDecel_LST": None} else: return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]} def rightWarningSignal_LST(data): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data if ego_df['ifwarning'].empty: print("无法获取正确预警信号标志位!") return warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())] if warning_rows.empty: return {"rightWarningSignal_LST": -1} else: return {"rightWarningSignal_LST": 1} def ifCrossingRedLight_LST(data): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data redlight_simtime = ego_df[ (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & ( ego_df['v'] != 0)]['simTime'] if redlight_simtime.empty: return {"ifCrossingRedLight_LST": -1} else: return {"ifCrossingRedLight_LST": 1} def ifStopgreenWaveSpeedGuidance_LST(data): scenario_name = find_nested_name(data.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] # 将correctwarning保存到data对象中,供图表生成使用 data.correctwarning = correctwarning ego_df = data.ego_data greenlight_simtime = \ ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime'] if greenlight_simtime.empty: return {"ifStopgreenWaveSpeedGuidance_LST": -1} else: return {"ifStopgreenWaveSpeedGuidance_LST": 1} # ------ 单车智能指标 ------ def limitSpeed_LST(data): ego_df = data.ego_data scenario_name = find_nested_name(data.function_config["function"]) limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"] speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.1]['v'].tolist() if len(speed_limit) == 0: return {"speedLimit_LST": -1} max_speed = max(speed_limit) data.speedLimit = limit_speed generate_function_chart_data(data, 'limitspeed_LST') return {"speedLimit_LST": max_speed} def limitSpeedPastLimitSign_LST(data): ego_df = data.ego_data scenario_name = find_nested_name(data.function_config["function"]) limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"] car_length = data.function_config["vehicle"]['CAR_LENGTH'] ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist() ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist() data.speedLimit = limit_speed data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST') if len(ego_speed) == 0: return {"speedPastLimitSign_LST": -1} return {"speedPastLimitSign_LST": ego_speed[0]} def leastDistance_LST(data): ego_df = data.ego_data dist_row = ego_df[ego_df['v'] == 0]['relative_dist'].tolist() if len(dist_row) == 0: return {"leastDistance_LST": -1} else: min_dist = min(dist_row) return {"leastDistance_LST": min_dist} def launchTimeinStopLine_LST(data): ego_df = data.ego_data simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist() if len(simtime_row) == 0: return {"launchTimeinStopLine_LST": -1} else: delta_t = simtime_row[-1] - simtime_row[0] return {"launchTimeinStopLine_LST": delta_t} def launchTimewhenFollowingCar_LST(data): ego_df = data.ego_data t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0] simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist() if len(simtime_row) == 0: return {"launchTimewhenFollowingCar_LST": 0} else: time_interval = _is_segment_by_interval(simtime_row, t_interval) delta_t = [] for t in time_interval: delta_t.append(t[-1] - t[0]) return {"launchTimewhenFollowingCar_LST": max(delta_t)} def noStop_LST(data): ego_df_ini = data.ego_data min_time = ego_df_ini['simTime'].min() + 5 max_time = ego_df_ini['simTime'].max() - 5 ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)] speed = ego_df['v'].tolist() if (any(speed) == 0): return {"noStop_LST": -1} else: return {"noStop_LST": 1} def launchTimeinTrafficLight_LST(data): ''' 待修改: 红灯的状态值:1 绿灯的状态值:0 ''' ego_df = data.ego_data simtime_of_redlight = ego_df[ego_df['stateMask'] == 0]['simTime'] simtime_of_stop = ego_df[ego_df['v'] == 0]['simTime'] if len(simtime_of_stop) or len(simtime_of_redlight): return {"timeInterval_LST": -1} simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist() if len(simtime_of_launch) == 0: return {"timeInterval_LST": -1} return {"timeInterval_LST": simtime_of_launch[-1] - simtime_of_launch[0]} def crossJunctionToTargetLane_LST(data): ego_df = data.ego_data lane_in_leftturn = set(ego_df['lane_id'].tolist()) scenario_name = find_nested_name(data.function_config["function"]) target_lane_id = data.function_config["function"][scenario_name]["crossJunctionToTargetLane_LST"]['max'] if target_lane_id not in lane_in_leftturn: return {"crossJunctionToTargetLane_LST": -1} else: return {"crossJunctionToTargetLane_LST": target_lane_id} def keepInLane_LST(data): ego_df = data.ego_data notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width']/2].tolist() if len(notkeepinlane): return {"keepInLane_LST": -1} return {"keepInLane_LST": 1} def leastLateralDistance_LST(data): ego_df = data.ego_data lane_width = ego_df[ego_df['x_relative_dist'] == 0]['lane_width'] if lane_width.empty(): return {"leastLateralDistance_LST": -1} else: y_relative_dist = ego_df[ego_df['x_relative_dist'] == 0]['y_relative_dist'] if (y_relative_dist <= lane_width / 2).all(): return {"leastLateralDistance_LST": 1} else: return {"leastLateralDistance_LST": -1} def waitTimeAtCrosswalkwithPedestrian_LST(data): ego_df = data.ego_data object_df = data.object_data data['in_crosswalk'] = [] position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True) for cross_id in position_data['cross_id'].tolist(): for posX, posY in object_df['posX', 'posY']: pedestrian_pos = (posX, posY) plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0] if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos): data[data['playerId'] == 2]['in_crosswalk'] = 1 else: data[data['playerId'] == 2]['in_crosswalk'] = 0 car_stop_time = ego_df[ego_df['v'] == 0]['simTime'] pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime'] car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist() return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len( car_wait_pedestrian) > 0 else 0} def launchTimewhenPedestrianLeave_LST(data): ego_df = data.ego_data car_stop_time = ego_df[ego_df['v'] == 0]["simTime"] if car_stop_time.empty(): return {"launchTimewhenPedestrianLeave_LST": -1} else: lane_width = ego_df[ego_df['v'] == 0]['lane_width'].tolist()[0] car_to_pedestrain = ego_df[ego_df['y_relative_dist'] <= lane_width / 2]["simTime"] legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist() return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]} def noCollision_LST(data): ego_df = data.ego_data if ego_df['relative_dist'].any() == 0: return {"noCollision_LST": -1} else: return {"noCollision_LST": 1} def noReverse_LST(data): ego_df = data.ego_data if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0: return {"noReverse_LST": -1} else: return {"noReverse_LST": 1} def turnAround_LST(data): ego_df = data.ego_data if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and ( ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0): return {"turnAround_LST": 1} else: return {"turnAround_LST": -1} def laneOffset_LST(data): car_width = data.function_config['vehicle']['CAR_WIDTH'] ego_df_ini = data.ego_data min_time = ego_df_ini['simTime'].min() + 5 max_time = ego_df_ini['simTime'].max() - 5 ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)] laneoffset = ego_df['y_relative_dist'] - car_width / 2 return {"laneOffset_LST": max(laneoffset)} def maxLongitudeDist_LST(data): ego_df = data.ego_data longitude_dist = abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist()) data.longitude_dist = min(abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist())) stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist() data.stop_time = min(stop_time) if len(longitude_dist) == 0: return {"maxLongitudeDist_LST": -1} generate_function_chart_data(data, 'maxLongitudeDist_LST') return {"maxLongDist_LST": min(longitude_dist)} def noEmergencyBraking_LST(data): ego_df = data.ego_data ego_df['ip_dec'] = ego_df['v'].apply( get_interpolation, point1=[18, -5], point2=[72, -3.5]) ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply( lambda x: 1 if x < 0 else 0) if sum(ego_df['slam_brake']) == 0: return {"noEmergencyBraking_LST": 1} else: return {"noEmergencyBraking_LST": -1} def rightWarningSignal_PGVIL(data_processed) -> dict: """判断是否发出正确预警信号""" ego_df = data_processed.ego_data scenario_name = find_nested_name(data_processed.function_config["function"]) correctwarning = scenario_sign_dict[scenario_name] if correctwarning is None: print("无法获取正确的预警信号标志位!") return None # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行 warning_rows = ego_df[ (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna()) ] if warning_rows.empty: return {"rightWarningSignal_PGVIL": -1} else: return {"rightWarningSignal_PGVIL": 1} def latestWarningDistance_PGVIL(data_processed) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistance_PGVIL": 0.0} ego, obj = extract_ego_obj(warning_data) distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values ) # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL') return {"latestWarningDistance_PGVIL": float(np.min(distances))} def latestWarningDistance_TTC_PGVIL(data_processed) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"latestWarningDistance_TTC_PGVIL": 0.0} ego, obj = extract_ego_obj(warning_data) # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) obj_pos = obj[["posX", "posY"]].values obj_speed = obj[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, obj_pos) rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL') return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))} def earliestWarningDistance_PGVIL(data_processed) -> dict: """预警距离计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistance_PGVIL": 0} ego, obj = extract_ego_obj(warning_data) distances = calculate_distance_PGVIL( np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values ) # 将计算结果保存到data对象中,供图表生成使用 data_processed.warning_dist = distances data_processed.warning_time = ego['simTime'].tolist() if distances.size == 0: print("没有找到数据!") return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0 generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL') return {"earliestWarningDistance": float(np.min(distances))} def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict: """TTC计算流水线""" ego_df = data_processed.ego_data obj_df = data_processed.object_df warning_data = get_first_warning(data_processed) if warning_data is None: return {"earliestWarningDistance_TTC_PGVIL": 0.0} ego, obj = extract_ego_obj(warning_data) # 向量化计算 ego_pos = np.array([[ego["posX"], ego["posY"]]]) ego_speed = np.array([[ego["speedX"], ego["speedY"]]]) obj_pos = obj[["posX", "posY"]].values obj_speed = obj[["speedX", "speedY"]].values distances = calculate_distance_PGVIL(ego_pos, obj_pos) rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed) data_processed.warning_dist = distances data_processed.warning_speed = rel_speeds data_processed.warning_time = ego['simTime'].tolist() with np.errstate(divide="ignore", invalid="ignore"): ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf) if ttc.size == 0: print("没有找到数据!") return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0 data_processed.ttc = ttc generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL') return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))} # def delayOfEmergencyBrakeWarning(data_processed) -> dict: # #预警时机相对背景车辆减速度达到-4m/s2后的时延 # ego_df = data_processed.ego_data # obj_df = data_processed.object_df # warning_data = get_first_warning(data_processed) # if warning_data is None: # return {"delayOfEmergencyBrakeWarning": -1} # try: # ego, obj = extract_ego_obj(warning_data) # # 向量化计算 # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]]) # # 计算背景车辆减速度 # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0] # simtime_freq = 1 / simtime_gap#每秒采样频率 # # simtime_freq为一个时间窗,找出时间窗内的最大减速度 # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长 # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量 # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比 # max_deceleration = np.max(obj_deceleration) # if max_deceleration < -4: # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)] # max_deceleration_time = max_deceleration_times.iloc[0] # delay_time = ego["simTime"] - max_deceleration_time # return {"delayOfEmergencyBrakeWarning": float(delay_time)} # else: # print("没有达到预警减速度阈值:-4m/s^2") # return {"delayOfEmergencyBrakeWarning": -1} def warningDelayTime_PGVIL(data_processed) -> dict: """车端接收到预警到HMI发出预警的时延""" ego_df = data_processed.ego_data # #打印ego_df的列名 # print(ego_df.columns.tolist()) warning_data = get_first_warning(data_processed) if warning_data is None: return {"warningDelayTime_PGVIL": -1} try: ego, obj = extract_ego_obj(warning_data) # 找到event_Type不为空,且playerID为1的行 rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())] first_time = rosbag_warning_rows["simTime"].iloc[0] warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0] delay_time = warning_time - first_time return {"warningDelayTime_PGVIL": float(delay_time)} except Exception as e: print(f"计算预警时延时发生错误: {e}") return {"warningDelayTime_PGVIL": -1} def get_car_to_stop_line_distance(ego, car_point, stop_line_points): """ 计算主车后轴中心点到停止线的距离 :return 距离 """ distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"] # 计算停止线的方向向量 line_vector = np.array( [ stop_line_points[1][0] - stop_line_points[0][0], stop_line_points[1][1] - stop_line_points[0][1], ] ) direction_vector_norm = np.linalg.norm(line_vector) direction_vector_unit = ( line_vector / direction_vector_norm if direction_vector_norm != 0 else np.array([0, 0]) ) # 计算主车后轴中心点到停止线投影的坐标(垂足) projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit) perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit # 计算主车后轴中心点到垂足的距离 distance_to_foot = np.linalg.norm(car_point - perpendicular_foot) carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead return carhead_distance_to_foot def ifCrossingRedLight_PGVIL(data_processed) -> dict: # 判断车辆是否闯红灯 stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)]) X_OFFSET = 258109.4239876 Y_OFFSET = 4149969.964821 stop_line_points += np.array([[X_OFFSET, Y_OFFSET]]) ego_df = data_processed.ego_data prev_distance = float("inf") # 初始化为正无穷大 """ traffic_light_status 0x100000为绿灯,1048576 0x1000000为黄灯,16777216 0x10000000为红灯,268435456 """ red_light_violation = False for index, ego in ego_df.iterrows(): car_point = (ego["posX"], ego["posY"]) stateMask = ego["stateMask"] simTime = ego["simTime"] distance_to_stopline = get_car_to_stop_line_distance( ego, car_point, stop_line_points ) # 主车车头跨越停止线时非绿灯,返回-1,闯红灯 if prev_distance > 0 and distance_to_stopline < 0: if stateMask is not None and stateMask != 1048576: red_light_violation = True break prev_distance = distance_to_stopline if red_light_violation: return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯 else: return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯 # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict: # #在绿波车速引导期间是否发生停车 # def mindisStopline(data_processed) -> dict: # """ # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间 # """ # ego_df = data_processed.ego_data # obj_df = data_processed.object_df # stop_giveway_simtime = ego_df[ # ego_df["sign_type1"] == 32 | # ego_df["stopline_type"] == 3 # ]["simTime"] # stop_giveway_data = ego_df[ # ego_df["sign_type1"] == 32 | # ego_df["stopline_type"] == 3 # ]["simTime"] # if stop_giveway_simtime.empty: # print("没有停车让行标志/标线") # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1] # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0] # distance_to_stoplines = [] # for _,row in ego_data.iterrows(): # ego_pos = np.array([row["posX"], row["posY"], row["posH"]]) # stop_line_points = [ # [row["stopline_x1"], row["stopline_y1"]], # [row["stopline_x2"], row["stopline_y2"]], # ] # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points) # distance_to_stoplines.append(distance_to_stopline) # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead # return {"mindisStopline": mindisStopline} def limitSpeed_PGVIL(data): ego_df = data.ego_data max_speed = max(ego_df["v"]) if len(ego_df["v"]) == 0: return {"speedLimit_PGVIL": -1} else: return {"speedLimit_PGVIL": max_speed} def leastDistance_PGVIL(data): exclude_seconds = 2.0 ego_df = data.ego_data max_sim_time = ego_df["simTime"].max() threshold_time = max_sim_time - exclude_seconds for index, ego in ego_df.iterrows(): if ego["simTime"] >= threshold_time: continue if "v" in ego and ego["v"] == 0: car_point = (ego["posX"], ego["posY"]) p1 = (ego["stopline_x1"], ego["stopline_y1"]) p2 = (ego["stopline_x2"], ego["stopline_y2"]) stop_line_points = np.array([p1, p2], dtype=float) distance_to_stopline = get_car_to_stop_line_distance( ego, car_point, stop_line_points ) return {"leastDistance_PGVIL": distance_to_stopline} return {"leastDistance_PGVIL": -1} def launchTimeinStopLine_PGVIL(data): ego_df = data.ego_data in_stop = False start_time = None last_low_time = None max_duration = 0.0 v_thresh = 0.01 for _, row in ego_df.iterrows(): simt = row["simTime"] v = row.get("v", None) if v is None or np.isnan(v) or abs(v) > v_thresh: if in_stop: duration = last_low_time - start_time if duration > max_duration: max_duration = duration in_stop = False else: if not in_stop: start_time = simt in_stop = True # 更新最近一次“停车时刻”的 simTime last_low_time = simt if in_stop: duration = last_low_time - start_time if duration > max_duration: max_duration = duration if max_duration <= 0: return {"launchTimeinStopLine_PGVIL": -1} else: return {"launchTimeinStopLine_PGVIL": float(max_duration)} def launchTimeinTrafficLight_PGVIL(data): GREEN = 0x100000 RED = 0x10000000 ego_df = data.ego_data # 找到第一次红灯 to 绿灯的切换时刻 is_transition = (ego_df["stateMask"] == GREEN) & ( ego_df["stateMask"].shift(1) == RED ) transition_times = ego_df.loc[is_transition, "simTime"] if transition_times.empty: return {"timeInterval_PGVIL": -1} time_red2green = transition_times.iloc[0] car_move_times = ego_df.loc[ (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime" ] if car_move_times.empty: return {"timeInterval_PGVIL": -1} time_move = move_times.iloc[0] return {"timeInterval_PGVIL": time_move - time_red2green} def crossJunctionToTargetLane_PGVIL(data): ego_df = data.ego_data lane_ids = set(ego_df["lane_id"].dropna()) try: scenario_name = find_nested_name(data.function_config["function"]) target_lane_id = data.function_config["function"][scenario_name][ "crossJunctionToTargetLane_PGVIL" ]["max"] except KeyError: raise ValueError("请在配置文件中指定目标车道ID!") result = target_lane_id if target_lane_id in lane_ids else -1 return {"crossJunctionToTargetLane_PGVIL": result} def noStop_PGVIL(data): exclude_end_seconds = 5.0 exclude_start_seconds = 5.0 ego_df = data.ego_data max_sim_time = ego_df["simTime"].max() start_threshold = min_sim_time + exclude_start_seconds end_threshold = max_sim_time - exclude_end_seconds filtered_df = ego_df[ (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold) ] if (filtered_df["v"] == 0).any(): return {"noStop_PGVIL": -1} else: return {"noStop_PGVIL": 1} def noEmergencyBraking_PGVIL(data): ego_df = data.ego_data ego_df["ip_dec"] = ego_df["v"].apply( get_interpolation, point1=[18, -5], point2=[72, -3.5] ) ego_df["slam_brake"] = (ego_df["accleX"] - ego_df["ip_dec"]).apply( lambda x: 1 if x < 0 else 0 ) if sum(ego_df["slam_brake"]) == 0: return {"noEmergencyBraking_PGVIL": 1} else: return {"noEmergencyBraking_PGVIL": -1} def noReverse_PGVIL(data): ego_df = data.ego_data.copy() heading_x = np.cos(ego_df["posH"]) reverse_flag = (ego_df["speedX"] * heading_x) < 0 if reverse_flag.any(): return {"noReverse_PGVIL": -1} else: return {"noReverse_PGVIL": 1} def laneOffset_PGVIL(data): car_width = data.function_config["vehicle"]["CAR_WIDTH"] ego_df = data.ego_data is_zero = ego_df["v"] == 0 if not is_zero.any(): # 全程未停车 return {"laneOffset_PGVIL": None} groups = (is_zero != is_zero.shift(fill_value=False)).cumsum() last_zero_group = groups[is_zero].iloc[-1] last_stop = ego_df[groups == last_zero_group] # 距离右侧车道线 edge_dist = (last_stop["lane_width"] / 2 + last_stop["laneOffset"]) - ( car_width / 2 ) return {"laneOffset_PGVIL": edge_dist.max()} def maxLongitudelDistance_PGVIL(data): scenario_name = find_nested_name(data.function_config["function"]) stopX_pos = data.function_config["function"][scenario_name][ "maxLongitudelDistance_PGVIL" ]["max"] stopY_pos = data.function_config["function"][scenario_name][ "maxLongitudelDistance_PGVIL" ]["min"] def keepInLane_PGVIL(data): ego_df = data.ego_data.copy() ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"]) if ego_df.empty: return {"keepInLane_PGVIL": -1} threshold = ego_df["lane_width"] / 2 if (ego_df["laneOffset"].abs() > threshold).any(): return {"keepInLane_PGVIL": -1} else: return {"keepInLane_PGVIL": 1} def launchTimewhenPedestrianLeave_PGVIL(data): ego_df = data.ego_data ped_df = data.object_data.loc[ data.object_data["playerId"] == 2, ["simTime", "posX", "posY"] ] merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped")) if merged.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -1} yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度 dx = merged["posX_ped"] - merged["posX"] dy = merged["posY_ped"] - merged["posY"] merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy) merged["threshold"] = merged["lane_width"] / 2 entry = merged[merged["lateral_dist"] <= merged["threshold"]] if entry.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -1} t_entry = entry["simTime"].iloc[0] after_e = merged[merged["simTime"] > t_entry] leave = after_e[after_e["lateral_dist"] > after_e["threshold"]] if leave.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -1} t_leave = leave["simTime"].iloc[0] ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0] stops = ego_df[(ego_df["simTime"] > t_leave) & (ego_df["v"] == 0)].copy() if stops.empty: return {"launchTimewhenPedestrianLeave_PGVIL": -1} front_overPos = ego_df["dimX"] / 2 + ego_df["offX"] yaw_stop = np.deg2rad(stops["posH"]) stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos dx_s = stops["front_x"] - ped_x dy_s = stops["front_y"] - ped_y proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop) valid = stops[proj <= 0] t_stop = valid["simTime"].iloc[0] launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"] t_launch = launches.iloc[0] return {"launchTimewhenPedestrianLeave_PGVIL": t_launch - t_stop} def waitTimeAtCrosswalkwithPedestrian_PGVIL(data): ego_df = data.ego_data ped_df = data.object_data.loc[ data.object_data["playerId"] == 2, ["simTime", "posX", "posY"] ] first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"] if first_launch.empty: t0_launch = 0.0 else: t0_launch = first_launch.iloc[0] merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped")) merged["front_offset"] = merged["dimX"] / 2 + merged["offX"] yaw = np.deg2rad(merged["posH"]) merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"] merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"] dx = merged["posX_ped"] - merged["front_x"] dy = merged["posY_ped"] - merged["front_y"] valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0 stops_df = merged.loc[ (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times ].sort_values("simTime") if stops_df.empty: return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1} wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0] return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": wait_time} def noCollision_PGVIL(data): ego = data.ego_data[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]] tar = data.object_data.loc[data.object_data["playerId"] == 2, ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]] df = ego.merge(tar, on="simTime", suffixes=("", "_tar")) df["posH_tar_rad"] = np.deg2rad(df["posH_tar"]) df["posH_rad"] = np.deg2rad(df["posH"]) df["collision"] = df.apply(lambda row: funcIsCollision( row["dimX"],row["dimY"],row["offX"],row["offY"],row["posX"],row["posY"], row["posH_rad"],row["dimX_tar"],row["dimY_tar"],row["offX_tar"],row["offY_tar"], row["posX_tar"],row["posY_tar"],row["posH_tar_rad"],),axis=1,) if df["collision"].any(): return {"noCollision_PGVIL": -1} else: return {"noCollision_PGVIL": 1} def leastLateralDistance_PGVIL(data): ego_df = data.ego_data cones = data.object_data.loc[data.object_data['playerId'] != 1, ['simTime','playerId','posX','posY']].copy() df = ego.merge(cones, on='simTime', how='inner', suffixes=('','_cone')) yaw = np.deg2rad(df['posH']) x_c = df['posX'] + df['offX'] * np.cos(yaw) y_c = df['posY'] + df['offX'] * np.sin(yaw) dx = df['posX'] - x_c dy = df['posY'] - y_c dx = df['posX_cone'] - x_c dy = df['posY_cone'] - y_c local_x = np.cos(yaw) * dx + np.sin(yaw) * dy local_y = -np.sin(yaw) * dx + np.cos(yaw) * dy half_length = df['dimX'] / 2 half_width = df['dimY'] / 2 inside = (np.abs(local_x) <= half_length) & (np.abs(local_y) <= half_width) collisions = df.loc[inside, ['simTime','playerId']] if collisions.empty: return {"noConeRectCollision_PGVIL": 1} else: collision_times = collisions['simTime'].unique().tolist() collision_ids = collisions['playerId'].unique().tolist() return {"noConeRectCollision_PGVIL": -1} class FunctionRegistry: """动态函数注册器(支持参数验证)""" def __init__(self, data_processed): self.logger = LogManager().get_logger() # 获取全局日志实例 self.data = data_processed # 检查function_config是否为空 if not hasattr(data_processed, 'function_config') or not data_processed.function_config: self.logger.warning("功能配置为空,跳过功能指标计算") self.fun_config = {} self.level_3_merics = [] self._registry = {} return self.fun_config = data_processed.function_config.get("function", {}) self.level_3_merics = self._extract_level_3_metrics(self.fun_config) self._registry: Dict[str, Callable] = {} self._registry = self._build_registry() def _extract_level_3_metrics(self, config_node: dict) -> list: """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)""" metrics = [] def _recurse(node): if isinstance(node, dict): if "name" in node and not any( isinstance(v, dict) for v in node.values() ): metrics.append(node["name"]) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f"评比的功能指标列表:{metrics}") return metrics def _build_registry(self) -> dict: """自动注册指标函数(防御性编程)""" registry = {} for func_name in self.level_3_merics: try: registry[func_name] = globals()[func_name] except KeyError: print(f"未实现指标函数: {func_name}") self.logger.error(f"未实现指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算(带熔断机制)""" results = {} # 如果配置为空或没有注册的指标,直接返回空结果 if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry: self.logger.info("功能配置为空或无注册指标,返回空结果") return results for name, func in self._registry.items(): try: result = func(self.data) # 统一传递数据上下文 results.update(result) except Exception as e: print(f"{name} 执行失败: {str(e)}") self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f"功能指标计算结果:{results}") return results class FunctionManager: """管理功能指标计算的类""" def __init__(self, data_processed): self.data = data_processed self.logger = LogManager().get_logger() # 检查function_config是否为空 if not hasattr(data_processed, 'function_config') or not data_processed.function_config: self.logger.warning("功能配置为空,跳过功能指标计算初始化") self.function = None else: self.function = FunctionRegistry(self.data) def report_statistic(self): """ 计算并报告功能指标结果。 :return: 评估结果 """ # 如果function为None,直接返回空字典 if self.function is None: self.logger.info("功能指标管理器未初始化,返回空结果") return {} function_result = self.function.batch_execute() print("\n[功能性表现及评价结果]") return function_result # self.logger.info(f'Function Result: {function_result}') # 使用示例 if __name__ == "__main__": pass # print("\n[功能类表现及得分情况]")