function.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2025 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2025/01/5
  11. @Last Modified: 2025/01/5
  12. @Summary: Function Metrics Calculation
  13. """
  14. import sys
  15. from pathlib import Path
  16. # 添加项目根目录到系统路径
  17. root_path = Path(__file__).resolve().parent.parent
  18. sys.path.append(str(root_path))
  19. from modules.lib.score import Score
  20. from modules.lib.log_manager import LogManager
  21. import numpy as np
  22. from typing import Dict, Tuple, Optional, Callable, Any
  23. import pandas as pd
  24. import yaml
  25. # ----------------------
  26. # 基础工具函数 (Pure functions)
  27. # ----------------------
  28. scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208, "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
  29. "ForwardCollision": 212}
  30. def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
  31. """向量化距离计算"""
  32. return np.linalg.norm(ego_pos - obj_pos, axis=1)
  33. def calculate_relative_speed_PGVIL(
  34. ego_speed: np.ndarray, obj_speed: np.ndarray
  35. ) -> np.ndarray:
  36. """向量化相对速度计算"""
  37. return np.linalg.norm(ego_speed - obj_speed, axis=1)
  38. def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  39. """向量化距离计算"""
  40. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  41. return dist
  42. def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  43. """向量化相对速度计算"""
  44. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  45. def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
  46. """数据提取函数"""
  47. ego = data[data["playerId"] == 1].iloc[0]
  48. obj = data[data["playerId"] != 1]
  49. return ego, obj
  50. def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
  51. """带缓存的预警数据获取"""
  52. ego_df = data_processed.ego_data
  53. obj_df = data_processed.object_df
  54. scenario_name = data_processed.function_config["function"]["scenario"]["name"]
  55. correctwarning = scenario_sign_dict.get(scenario_name)
  56. if correctwarning is None:
  57. print("无法获取正确的预警信号标志位!")
  58. return None
  59. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  60. warning_times = warning_rows['simTime']
  61. if warning_times.empty:
  62. print("没有找到预警数据!")
  63. return None
  64. first_time = warning_times.iloc[0]
  65. return obj_df[obj_df['simTime'] == first_time]
  66. # ----------------------
  67. # 核心计算功能函数
  68. # ----------------------
  69. def latestWarningDistance_LST(data) -> dict:
  70. """预警距离计算流水线"""
  71. scenario_name = data.function_config["function"]["scenario"]["name"]
  72. correctwarning = scenario_sign_dict[scenario_name]
  73. ego_df = data.ego_data
  74. warning_dist = calculate_distance(ego_df, correctwarning)
  75. if warning_dist.empty:
  76. return {"latestWarningDistance_LST": 0.0}
  77. return {"latestWarningDistance_LST": float(warning_dist.iloc[-1])}
  78. def earliestWarningDistance_LST(data) -> dict:
  79. """预警距离计算流水线"""
  80. scenario_name = data.function_config["function"]["scenario"]["name"]
  81. correctwarning = scenario_sign_dict[scenario_name]
  82. ego_df = data.ego_data
  83. warning_dist = calculate_distance(ego_df, correctwarning)
  84. if warning_dist.empty:
  85. return {"earliestWarningDistance_LST": 0.0}
  86. return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else np.inf}
  87. def latestWarningDistance_TTC_LST(data) -> dict:
  88. """TTC计算流水线"""
  89. scenario_name = data.function_config["function"]["scenario"]["name"]
  90. correctwarning = scenario_sign_dict[scenario_name]
  91. ego_df = data.ego_data
  92. warning_dist = calculate_distance(ego_df, correctwarning)
  93. if warning_dist.empty:
  94. return {"latestWarningDistance_TTC_LST": 0.0}
  95. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  96. with np.errstate(divide='ignore', invalid='ignore'):
  97. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  98. return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else np.inf}
  99. def earliestWarningDistance_TTC_LST(data) -> dict:
  100. """TTC计算流水线"""
  101. scenario_name = data.function_config["function"]["scenario"]["name"]
  102. correctwarning = scenario_sign_dict[scenario_name]
  103. ego_df = data.ego_data
  104. warning_dist = calculate_distance(ego_df, correctwarning)
  105. if warning_dist.empty:
  106. return {"earliestWarningDistance_TTC_LST": 0.0}
  107. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  108. with np.errstate(divide='ignore', invalid='ignore'):
  109. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  110. return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else np.inf}
  111. def warningDelayTime_LST(data):
  112. scenario_name = data.function_config["function"]["scenario"]["name"]
  113. correctwarning = scenario_sign_dict[scenario_name]
  114. ego_df = data.ego_data
  115. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  116. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  117. rosbag_warning_rows = ego_df[(ego_df['event_info.eventSource'].notna())]['simTime'].tolist()
  118. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  119. if (simTime_HMI is None) and (simTime_rosbag is None):
  120. print("没有发出预警!")
  121. delay_time = 100.0
  122. elif (simTime_HMI is not None) and (simTime_rosbag is not None):
  123. delay_time = abs(simTime_HMI - simTime_rosbag)
  124. elif (simTime_HMI is not None) and (simTime_rosbag is None):
  125. print("没有发出预警!")
  126. delay_time = None
  127. else:
  128. delay_time = simTime_rosbag
  129. return {"warningDelayTime_LST": delay_time}
  130. def warningDelayTimeOf4_LST(data):
  131. scenario_name = data.function_config["function"]["scenario"]["name"]
  132. correctwarning = scenario_sign_dict[scenario_name]
  133. ego_df = data.ego_data
  134. ego_speed_simtime = ego_df[ego_df['accel'] <= -4]['simTime'].tolist() # 单位m/s^2
  135. warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
  136. if (len(warning_simTime) == 0) and (len(ego_speed_simtime) == 0):
  137. return {"warningDelayTimeOf4_LST": 0}
  138. elif (len(warning_simTime) == 0) and (len(ego_speed_simtime) > 0):
  139. return {"warningDelayTimeOf4_LST": ego_speed_simtime[0]}
  140. elif (len(warning_simTime) > 0) and (len(ego_speed_simtime) == 0):
  141. return {"warningDelayTimeOf4_LST": None}
  142. else:
  143. return {"warningDelayTimeOf4_LST": warning_simTime[0] - ego_speed_simtime[0]}
  144. def rightWarningSignal_LST(data):
  145. scenario_name = data.function_config["function"]["scenario"]["name"]
  146. correctwarning = scenario_sign_dict[scenario_name]
  147. ego_df = data.ego_data
  148. if correctwarning.empty:
  149. print("无法获取正确预警信号标志位!")
  150. return
  151. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  152. if warning_rows.empty:
  153. return {"rightWarningSignal_LST": -1}
  154. else:
  155. return {"rightWarningSignal_LST": 1}
  156. def ifCrossingRedLight_LST(data):
  157. scenario_name = data.function_config["function"]["scenario"]["name"]
  158. correctwarning = scenario_sign_dict[scenario_name]
  159. ego_df = data.ego_data
  160. redlight_simtime = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (ego_df['v'] != 0)]['simTime']
  161. if redlight_simtime.empty:
  162. return {"ifCrossingRedLight_LST": 0}
  163. else:
  164. return {"ifCrossingRedLight_LST": 1}
  165. def ifStopgreenWaveSpeedGuidance_LST(data):
  166. scenario_name = data.function_config["function"]["scenario"]["name"]
  167. correctwarning = scenario_sign_dict[scenario_name]
  168. ego_df = data.ego_data
  169. greenlight_simtime = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
  170. if greenlight_simtime.empty:
  171. return {"ifStopgreenWaveSpeedGuidance_LST": 0}
  172. else:
  173. return {"ifStopgreenWaveSpeedGuidance_LST": 1}
  174. def rightWarningSignal_PGVIL(data_processed) -> dict:
  175. """判断是否发出正确预警信号"""
  176. ego_df = data_processed.ego_data
  177. scenario_name = data_processed.function_config["function"]["scenario"]["name"]
  178. correctwarning = scenario_sign_dict[scenario_name]
  179. if correctwarning is None:
  180. print("无法获取正确的预警信号标志位!")
  181. return None
  182. # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
  183. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  184. if warning_rows.empty:
  185. return {"rightWarningSignal_PGVIL": -1}
  186. else:
  187. return {"rightWarningSignal_PGVIL": 1}
  188. def latestWarningDistance_PGVIL(data_processed) -> dict:
  189. """预警距离计算流水线"""
  190. ego_df = data_processed.ego_data
  191. obj_df = data_processed.object_df
  192. warning_data = get_first_warning(data_processed)
  193. if warning_data is None:
  194. return {"latestWarningDistance_PGVIL": 0.0}
  195. ego, obj = extract_ego_obj(warning_data)
  196. distances = calculate_distance_PGVIL(
  197. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  198. )
  199. if distances.size == 0:
  200. print("没有找到数据!")
  201. return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  202. return {"latestWarningDistance_PGVIL": float(np.min(distances))}
  203. def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  204. """TTC计算流水线"""
  205. ego_df = data_processed.ego_data
  206. obj_df = data_processed.object_df
  207. warning_data = get_first_warning(data_processed)
  208. if warning_data is None:
  209. return {"latestWarningDistance_TTC_PGVIL": 0.0}
  210. ego, obj = extract_ego_obj(warning_data)
  211. # 向量化计算
  212. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  213. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  214. obj_pos = obj[["posX", "posY"]].values
  215. obj_speed = obj[["speedX", "speedY"]].values
  216. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  217. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  218. with np.errstate(divide="ignore", invalid="ignore"):
  219. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  220. if ttc.size == 0:
  221. print("没有找到数据!")
  222. return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  223. return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  224. def earliestWarningDistance_PGVIL(data_processed) -> dict:
  225. """预警距离计算流水线"""
  226. ego_df = data_processed.ego_data
  227. obj_df = data_processed.object_df
  228. warning_data = get_first_warning(data_processed)
  229. if warning_data is None:
  230. return {"earliestWarningDistance_PGVIL": 0}
  231. ego, obj = extract_ego_obj(warning_data)
  232. distances = calculate_distance_PGVIL(
  233. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  234. )
  235. if distances.size == 0:
  236. print("没有找到数据!")
  237. return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  238. return {"earliestWarningDistance": float(np.min(distances))}
  239. def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  240. """TTC计算流水线"""
  241. ego_df = data_processed.ego_data
  242. obj_df = data_processed.object_df
  243. warning_data = get_first_warning(data_processed)
  244. if warning_data is None:
  245. return {"earliestWarningDistance_TTC_PGVIL": 0.0}
  246. ego, obj = extract_ego_obj(warning_data)
  247. # 向量化计算
  248. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  249. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  250. obj_pos = obj[["posX", "posY"]].values
  251. obj_speed = obj[["speedX", "speedY"]].values
  252. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  253. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  254. with np.errstate(divide="ignore", invalid="ignore"):
  255. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  256. if ttc.size == 0:
  257. print("没有找到数据!")
  258. return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  259. return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  260. # def delayOfEmergencyBrakeWarning(data_processed) -> dict:
  261. # #预警时机相对背景车辆减速度达到-4m/s2后的时延
  262. # ego_df = data_processed.ego_data
  263. # obj_df = data_processed.object_df
  264. # warning_data = get_first_warning(data_processed)
  265. # if warning_data is None:
  266. # return {"delayOfEmergencyBrakeWarning": -1}
  267. # try:
  268. # ego, obj = extract_ego_obj(warning_data)
  269. # # 向量化计算
  270. # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]])
  271. # # 计算背景车辆减速度
  272. # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0]
  273. # simtime_freq = 1 / simtime_gap#每秒采样频率
  274. # # simtime_freq为一个时间窗,找出时间窗内的最大减速度
  275. # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长
  276. # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量
  277. # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
  278. # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比
  279. # max_deceleration = np.max(obj_deceleration)
  280. # if max_deceleration < -4:
  281. # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)]
  282. # max_deceleration_time = max_deceleration_times.iloc[0]
  283. # delay_time = ego["simTime"] - max_deceleration_time
  284. # return {"delayOfEmergencyBrakeWarning": float(delay_time)}
  285. # else:
  286. # print("没有达到预警减速度阈值:-4m/s^2")
  287. # return {"delayOfEmergencyBrakeWarning": -1}
  288. def warningDelayTime_PGVIL(data_processed) -> dict:
  289. """车端接收到预警到HMI发出预警的时延"""
  290. ego_df = data_processed.ego_data
  291. warning_data = get_first_warning(data_processed)
  292. if warning_data is None:
  293. return {"warningDelayTime_PGVIL": -1}
  294. try:
  295. ego, obj = extract_ego_obj(warning_data)
  296. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna())]
  297. first_time = rosbag_warning_rows["simTime"].iloc[0]
  298. warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
  299. delay_time = warning_time - first_time
  300. return {"warningDelayTime_PGVIL": float(delay_time)}
  301. except Exception as e:
  302. print(f"计算预警时延时发生错误: {e}")
  303. return {"warningDelayTime_PGVIL": -1}
  304. def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
  305. """
  306. 计算主车后轴中心点到停止线的距离
  307. :return 距离
  308. """
  309. distance_carpoint_carhead = ego['dimX']/2 + ego['offX']
  310. # 计算停止线的方向向量
  311. line_vector = np.array(
  312. [
  313. stop_line_points[1][0] - stop_line_points[0][0],
  314. stop_line_points[1][1] - stop_line_points[0][1],
  315. ]
  316. )
  317. direction_vector_norm = np.linalg.norm(line_vector)
  318. direction_vector_unit = line_vector / direction_vector_norm if direction_vector_norm != 0 else np.array([0, 0])
  319. # 计算主车后轴中心点到停止线投影的坐标(垂足)
  320. projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
  321. perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
  322. # 计算主车后轴中心点到垂足的距离
  323. distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
  324. carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
  325. return carhead_distance_to_foot
  326. def ifCrossingRedLight_PGVIL(data_processed) -> dict:
  327. #判断车辆是否闯红灯
  328. stop_line_points = np.array([(276.555,-35.575),(279.751,-33.683)])
  329. X_OFFSET = 258109.4239876
  330. Y_OFFSET = 4149969.964821
  331. stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
  332. ego_df = data_processed.ego_data
  333. prev_distance = float('inf') # 初始化为正无穷大
  334. """
  335. traffic_light_status
  336. 0x100000为绿灯,1048576
  337. 0x1000000为黄灯,16777216
  338. 0x10000000为红灯,268435456
  339. """
  340. red_light_violation = False
  341. for index ,ego in ego_df.iterrows():
  342. car_point = (ego["posX"], ego["posY"])
  343. stateMask = ego["stateMask"]
  344. simTime = ego["simTime"]
  345. distance_to_stopline = get_car_to_stop_line_distance(ego, car_point, stop_line_points)
  346. #主车车头跨越停止线时非绿灯,返回-1,闯红灯
  347. if prev_distance > 0 and distance_to_stopline < 0:
  348. if stateMask != 1048576:
  349. red_light_violation = True
  350. break
  351. prev_distance = distance_to_stopline
  352. if red_light_violation:
  353. return {"ifCrossingRedLight_PGVIL": -1}#闯红灯
  354. else:
  355. return {"ifCrossingRedLight_PGVIL": 1}#没有闯红灯
  356. # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
  357. # #在绿波车速引导期间是否发生停车
  358. # def mindisStopline(data_processed) -> dict:
  359. # """
  360. # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间
  361. # """
  362. # ego_df = data_processed.ego_data
  363. # obj_df = data_processed.object_df
  364. # stop_giveway_simtime = ego_df[
  365. # ego_df["sign_type1"] == 32 |
  366. # ego_df["stopline_type"] == 3
  367. # ]["simTime"]
  368. # stop_giveway_data = ego_df[
  369. # ego_df["sign_type1"] == 32 |
  370. # ego_df["stopline_type"] == 3
  371. # ]["simTime"]
  372. # if stop_giveway_simtime.empty:
  373. # print("没有停车让行标志/标线")
  374. # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
  375. # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
  376. # distance_to_stoplines = []
  377. # for _,row in ego_data.iterrows():
  378. # ego_pos = np.array([row["posX"], row["posY"], row["posH"]])
  379. # stop_line_points = [
  380. # [row["stopline_x1"], row["stopline_y1"]],
  381. # [row["stopline_x2"], row["stopline_y2"]],
  382. # ]
  383. # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
  384. # distance_to_stoplines.append(distance_to_stopline)
  385. # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
  386. # return {"mindisStopline": mindisStopline}
  387. class FunctionRegistry:
  388. """动态函数注册器(支持参数验证)"""
  389. def __init__(self, data_processed):
  390. self.logger = LogManager().get_logger() # 获取全局日志实例
  391. self.data = data_processed
  392. self.fun_config = data_processed.function_config["function"]
  393. self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
  394. self._registry: Dict[str, Callable] = {}
  395. self._registry = self._build_registry()
  396. def _extract_level_3_metrics(self, config_node: dict) -> list:
  397. """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
  398. metrics = []
  399. def _recurse(node):
  400. if isinstance(node, dict):
  401. if "name" in node and not any(
  402. isinstance(v, dict) for v in node.values()
  403. ):
  404. metrics.append(node["name"])
  405. for v in node.values():
  406. _recurse(v)
  407. _recurse(config_node)
  408. self.logger.info(f"评比的功能指标列表:{metrics}")
  409. return metrics
  410. def _build_registry(self) -> dict:
  411. """自动注册指标函数(防御性编程)"""
  412. registry = {}
  413. for func_name in self.level_3_merics:
  414. try:
  415. registry[func_name] = globals()[func_name]
  416. except KeyError:
  417. print(f"未实现指标函数: {func_name}")
  418. self.logger.error(f"未实现指标函数: {func_name}")
  419. return registry
  420. def batch_execute(self) -> dict:
  421. """批量执行指标计算(带熔断机制)"""
  422. results = {}
  423. for name, func in self._registry.items():
  424. try:
  425. result = func(self.data) # 统一传递数据上下文
  426. results.update(result)
  427. except Exception as e:
  428. print(f"{name} 执行失败: {str(e)}")
  429. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  430. results[name] = None
  431. self.logger.info(f"功能指标计算结果:{results}")
  432. return results
  433. class FunctionManager:
  434. """管理功能指标计算的类"""
  435. def __init__(self, data_processed):
  436. self.data = data_processed
  437. self.function = FunctionRegistry(self.data)
  438. def report_statistic(self):
  439. """
  440. 计算并报告功能指标结果。
  441. :return: 评估结果
  442. """
  443. function_result = self.function.batch_execute()
  444. evaluator = Score(self.data.function_config)
  445. result = evaluator.evaluate(function_result)
  446. print("\n[功能性表现及评价结果]")
  447. return result
  448. # self.logger.info(f'Function Result: {function_result}')
  449. # 使用示例
  450. if __name__ == "__main__":
  451. pass
  452. # print("\n[功能类表现及得分情况]")