function.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2025 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2025/01/5
  11. @Last Modified: 2025/01/5
  12. @Summary: Function Metrics Calculation
  13. """
  14. import sys
  15. from pathlib import Path
  16. # 添加项目根目录到系统路径
  17. root_path = Path(__file__).resolve().parent.parent
  18. sys.path.append(str(root_path))
  19. print(root_path)
  20. from modules.lib.score import Score
  21. from modules.lib.log_manager import LogManager
  22. import numpy as np
  23. from typing import Dict, Tuple, Optional, Callable, Any
  24. import pandas as pd
  25. import yaml
  26. from modules.lib.chart_generator import generate_function_chart_data
  27. from shapely.geometry import Point, Polygon
  28. from modules.lib.common import get_interpolation
  29. import pandas as pd
  30. import yaml
  31. import math
  32. # ----------------------
  33. # 基础工具函数 (Pure functions)
  34. # ----------------------
  35. scenario_sign_dict = {
  36. "LeftTurnAssist": 206,
  37. "HazardousLocationW": 207,
  38. "RedLightViolationW": 208,
  39. "AbnormalVehicleW":209,
  40. "NsightVulnerableRoadUserCollisionW":210,
  41. "LitterW":211,
  42. "ForwardCollisionW":212,
  43. "VisibilityW":213,
  44. "EmergencyBrakeW":214,
  45. "IntersectionCollisionW":215,
  46. "BlindSpotW":216,
  47. "DoNotPassW":217,
  48. "ControlLossW":218,
  49. "FrontTrafficJamW":219,
  50. "EmergencyVehicleW":220,
  51. "CooperativeVehicleMerge":221,
  52. "CooperativeLaneChange":223,
  53. "VulnerableRoadUserCollisionW":224,
  54. "CooperativeIntersectionPassing":225,
  55. "RampMerge":226,
  56. "DrivingLaneRecommendation":227,
  57. "TrafficJamW": 228,
  58. "DynamicSpeedLimitingInformation": 229,
  59. "EmergencyVehiclesPriority": 230,
  60. "RemoteSupervision": 231,
  61. "SignalLightReminder": 232,
  62. "OtherVehicleRedLightViolationW": 233,
  63. "GreenLightOptimalSpeedAdvisory": 234,
  64. }
  65. def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
  66. polygon = Polygon(polygon)
  67. point = Point(test_point)
  68. return polygon.contains(point)
  69. def _is_segment_by_interval(time_list, expected_step) -> list:
  70. """
  71. 根据时间戳之间的间隔进行分段。
  72. 参数:
  73. time_list (list): 时间戳列表。
  74. expected_step (float): 预期的固定步长。
  75. 返回:
  76. list: 分段后的时间戳列表,每个元素是一个子列表。
  77. """
  78. if not time_list:
  79. return []
  80. segments = []
  81. current_segment = [time_list[0]]
  82. for i in range(1, len(time_list)):
  83. actual_step = time_list[i] - time_list[i - 1]
  84. if actual_step != expected_step:
  85. # 如果间隔不符合预期,则开始一个新的段
  86. segments.append(current_segment)
  87. current_segment = [time_list[i]]
  88. else:
  89. # 否则,将当前时间戳添加到当前段中
  90. current_segment.append(time_list[i])
  91. # 添加最后一个段
  92. if current_segment:
  93. segments.append(current_segment)
  94. return segments
  95. # 寻找二级指标的名称
  96. def find_nested_name(data):
  97. """
  98. 查找字典中嵌套的name结构。
  99. :param data: 要搜索的字典
  100. :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None
  101. """
  102. if isinstance(data, dict):
  103. for key, value in data.items():
  104. if isinstance(value, dict) and 'name' in value:
  105. return value['name']
  106. # 递归查找嵌套字典
  107. result = find_nested_name(value)
  108. if result is not None:
  109. return result
  110. elif isinstance(data, list):
  111. for item in data:
  112. result = find_nested_name(item)
  113. if result is not None:
  114. return result
  115. return None
  116. def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
  117. """向量化距离计算"""
  118. return np.linalg.norm(ego_pos - obj_pos, axis=1)
  119. def calculate_relative_speed_PGVIL(
  120. ego_speed: np.ndarray, obj_speed: np.ndarray
  121. ) -> np.ndarray:
  122. """向量化相对速度计算"""
  123. return np.linalg.norm(ego_speed - obj_speed, axis=1)
  124. def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  125. """向量化距离计算"""
  126. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  127. return dist
  128. def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  129. """向量化相对速度计算"""
  130. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  131. def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
  132. """数据提取函数"""
  133. ego = data[data["playerId"] == 1].iloc[0]
  134. obj = data[data["playerId"] != 1]
  135. return ego, obj
  136. def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
  137. """带缓存的预警数据获取"""
  138. ego_df = data_processed.ego_data
  139. obj_df = data_processed.object_df
  140. scenario_name = find_nested_name(data_processed.function_config["function"])
  141. correctwarning = scenario_sign_dict.get(scenario_name)
  142. if correctwarning is None:
  143. print("无法获取正确的预警信号标志位!")
  144. return None
  145. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  146. warning_times = warning_rows['simTime']
  147. if warning_times.empty:
  148. print("没有找到预警数据!")
  149. return None
  150. first_time = warning_times.iloc[0]
  151. return obj_df[obj_df['simTime'] == first_time]
  152. def getAxis(heading):
  153. AxisL = [0, 0]
  154. AxisW = [0, 0]
  155. AxisL[0] = math.cos(heading)
  156. AxisL[1] = math.sin(heading)
  157. AxisW[0] = -math.sin(heading)
  158. AxisW[1] = math.cos(heading)
  159. return AxisL, AxisW
  160. def getDotMultiply(firstAxis, secondAxis):
  161. dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1])
  162. return dotMultiply
  163. def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
  164. projAxisL = getDotMultiply(AxisL, baseAxis)
  165. projAxisW = getDotMultiply(AxisW, baseAxis)
  166. projectionRadius = halfLength * projAxisL + halfWidth * projAxisW
  167. return projectionRadius
  168. def isCollision(
  169. firstAxisL,
  170. firstAxisW,
  171. firstHalfLength,
  172. firstHalfWidth,
  173. secondAxisL,
  174. secondAxisW,
  175. secondHalfLength,
  176. secondHalfWidth,
  177. disVector,
  178. ):
  179. isCollision = True
  180. axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
  181. addProjectionRadius = 0
  182. projectionDisVecotr = 0
  183. for axis in axes:
  184. addProjectionRadius = getProjectionRadius(
  185. firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth
  186. ) + getProjectionRadius(
  187. secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth
  188. )
  189. projectionDisVecotr = getDotMultiply(disVector, axis)
  190. if projectionDisVecotr > addProjectionRadius:
  191. isCollision = False
  192. break
  193. return isCollision
  194. def funcIsCollision(
  195. firstDimX,
  196. firstDimY,
  197. firstOffX,
  198. firstOffY,
  199. firstX,
  200. firstY,
  201. firstHeading,
  202. secondDimX,
  203. secondDimY,
  204. secondOffX,
  205. secondOffY,
  206. secondX,
  207. secondY,
  208. secondHeading,
  209. ):
  210. firstAxisL = getAxis(firstHeading)[0]
  211. firstAxisW = getAxis(firstHeading)[1]
  212. secondAxisL = getAxis(secondHeading)[0]
  213. secondAxisW = getAxis(secondHeading)[1]
  214. firstHalfLength = 0.5 * firstDimX
  215. firstHalfWidth = 0.5 * firstDimY
  216. secondHalfLength = 0.5 * secondDimX
  217. secondHalfWidth = 0.5 * secondDimY
  218. firstCenter = [0, 0]
  219. secondCenter = [0, 0]
  220. disVector = [0, 0]
  221. firstCenter[0] = firstX + firstOffX * math.cos(firstHeading)
  222. firstCenter[1] = firstY + firstOffX * math.sin(firstHeading)
  223. secondCenter[0] = secondX + secondOffX * math.cos(secondHeading)
  224. secondCenter[1] = secondY + secondOffX * math.sin(secondHeading)
  225. disVector[0] = secondCenter[0] - firstCenter[0]
  226. disVector[1] = secondCenter[1] - firstCenter[1]
  227. varIsCollision = isCollision(
  228. firstAxisL,
  229. firstAxisW,
  230. firstHalfLength,
  231. firstHalfWidth,
  232. secondAxisL,
  233. secondAxisW,
  234. secondHalfLength,
  235. secondHalfWidth,
  236. disVector,
  237. )
  238. return varIsCollision
  239. # ----------------------
  240. # 核心计算功能函数
  241. # ----------------------
  242. def latestWarningDistance_LST(data) -> dict:
  243. """预警距离计算流水线"""
  244. scenario_name = find_nested_name(data.function_config["function"])
  245. value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
  246. correctwarning = scenario_sign_dict[scenario_name]
  247. ego_df = data.ego_data
  248. warning_dist = calculate_distance(ego_df, correctwarning)
  249. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  250. # 将计算结果保存到data对象中,供图表生成使用
  251. data.warning_dist = warning_dist
  252. data.warning_speed = warning_speed
  253. data.correctwarning = correctwarning
  254. if warning_dist.empty:
  255. return {"latestWarningDistance_LST": 0.0}
  256. # 生成图表数据
  257. generate_function_chart_data(data, 'latestWarningDistance_LST')
  258. return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value}
  259. def earliestWarningDistance_LST(data) -> dict:
  260. """预警距离计算流水线"""
  261. scenario_name = find_nested_name(data.function_config["function"])
  262. value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
  263. correctwarning = scenario_sign_dict[scenario_name]
  264. ego_df = data.ego_data
  265. warning_dist = calculate_distance(ego_df, correctwarning)
  266. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  267. # 将计算结果保存到data对象中,供图表生成使用
  268. data.warning_dist = warning_dist
  269. data.warning_speed = warning_speed
  270. data.correctwarning = correctwarning
  271. if len(warning_dist) == 0:
  272. return {"earliestWarningDistance_LST": 0.0}
  273. # 生成图表数据
  274. generate_function_chart_data(data, 'earliestWarningDistance_LST')
  275. return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value}
  276. def latestWarningDistance_TTC_LST(data) -> dict:
  277. """TTC计算流水线"""
  278. scenario_name = find_nested_name(data.function_config["function"])
  279. value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
  280. correctwarning = scenario_sign_dict[scenario_name]
  281. ego_df = data.ego_data
  282. warning_dist = calculate_distance(ego_df, correctwarning)
  283. if warning_dist.empty:
  284. return {"latestWarningDistance_TTC_LST": 0.0}
  285. # 将correctwarning保存到data对象中,供图表生成使用
  286. data.correctwarning = correctwarning
  287. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  288. with np.errstate(divide='ignore', invalid='ignore'):
  289. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  290. # 处理无效的TTC值
  291. for i in range(len(ttc)):
  292. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  293. data.warning_dist = warning_dist
  294. data.warning_speed = warning_speed
  295. data.ttc = ttc
  296. # 生成图表数据
  297. # from modules.lib.chart_generator import generate_function_chart_data
  298. generate_function_chart_data(data, 'latestWarningDistance_TTC_LST')
  299. return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
  300. def earliestWarningDistance_TTC_LST(data) -> dict:
  301. """TTC计算流水线"""
  302. scenario_name = find_nested_name(data.function_config["function"])
  303. value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
  304. correctwarning = scenario_sign_dict[scenario_name]
  305. ego_df = data.ego_data
  306. warning_dist = calculate_distance(ego_df, correctwarning)
  307. if warning_dist.empty:
  308. return {"earliestWarningDistance_TTC_LST": 0.0}
  309. # 将correctwarning保存到data对象中,供图表生成使用
  310. data.correctwarning = correctwarning
  311. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  312. with np.errstate(divide='ignore', invalid='ignore'):
  313. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  314. # 处理无效的TTC值
  315. for i in range(len(ttc)):
  316. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  317. # 将计算结果保存到data对象中,供图表生成使用
  318. data.warning_dist = warning_dist
  319. data.warning_speed = warning_speed
  320. data.ttc = ttc
  321. data.correctwarning = correctwarning
  322. # 生成图表数据
  323. generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST')
  324. return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
  325. def warningDelayTime_LST(data):
  326. scenario_name = find_nested_name(data.function_config["function"])
  327. correctwarning = scenario_sign_dict[scenario_name]
  328. # 将correctwarning保存到data对象中,供图表生成使用
  329. data.correctwarning = correctwarning
  330. ego_df = data.ego_data
  331. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  332. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  333. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  334. 'simTime'].tolist()
  335. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  336. if (simTime_HMI is None) or (simTime_rosbag is None):
  337. print("预警出错!")
  338. delay_time = 100.0
  339. else:
  340. delay_time = abs(simTime_HMI - simTime_rosbag)
  341. return {"warningDelayTime_LST": delay_time}
  342. def warningDelayTimeofReachDecel_LST(data):
  343. scenario_name = find_nested_name(data.function_config["function"])
  344. correctwarning = scenario_sign_dict[scenario_name]
  345. # 将correctwarning保存到data对象中,供图表生成使用
  346. data.correctwarning = correctwarning
  347. ego_df = data.ego_data
  348. obj_df = data.obj_data
  349. # 加速度在速度方向上的投影
  350. speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6
  351. obj_df["ux"] = obj_df["speedX"] / speed_mag
  352. obj_df["uy"] = obj_df["speedY"] / speed_mag
  353. obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"]
  354. obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2
  355. warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
  356. if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
  357. return {"warningDelayTimeofReachDecel_LST": 0}
  358. elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0):
  359. return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]}
  360. elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0):
  361. return {"warningDelayTimeofReachDecel_LST": None}
  362. else:
  363. return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
  364. def rightWarningSignal_LST(data):
  365. scenario_name = find_nested_name(data.function_config["function"])
  366. correctwarning = scenario_sign_dict[scenario_name]
  367. # 将correctwarning保存到data对象中,供图表生成使用
  368. data.correctwarning = correctwarning
  369. ego_df = data.ego_data
  370. if ego_df['ifwarning'].empty:
  371. print("无法获取正确预警信号标志位!")
  372. return
  373. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  374. if warning_rows.empty:
  375. return {"rightWarningSignal_LST": -1}
  376. else:
  377. return {"rightWarningSignal_LST": 1}
  378. def ifCrossingRedLight_LST(data):
  379. scenario_name = find_nested_name(data.function_config["function"])
  380. correctwarning = scenario_sign_dict[scenario_name]
  381. # 将correctwarning保存到data对象中,供图表生成使用
  382. data.correctwarning = correctwarning
  383. ego_df = data.ego_data
  384. redlight_simtime = ego_df[
  385. (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (
  386. ego_df['v'] != 0)]['simTime']
  387. if redlight_simtime.empty:
  388. return {"ifCrossingRedLight_LST": -1}
  389. else:
  390. return {"ifCrossingRedLight_LST": 1}
  391. def ifStopgreenWaveSpeedGuidance_LST(data):
  392. scenario_name = find_nested_name(data.function_config["function"])
  393. correctwarning = scenario_sign_dict[scenario_name]
  394. # 将correctwarning保存到data对象中,供图表生成使用
  395. data.correctwarning = correctwarning
  396. ego_df = data.ego_data
  397. greenlight_simtime = \
  398. ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
  399. if greenlight_simtime.empty:
  400. return {"ifStopgreenWaveSpeedGuidance_LST": -1}
  401. else:
  402. return {"ifStopgreenWaveSpeedGuidance_LST": 1}
  403. # ------ 单车智能指标 ------
  404. def limitSpeed_LST(data):
  405. ego_df = data.ego_data
  406. scenario_name = find_nested_name(data.function_config["function"])
  407. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  408. speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.1]['v'].tolist()
  409. if len(speed_limit) == 0:
  410. return {"speedLimit_LST": -1}
  411. max_speed = max(speed_limit)
  412. data.speedLimit = limit_speed
  413. generate_function_chart_data(data, 'limitspeed_LST')
  414. return {"speedLimit_LST": max_speed}
  415. def limitSpeedPastLimitSign_LST(data):
  416. ego_df = data.ego_data
  417. scenario_name = find_nested_name(data.function_config["function"])
  418. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  419. car_length = data.function_config["vehicle"]['CAR_LENGTH']
  420. ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist()
  421. ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
  422. data.speedLimit = limit_speed
  423. data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
  424. generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST')
  425. if len(ego_speed) == 0:
  426. return {"speedPastLimitSign_LST": -1}
  427. return {"speedPastLimitSign_LST": ego_speed[0]}
  428. def leastDistance_LST(data):
  429. ego_df = data.ego_data
  430. dist_row = ego_df[ego_df['v'] == 0]['relative_dist'].tolist()
  431. if len(dist_row) == 0:
  432. return {"leastDistance_LST": -1}
  433. else:
  434. min_dist = min(dist_row)
  435. return {"leastDistance_LST": min_dist}
  436. def launchTimeinStopLine_LST(data):
  437. ego_df = data.ego_data
  438. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  439. if len(simtime_row) == 0:
  440. return {"launchTimeinStopLine_LST": -1}
  441. else:
  442. delta_t = simtime_row[-1] - simtime_row[0]
  443. return {"launchTimeinStopLine_LST": delta_t}
  444. def launchTimewhenFollowingCar_LST(data):
  445. ego_df = data.ego_data
  446. t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
  447. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  448. if len(simtime_row) == 0:
  449. return {"launchTimewhenFollowingCar_LST": 0}
  450. else:
  451. time_interval = _is_segment_by_interval(simtime_row, t_interval)
  452. delta_t = []
  453. for t in time_interval:
  454. delta_t.append(t[-1] - t[0])
  455. return {"launchTimewhenFollowingCar_LST": max(delta_t)}
  456. def noStop_LST(data):
  457. ego_df_ini = data.ego_data
  458. min_time = ego_df_ini['simTime'].min() + 5
  459. max_time = ego_df_ini['simTime'].max() - 5
  460. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  461. speed = ego_df['v'].tolist()
  462. if (any(speed) == 0):
  463. return {"noStop_LST": -1}
  464. else:
  465. return {"noStop_LST": 1}
  466. def launchTimeinTrafficLight_LST(data):
  467. '''
  468. 待修改:
  469. 红灯的状态值:1
  470. 绿灯的状态值:0
  471. '''
  472. ego_df = data.ego_data
  473. simtime_of_redlight = ego_df[ego_df['stateMask'] == 0]['simTime']
  474. simtime_of_stop = ego_df[ego_df['v'] == 0]['simTime']
  475. if len(simtime_of_stop) or len(simtime_of_redlight):
  476. return {"timeInterval_LST": -1}
  477. simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist()
  478. if len(simtime_of_launch) == 0:
  479. return {"timeInterval_LST": -1}
  480. return {"timeInterval_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
  481. def crossJunctionToTargetLane_LST(data):
  482. ego_df = data.ego_data
  483. lane_in_leftturn = set(ego_df['lane_id'].tolist())
  484. scenario_name = find_nested_name(data.function_config["function"])
  485. target_lane_id = data.function_config["function"][scenario_name]["crossJunctionToTargetLane_LST"]['max']
  486. if target_lane_id not in lane_in_leftturn:
  487. return {"crossJunctionToTargetLane_LST": -1}
  488. else:
  489. return {"crossJunctionToTargetLane_LST": target_lane_id}
  490. def keepInLane_LST(data):
  491. ego_df = data.ego_data
  492. notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width']/2].tolist()
  493. if len(notkeepinlane):
  494. return {"keepInLane_LST": -1}
  495. return {"keepInLane_LST": 1}
  496. def leastLateralDistance_LST(data):
  497. ego_df = data.ego_data
  498. lane_width = ego_df[ego_df['x_relative_dist'] == 0]['lane_width']
  499. if lane_width.empty():
  500. return {"leastLateralDistance_LST": -1}
  501. else:
  502. y_relative_dist = ego_df[ego_df['x_relative_dist'] == 0]['y_relative_dist']
  503. if (y_relative_dist <= lane_width / 2).all():
  504. return {"leastLateralDistance_LST": 1}
  505. else:
  506. return {"leastLateralDistance_LST": -1}
  507. def waitTimeAtCrosswalkwithPedestrian_LST(data):
  508. ego_df = data.ego_data
  509. object_df = data.object_data
  510. data['in_crosswalk'] = []
  511. position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True)
  512. for cross_id in position_data['cross_id'].tolist():
  513. for posX, posY in object_df['posX', 'posY']:
  514. pedestrian_pos = (posX, posY)
  515. plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0]
  516. if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos):
  517. data[data['playerId'] == 2]['in_crosswalk'] = 1
  518. else:
  519. data[data['playerId'] == 2]['in_crosswalk'] = 0
  520. car_stop_time = ego_df[ego_df['v'] == 0]['simTime']
  521. pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime']
  522. car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist()
  523. return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len(
  524. car_wait_pedestrian) > 0 else 0}
  525. def launchTimewhenPedestrianLeave_LST(data):
  526. ego_df = data.ego_data
  527. car_stop_time = ego_df[ego_df['v'] == 0]["simTime"]
  528. if car_stop_time.empty():
  529. return {"launchTimewhenPedestrianLeave_LST": -1}
  530. else:
  531. lane_width = ego_df[ego_df['v'] == 0]['lane_width'].tolist()[0]
  532. car_to_pedestrain = ego_df[ego_df['y_relative_dist'] <= lane_width / 2]["simTime"]
  533. legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist()
  534. return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
  535. def noCollision_LST(data):
  536. ego_df = data.ego_data
  537. if ego_df['relative_dist'].any() == 0:
  538. return {"noCollision_LST": -1}
  539. else:
  540. return {"noCollision_LST": 1}
  541. def noReverse_LST(data):
  542. ego_df = data.ego_data
  543. if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
  544. return {"noReverse_LST": -1}
  545. else:
  546. return {"noReverse_LST": 1}
  547. def turnAround_LST(data):
  548. ego_df = data.ego_data
  549. if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
  550. ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
  551. return {"turnAround_LST": 1}
  552. else:
  553. return {"turnAround_LST": -1}
  554. def laneOffset_LST(data):
  555. car_width = data.function_config['vehicle']['CAR_WIDTH']
  556. ego_df_ini = data.ego_data
  557. min_time = ego_df_ini['simTime'].min() + 5
  558. max_time = ego_df_ini['simTime'].max() - 5
  559. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  560. laneoffset = ego_df['y_relative_dist'] - car_width / 2
  561. return {"laneOffset_LST": max(laneoffset)}
  562. def maxLongitudeDist_LST(data):
  563. ego_df = data.ego_data
  564. longitude_dist = abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist())
  565. data.longitude_dist = min(abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist()))
  566. stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist()
  567. data.stop_time = min(stop_time)
  568. if len(longitude_dist) == 0:
  569. return {"maxLongitudeDist_LST": -1}
  570. generate_function_chart_data(data, 'maxLongitudeDist_LST')
  571. return {"maxLongDist_LST": min(longitude_dist)}
  572. def noEmergencyBraking_LST(data):
  573. ego_df = data.ego_data
  574. ego_df['ip_dec'] = ego_df['v'].apply(
  575. get_interpolation, point1=[18, -5], point2=[72, -3.5])
  576. ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply(
  577. lambda x: 1 if x < 0 else 0)
  578. if sum(ego_df['slam_brake']) == 0:
  579. return {"noEmergencyBraking_LST": 1}
  580. else:
  581. return {"noEmergencyBraking_LST": -1}
  582. def rightWarningSignal_PGVIL(data_processed) -> dict:
  583. """判断是否发出正确预警信号"""
  584. ego_df = data_processed.ego_data
  585. scenario_name = find_nested_name(data_processed.function_config["function"])
  586. correctwarning = scenario_sign_dict[scenario_name]
  587. if correctwarning is None:
  588. print("无法获取正确的预警信号标志位!")
  589. return None
  590. # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
  591. warning_rows = ego_df[
  592. (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
  593. ]
  594. if warning_rows.empty:
  595. return {"rightWarningSignal_PGVIL": -1}
  596. else:
  597. return {"rightWarningSignal_PGVIL": 1}
  598. def latestWarningDistance_PGVIL(data_processed) -> dict:
  599. """预警距离计算流水线"""
  600. ego_df = data_processed.ego_data
  601. obj_df = data_processed.object_df
  602. warning_data = get_first_warning(data_processed)
  603. if warning_data is None:
  604. return {"latestWarningDistance_PGVIL": 0.0}
  605. ego, obj = extract_ego_obj(warning_data)
  606. distances = calculate_distance_PGVIL(
  607. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  608. )
  609. # 将计算结果保存到data对象中,供图表生成使用
  610. data_processed.warning_dist = distances
  611. data_processed.warning_time = ego['simTime'].tolist()
  612. if distances.size == 0:
  613. print("没有找到数据!")
  614. return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  615. generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL')
  616. return {"latestWarningDistance_PGVIL": float(np.min(distances))}
  617. def latestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  618. """TTC计算流水线"""
  619. ego_df = data_processed.ego_data
  620. obj_df = data_processed.object_df
  621. warning_data = get_first_warning(data_processed)
  622. if warning_data is None:
  623. return {"latestWarningDistance_TTC_PGVIL": 0.0}
  624. ego, obj = extract_ego_obj(warning_data)
  625. # 向量化计算
  626. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  627. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  628. obj_pos = obj[["posX", "posY"]].values
  629. obj_speed = obj[["speedX", "speedY"]].values
  630. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  631. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  632. data_processed.warning_dist = distances
  633. data_processed.warning_speed = rel_speeds
  634. data_processed.warning_time = ego['simTime'].tolist()
  635. with np.errstate(divide="ignore", invalid="ignore"):
  636. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  637. if ttc.size == 0:
  638. print("没有找到数据!")
  639. return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  640. data_processed.ttc = ttc
  641. generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL')
  642. return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  643. def earliestWarningDistance_PGVIL(data_processed) -> dict:
  644. """预警距离计算流水线"""
  645. ego_df = data_processed.ego_data
  646. obj_df = data_processed.object_df
  647. warning_data = get_first_warning(data_processed)
  648. if warning_data is None:
  649. return {"earliestWarningDistance_PGVIL": 0}
  650. ego, obj = extract_ego_obj(warning_data)
  651. distances = calculate_distance_PGVIL(
  652. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  653. )
  654. # 将计算结果保存到data对象中,供图表生成使用
  655. data_processed.warning_dist = distances
  656. data_processed.warning_time = ego['simTime'].tolist()
  657. if distances.size == 0:
  658. print("没有找到数据!")
  659. return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  660. generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL')
  661. return {"earliestWarningDistance": float(np.min(distances))}
  662. def earliestWarningDistance_TTC_PGVIL(data_processed) -> dict:
  663. """TTC计算流水线"""
  664. ego_df = data_processed.ego_data
  665. obj_df = data_processed.object_df
  666. warning_data = get_first_warning(data_processed)
  667. if warning_data is None:
  668. return {"earliestWarningDistance_TTC_PGVIL": 0.0}
  669. ego, obj = extract_ego_obj(warning_data)
  670. # 向量化计算
  671. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  672. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  673. obj_pos = obj[["posX", "posY"]].values
  674. obj_speed = obj[["speedX", "speedY"]].values
  675. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  676. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  677. data_processed.warning_dist = distances
  678. data_processed.warning_speed = rel_speeds
  679. data_processed.warning_time = ego['simTime'].tolist()
  680. with np.errstate(divide="ignore", invalid="ignore"):
  681. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  682. if ttc.size == 0:
  683. print("没有找到数据!")
  684. return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  685. data_processed.ttc = ttc
  686. generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL')
  687. return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  688. # def delayOfEmergencyBrakeWarning(data_processed) -> dict:
  689. # #预警时机相对背景车辆减速度达到-4m/s2后的时延
  690. # ego_df = data_processed.ego_data
  691. # obj_df = data_processed.object_df
  692. # warning_data = get_first_warning(data_processed)
  693. # if warning_data is None:
  694. # return {"delayOfEmergencyBrakeWarning": -1}
  695. # try:
  696. # ego, obj = extract_ego_obj(warning_data)
  697. # # 向量化计算
  698. # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]])
  699. # # 计算背景车辆减速度
  700. # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0]
  701. # simtime_freq = 1 / simtime_gap#每秒采样频率
  702. # # simtime_freq为一个时间窗,找出时间窗内的最大减速度
  703. # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长
  704. # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量
  705. # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
  706. # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比
  707. # max_deceleration = np.max(obj_deceleration)
  708. # if max_deceleration < -4:
  709. # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)]
  710. # max_deceleration_time = max_deceleration_times.iloc[0]
  711. # delay_time = ego["simTime"] - max_deceleration_time
  712. # return {"delayOfEmergencyBrakeWarning": float(delay_time)}
  713. # else:
  714. # print("没有达到预警减速度阈值:-4m/s^2")
  715. # return {"delayOfEmergencyBrakeWarning": -1}
  716. def warningDelayTime_PGVIL(data_processed) -> dict:
  717. """车端接收到预警到HMI发出预警的时延"""
  718. ego_df = data_processed.ego_data
  719. # #打印ego_df的列名
  720. # print(ego_df.columns.tolist())
  721. warning_data = get_first_warning(data_processed)
  722. if warning_data is None:
  723. return {"warningDelayTime_PGVIL": -1}
  724. try:
  725. ego, obj = extract_ego_obj(warning_data)
  726. # 找到event_Type不为空,且playerID为1的行
  727. rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
  728. first_time = rosbag_warning_rows["simTime"].iloc[0]
  729. warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
  730. delay_time = warning_time - first_time
  731. return {"warningDelayTime_PGVIL": float(delay_time)}
  732. except Exception as e:
  733. print(f"计算预警时延时发生错误: {e}")
  734. return {"warningDelayTime_PGVIL": -1}
  735. def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
  736. """
  737. 计算主车后轴中心点到停止线的距离
  738. :return 距离
  739. """
  740. distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
  741. # 计算停止线的方向向量
  742. line_vector = np.array(
  743. [
  744. stop_line_points[1][0] - stop_line_points[0][0],
  745. stop_line_points[1][1] - stop_line_points[0][1],
  746. ]
  747. )
  748. direction_vector_norm = np.linalg.norm(line_vector)
  749. direction_vector_unit = (
  750. line_vector / direction_vector_norm
  751. if direction_vector_norm != 0
  752. else np.array([0, 0])
  753. )
  754. # 计算主车后轴中心点到停止线投影的坐标(垂足)
  755. projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
  756. perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
  757. # 计算主车后轴中心点到垂足的距离
  758. distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
  759. carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
  760. return carhead_distance_to_foot
  761. def ifCrossingRedLight_PGVIL(data_processed) -> dict:
  762. # 判断车辆是否闯红灯
  763. stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
  764. X_OFFSET = 258109.4239876
  765. Y_OFFSET = 4149969.964821
  766. stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
  767. ego_df = data_processed.ego_data
  768. prev_distance = float("inf") # 初始化为正无穷大
  769. """
  770. traffic_light_status
  771. 0x100000为绿灯,1048576
  772. 0x1000000为黄灯,16777216
  773. 0x10000000为红灯,268435456
  774. """
  775. red_light_violation = False
  776. for index, ego in ego_df.iterrows():
  777. car_point = (ego["posX"], ego["posY"])
  778. stateMask = ego["stateMask"]
  779. simTime = ego["simTime"]
  780. distance_to_stopline = get_car_to_stop_line_distance(
  781. ego, car_point, stop_line_points
  782. )
  783. # 主车车头跨越停止线时非绿灯,返回-1,闯红灯
  784. if prev_distance > 0 and distance_to_stopline < 0:
  785. if stateMask is not None and stateMask != 1048576:
  786. red_light_violation = True
  787. break
  788. prev_distance = distance_to_stopline
  789. if red_light_violation:
  790. return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯
  791. else:
  792. return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯
  793. # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
  794. # #在绿波车速引导期间是否发生停车
  795. # def mindisStopline(data_processed) -> dict:
  796. # """
  797. # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间
  798. # """
  799. # ego_df = data_processed.ego_data
  800. # obj_df = data_processed.object_df
  801. # stop_giveway_simtime = ego_df[
  802. # ego_df["sign_type1"] == 32 |
  803. # ego_df["stopline_type"] == 3
  804. # ]["simTime"]
  805. # stop_giveway_data = ego_df[
  806. # ego_df["sign_type1"] == 32 |
  807. # ego_df["stopline_type"] == 3
  808. # ]["simTime"]
  809. # if stop_giveway_simtime.empty:
  810. # print("没有停车让行标志/标线")
  811. # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
  812. # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
  813. # distance_to_stoplines = []
  814. # for _,row in ego_data.iterrows():
  815. # ego_pos = np.array([row["posX"], row["posY"], row["posH"]])
  816. # stop_line_points = [
  817. # [row["stopline_x1"], row["stopline_y1"]],
  818. # [row["stopline_x2"], row["stopline_y2"]],
  819. # ]
  820. # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
  821. # distance_to_stoplines.append(distance_to_stopline)
  822. # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
  823. # return {"mindisStopline": mindisStopline}
  824. def limitSpeed_PGVIL(data):
  825. ego_df = data.ego_data
  826. max_speed = max(ego_df["v"])
  827. if len(ego_df["v"]) == 0:
  828. return {"speedLimit_PGVIL": -1}
  829. else:
  830. return {"speedLimit_PGVIL": max_speed}
  831. def leastDistance_PGVIL(data):
  832. exclude_seconds = 2.0
  833. ego_df = data.ego_data
  834. max_sim_time = ego_df["simTime"].max()
  835. threshold_time = max_sim_time - exclude_seconds
  836. for index, ego in ego_df.iterrows():
  837. if ego["simTime"] >= threshold_time:
  838. continue
  839. if "v" in ego and ego["v"] == 0:
  840. car_point = (ego["posX"], ego["posY"])
  841. p1 = (ego["stopline_x1"], ego["stopline_y1"])
  842. p2 = (ego["stopline_x2"], ego["stopline_y2"])
  843. stop_line_points = np.array([p1, p2], dtype=float)
  844. distance_to_stopline = get_car_to_stop_line_distance(
  845. ego, car_point, stop_line_points
  846. )
  847. return {"leastDistance_PGVIL": distance_to_stopline}
  848. return {"leastDistance_PGVIL": -1}
  849. def launchTimeinStopLine_PGVIL(data):
  850. ego_df = data.ego_data
  851. in_stop = False
  852. start_time = None
  853. last_low_time = None
  854. max_duration = 0.0
  855. v_thresh = 0.01
  856. for _, row in ego_df.iterrows():
  857. simt = row["simTime"]
  858. v = row.get("v", None)
  859. if v is None or np.isnan(v) or abs(v) > v_thresh:
  860. if in_stop:
  861. duration = last_low_time - start_time
  862. if duration > max_duration:
  863. max_duration = duration
  864. in_stop = False
  865. else:
  866. if not in_stop:
  867. start_time = simt
  868. in_stop = True
  869. # 更新最近一次“停车时刻”的 simTime
  870. last_low_time = simt
  871. if in_stop:
  872. duration = last_low_time - start_time
  873. if duration > max_duration:
  874. max_duration = duration
  875. if max_duration <= 0:
  876. return {"launchTimeinStopLine_PGVIL": -1}
  877. else:
  878. return {"launchTimeinStopLine_PGVIL": float(max_duration)}
  879. def launchTimeinTrafficLight_PGVIL(data):
  880. GREEN = 0x100000
  881. RED = 0x10000000
  882. ego_df = data.ego_data
  883. # 找到第一次红灯 to 绿灯的切换时刻
  884. is_transition = (ego_df["stateMask"] == GREEN) & (
  885. ego_df["stateMask"].shift(1) == RED
  886. )
  887. transition_times = ego_df.loc[is_transition, "simTime"]
  888. if transition_times.empty:
  889. return {"timeInterval_PGVIL": -1}
  890. time_red2green = transition_times.iloc[0]
  891. car_move_times = ego_df.loc[
  892. (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime"
  893. ]
  894. if car_move_times.empty:
  895. return {"timeInterval_PGVIL": -1}
  896. time_move = move_times.iloc[0]
  897. return {"timeInterval_PGVIL": time_move - time_red2green}
  898. def crossJunctionToTargetLane_PGVIL(data):
  899. ego_df = data.ego_data
  900. lane_ids = set(ego_df["lane_id"].dropna())
  901. try:
  902. scenario_name = find_nested_name(data.function_config["function"])
  903. target_lane_id = data.function_config["function"][scenario_name][
  904. "crossJunctionToTargetLane_PGVIL"
  905. ]["max"]
  906. except KeyError:
  907. raise ValueError("请在配置文件中指定目标车道ID!")
  908. result = target_lane_id if target_lane_id in lane_ids else -1
  909. return {"crossJunctionToTargetLane_PGVIL": result}
  910. def noStop_PGVIL(data):
  911. exclude_end_seconds = 5.0
  912. exclude_start_seconds = 5.0
  913. ego_df = data.ego_data
  914. max_sim_time = ego_df["simTime"].max()
  915. start_threshold = min_sim_time + exclude_start_seconds
  916. end_threshold = max_sim_time - exclude_end_seconds
  917. filtered_df = ego_df[
  918. (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
  919. ]
  920. if (filtered_df["v"] == 0).any():
  921. return {"noStop_PGVIL": -1}
  922. else:
  923. return {"noStop_PGVIL": 1}
  924. def noEmergencyBraking_PGVIL(data):
  925. ego_df = data.ego_data
  926. ego_df["ip_dec"] = ego_df["v"].apply(
  927. get_interpolation, point1=[18, -5], point2=[72, -3.5]
  928. )
  929. ego_df["slam_brake"] = (ego_df["accleX"] - ego_df["ip_dec"]).apply(
  930. lambda x: 1 if x < 0 else 0
  931. )
  932. if sum(ego_df["slam_brake"]) == 0:
  933. return {"noEmergencyBraking_PGVIL": 1}
  934. else:
  935. return {"noEmergencyBraking_PGVIL": -1}
  936. def noReverse_PGVIL(data):
  937. ego_df = data.ego_data.copy()
  938. heading_x = np.cos(ego_df["posH"])
  939. reverse_flag = (ego_df["speedX"] * heading_x) < 0
  940. if reverse_flag.any():
  941. return {"noReverse_PGVIL": -1}
  942. else:
  943. return {"noReverse_PGVIL": 1}
  944. def laneOffset_PGVIL(data):
  945. car_width = data.function_config["vehicle"]["CAR_WIDTH"]
  946. ego_df = data.ego_data
  947. is_zero = ego_df["v"] == 0
  948. if not is_zero.any():
  949. # 全程未停车
  950. return {"laneOffset_PGVIL": None}
  951. groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
  952. last_zero_group = groups[is_zero].iloc[-1]
  953. last_stop = ego_df[groups == last_zero_group]
  954. # 距离右侧车道线
  955. edge_dist = (last_stop["lane_width"] / 2 + last_stop["laneOffset"]) - (
  956. car_width / 2
  957. )
  958. return {"laneOffset_PGVIL": edge_dist.max()}
  959. def maxLongitudelDistance_PGVIL(data):
  960. scenario_name = find_nested_name(data.function_config["function"])
  961. stopX_pos = data.function_config["function"][scenario_name][
  962. "maxLongitudelDistance_PGVIL"
  963. ]["max"]
  964. stopY_pos = data.function_config["function"][scenario_name][
  965. "maxLongitudelDistance_PGVIL"
  966. ]["min"]
  967. def keepInLane_PGVIL(data):
  968. ego_df = data.ego_data.copy()
  969. ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
  970. if ego_df.empty:
  971. return {"keepInLane_PGVIL": -1}
  972. threshold = ego_df["lane_width"] / 2
  973. if (ego_df["laneOffset"].abs() > threshold).any():
  974. return {"keepInLane_PGVIL": -1}
  975. else:
  976. return {"keepInLane_PGVIL": 1}
  977. def launchTimewhenPedestrianLeave_PGVIL(data):
  978. ego_df = data.ego_data
  979. ped_df = data.object_data.loc[
  980. data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
  981. ]
  982. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  983. if merged.empty:
  984. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  985. yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度
  986. dx = merged["posX_ped"] - merged["posX"]
  987. dy = merged["posY_ped"] - merged["posY"]
  988. merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
  989. merged["threshold"] = merged["lane_width"] / 2
  990. entry = merged[merged["lateral_dist"] <= merged["threshold"]]
  991. if entry.empty:
  992. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  993. t_entry = entry["simTime"].iloc[0]
  994. after_e = merged[merged["simTime"] > t_entry]
  995. leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
  996. if leave.empty:
  997. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  998. t_leave = leave["simTime"].iloc[0]
  999. ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
  1000. stops = ego_df[(ego_df["simTime"] > t_leave) & (ego_df["v"] == 0)].copy()
  1001. if stops.empty:
  1002. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1003. front_overPos = ego_df["dimX"] / 2 + ego_df["offX"]
  1004. yaw_stop = np.deg2rad(stops["posH"])
  1005. stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
  1006. stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
  1007. dx_s = stops["front_x"] - ped_x
  1008. dy_s = stops["front_y"] - ped_y
  1009. proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
  1010. valid = stops[proj <= 0]
  1011. t_stop = valid["simTime"].iloc[0]
  1012. launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"]
  1013. t_launch = launches.iloc[0]
  1014. return {"launchTimewhenPedestrianLeave_PGVIL": t_launch - t_stop}
  1015. def waitTimeAtCrosswalkwithPedestrian_PGVIL(data):
  1016. ego_df = data.ego_data
  1017. ped_df = data.object_data.loc[
  1018. data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
  1019. ]
  1020. first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"]
  1021. if first_launch.empty:
  1022. t0_launch = 0.0
  1023. else:
  1024. t0_launch = first_launch.iloc[0]
  1025. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1026. merged["front_offset"] = merged["dimX"] / 2 + merged["offX"]
  1027. yaw = np.deg2rad(merged["posH"])
  1028. merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"]
  1029. merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"]
  1030. dx = merged["posX_ped"] - merged["front_x"]
  1031. dy = merged["posY_ped"] - merged["front_y"]
  1032. valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0
  1033. stops_df = merged.loc[
  1034. (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
  1035. ].sort_values("simTime")
  1036. if stops_df.empty:
  1037. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
  1038. wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
  1039. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": wait_time}
  1040. def noCollision_PGVIL(data):
  1041. ego = data.ego_data[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
  1042. tar = data.object_data.loc[data.object_data["playerId"] == 2,
  1043. ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
  1044. df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
  1045. df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
  1046. df["posH_rad"] = np.deg2rad(df["posH"])
  1047. df["collision"] = df.apply(lambda row: funcIsCollision(
  1048. row["dimX"],row["dimY"],row["offX"],row["offY"],row["posX"],row["posY"],
  1049. row["posH_rad"],row["dimX_tar"],row["dimY_tar"],row["offX_tar"],row["offY_tar"],
  1050. row["posX_tar"],row["posY_tar"],row["posH_tar_rad"],),axis=1,)
  1051. if df["collision"].any():
  1052. return {"noCollision_PGVIL": -1}
  1053. else:
  1054. return {"noCollision_PGVIL": 1}
  1055. def leastLateralDistance_PGVIL(data):
  1056. ego_df = data.ego_data
  1057. cones = data.object_data.loc[data.object_data['playerId'] != 1,
  1058. ['simTime','playerId','posX','posY']].copy()
  1059. df = ego.merge(cones, on='simTime', how='inner', suffixes=('','_cone'))
  1060. yaw = np.deg2rad(df['posH'])
  1061. x_c = df['posX'] + df['offX'] * np.cos(yaw)
  1062. y_c = df['posY'] + df['offX'] * np.sin(yaw)
  1063. dx = df['posX'] - x_c
  1064. dy = df['posY'] - y_c
  1065. dx = df['posX_cone'] - x_c
  1066. dy = df['posY_cone'] - y_c
  1067. local_x = np.cos(yaw) * dx + np.sin(yaw) * dy
  1068. local_y = -np.sin(yaw) * dx + np.cos(yaw) * dy
  1069. half_length = df['dimX'] / 2
  1070. half_width = df['dimY'] / 2
  1071. inside = (np.abs(local_x) <= half_length) & (np.abs(local_y) <= half_width)
  1072. collisions = df.loc[inside, ['simTime','playerId']]
  1073. if collisions.empty:
  1074. return {"noConeRectCollision_PGVIL": 1}
  1075. else:
  1076. collision_times = collisions['simTime'].unique().tolist()
  1077. collision_ids = collisions['playerId'].unique().tolist()
  1078. return {"noConeRectCollision_PGVIL": -1}
  1079. class FunctionRegistry:
  1080. """动态函数注册器(支持参数验证)"""
  1081. def __init__(self, data_processed):
  1082. self.logger = LogManager().get_logger() # 获取全局日志实例
  1083. self.data = data_processed
  1084. # 检查function_config是否为空
  1085. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1086. self.logger.warning("功能配置为空,跳过功能指标计算")
  1087. self.fun_config = {}
  1088. self.level_3_merics = []
  1089. self._registry = {}
  1090. return
  1091. self.fun_config = data_processed.function_config.get("function", {})
  1092. self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
  1093. self._registry: Dict[str, Callable] = {}
  1094. self._registry = self._build_registry()
  1095. def _extract_level_3_metrics(self, config_node: dict) -> list:
  1096. """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
  1097. metrics = []
  1098. def _recurse(node):
  1099. if isinstance(node, dict):
  1100. if "name" in node and not any(
  1101. isinstance(v, dict) for v in node.values()
  1102. ):
  1103. metrics.append(node["name"])
  1104. for v in node.values():
  1105. _recurse(v)
  1106. _recurse(config_node)
  1107. self.logger.info(f"评比的功能指标列表:{metrics}")
  1108. return metrics
  1109. def _build_registry(self) -> dict:
  1110. """自动注册指标函数(防御性编程)"""
  1111. registry = {}
  1112. for func_name in self.level_3_merics:
  1113. try:
  1114. registry[func_name] = globals()[func_name]
  1115. except KeyError:
  1116. print(f"未实现指标函数: {func_name}")
  1117. self.logger.error(f"未实现指标函数: {func_name}")
  1118. return registry
  1119. def batch_execute(self) -> dict:
  1120. """批量执行指标计算(带熔断机制)"""
  1121. results = {}
  1122. # 如果配置为空或没有注册的指标,直接返回空结果
  1123. if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry:
  1124. self.logger.info("功能配置为空或无注册指标,返回空结果")
  1125. return results
  1126. for name, func in self._registry.items():
  1127. try:
  1128. result = func(self.data) # 统一传递数据上下文
  1129. results.update(result)
  1130. except Exception as e:
  1131. print(f"{name} 执行失败: {str(e)}")
  1132. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  1133. results[name] = None
  1134. self.logger.info(f"功能指标计算结果:{results}")
  1135. return results
  1136. class FunctionManager:
  1137. """管理功能指标计算的类"""
  1138. def __init__(self, data_processed):
  1139. self.data = data_processed
  1140. self.logger = LogManager().get_logger()
  1141. # 检查function_config是否为空
  1142. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1143. self.logger.warning("功能配置为空,跳过功能指标计算初始化")
  1144. self.function = None
  1145. else:
  1146. self.function = FunctionRegistry(self.data)
  1147. def report_statistic(self):
  1148. """
  1149. 计算并报告功能指标结果。
  1150. :return: 评估结果
  1151. """
  1152. # 如果function为None,直接返回空字典
  1153. if self.function is None:
  1154. self.logger.info("功能指标管理器未初始化,返回空结果")
  1155. return {}
  1156. function_result = self.function.batch_execute()
  1157. print("\n[功能性表现及评价结果]")
  1158. return function_result
  1159. # self.logger.info(f'Function Result: {function_result}')
  1160. # 使用示例
  1161. if __name__ == "__main__":
  1162. pass
  1163. # print("\n[功能类表现及得分情况]")