123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2025 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2025/01/5
- @Last Modified: 2025/01/5
- @Summary: Function Metrics Calculation
- """
- import sys
- from pathlib import Path
- # 添加项目根目录到系统路径
- root_path = Path(__file__).resolve().parent.parent
- sys.path.append(str(root_path))
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- import numpy as np
- from typing import Dict, Tuple, Optional, Callable, Any
- import pandas as pd
- import yaml
- # ----------------------
- # 基础工具函数 (Pure functions)
- # ----------------------
- scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208,
- "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
- "ForwardCollision": 212}
- def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
- """向量化距离计算"""
- return np.linalg.norm(ego_pos - obj_pos, axis=1)
- def calculate_relative_speed_PGVIL(
- ego_speed: np.ndarray, obj_speed: np.ndarray
- ) -> np.ndarray:
- """向量化相对速度计算"""
- return np.linalg.norm(ego_speed - obj_speed, axis=1)
- def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
- """向量化距离计算"""
- dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
- return dist
- def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
- """向量化相对速度计算"""
- return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
- def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
- """数据提取函数"""
- ego = data[data["playerId"] == 1].iloc[0]
- obj = data[data["playerId"] != 1]
- return ego, obj
- def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
- """带缓存的预警数据获取"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- scenario_name = data_processed.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict.get(scenario_name)
- if correctwarning is None:
- print("无法获取正确的预警信号标志位!")
- return None
- warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
- warning_times = warning_rows['simTime']
- if warning_times.empty:
- print("没有找到预警数据!")
- return None
- first_time = warning_times.iloc[0]
- return obj_df[obj_df['simTime'] == first_time]
- # ----------------------
- # 核心计算功能函数
- # ----------------------
- def latestWarningDistance_LST(data) -> dict:
- """预警距离计算流水线"""
- scenario_name = data.function_config["function"]["scenario"]["name"]
- value = data.function_config["function"]["scenario"]["latestWarningDistance_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if warning_dist.empty:
- return {"latestWarningDistance_LST": 0.0}
- return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value}
- def earliestWarningDistance_LST(data) -> dict:
- """预警距离计算流水线"""
- scenario_name = data.function_config["function"]["scenario"]["name"]
- value = data.function_config["function"]["scenario"]["earliestWarningDistance_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if warning_dist.empty:
- return {"earliestWarningDistance_LST": 0.0}
- return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value}
- def latestWarningDistance_TTC_LST(data) -> dict:
- """TTC计算流水线"""
- scenario_name = data.function_config["function"]["scenario"]["name"]
- value = data.function_config["function"]["scenario"]["latestWarningDistance_TTC_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if warning_dist.empty:
- return {"latestWarningDistance_TTC_LST": 0.0}
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- with np.errstate(divide='ignore', invalid='ignore'):
- ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
-
- # 处理无效的TTC值
- for i in range(len(ttc)):
- ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
-
- # 生成图表数据
- from modules.lib.chart_generator import generate_function_chart_data
- generate_function_chart_data(data, 'latestWarningDistance_TTC_LST')
-
- return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
- def earliestWarningDistance_TTC_LST(data) -> dict:
- """TTC计算流水线"""
- scenario_name = data.function_config["function"]["scenario"]["name"]
- value = data.function_config["function"]["scenario"]["earliestWarningDistance_TTC_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if warning_dist.empty:
- return {"earliestWarningDistance_TTC_LST": 0.0}
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- with np.errstate(divide='ignore', invalid='ignore'):
- ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
-
- # 处理无效的TTC值
- for i in range(len(ttc)):
- ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
-
- return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
- def warningDelayTime_LST(data):
- scenario_name = data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
- simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
- rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
- 'simTime'].tolist()
- simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
- if (simTime_HMI is None) or (simTime_rosbag is None):
- print("预警出错!")
- delay_time = 100.0
- else:
- delay_time = abs(simTime_HMI - simTime_rosbag)
- return {"warningDelayTime_LST": delay_time}
- def warningDelayTimeofReachDecel_LST(data):
- scenario_name = data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- ego_speed_simtime = ego_df[ego_df['accel'] <= -4]['simTime'].tolist() # 单位m/s^2
- warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
- if (len(warning_simTime) == 0) and (len(ego_speed_simtime) == 0):
- return {"warningDelayTimeofReachDecel_LST": 0}
- elif (len(warning_simTime) == 0) and (len(ego_speed_simtime) > 0):
- return {"warningDelayTimeofReachDecel_LST": ego_speed_simtime[0]}
- elif (len(warning_simTime) > 0) and (len(ego_speed_simtime) == 0):
- return {"warningDelayTimeofReachDecel_LST": None}
- else:
- return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - ego_speed_simtime[0]}
- def rightWarningSignal_LST(data):
- scenario_name = data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- if ego_df['ifwarning'].empty:
- print("无法获取正确预警信号标志位!")
- return
- warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
- if warning_rows.empty:
- return {"rightWarningSignal_LST": -1}
- else:
- return {"rightWarningSignal_LST": 1}
- def ifCrossingRedLight_LST(data):
- scenario_name = data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- redlight_simtime = ego_df[
- (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (
- ego_df['v'] != 0)]['simTime']
- if redlight_simtime.empty:
- return {"ifCrossingRedLight_LST": -1}
- else:
- return {"ifCrossingRedLight_LST": 1}
- def ifStopgreenWaveSpeedGuidance_LST(data):
- scenario_name = data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- greenlight_simtime = \
- ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
- if greenlight_simtime.empty:
- return {"ifStopgreenWaveSpeedGuidance_LST": -1}
- else:
- return {"ifStopgreenWaveSpeedGuidance_LST": 1}
- def rightWarningSignal_PGVIL(data_processed) -> dict:
- """判断是否发出正确预警信号"""
- ego_df = data_processed.ego_data
- scenario_name = data_processed.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- if correctwarning is None:
- print("无法获取正确的预警信号标志位!")
- return None
- # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
- warning_rows = ego_df[
- (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
- ]
- if warning_rows.empty:
- return {"rightWarningSignal_PGVIL": -1}
- else:
- return {"rightWarningSignal_PGVIL": 1}
- def latestWarningDistance_PGVIL(data_processed) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistance_PGVIL": 0.0}
- ego, obj = extract_ego_obj(warning_data)
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
- )
- if distances.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
- return {"latestWarningDistance_PGVIL": float(np.min(distances))}
- def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistance_TTC_PGVIL": 0.0}
- ego, obj = extract_ego_obj(warning_data)
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- obj_pos = obj[["posX", "posY"]].values
- obj_speed = obj[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, obj_pos)
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
- return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
- def earliestWarningDistance_PGVIL(data_processed) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistance_PGVIL": 0}
- ego, obj = extract_ego_obj(warning_data)
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
- )
- if distances.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
- return {"earliestWarningDistance": float(np.min(distances))}
- def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistance_TTC_PGVIL": 0.0}
- ego, obj = extract_ego_obj(warning_data)
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- obj_pos = obj[["posX", "posY"]].values
- obj_speed = obj[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, obj_pos)
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
- return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
- # def delayOfEmergencyBrakeWarning(data_processed) -> dict:
- # #预警时机相对背景车辆减速度达到-4m/s2后的时延
- # ego_df = data_processed.ego_data
- # obj_df = data_processed.object_df
- # warning_data = get_first_warning(data_processed)
- # if warning_data is None:
- # return {"delayOfEmergencyBrakeWarning": -1}
- # try:
- # ego, obj = extract_ego_obj(warning_data)
- # # 向量化计算
- # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]])
- # # 计算背景车辆减速度
- # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0]
- # simtime_freq = 1 / simtime_gap#每秒采样频率
- # # simtime_freq为一个时间窗,找出时间窗内的最大减速度
- # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长
- # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量
- # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
- # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比
- # max_deceleration = np.max(obj_deceleration)
- # if max_deceleration < -4:
- # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)]
- # max_deceleration_time = max_deceleration_times.iloc[0]
- # delay_time = ego["simTime"] - max_deceleration_time
- # return {"delayOfEmergencyBrakeWarning": float(delay_time)}
- # else:
- # print("没有达到预警减速度阈值:-4m/s^2")
- # return {"delayOfEmergencyBrakeWarning": -1}
- def warningDelayTime_PGVIL(data_processed) -> dict:
- """车端接收到预警到HMI发出预警的时延"""
- ego_df = data_processed.ego_data
- # #打印ego_df的列名
- # print(ego_df.columns.tolist())
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"warningDelayTime_PGVIL": -1}
- try:
- ego, obj = extract_ego_obj(warning_data)
- # 找到event_Type不为空,且playerID为1的行
- rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
- first_time = rosbag_warning_rows["simTime"].iloc[0]
- warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
- delay_time = warning_time - first_time
- return {"warningDelayTime_PGVIL": float(delay_time)}
- except Exception as e:
- print(f"计算预警时延时发生错误: {e}")
- return {"warningDelayTime_PGVIL": -1}
- def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
- """
- 计算主车后轴中心点到停止线的距离
- :return 距离
- """
- distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
- # 计算停止线的方向向量
- line_vector = np.array(
- [
- stop_line_points[1][0] - stop_line_points[0][0],
- stop_line_points[1][1] - stop_line_points[0][1],
- ]
- )
- direction_vector_norm = np.linalg.norm(line_vector)
- direction_vector_unit = (
- line_vector / direction_vector_norm
- if direction_vector_norm != 0
- else np.array([0, 0])
- )
- # 计算主车后轴中心点到停止线投影的坐标(垂足)
- projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
- perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
- # 计算主车后轴中心点到垂足的距离
- distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
- carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
- return carhead_distance_to_foot
- def ifCrossingRedLight_PGVIL(data_processed) -> dict:
- # 判断车辆是否闯红灯
- stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
- X_OFFSET = 258109.4239876
- Y_OFFSET = 4149969.964821
- stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
- ego_df = data_processed.ego_data
- prev_distance = float("inf") # 初始化为正无穷大
- """
- traffic_light_status
- 0x100000为绿灯,1048576
- 0x1000000为黄灯,16777216
- 0x10000000为红灯,268435456
- """
- red_light_violation = False
- for index, ego in ego_df.iterrows():
- car_point = (ego["posX"], ego["posY"])
- stateMask = ego["stateMask"]
- simTime = ego["simTime"]
- distance_to_stopline = get_car_to_stop_line_distance(
- ego, car_point, stop_line_points
- )
- # 主车车头跨越停止线时非绿灯,返回-1,闯红灯
- if prev_distance > 0 and distance_to_stopline < 0:
- if stateMask is not None and stateMask != 1048576:
- red_light_violation = True
- break
- prev_distance = distance_to_stopline
- if red_light_violation:
- return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯
- else:
- return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯
- # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
- # #在绿波车速引导期间是否发生停车
- # def mindisStopline(data_processed) -> dict:
- # """
- # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间
- # """
- # ego_df = data_processed.ego_data
- # obj_df = data_processed.object_df
- # stop_giveway_simtime = ego_df[
- # ego_df["sign_type1"] == 32 |
- # ego_df["stopline_type"] == 3
- # ]["simTime"]
- # stop_giveway_data = ego_df[
- # ego_df["sign_type1"] == 32 |
- # ego_df["stopline_type"] == 3
- # ]["simTime"]
- # if stop_giveway_simtime.empty:
- # print("没有停车让行标志/标线")
- # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
- # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
- # distance_to_stoplines = []
- # for _,row in ego_data.iterrows():
- # ego_pos = np.array([row["posX"], row["posY"], row["posH"]])
- # stop_line_points = [
- # [row["stopline_x1"], row["stopline_y1"]],
- # [row["stopline_x2"], row["stopline_y2"]],
- # ]
- # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
- # distance_to_stoplines.append(distance_to_stopline)
- # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
- # return {"mindisStopline": mindisStopline}
- class FunctionRegistry:
- """动态函数注册器(支持参数验证)"""
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger() # 获取全局日志实例
- self.data = data_processed
- self.fun_config = data_processed.function_config["function"]
- self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
- self._registry: Dict[str, Callable] = {}
- self._registry = self._build_registry()
- def _extract_level_3_metrics(self, config_node: dict) -> list:
- """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if "name" in node and not any(
- isinstance(v, dict) for v in node.values()
- ):
- metrics.append(node["name"])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f"评比的功能指标列表:{metrics}")
- return metrics
- def _build_registry(self) -> dict:
- """自动注册指标函数(防御性编程)"""
- registry = {}
- for func_name in self.level_3_merics:
- try:
- registry[func_name] = globals()[func_name]
- except KeyError:
- print(f"未实现指标函数: {func_name}")
- self.logger.error(f"未实现指标函数: {func_name}")
- return registry
- def batch_execute(self) -> dict:
- """批量执行指标计算(带熔断机制)"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data) # 统一传递数据上下文
- results.update(result)
- except Exception as e:
- print(f"{name} 执行失败: {str(e)}")
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f"功能指标计算结果:{results}")
- return results
- class FunctionManager:
- """管理功能指标计算的类"""
- def __init__(self, data_processed):
- self.data = data_processed
- self.function = FunctionRegistry(self.data)
- def report_statistic(self):
- """
- 计算并报告功能指标结果。
- :return: 评估结果
- """
- function_result = self.function.batch_execute()
- print("\n[功能性表现及评价结果]")
- return function_result
- # self.logger.info(f'Function Result: {function_result}')
- # 使用示例
- if __name__ == "__main__":
- pass
- # print("\n[功能类表现及得分情况]")
|