function.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2025 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2025/01/5
  11. @Last Modified: 2025/01/5
  12. @Summary: Function Metrics Calculation
  13. """
  14. import sys
  15. from pathlib import Path
  16. # 添加项目根目录到系统路径
  17. root_path = Path(__file__).resolve().parent.parent
  18. sys.path.append(str(root_path))
  19. print(root_path)
  20. from modules.lib.score import Score
  21. from modules.lib.log_manager import LogManager
  22. import numpy as np
  23. from typing import Dict, Tuple, Optional, Callable, Any
  24. import pandas as pd
  25. import yaml
  26. from modules.lib.chart_generator import generate_function_chart_data
  27. from shapely.geometry import Point, Polygon
  28. from modules.lib.common import get_interpolation
  29. # ----------------------
  30. # 基础工具函数 (Pure functions)
  31. # ----------------------
  32. scenario_sign_dict = {"LeftTurnAssist": 206, "HazardousLocationW": 207, "RedLightViolationW": 208,
  33. "CoorperativeIntersectionPassing": 225, "GreenLightOptimalSpeedAdvisory": 234,
  34. "ForwardCollision": 212}
  35. def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
  36. polygon = Polygon(polygon)
  37. point = Point(test_point)
  38. return polygon.contains(point)
  39. def _is_segment_by_interval(time_list, expected_step) -> list:
  40. """
  41. 根据时间戳之间的间隔进行分段。
  42. 参数:
  43. time_list (list): 时间戳列表。
  44. expected_step (float): 预期的固定步长。
  45. 返回:
  46. list: 分段后的时间戳列表,每个元素是一个子列表。
  47. """
  48. if not time_list:
  49. return []
  50. segments = []
  51. current_segment = [time_list[0]]
  52. for i in range(1, len(time_list)):
  53. actual_step = time_list[i] - time_list[i - 1]
  54. if actual_step != expected_step:
  55. # 如果间隔不符合预期,则开始一个新的段
  56. segments.append(current_segment)
  57. current_segment = [time_list[i]]
  58. else:
  59. # 否则,将当前时间戳添加到当前段中
  60. current_segment.append(time_list[i])
  61. # 添加最后一个段
  62. if current_segment:
  63. segments.append(current_segment)
  64. return segments
  65. # 寻找二级指标的名称
  66. def find_nested_name(data):
  67. """
  68. 查找字典中嵌套的name结构。
  69. :param data: 要搜索的字典
  70. :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None
  71. """
  72. if isinstance(data, dict):
  73. for key, value in data.items():
  74. if isinstance(value, dict) and 'name' in value:
  75. return value['name']
  76. # 递归查找嵌套字典
  77. result = find_nested_name(value)
  78. if result is not None:
  79. return result
  80. elif isinstance(data, list):
  81. for item in data:
  82. result = find_nested_name(item)
  83. if result is not None:
  84. return result
  85. return None
  86. def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
  87. """向量化距离计算"""
  88. return np.linalg.norm(ego_pos - obj_pos, axis=1)
  89. def calculate_relative_speed_PGVIL(
  90. ego_speed: np.ndarray, obj_speed: np.ndarray
  91. ) -> np.ndarray:
  92. """向量化相对速度计算"""
  93. return np.linalg.norm(ego_speed - obj_speed, axis=1)
  94. def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  95. """向量化距离计算"""
  96. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  97. return dist
  98. def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  99. """向量化相对速度计算"""
  100. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  101. def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
  102. """数据提取函数"""
  103. ego = data[data["playerId"] == 1].iloc[0]
  104. obj = data[data["playerId"] != 1]
  105. return ego, obj
  106. def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
  107. """带缓存的预警数据获取"""
  108. ego_df = data_processed.ego_data
  109. obj_df = data_processed.object_df
  110. scenario_name = find_nested_name(data_processed.function_config["function"])
  111. correctwarning = scenario_sign_dict.get(scenario_name)
  112. if correctwarning is None:
  113. print("无法获取正确的预警信号标志位!")
  114. return None
  115. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  116. warning_times = warning_rows['simTime']
  117. if warning_times.empty:
  118. print("没有找到预警数据!")
  119. return None
  120. first_time = warning_times.iloc[0]
  121. return obj_df[obj_df['simTime'] == first_time]
  122. # ----------------------
  123. # 核心计算功能函数
  124. # ----------------------
  125. def latestWarningDistance_LST(data) -> dict:
  126. """预警距离计算流水线"""
  127. scenario_name = find_nested_name(data.function_config["function"])
  128. value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
  129. correctwarning = scenario_sign_dict[scenario_name]
  130. ego_df = data.ego_data
  131. warning_dist = calculate_distance(ego_df, correctwarning)
  132. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  133. # 将计算结果保存到data对象中,供图表生成使用
  134. data.warning_dist = warning_dist
  135. data.warning_speed = warning_speed
  136. data.correctwarning = correctwarning
  137. if warning_dist.empty:
  138. return {"latestWarningDistance_LST": 0.0}
  139. # 生成图表数据
  140. generate_function_chart_data(data, 'latestWarningDistance_LST')
  141. return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value}
  142. def earliestWarningDistance_LST(data) -> dict:
  143. """预警距离计算流水线"""
  144. scenario_name = find_nested_name(data.function_config["function"])
  145. value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
  146. correctwarning = scenario_sign_dict[scenario_name]
  147. ego_df = data.ego_data
  148. warning_dist = calculate_distance(ego_df, correctwarning)
  149. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  150. # 将计算结果保存到data对象中,供图表生成使用
  151. data.warning_dist = warning_dist
  152. data.warning_speed = warning_speed
  153. data.correctwarning = correctwarning
  154. if warning_dist.empty:
  155. return {"earliestWarningDistance_LST": 0.0}
  156. # 生成图表数据
  157. generate_function_chart_data(data, 'earliestWarningDistance_LST')
  158. return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value}
  159. def latestWarningDistance_TTC_LST(data) -> dict:
  160. """TTC计算流水线"""
  161. scenario_name = find_nested_name(data.function_config["function"])
  162. value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
  163. correctwarning = scenario_sign_dict[scenario_name]
  164. ego_df = data.ego_data
  165. warning_dist = calculate_distance(ego_df, correctwarning)
  166. if warning_dist.empty:
  167. return {"latestWarningDistance_TTC_LST": 0.0}
  168. # 将correctwarning保存到data对象中,供图表生成使用
  169. data.correctwarning = correctwarning
  170. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  171. with np.errstate(divide='ignore', invalid='ignore'):
  172. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  173. # 处理无效的TTC值
  174. for i in range(len(ttc)):
  175. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  176. data.warning_dist = warning_dist
  177. data.warning_speed = warning_speed
  178. data.ttc = ttc
  179. # 生成图表数据
  180. # from modules.lib.chart_generator import generate_function_chart_data
  181. generate_function_chart_data(data, 'latestWarningDistance_TTC_LST')
  182. return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
  183. def earliestWarningDistance_TTC_LST(data) -> dict:
  184. """TTC计算流水线"""
  185. scenario_name = find_nested_name(data.function_config["function"])
  186. value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
  187. correctwarning = scenario_sign_dict[scenario_name]
  188. ego_df = data.ego_data
  189. warning_dist = calculate_distance(ego_df, correctwarning)
  190. if warning_dist.empty:
  191. return {"earliestWarningDistance_TTC_LST": 0.0}
  192. # 将correctwarning保存到data对象中,供图表生成使用
  193. data.correctwarning = correctwarning
  194. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  195. with np.errstate(divide='ignore', invalid='ignore'):
  196. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  197. # 处理无效的TTC值
  198. for i in range(len(ttc)):
  199. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  200. # 将计算结果保存到data对象中,供图表生成使用
  201. data.warning_dist = warning_dist
  202. data.warning_speed = warning_speed
  203. data.ttc = ttc
  204. data.correctwarning = correctwarning
  205. # 生成图表数据
  206. generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST')
  207. return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
  208. def warningDelayTime_LST(data):
  209. scenario_name = find_nested_name(data.function_config["function"])
  210. correctwarning = scenario_sign_dict[scenario_name]
  211. # 将correctwarning保存到data对象中,供图表生成使用
  212. data.correctwarning = correctwarning
  213. ego_df = data.ego_data
  214. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  215. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  216. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  217. 'simTime'].tolist()
  218. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  219. if (simTime_HMI is None) or (simTime_rosbag is None):
  220. print("预警出错!")
  221. delay_time = 100.0
  222. else:
  223. delay_time = abs(simTime_HMI - simTime_rosbag)
  224. return {"warningDelayTime_LST": delay_time}
  225. def warningDelayTimeofReachDecel_LST(data):
  226. scenario_name = find_nested_name(data.function_config["function"])
  227. correctwarning = scenario_sign_dict[scenario_name]
  228. # 将correctwarning保存到data对象中,供图表生成使用
  229. data.correctwarning = correctwarning
  230. ego_df = data.ego_data
  231. obj_df = data.obj_data
  232. obj_speed_simtime = obj_df[obj_df['accel'] <= -4]['simTime'].tolist() # 单位m/s^2
  233. warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
  234. if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
  235. return {"warningDelayTimeofReachDecel_LST": 0}
  236. elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0):
  237. return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]}
  238. elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0):
  239. return {"warningDelayTimeofReachDecel_LST": None}
  240. else:
  241. return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
  242. def rightWarningSignal_LST(data):
  243. scenario_name = find_nested_name(data.function_config["function"])
  244. correctwarning = scenario_sign_dict[scenario_name]
  245. # 将correctwarning保存到data对象中,供图表生成使用
  246. data.correctwarning = correctwarning
  247. ego_df = data.ego_data
  248. if ego_df['ifwarning'].empty:
  249. print("无法获取正确预警信号标志位!")
  250. return
  251. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  252. if warning_rows.empty:
  253. return {"rightWarningSignal_LST": -1}
  254. else:
  255. return {"rightWarningSignal_LST": 1}
  256. def ifCrossingRedLight_LST(data):
  257. scenario_name = find_nested_name(data.function_config["function"])
  258. correctwarning = scenario_sign_dict[scenario_name]
  259. # 将correctwarning保存到data对象中,供图表生成使用
  260. data.correctwarning = correctwarning
  261. ego_df = data.ego_data
  262. redlight_simtime = ego_df[
  263. (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (
  264. ego_df['v'] != 0)]['simTime']
  265. if redlight_simtime.empty:
  266. return {"ifCrossingRedLight_LST": -1}
  267. else:
  268. return {"ifCrossingRedLight_LST": 1}
  269. def ifStopgreenWaveSpeedGuidance_LST(data):
  270. scenario_name = find_nested_name(data.function_config["function"])
  271. correctwarning = scenario_sign_dict[scenario_name]
  272. # 将correctwarning保存到data对象中,供图表生成使用
  273. data.correctwarning = correctwarning
  274. ego_df = data.ego_data
  275. greenlight_simtime = \
  276. ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
  277. if greenlight_simtime.empty:
  278. return {"ifStopgreenWaveSpeedGuidance_LST": -1}
  279. else:
  280. return {"ifStopgreenWaveSpeedGuidance_LST": 1}
  281. # ------ 单车智能指标 ------
  282. def limitSpeed_LST(data):
  283. ego_df = data.ego_data
  284. scenario_name = find_nested_name(data.function_config["function"])
  285. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  286. speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.1]['v'].tolist()
  287. if len(speed_limit) == 0:
  288. return {"speedLimit_LST": -1}
  289. max_speed = max(speed_limit)
  290. data.speedLimit = limit_speed
  291. generate_function_chart_data(data, 'limitspeed_LST')
  292. return {"speedLimit_LST": max_speed}
  293. def limitSpeedPastLimitSign_LST(data):
  294. ego_df = data.ego_data
  295. scenario_name = find_nested_name(data.function_config["function"])
  296. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  297. car_length = data.function_config["vehicle"]['CAR_LENGTH']
  298. ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist()
  299. ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
  300. data.speedLimit = limit_speed
  301. data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
  302. generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST')
  303. if len(ego_speed) == 0:
  304. return {"speedPastLimitSign_LST": -1}
  305. return {"speedPastLimitSign_LST": ego_speed[0]}
  306. def leastDistance_LST(data):
  307. ego_df = data.ego_data
  308. dist_row = ego_df[ego_df['v'] == 0]['relative_dist'].tolist()
  309. if len(dist_row) == 0:
  310. return {"leastDistance_LST": -1}
  311. else:
  312. min_dist = min(dist_row)
  313. return {"leastDistance_LST": min_dist}
  314. def launchTimeinStopLine_LST(data):
  315. ego_df = data.ego_data
  316. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  317. if len(simtime_row) == 0:
  318. return {"launchTimeinStopLine_LST": -1}
  319. else:
  320. delta_t = simtime_row[-1] - simtime_row[0]
  321. return {"launchTimeinStopLine_LST": delta_t}
  322. def launchTimewhenFollowingCar_LST(data):
  323. ego_df = data.ego_data
  324. t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
  325. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  326. if len(simtime_row) == 0:
  327. return {"launchTimewhenFollowingCar_LST": 0}
  328. else:
  329. time_interval = _is_segment_by_interval(simtime_row, t_interval)
  330. delta_t = []
  331. for t in time_interval:
  332. delta_t.append(t[-1] - t[0])
  333. return {"launchTimewhenFollowingCar_LST": max(delta_t)}
  334. def noStop_LST(data):
  335. ego_df_ini = data.ego_data
  336. min_time = ego_df_ini['simTime'].min() + 5
  337. max_time = ego_df_ini['simTime'].max() - 5
  338. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  339. speed = ego_df['v'].tolist()
  340. if (any(speed) == 0):
  341. return {"noStop_LST": -1}
  342. else:
  343. return {"noStop_LST": 1}
  344. def launchTimeinTrafficLight_LST(data):
  345. '''
  346. 待修改:
  347. 红灯的状态值:1
  348. 绿灯的状态值:0
  349. '''
  350. ego_df = data.ego_data
  351. simtime_of_redlight = ego_df[ego_df['stateMask'] == 0]['simTime']
  352. simtime_of_stop = ego_df[ego_df['v'] == 0]['simTime']
  353. if len(simtime_of_stop) or len(simtime_of_redlight):
  354. return {"timeInterval_LST": -1}
  355. simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist()
  356. if len(simtime_of_launch) == 0:
  357. return {"timeInterval_LST": -1}
  358. return {"timeInterval_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
  359. def crossJunctionToTargetLane_LST(data):
  360. ego_df = data.ego_data
  361. lane_in_leftturn = set(ego_df['lane_id'].tolist())
  362. scenario_name = find_nested_name(data.function_config["function"])
  363. target_lane_id = data.function_config["function"][scenario_name]["crossJunctionToTargetLane_LST"]['max']
  364. if target_lane_id not in lane_in_leftturn:
  365. return {"crossJunctionToTargetLane_LST": -1}
  366. else:
  367. return {"crossJunctionToTargetLane_LST": target_lane_id}
  368. def keepInLane_LST(data):
  369. ego_df = data.ego_data
  370. notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width']/2].tolist()
  371. if len(notkeepinlane):
  372. return {"keepInLane_LST": -1}
  373. return {"keepInLane_LST": 1}
  374. def leastLateralDistance_LST(data):
  375. ego_df = data.ego_data
  376. lane_width = ego_df[ego_df['x_relative_dist'] == 0]['lane_width']
  377. if lane_width.empty():
  378. return {"leastLateralDistance_LST": -1}
  379. else:
  380. y_relative_dist = ego_df[ego_df['x_relative_dist'] == 0]['y_relative_dist']
  381. if (y_relative_dist <= lane_width / 2).all():
  382. return {"leastLateralDistance_LST": 1}
  383. else:
  384. return {"leastLateralDistance_LST": -1}
  385. def waitTimeAtCrosswalkwithPedestrian_LST(data):
  386. ego_df = data.ego_data
  387. object_df = data.object_data
  388. data['in_crosswalk'] = []
  389. position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True)
  390. for cross_id in position_data['cross_id'].tolist():
  391. for posX, posY in object_df['posX', 'posY']:
  392. pedestrian_pos = (posX, posY)
  393. plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0]
  394. if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos):
  395. data[data['playerId'] == 2]['in_crosswalk'] = 1
  396. else:
  397. data[data['playerId'] == 2]['in_crosswalk'] = 0
  398. car_stop_time = ego_df[ego_df['v'] == 0]['simTime']
  399. pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime']
  400. car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist()
  401. return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len(
  402. car_wait_pedestrian) > 0 else 0}
  403. def launchTimewhenPedestrianLeave_LST(data):
  404. ego_df = data.ego_data
  405. car_stop_time = ego_df[ego_df['v'] == 0]["simTime"]
  406. if car_stop_time.empty():
  407. return {"launchTimewhenPedestrianLeave_LST": -1}
  408. else:
  409. lane_width = ego_df[ego_df['v'] == 0]['lane_width'].tolist()[0]
  410. car_to_pedestrain = ego_df[ego_df['y_relative_dist'] <= lane_width / 2]["simTime"]
  411. legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist()
  412. return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
  413. def noCollision_LST(data):
  414. ego_df = data.ego_data
  415. if ego_df['relative_dist'].any() == 0:
  416. return {"noCollision_LST": -1}
  417. else:
  418. return {"noCollision_LST": 1}
  419. def noReverse_LST(data):
  420. ego_df = data.ego_data
  421. if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
  422. return {"noReverse_LST": -1}
  423. else:
  424. return {"noReverse_LST": 1}
  425. def turnAround_LST(data):
  426. ego_df = data.ego_data
  427. if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
  428. ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
  429. return {"turnAround_LST": 1}
  430. else:
  431. return {"turnAround_LST": -1}
  432. def laneOffset_LST(data):
  433. car_width = data.function_config['vehicle']['CAR_WIDTH']
  434. ego_df_ini = data.ego_data
  435. min_time = ego_df_ini['simTime'].min() + 5
  436. max_time = ego_df_ini['simTime'].max() - 5
  437. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  438. laneoffset = ego_df['y_relative_dist'] - car_width / 2
  439. return {"laneOffset_LST": max(laneoffset)}
  440. def maxLongitudeDist_LST(data):
  441. ego_df = data.ego_data
  442. longitude_dist = abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist())
  443. data.longitude_dist = min(abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist()))
  444. stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist()
  445. data.stop_time = min(stop_time)
  446. if len(longitude_dist) == 0:
  447. return {"maxLongitudeDist_LST": -1}
  448. generate_function_chart_data(data, 'maxLongitudeDist_LST')
  449. return {"maxLongDist_LST": min(longitude_dist)}
  450. def noEmergencyBraking_LST(data):
  451. ego_df = data.ego_data
  452. ego_df['ip_dec'] = ego_df['v'].apply(
  453. get_interpolation, point1=[18, -5], point2=[72, -3.5])
  454. ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply(
  455. lambda x: 1 if x < 0 else 0)
  456. if sum(ego_df['slam_brake']) == 0:
  457. return {"noEmergencyBraking_LST": 1}
  458. else:
  459. return {"noEmergencyBraking_LST": -1}
  460. def rightWarningSignal_PGVIL(data_processed) -> dict:
  461. """判断是否发出正确预警信号"""
  462. ego_df = data_processed.ego_data
  463. scenario_name = find_nested_name(data_processed.function_config["function"])
  464. correctwarning = scenario_sign_dict[scenario_name]
  465. if correctwarning is None:
  466. print("无法获取正确的预警信号标志位!")
  467. return None
  468. # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
  469. warning_rows = ego_df[
  470. (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
  471. ]
  472. if warning_rows.empty:
  473. return {"rightWarningSignal_PGVIL": -1}
  474. else:
  475. return {"rightWarningSignal_PGVIL": 1}
  476. def latestWarningDistance_PGVIL(data_processed) -> dict:
  477. """预警距离计算流水线"""
  478. ego_df = data_processed.ego_data
  479. obj_df = data_processed.object_df
  480. warning_data = get_first_warning(data_processed)
  481. if warning_data is None:
  482. return {"latestWarningDistance_PGVIL": 0.0}
  483. ego, obj = extract_ego_obj(warning_data)
  484. distances = calculate_distance_PGVIL(
  485. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  486. )
  487. # 将计算结果保存到data对象中,供图表生成使用
  488. data_processed.warning_dist = distances
  489. if distances.size == 0:
  490. print("没有找到数据!")
  491. return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  492. return {"latestWarningDistance_PGVIL": float(np.min(distances))}
  493. def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  494. """TTC计算流水线"""
  495. ego_df = data_processed.ego_data
  496. obj_df = data_processed.object_df
  497. warning_data = get_first_warning(data_processed)
  498. if warning_data is None:
  499. return {"latestWarningDistance_TTC_PGVIL": 0.0}
  500. ego, obj = extract_ego_obj(warning_data)
  501. # 向量化计算
  502. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  503. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  504. obj_pos = obj[["posX", "posY"]].values
  505. obj_speed = obj[["speedX", "speedY"]].values
  506. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  507. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  508. data_processed.warning_dist = distances
  509. data_processed.warning_speed = rel_speeds
  510. with np.errstate(divide="ignore", invalid="ignore"):
  511. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  512. if ttc.size == 0:
  513. print("没有找到数据!")
  514. return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  515. data_processed.ttc = ttc
  516. return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  517. def earliestWarningDistance_PGVIL(data_processed) -> dict:
  518. """预警距离计算流水线"""
  519. ego_df = data_processed.ego_data
  520. obj_df = data_processed.object_df
  521. warning_data = get_first_warning(data_processed)
  522. if warning_data is None:
  523. return {"earliestWarningDistance_PGVIL": 0}
  524. ego, obj = extract_ego_obj(warning_data)
  525. distances = calculate_distance_PGVIL(
  526. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  527. )
  528. # 将计算结果保存到data对象中,供图表生成使用
  529. data_processed.warning_dist = distances
  530. if distances.size == 0:
  531. print("没有找到数据!")
  532. return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  533. return {"earliestWarningDistance": float(np.min(distances))}
  534. def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  535. """TTC计算流水线"""
  536. ego_df = data_processed.ego_data
  537. obj_df = data_processed.object_df
  538. warning_data = get_first_warning(data_processed)
  539. if warning_data is None:
  540. return {"earliestWarningDistance_TTC_PGVIL": 0.0}
  541. ego, obj = extract_ego_obj(warning_data)
  542. # 向量化计算
  543. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  544. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  545. obj_pos = obj[["posX", "posY"]].values
  546. obj_speed = obj[["speedX", "speedY"]].values
  547. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  548. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  549. data_processed.warning_dist = distances
  550. data_processed.warning_speed = rel_speeds
  551. with np.errstate(divide="ignore", invalid="ignore"):
  552. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  553. if ttc.size == 0:
  554. print("没有找到数据!")
  555. return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  556. data_processed.ttc = ttc
  557. return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  558. # def delayOfEmergencyBrakeWarning(data_processed) -> dict:
  559. # #预警时机相对背景车辆减速度达到-4m/s2后的时延
  560. # ego_df = data_processed.ego_data
  561. # obj_df = data_processed.object_df
  562. # warning_data = get_first_warning(data_processed)
  563. # if warning_data is None:
  564. # return {"delayOfEmergencyBrakeWarning": -1}
  565. # try:
  566. # ego, obj = extract_ego_obj(warning_data)
  567. # # 向量化计算
  568. # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]])
  569. # # 计算背景车辆减速度
  570. # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0]
  571. # simtime_freq = 1 / simtime_gap#每秒采样频率
  572. # # simtime_freq为一个时间窗,找出时间窗内的最大减速度
  573. # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长
  574. # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量
  575. # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
  576. # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比
  577. # max_deceleration = np.max(obj_deceleration)
  578. # if max_deceleration < -4:
  579. # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)]
  580. # max_deceleration_time = max_deceleration_times.iloc[0]
  581. # delay_time = ego["simTime"] - max_deceleration_time
  582. # return {"delayOfEmergencyBrakeWarning": float(delay_time)}
  583. # else:
  584. # print("没有达到预警减速度阈值:-4m/s^2")
  585. # return {"delayOfEmergencyBrakeWarning": -1}
  586. def warningDelayTime_PGVIL(data_processed) -> dict:
  587. """车端接收到预警到HMI发出预警的时延"""
  588. ego_df = data_processed.ego_data
  589. # #打印ego_df的列名
  590. # print(ego_df.columns.tolist())
  591. warning_data = get_first_warning(data_processed)
  592. if warning_data is None:
  593. return {"warningDelayTime_PGVIL": -1}
  594. try:
  595. ego, obj = extract_ego_obj(warning_data)
  596. # 找到event_Type不为空,且playerID为1的行
  597. rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
  598. first_time = rosbag_warning_rows["simTime"].iloc[0]
  599. warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
  600. delay_time = warning_time - first_time
  601. return {"warningDelayTime_PGVIL": float(delay_time)}
  602. except Exception as e:
  603. print(f"计算预警时延时发生错误: {e}")
  604. return {"warningDelayTime_PGVIL": -1}
  605. def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
  606. """
  607. 计算主车后轴中心点到停止线的距离
  608. :return 距离
  609. """
  610. distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
  611. # 计算停止线的方向向量
  612. line_vector = np.array(
  613. [
  614. stop_line_points[1][0] - stop_line_points[0][0],
  615. stop_line_points[1][1] - stop_line_points[0][1],
  616. ]
  617. )
  618. direction_vector_norm = np.linalg.norm(line_vector)
  619. direction_vector_unit = (
  620. line_vector / direction_vector_norm
  621. if direction_vector_norm != 0
  622. else np.array([0, 0])
  623. )
  624. # 计算主车后轴中心点到停止线投影的坐标(垂足)
  625. projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
  626. perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
  627. # 计算主车后轴中心点到垂足的距离
  628. distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
  629. carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
  630. return carhead_distance_to_foot
  631. def ifCrossingRedLight_PGVIL(data_processed) -> dict:
  632. # 判断车辆是否闯红灯
  633. stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
  634. X_OFFSET = 258109.4239876
  635. Y_OFFSET = 4149969.964821
  636. stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
  637. ego_df = data_processed.ego_data
  638. prev_distance = float("inf") # 初始化为正无穷大
  639. """
  640. traffic_light_status
  641. 0x100000为绿灯,1048576
  642. 0x1000000为黄灯,16777216
  643. 0x10000000为红灯,268435456
  644. """
  645. red_light_violation = False
  646. for index, ego in ego_df.iterrows():
  647. car_point = (ego["posX"], ego["posY"])
  648. stateMask = ego["stateMask"]
  649. simTime = ego["simTime"]
  650. distance_to_stopline = get_car_to_stop_line_distance(
  651. ego, car_point, stop_line_points
  652. )
  653. # 主车车头跨越停止线时非绿灯,返回-1,闯红灯
  654. if prev_distance > 0 and distance_to_stopline < 0:
  655. if stateMask is not None and stateMask != 1048576:
  656. red_light_violation = True
  657. break
  658. prev_distance = distance_to_stopline
  659. if red_light_violation:
  660. return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯
  661. else:
  662. return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯
  663. # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
  664. # #在绿波车速引导期间是否发生停车
  665. # def mindisStopline(data_processed) -> dict:
  666. # """
  667. # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间
  668. # """
  669. # ego_df = data_processed.ego_data
  670. # obj_df = data_processed.object_df
  671. # stop_giveway_simtime = ego_df[
  672. # ego_df["sign_type1"] == 32 |
  673. # ego_df["stopline_type"] == 3
  674. # ]["simTime"]
  675. # stop_giveway_data = ego_df[
  676. # ego_df["sign_type1"] == 32 |
  677. # ego_df["stopline_type"] == 3
  678. # ]["simTime"]
  679. # if stop_giveway_simtime.empty:
  680. # print("没有停车让行标志/标线")
  681. # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
  682. # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
  683. # distance_to_stoplines = []
  684. # for _,row in ego_data.iterrows():
  685. # ego_pos = np.array([row["posX"], row["posY"], row["posH"]])
  686. # stop_line_points = [
  687. # [row["stopline_x1"], row["stopline_y1"]],
  688. # [row["stopline_x2"], row["stopline_y2"]],
  689. # ]
  690. # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
  691. # distance_to_stoplines.append(distance_to_stopline)
  692. # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
  693. # return {"mindisStopline": mindisStopline}
  694. class FunctionRegistry:
  695. """动态函数注册器(支持参数验证)"""
  696. def __init__(self, data_processed):
  697. self.logger = LogManager().get_logger() # 获取全局日志实例
  698. self.data = data_processed
  699. self.fun_config = data_processed.function_config["function"]
  700. self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
  701. self._registry: Dict[str, Callable] = {}
  702. self._registry = self._build_registry()
  703. def _extract_level_3_metrics(self, config_node: dict) -> list:
  704. """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
  705. metrics = []
  706. def _recurse(node):
  707. if isinstance(node, dict):
  708. if "name" in node and not any(
  709. isinstance(v, dict) for v in node.values()
  710. ):
  711. metrics.append(node["name"])
  712. for v in node.values():
  713. _recurse(v)
  714. _recurse(config_node)
  715. self.logger.info(f"评比的功能指标列表:{metrics}")
  716. return metrics
  717. def _build_registry(self) -> dict:
  718. """自动注册指标函数(防御性编程)"""
  719. registry = {}
  720. for func_name in self.level_3_merics:
  721. try:
  722. registry[func_name] = globals()[func_name]
  723. except KeyError:
  724. print(f"未实现指标函数: {func_name}")
  725. self.logger.error(f"未实现指标函数: {func_name}")
  726. return registry
  727. def batch_execute(self) -> dict:
  728. """批量执行指标计算(带熔断机制)"""
  729. results = {}
  730. for name, func in self._registry.items():
  731. try:
  732. result = func(self.data) # 统一传递数据上下文
  733. results.update(result)
  734. except Exception as e:
  735. print(f"{name} 执行失败: {str(e)}")
  736. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  737. results[name] = None
  738. self.logger.info(f"功能指标计算结果:{results}")
  739. return results
  740. class FunctionManager:
  741. """管理功能指标计算的类"""
  742. def __init__(self, data_processed):
  743. self.data = data_processed
  744. self.function = FunctionRegistry(self.data)
  745. def report_statistic(self):
  746. """
  747. 计算并报告功能指标结果。
  748. :return: 评估结果
  749. """
  750. function_result = self.function.batch_execute()
  751. print("\n[功能性表现及评价结果]")
  752. return function_result
  753. # self.logger.info(f'Function Result: {function_result}')
  754. # 使用示例
  755. if __name__ == "__main__":
  756. pass
  757. # print("\n[功能类表现及得分情况]")