function.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2025 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2025/01/5
  11. @Last Modified: 2025/01/5
  12. @Summary: Function Metrics Calculation
  13. """
  14. import sys
  15. from pathlib import Path
  16. # 添加项目根目录到系统路径
  17. root_path = Path(__file__).resolve().parent.parent
  18. sys.path.append(str(root_path))
  19. print(root_path)
  20. from modules.lib.score import Score
  21. from modules.lib.log_manager import LogManager
  22. import numpy as np
  23. from typing import Dict, Tuple, Optional, Callable, Any
  24. import pandas as pd
  25. import yaml
  26. from modules.lib.chart_generator import generate_function_chart_data
  27. from shapely.geometry import Point, Polygon
  28. from modules.lib.common import get_interpolation
  29. import pandas as pd
  30. import yaml
  31. import math
  32. # ----------------------
  33. # 基础工具函数 (Pure functions)
  34. # ----------------------
  35. scenario_sign_dict = {
  36. "LeftTurnAssist": 206,
  37. "HazardousLocationW": 207,
  38. "RedLightViolationW": 208,
  39. "AbnormalVehicleW": 209,
  40. "NsightVulnerableRoadUserCollisionW": 210,
  41. "LitterW": 211,
  42. "ForwardCollisionW": 212,
  43. "VisibilityW": 213,
  44. "EmergencyBrakeW": 214,
  45. "IntersectionCollisionW": 215,
  46. "BlindSpotW": 216,
  47. "DoNotPassW": 217,
  48. "ControlLossW": 218,
  49. "FrontTrafficJamW": 219,
  50. "EmergencyVehicleW": 220,
  51. "CooperativeVehicleMerge": 221,
  52. "CooperativeLaneChange": 223,
  53. "VulnerableRoadUserCollisionW": 224,
  54. "CooperativeIntersectionPassing": 225,
  55. "RampMerge": 226,
  56. "DrivingLaneRecommendation": 227,
  57. "TrafficJamW": 228,
  58. "DynamicSpeedLimitingInformation": 229,
  59. "EmergencyVehiclesPriority": 230,
  60. "RemoteSupervision": 231,
  61. "SignalLightReminder": 232,
  62. "OtherVehicleRedLightViolationW": 233,
  63. "GreenLightOptimalSpeedAdvisory": 234,
  64. }
  65. def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
  66. polygon = Polygon(polygon)
  67. point = Point(test_point)
  68. return polygon.contains(point)
  69. def _is_segment_by_interval(time_list, expected_step) -> list:
  70. """
  71. 根据时间戳之间的间隔进行分段。
  72. 参数:
  73. time_list (list): 时间戳列表。
  74. expected_step (float): 预期的固定步长。
  75. 返回:
  76. list: 分段后的时间戳列表,每个元素是一个子列表。
  77. """
  78. if not time_list:
  79. return []
  80. segments = []
  81. current_segment = [time_list[0]]
  82. for i in range(1, len(time_list)):
  83. actual_step = time_list[i] - time_list[i - 1]
  84. if actual_step != expected_step:
  85. # 如果间隔不符合预期,则开始一个新的段
  86. segments.append(current_segment)
  87. current_segment = [time_list[i]]
  88. else:
  89. # 否则,将当前时间戳添加到当前段中
  90. current_segment.append(time_list[i])
  91. # 添加最后一个段
  92. if current_segment:
  93. segments.append(current_segment)
  94. return segments
  95. # 寻找二级指标的名称
  96. def find_nested_name(data):
  97. """
  98. 查找字典中嵌套的name结构。
  99. :param data: 要搜索的字典
  100. :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None
  101. """
  102. if isinstance(data, dict):
  103. for key, value in data.items():
  104. if isinstance(value, dict) and 'name' in value:
  105. return value['name']
  106. # 递归查找嵌套字典
  107. result = find_nested_name(value)
  108. if result is not None:
  109. return result
  110. elif isinstance(data, list):
  111. for item in data:
  112. result = find_nested_name(item)
  113. if result is not None:
  114. return result
  115. return None
  116. def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
  117. """向量化距离计算"""
  118. return np.linalg.norm(ego_pos - obj_pos, axis=1)
  119. def calculate_relative_speed_PGVIL(
  120. ego_speed: np.ndarray, obj_speed: np.ndarray
  121. ) -> np.ndarray:
  122. """向量化相对速度计算"""
  123. return np.linalg.norm(ego_speed - obj_speed, axis=1)
  124. def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  125. """向量化距离计算"""
  126. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  127. return dist
  128. def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  129. """向量化相对速度计算"""
  130. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  131. def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
  132. """数据提取函数"""
  133. ego = data[data["playerId"] == 1].iloc[0]
  134. obj = data[data["playerId"] != 1]
  135. return ego, obj
  136. def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
  137. """带缓存的预警数据获取"""
  138. ego_df = data_processed.ego_data
  139. obj_df = data_processed.object_df
  140. scenario_name = find_nested_name(data_processed.function_config["function"])
  141. correctwarning = scenario_sign_dict.get(scenario_name)
  142. if correctwarning is None:
  143. print("无法获取正确的预警信号标志位!")
  144. return None
  145. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  146. warning_times = warning_rows['simTime']
  147. if warning_times.empty:
  148. print("没有找到预警数据!")
  149. return None
  150. first_time = warning_times.iloc[0]
  151. return obj_df[obj_df['simTime'] == first_time]
  152. def getAxis(heading):
  153. AxisL = [0, 0]
  154. AxisW = [0, 0]
  155. AxisL[0] = math.cos(heading)
  156. AxisL[1] = math.sin(heading)
  157. AxisW[0] = -math.sin(heading)
  158. AxisW[1] = math.cos(heading)
  159. return AxisL, AxisW
  160. def getDotMultiply(firstAxis, secondAxis):
  161. dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1])
  162. return dotMultiply
  163. def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
  164. projAxisL = getDotMultiply(AxisL, baseAxis)
  165. projAxisW = getDotMultiply(AxisW, baseAxis)
  166. projectionRadius = halfLength * projAxisL + halfWidth * projAxisW
  167. return projectionRadius
  168. def isCollision(
  169. firstAxisL,
  170. firstAxisW,
  171. firstHalfLength,
  172. firstHalfWidth,
  173. secondAxisL,
  174. secondAxisW,
  175. secondHalfLength,
  176. secondHalfWidth,
  177. disVector,
  178. ):
  179. isCollision = True
  180. axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
  181. addProjectionRadius = 0
  182. projectionDisVecotr = 0
  183. for axis in axes:
  184. addProjectionRadius = getProjectionRadius(
  185. firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth
  186. ) + getProjectionRadius(
  187. secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth
  188. )
  189. projectionDisVecotr = getDotMultiply(disVector, axis)
  190. if projectionDisVecotr > addProjectionRadius:
  191. isCollision = False
  192. break
  193. return isCollision
  194. def funcIsCollision(
  195. firstDimX,
  196. firstDimY,
  197. firstOffX,
  198. firstOffY,
  199. firstX,
  200. firstY,
  201. firstHeading,
  202. secondDimX,
  203. secondDimY,
  204. secondOffX,
  205. secondOffY,
  206. secondX,
  207. secondY,
  208. secondHeading,
  209. ):
  210. firstAxisL = getAxis(firstHeading)[0]
  211. firstAxisW = getAxis(firstHeading)[1]
  212. secondAxisL = getAxis(secondHeading)[0]
  213. secondAxisW = getAxis(secondHeading)[1]
  214. firstHalfLength = 0.5 * firstDimX
  215. firstHalfWidth = 0.5 * firstDimY
  216. secondHalfLength = 0.5 * secondDimX
  217. secondHalfWidth = 0.5 * secondDimY
  218. firstCenter = [0, 0]
  219. secondCenter = [0, 0]
  220. disVector = [0, 0]
  221. firstCenter[0] = firstX + firstOffX * math.cos(firstHeading)
  222. firstCenter[1] = firstY + firstOffX * math.sin(firstHeading)
  223. secondCenter[0] = secondX + secondOffX * math.cos(secondHeading)
  224. secondCenter[1] = secondY + secondOffX * math.sin(secondHeading)
  225. disVector[0] = secondCenter[0] - firstCenter[0]
  226. disVector[1] = secondCenter[1] - firstCenter[1]
  227. varIsCollision = isCollision(
  228. firstAxisL,
  229. firstAxisW,
  230. firstHalfLength,
  231. firstHalfWidth,
  232. secondAxisL,
  233. secondAxisW,
  234. secondHalfLength,
  235. secondHalfWidth,
  236. disVector,
  237. )
  238. return varIsCollision
  239. # ----------------------
  240. # 核心计算功能函数
  241. # ----------------------
  242. def latestWarningDistance_LST(data, plot_path) -> dict:
  243. """预警距离计算流水线"""
  244. scenario_name = find_nested_name(data.function_config["function"])
  245. value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
  246. correctwarning = scenario_sign_dict[scenario_name]
  247. ego_df = data.ego_data
  248. warning_dist = calculate_distance(ego_df, correctwarning)
  249. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  250. # 将计算结果保存到data对象中,供图表生成使用
  251. data.warning_dist = warning_dist
  252. data.warning_speed = warning_speed
  253. data.correctwarning = correctwarning
  254. if warning_dist.empty:
  255. return {"latestWarningDistance_LST": 0.0}
  256. # 生成图表数据
  257. generate_function_chart_data(data, 'latestWarningDistance_LST', plot_path)
  258. return {"latestWarningDistance_LST": float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else value}
  259. def earliestWarningDistance_LST(data, plot_path) -> dict:
  260. """预警距离计算流水线"""
  261. scenario_name = find_nested_name(data.function_config["function"])
  262. value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
  263. correctwarning = scenario_sign_dict[scenario_name]
  264. ego_df = data.ego_data
  265. warning_dist = calculate_distance(ego_df, correctwarning)
  266. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  267. # 将计算结果保存到data对象中,供图表生成使用
  268. data.warning_dist = warning_dist
  269. data.warning_speed = warning_speed
  270. data.correctwarning = correctwarning
  271. if len(warning_dist) == 0:
  272. return {"earliestWarningDistance_LST": 0.0}
  273. # 生成图表数据
  274. generate_function_chart_data(data, 'earliestWarningDistance_LST', plot_path)
  275. return {"earliestWarningDistance_LST": float(warning_dist.iloc[0]) if len(warning_dist) > 0 else value}
  276. def latestWarningDistance_TTC_LST(data, plot_path) -> dict:
  277. """TTC计算流水线"""
  278. scenario_name = find_nested_name(data.function_config["function"])
  279. value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
  280. correctwarning = scenario_sign_dict[scenario_name]
  281. ego_df = data.ego_data
  282. warning_dist = calculate_distance(ego_df, correctwarning)
  283. if warning_dist.empty:
  284. return {"latestWarningDistance_TTC_LST": 0.0}
  285. # 将correctwarning保存到data对象中,供图表生成使用
  286. data.correctwarning = correctwarning
  287. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  288. with np.errstate(divide='ignore', invalid='ignore'):
  289. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  290. # 处理无效的TTC值
  291. for i in range(len(ttc)):
  292. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  293. data.warning_dist = warning_dist
  294. data.warning_speed = warning_speed
  295. data.ttc = ttc
  296. # 生成图表数据
  297. # from modules.lib.chart_generator import generate_function_chart_data
  298. generate_function_chart_data(data, 'latestWarningDistance_TTC_LST', plot_path)
  299. return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
  300. def earliestWarningDistance_TTC_LST(data, plot_path) -> dict:
  301. """TTC计算流水线"""
  302. scenario_name = find_nested_name(data.function_config["function"])
  303. value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
  304. correctwarning = scenario_sign_dict[scenario_name]
  305. ego_df = data.ego_data
  306. warning_dist = calculate_distance(ego_df, correctwarning)
  307. if warning_dist.empty:
  308. return {"earliestWarningDistance_TTC_LST": 0.0}
  309. # 将correctwarning保存到data对象中,供图表生成使用
  310. data.correctwarning = correctwarning
  311. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  312. with np.errstate(divide='ignore', invalid='ignore'):
  313. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  314. # 处理无效的TTC值
  315. for i in range(len(ttc)):
  316. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  317. # 将计算结果保存到data对象中,供图表生成使用
  318. data.warning_dist = warning_dist
  319. data.warning_speed = warning_speed
  320. data.ttc = ttc
  321. data.correctwarning = correctwarning
  322. # 生成图表数据
  323. generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST', plot_path)
  324. return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
  325. def warningDelayTime_LST(data, plot_path):
  326. scenario_name = find_nested_name(data.function_config["function"])
  327. correctwarning = scenario_sign_dict[scenario_name]
  328. # 将correctwarning保存到data对象中,供图表生成使用
  329. data.correctwarning = correctwarning
  330. ego_df = data.ego_data
  331. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  332. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  333. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  334. 'simTime'].tolist()
  335. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  336. if (simTime_HMI is None) or (simTime_rosbag is None):
  337. print("预警出错!")
  338. delay_time = 100.0
  339. else:
  340. delay_time = abs(simTime_HMI - simTime_rosbag)
  341. return {"warningDelayTime_LST": delay_time}
  342. def warningDelayTimeofReachDecel_LST(data, plot_path):
  343. scenario_name = find_nested_name(data.function_config["function"])
  344. correctwarning = scenario_sign_dict[scenario_name]
  345. # 将correctwarning保存到data对象中,供图表生成使用
  346. data.correctwarning = correctwarning
  347. ego_df = data.ego_data
  348. obj_df = data.obj_data
  349. # 加速度在速度方向上的投影
  350. speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6
  351. obj_df["ux"] = obj_df["speedX"] / speed_mag
  352. obj_df["uy"] = obj_df["speedY"] / speed_mag
  353. obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"]
  354. obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2
  355. warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
  356. if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
  357. return {"warningDelayTimeofReachDecel_LST": 0}
  358. elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0):
  359. return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]}
  360. elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0):
  361. return {"warningDelayTimeofReachDecel_LST": None}
  362. else:
  363. return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
  364. def rightWarningSignal_LST(data, plot_path):
  365. scenario_name = find_nested_name(data.function_config["function"])
  366. correctwarning = scenario_sign_dict[scenario_name]
  367. # 将correctwarning保存到data对象中,供图表生成使用
  368. data.correctwarning = correctwarning
  369. ego_df = data.ego_data
  370. if ego_df['ifwarning'].empty:
  371. print("无法获取正确预警信号标志位!")
  372. return
  373. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  374. if warning_rows.empty:
  375. return {"rightWarningSignal_LST": -1}
  376. else:
  377. return {"rightWarningSignal_LST": 1}
  378. def noWarning_LST(data):
  379. scenario_name = find_nested_name(data.function_config["function"])
  380. correctwarning = scenario_sign_dict[scenario_name]
  381. ego_df = data.ego_data
  382. if ego_df['ifwarning'].empty:
  383. print("无法获取正确预警信号标志位!")
  384. return
  385. warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
  386. if warning_rows:
  387. return {"noWarning_LST": 1}
  388. else:
  389. return {"noWarning_LST": -1}
  390. def ifCrossingRedLight_LST(data, plot_path):
  391. scenario_name = find_nested_name(data.function_config["function"])
  392. correctwarning = scenario_sign_dict[scenario_name]
  393. # 将correctwarning保存到data对象中,供图表生成使用
  394. data.correctwarning = correctwarning
  395. ego_df = data.ego_data
  396. redlight_simtime = ego_df[
  397. (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] == 0) & (
  398. ego_df['v'] != 0)]['simTime']
  399. if redlight_simtime.empty:
  400. return {"ifCrossingRedLight_LST": -1}
  401. else:
  402. return {"ifCrossingRedLight_LST": 1}
  403. def ifStopgreenWaveSpeedGuidance_LST(data, plot_path):
  404. scenario_name = find_nested_name(data.function_config["function"])
  405. correctwarning = scenario_sign_dict[scenario_name]
  406. # 将correctwarning保存到data对象中,供图表生成使用
  407. data.correctwarning = correctwarning
  408. ego_df = data.ego_data
  409. greenlight_simtime = \
  410. ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] == 0)]['simTime']
  411. if greenlight_simtime.empty:
  412. return {"ifStopgreenWaveSpeedGuidance_LST": -1}
  413. else:
  414. return {"ifStopgreenWaveSpeedGuidance_LST": 1}
  415. # ------ 单车智能指标 ------
  416. def limitSpeed_LST(data, plot_path):
  417. ego_df = data.ego_data
  418. scenario_name = find_nested_name(data.function_config["function"])
  419. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  420. speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.1]['v'].tolist()
  421. if len(speed_limit) == 0:
  422. return {"speedLimit_LST": -1}
  423. max_speed = max(speed_limit)
  424. data.speedLimit = limit_speed
  425. generate_function_chart_data(data, 'limitspeed_LST', plot_path)
  426. return {"speedLimit_LST": max_speed}
  427. def limitSpeedPastLimitSign_LST(data, plot_path):
  428. ego_df = data.ego_data
  429. scenario_name = find_nested_name(data.function_config["function"])
  430. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  431. car_length = data.function_config["vehicle"]['CAR_LENGTH']
  432. ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist()
  433. ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
  434. data.speedLimit = limit_speed
  435. data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
  436. generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST', plot_path)
  437. if len(ego_speed) == 0:
  438. return {"speedPastLimitSign_LST": -1}
  439. return {"speedPastLimitSign_LST": ego_speed[0]}
  440. def leastDistance_LST(data, plot_path):
  441. ego_df = data.ego_data
  442. dist_row = ego_df[ego_df['v'] == 0]['relative_dist'].tolist()
  443. if len(dist_row) == 0:
  444. return {"leastDistance_LST": -1}
  445. else:
  446. min_dist = min(dist_row)
  447. return {"leastDistance_LST": min_dist}
  448. def launchTimeinStopLine_LST(data, plot_path):
  449. ego_df = data.ego_data
  450. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  451. if len(simtime_row) == 0:
  452. return {"launchTimeinStopLine_LST": -1}
  453. else:
  454. delta_t = simtime_row[-1] - simtime_row[0]
  455. return {"launchTimeinStopLine_LST": delta_t}
  456. def launchTimewhenFollowingCar_LST(data, plot_path):
  457. ego_df = data.ego_data
  458. t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
  459. simtime_row = ego_df[ego_df['v'] == 0]['simTime'].tolist()
  460. if len(simtime_row) == 0:
  461. return {"launchTimewhenFollowingCar_LST": 0}
  462. else:
  463. time_interval = _is_segment_by_interval(simtime_row, t_interval)
  464. delta_t = []
  465. for t in time_interval:
  466. delta_t.append(t[-1] - t[0])
  467. return {"launchTimewhenFollowingCar_LST": max(delta_t)}
  468. def noStop_LST(data, plot_path):
  469. ego_df_ini = data.ego_data
  470. min_time = ego_df_ini['simTime'].min() + 5
  471. max_time = ego_df_ini['simTime'].max() - 5
  472. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  473. speed = ego_df['v'].tolist()
  474. if (any(speed) == 0):
  475. return {"noStop_LST": -1}
  476. else:
  477. return {"noStop_LST": 1}
  478. def launchTimeinTrafficLight_LST(data, plot_path):
  479. '''
  480. 待修改:
  481. 红灯的状态值:1
  482. 绿灯的状态值:0
  483. '''
  484. ego_df = data.ego_data
  485. simtime_of_redlight = ego_df[ego_df['stateMask'] == 0]['simTime']
  486. simtime_of_stop = ego_df[ego_df['v'] == 0]['simTime']
  487. if len(simtime_of_stop) or len(simtime_of_redlight):
  488. return {"timeInterval_LST": -1}
  489. simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist()
  490. if len(simtime_of_launch) == 0:
  491. return {"timeInterval_LST": -1}
  492. return {"timeInterval_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
  493. def crossJunctionToTargetLane_LST(data, plot_path):
  494. ego_df = data.ego_data
  495. lane_in_leftturn = set(ego_df['lane_id'].tolist())
  496. scenario_name = find_nested_name(data.function_config["function"])
  497. target_lane_id = data.function_config["function"][scenario_name]["crossJunctionToTargetLane_LST"]['max']
  498. if target_lane_id not in lane_in_leftturn:
  499. return {"crossJunctionToTargetLane_LST": -1}
  500. else:
  501. return {"crossJunctionToTargetLane_LST": target_lane_id}
  502. def keepInLane_LST(data, plot_path):
  503. ego_df = data.ego_data
  504. notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width'] / 2].tolist()
  505. if len(notkeepinlane):
  506. return {"keepInLane_LST": -1}
  507. return {"keepInLane_LST": 1}
  508. def leastLateralDistance_LST(data, plot_path):
  509. ego_df = data.ego_data
  510. lane_width = ego_df[ego_df['x_relative_dist'] == 0]['lane_width']
  511. if lane_width.empty():
  512. return {"leastLateralDistance_LST": -1}
  513. else:
  514. y_relative_dist = ego_df[ego_df['x_relative_dist'] == 0]['y_relative_dist']
  515. if (y_relative_dist <= lane_width / 2).all():
  516. return {"leastLateralDistance_LST": 1}
  517. else:
  518. return {"leastLateralDistance_LST": -1}
  519. def waitTimeAtCrosswalkwithPedestrian_LST(data, plot_path):
  520. ego_df = data.ego_data
  521. object_df = data.object_data
  522. data['in_crosswalk'] = []
  523. position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True)
  524. for cross_id in position_data['cross_id'].tolist():
  525. for posX, posY in object_df['posX', 'posY']:
  526. pedestrian_pos = (posX, posY)
  527. plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0]
  528. if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos):
  529. data[data['playerId'] == 2]['in_crosswalk'] = 1
  530. else:
  531. data[data['playerId'] == 2]['in_crosswalk'] = 0
  532. car_stop_time = ego_df[ego_df['v'] == 0]['simTime']
  533. pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime']
  534. car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist()
  535. return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len(
  536. car_wait_pedestrian) > 0 else 0}
  537. def launchTimewhenPedestrianLeave_LST(data, plot_path):
  538. ego_df = data.ego_data
  539. car_stop_time = ego_df[ego_df['v'] == 0]["simTime"]
  540. if car_stop_time.empty():
  541. return {"launchTimewhenPedestrianLeave_LST": -1}
  542. else:
  543. lane_width = ego_df[ego_df['v'] == 0]['lane_width'].tolist()[0]
  544. car_to_pedestrain = ego_df[ego_df['y_relative_dist'] <= lane_width / 2]["simTime"]
  545. legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist()
  546. return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
  547. def noCollision_LST(data, plot_path):
  548. ego_df = data.ego_data
  549. if ego_df['relative_dist'].any() == 0:
  550. return {"noCollision_LST": -1}
  551. else:
  552. return {"noCollision_LST": 1}
  553. def noReverse_LST(data, plot_path):
  554. ego_df = data.ego_data
  555. if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
  556. return {"noReverse_LST": -1}
  557. else:
  558. return {"noReverse_LST": 1}
  559. def turnAround_LST(data, plot_path):
  560. ego_df = data.ego_data
  561. if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
  562. ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
  563. return {"turnAround_LST": 1}
  564. else:
  565. return {"turnAround_LST": -1}
  566. def laneOffset_LST(data, plot_path):
  567. car_width = data.function_config['vehicle']['CAR_WIDTH']
  568. ego_df_ini = data.ego_data
  569. min_time = ego_df_ini['simTime'].min() + 5
  570. max_time = ego_df_ini['simTime'].max() - 5
  571. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  572. laneoffset = ego_df['y_relative_dist'] - car_width / 2
  573. return {"laneOffset_LST": max(laneoffset)}
  574. def maxLongitudeDist_LST(data, plot_path):
  575. ego_df = data.ego_data
  576. longitude_dist = abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist())
  577. data.longitude_dist = min(abs(ego_df[ego_df['v'] == 0]['x_relative_dist'].tolist()))
  578. stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist()
  579. data.stop_time = min(stop_time)
  580. if len(longitude_dist) == 0:
  581. return {"maxLongitudeDist_LST": -1}
  582. generate_function_chart_data(data, 'maxLongitudeDist_LST', plot_path)
  583. return {"maxLongDist_LST": min(longitude_dist)}
  584. def noEmergencyBraking_LST(data, plot_path):
  585. ego_df = data.ego_data
  586. ego_df['ip_dec'] = ego_df['v'].apply(
  587. get_interpolation, point1=[18, -5], point2=[72, -3.5])
  588. ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply(
  589. lambda x: 1 if x < 0 else 0)
  590. if sum(ego_df['slam_brake']) == 0:
  591. return {"noEmergencyBraking_LST": 1}
  592. else:
  593. return {"noEmergencyBraking_LST": -1}
  594. def rightWarningSignal_PGVIL(data_processed, plot_path) -> dict:
  595. """判断是否发出正确预警信号"""
  596. ego_df = data_processed.ego_data
  597. scenario_name = find_nested_name(data_processed.function_config["function"])
  598. correctwarning = scenario_sign_dict[scenario_name]
  599. if correctwarning is None:
  600. print("无法获取正确的预警信号标志位!")
  601. return None
  602. # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
  603. warning_rows = ego_df[
  604. (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
  605. ]
  606. if warning_rows.empty:
  607. return {"rightWarningSignal_PGVIL": -1}
  608. else:
  609. return {"rightWarningSignal_PGVIL": 1}
  610. def latestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
  611. """预警距离计算流水线"""
  612. ego_df = data_processed.ego_data
  613. obj_df = data_processed.object_df
  614. warning_data = get_first_warning(data_processed)
  615. if warning_data is None:
  616. return {"latestWarningDistance_PGVIL": 0.0}
  617. ego, obj = extract_ego_obj(warning_data)
  618. distances = calculate_distance_PGVIL(
  619. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  620. )
  621. # 将计算结果保存到data对象中,供图表生成使用
  622. data_processed.warning_dist = distances
  623. data_processed.warning_time = ego['simTime'].tolist()
  624. if distances.size == 0:
  625. print("没有找到数据!")
  626. return {"latestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  627. generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL', plot_path)
  628. return {"latestWarningDistance_PGVIL": float(np.min(distances))}
  629. def latestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
  630. """TTC计算流水线"""
  631. ego_df = data_processed.ego_data
  632. obj_df = data_processed.object_df
  633. warning_data = get_first_warning(data_processed)
  634. if warning_data is None:
  635. return {"latestWarningDistance_TTC_PGVIL": 0.0}
  636. ego, obj = extract_ego_obj(warning_data)
  637. # 向量化计算
  638. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  639. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  640. obj_pos = obj[["posX", "posY"]].values
  641. obj_speed = obj[["speedX", "speedY"]].values
  642. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  643. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  644. data_processed.warning_dist = distances
  645. data_processed.warning_speed = rel_speeds
  646. data_processed.warning_time = ego['simTime'].tolist()
  647. with np.errstate(divide="ignore", invalid="ignore"):
  648. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  649. if ttc.size == 0:
  650. print("没有找到数据!")
  651. return {"latestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  652. data_processed.ttc = ttc
  653. generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL', plot_path)
  654. return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  655. def earliestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
  656. """预警距离计算流水线"""
  657. ego_df = data_processed.ego_data
  658. obj_df = data_processed.object_df
  659. warning_data = get_first_warning(data_processed)
  660. if warning_data is None:
  661. return {"earliestWarningDistance_PGVIL": 0}
  662. ego, obj = extract_ego_obj(warning_data)
  663. distances = calculate_distance_PGVIL(
  664. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  665. )
  666. # 将计算结果保存到data对象中,供图表生成使用
  667. data_processed.warning_dist = distances
  668. data_processed.warning_time = ego['simTime'].tolist()
  669. if distances.size == 0:
  670. print("没有找到数据!")
  671. return {"earliestWarningDistance_PGVIL": 15} # 或返回其他默认值,如0.0
  672. generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL', plot_path)
  673. return {"earliestWarningDistance": float(np.min(distances))}
  674. def earliestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
  675. """TTC计算流水线"""
  676. ego_df = data_processed.ego_data
  677. obj_df = data_processed.object_df
  678. warning_data = get_first_warning(data_processed)
  679. if warning_data is None:
  680. return {"earliestWarningDistance_TTC_PGVIL": 0.0}
  681. ego, obj = extract_ego_obj(warning_data)
  682. # 向量化计算
  683. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  684. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  685. obj_pos = obj[["posX", "posY"]].values
  686. obj_speed = obj[["speedX", "speedY"]].values
  687. distances = calculate_distance_PGVIL(ego_pos, obj_pos)
  688. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  689. data_processed.warning_dist = distances
  690. data_processed.warning_speed = rel_speeds
  691. data_processed.warning_time = ego['simTime'].tolist()
  692. with np.errstate(divide="ignore", invalid="ignore"):
  693. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  694. if ttc.size == 0:
  695. print("没有找到数据!")
  696. return {"earliestWarningDistance_TTC_PGVIL": 2} # 或返回其他默认值,如0.0
  697. data_processed.ttc = ttc
  698. generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL', plot_path)
  699. return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  700. # def delayOfEmergencyBrakeWarning(data_processed) -> dict:
  701. # #预警时机相对背景车辆减速度达到-4m/s2后的时延
  702. # ego_df = data_processed.ego_data
  703. # obj_df = data_processed.object_df
  704. # warning_data = get_first_warning(data_processed)
  705. # if warning_data is None:
  706. # return {"delayOfEmergencyBrakeWarning": -1}
  707. # try:
  708. # ego, obj = extract_ego_obj(warning_data)
  709. # # 向量化计算
  710. # obj_speed = np.array([[obj_df["speedX"], obj_df["speedY"]]])
  711. # # 计算背景车辆减速度
  712. # simtime_gap = obj["simTime"].iloc[1] - obj["simTime"].iloc[0]
  713. # simtime_freq = 1 / simtime_gap#每秒采样频率
  714. # # simtime_freq为一个时间窗,找出时间窗内的最大减速度
  715. # obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)#速度向量的模长
  716. # obj_speed_change = np.diff(speed_magnitude)#速度模长的变化量
  717. # obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
  718. # #找到最大减速度,若最大减速度小于-4m/s2,则计算最大减速度对应的时间,和warning_data的差值进行对比
  719. # max_deceleration = np.max(obj_deceleration)
  720. # if max_deceleration < -4:
  721. # max_deceleration_times = obj["simTime"].iloc[np.argmax(obj_deceleration)]
  722. # max_deceleration_time = max_deceleration_times.iloc[0]
  723. # delay_time = ego["simTime"] - max_deceleration_time
  724. # return {"delayOfEmergencyBrakeWarning": float(delay_time)}
  725. # else:
  726. # print("没有达到预警减速度阈值:-4m/s^2")
  727. # return {"delayOfEmergencyBrakeWarning": -1}
  728. def warningDelayTime_PGVIL(data_processed, plot_path) -> dict:
  729. """车端接收到预警到HMI发出预警的时延"""
  730. ego_df = data_processed.ego_data
  731. # #打印ego_df的列名
  732. # print(ego_df.columns.tolist())
  733. warning_data = get_first_warning(data_processed)
  734. if warning_data is None:
  735. return {"warningDelayTime_PGVIL": -1}
  736. try:
  737. ego, obj = extract_ego_obj(warning_data)
  738. # 找到event_Type不为空,且playerID为1的行
  739. rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
  740. first_time = rosbag_warning_rows["simTime"].iloc[0]
  741. warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
  742. delay_time = warning_time - first_time
  743. return {"warningDelayTime_PGVIL": float(delay_time)}
  744. except Exception as e:
  745. print(f"计算预警时延时发生错误: {e}")
  746. return {"warningDelayTime_PGVIL": -1}
  747. def noWarning_PGVIL(data_processed):
  748. scenario_name = find_nested_name(data_processed.function_config["function"])
  749. correctwarning = scenario_sign_dict[scenario_name]
  750. ego_df = data_processed.ego_data
  751. if ego_df['ifwarning'].empty:
  752. print("无法获取正确预警信号标志位!")
  753. return
  754. warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
  755. if warning_rows:
  756. return {"noWarning_PGVIL": 1}
  757. else:
  758. return {"noWarning_PGVIL": -1}
  759. def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
  760. """
  761. 计算主车后轴中心点到停止线的距离
  762. :return 距离
  763. """
  764. distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
  765. # 计算停止线的方向向量
  766. line_vector = np.array(
  767. [
  768. stop_line_points[1][0] - stop_line_points[0][0],
  769. stop_line_points[1][1] - stop_line_points[0][1],
  770. ]
  771. )
  772. direction_vector_norm = np.linalg.norm(line_vector)
  773. direction_vector_unit = (
  774. line_vector / direction_vector_norm
  775. if direction_vector_norm != 0
  776. else np.array([0, 0])
  777. )
  778. # 计算主车后轴中心点到停止线投影的坐标(垂足)
  779. projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
  780. perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
  781. # 计算主车后轴中心点到垂足的距离
  782. distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
  783. carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
  784. return carhead_distance_to_foot
  785. def ifCrossingRedLight_PGVIL(data_processed, plot_path) -> dict:
  786. # 判断车辆是否闯红灯
  787. stop_line_points = np.array([(276.555, -35.575), (279.751, -33.683)])
  788. X_OFFSET = 258109.4239876
  789. Y_OFFSET = 4149969.964821
  790. stop_line_points += np.array([[X_OFFSET, Y_OFFSET]])
  791. ego_df = data_processed.ego_data
  792. prev_distance = float("inf") # 初始化为正无穷大
  793. """
  794. traffic_light_status
  795. 0x100000为绿灯,1048576
  796. 0x1000000为黄灯,16777216
  797. 0x10000000为红灯,268435456
  798. """
  799. red_light_violation = False
  800. for index, ego in ego_df.iterrows():
  801. car_point = (ego["posX"], ego["posY"])
  802. stateMask = ego["stateMask"]
  803. simTime = ego["simTime"]
  804. distance_to_stopline = get_car_to_stop_line_distance(
  805. ego, car_point, stop_line_points
  806. )
  807. # 主车车头跨越停止线时非绿灯,返回-1,闯红灯
  808. if prev_distance > 0 and distance_to_stopline < 0:
  809. if stateMask is not None and stateMask != 1048576:
  810. red_light_violation = True
  811. break
  812. prev_distance = distance_to_stopline
  813. if red_light_violation:
  814. return {"ifCrossingRedLight_PGVIL": -1} # 闯红灯
  815. else:
  816. return {"ifCrossingRedLight_PGVIL": 1} # 没有闯红灯
  817. # def ifStopgreenWaveSpeedGuidance(data_processed) -> dict:
  818. # #在绿波车速引导期间是否发生停车
  819. # def mindisStopline(data_processed) -> dict:
  820. # """
  821. # 当有停车让行标志/标线时车辆最前端与停车让行线的最小距离应在0-4m之间
  822. # """
  823. # ego_df = data_processed.ego_data
  824. # obj_df = data_processed.object_df
  825. # stop_giveway_simtime = ego_df[
  826. # ego_df["sign_type1"] == 32 |
  827. # ego_df["stopline_type"] == 3
  828. # ]["simTime"]
  829. # stop_giveway_data = ego_df[
  830. # ego_df["sign_type1"] == 32 |
  831. # ego_df["stopline_type"] == 3
  832. # ]["simTime"]
  833. # if stop_giveway_simtime.empty:
  834. # print("没有停车让行标志/标线")
  835. # ego_data = stop_giveway_data[stop_giveway_data['playerId'] == 1]
  836. # distance_carpoint_carhead = ego_data['dimX'].iloc[0]/2 + ego_data['offX'].iloc[0]
  837. # distance_to_stoplines = []
  838. # for _,row in ego_data.iterrows():
  839. # ego_pos = np.array([row["posX"], row["posY"], row["posH"]])
  840. # stop_line_points = [
  841. # [row["stopline_x1"], row["stopline_y1"]],
  842. # [row["stopline_x2"], row["stopline_y2"]],
  843. # ]
  844. # distance_to_stopline = get_car_to_stop_line_distance(ego_pos, stop_line_points)
  845. # distance_to_stoplines.append(distance_to_stopline)
  846. # mindisStopline = np.min(distance_to_stoplines) - distance_carpoint_carhead
  847. # return {"mindisStopline": mindisStopline}
  848. def limitSpeed_PGVIL(data, plot_path):
  849. ego_df = data.ego_data
  850. max_speed = max(ego_df["v"])
  851. if len(ego_df["v"]) == 0:
  852. return {"speedLimit_PGVIL": -1}
  853. else:
  854. return {"speedLimit_PGVIL": max_speed}
  855. def leastDistance_PGVIL(data, plot_path):
  856. exclude_seconds = 2.0
  857. ego_df = data.ego_data
  858. max_sim_time = ego_df["simTime"].max()
  859. threshold_time = max_sim_time - exclude_seconds
  860. for index, ego in ego_df.iterrows():
  861. if ego["simTime"] >= threshold_time:
  862. continue
  863. if "v" in ego and ego["v"] == 0:
  864. car_point = (ego["posX"], ego["posY"])
  865. p1 = (ego["stopline_x1"], ego["stopline_y1"])
  866. p2 = (ego["stopline_x2"], ego["stopline_y2"])
  867. stop_line_points = np.array([p1, p2], dtype=float)
  868. distance_to_stopline = get_car_to_stop_line_distance(
  869. ego, car_point, stop_line_points
  870. )
  871. return {"minimumDistance_PGVIL": distance_to_stopline}
  872. return {"minimumDistance_PGVIL": -1}
  873. def launchTimeinStopLine_PGVIL(data, plot_path):
  874. ego_df = data.ego_data
  875. in_stop = False
  876. start_time = None
  877. last_low_time = None
  878. max_duration = 0.0
  879. v_thresh = 0.01
  880. for _, row in ego_df.iterrows():
  881. simt = row["simTime"]
  882. v = row.get("v", None)
  883. if v is None or np.isnan(v) or abs(v) > v_thresh:
  884. if in_stop:
  885. duration = last_low_time - start_time
  886. if duration > max_duration:
  887. max_duration = duration
  888. in_stop = False
  889. else:
  890. if not in_stop:
  891. start_time = simt
  892. in_stop = True
  893. # 更新最近一次“停车时刻”的 simTime
  894. last_low_time = simt
  895. if in_stop:
  896. duration = last_low_time - start_time
  897. if duration > max_duration:
  898. max_duration = duration
  899. if max_duration <= 0:
  900. return {"launchTimeinStopLine_PGVIL": -1}
  901. else:
  902. return {"launchTimeinStopLine_PGVIL": float(max_duration)}
  903. def launchTimeinTrafficLight_PGVIL(data, plot_path):
  904. GREEN = 0x100000
  905. RED = 0x10000000
  906. ego_df = data.ego_data
  907. # 找到第一次红灯 to 绿灯的切换时刻
  908. is_transition = (ego_df["stateMask"] == GREEN) & (
  909. ego_df["stateMask"].shift(1) == RED
  910. )
  911. transition_times = ego_df.loc[is_transition, "simTime"]
  912. if transition_times.empty:
  913. return {"timeInterval_PGVIL": -1}
  914. time_red2green = transition_times.iloc[0]
  915. car_move_times = ego_df.loc[
  916. (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime"
  917. ]
  918. if car_move_times.empty:
  919. return {"timeInterval_PGVIL": -1}
  920. time_move = move_times.iloc[0]
  921. return {"timeInterval_PGVIL": time_move - time_red2green}
  922. def crossJunctionToTargetLane_PGVIL(data, plot_path):
  923. ego_df = data.ego_data
  924. lane_ids = set(ego_df["lane_id"].dropna())
  925. try:
  926. scenario_name = find_nested_name(data.function_config["function"])
  927. target_lane_id = data.function_config["function"][scenario_name][
  928. "crossJunctionToTargetLane_PGVIL"
  929. ]["max"]
  930. except KeyError:
  931. raise ValueError("请在配置文件中指定目标车道ID!")
  932. result = target_lane_id if target_lane_id in lane_ids else -1
  933. return {"crossJunctionToTargetLane_PGVIL": result}
  934. def noStop_PGVIL(data, plot_path):
  935. exclude_end_seconds = 5.0
  936. exclude_start_seconds = 5.0
  937. ego_df = data.ego_data
  938. max_sim_time = ego_df["simTime"].max()
  939. start_threshold = min_sim_time + exclude_start_seconds
  940. end_threshold = max_sim_time - exclude_end_seconds
  941. filtered_df = ego_df[
  942. (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
  943. ]
  944. if (filtered_df["v"] == 0).any():
  945. return {"noStop_PGVIL": -1}
  946. else:
  947. return {"noStop_PGVIL": 1}
  948. def noEmergencyBraking_PGVIL(data, plot_path):
  949. ego_df = data.ego_data
  950. ego_df["ip_dec"] = ego_df["v"].apply(
  951. get_interpolation, point1=[18, -5], point2=[72, -3.5]
  952. )
  953. ego_df["slam_brake"] = (ego_df["accleX"] - ego_df["ip_dec"]).apply(
  954. lambda x: 1 if x < 0 else 0
  955. )
  956. if sum(ego_df["slam_brake"]) == 0:
  957. return {"noEmergencyBraking_PGVIL": 1}
  958. else:
  959. return {"noEmergencyBraking_PGVIL": -1}
  960. def noReverse_PGVIL(data, plot_path):
  961. ego_df = data.ego_data.copy()
  962. heading_x = np.cos(ego_df["posH"])
  963. reverse_flag = (ego_df["speedX"] * heading_x) < 0
  964. if reverse_flag.any():
  965. return {"noReverse_PGVIL": -1}
  966. else:
  967. return {"noReverse_PGVIL": 1}
  968. def laneOffset_PGVIL(data, plot_path):
  969. car_width = data.function_config["vehicle"]["CAR_WIDTH"]
  970. ego_df = data.ego_data
  971. is_zero = ego_df["v"] == 0
  972. if not is_zero.any():
  973. # 全程未停车
  974. return {"laneOffset_PGVIL": None}
  975. groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
  976. last_zero_group = groups[is_zero].iloc[-1]
  977. last_stop = ego_df[groups == last_zero_group]
  978. # 距离右侧车道线
  979. edge_dist = (last_stop["lane_width"] / 2 + last_stop["laneOffset"]) - (
  980. car_width / 2
  981. )
  982. return {"laneOffset_PGVIL": edge_dist.max()}
  983. def maxLongitudelDistance_PGVIL(data, plot_path):
  984. scenario_name = find_nested_name(data.function_config["function"])
  985. stopX_pos = data.function_config["function"][scenario_name][
  986. "maxLongitudelDistance_PGVIL"
  987. ]["max"]
  988. stopY_pos = data.function_config["function"][scenario_name][
  989. "maxLongitudelDistance_PGVIL"
  990. ]["min"]
  991. def keepInLane_PGVIL(data, plot_path):
  992. ego_df = data.ego_data.copy()
  993. ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
  994. if ego_df.empty:
  995. return {"keepInLane_PGVIL": -1}
  996. threshold = ego_df["lane_width"] / 2
  997. if (ego_df["laneOffset"].abs() > threshold).any():
  998. return {"keepInLane_PGVIL": -1}
  999. else:
  1000. return {"keepInLane_PGVIL": 1}
  1001. def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
  1002. ego_df = data.ego_data
  1003. ped_df = data.object_data.loc[
  1004. data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
  1005. ]
  1006. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1007. if merged.empty:
  1008. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1009. yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度
  1010. dx = merged["posX_ped"] - merged["posX"]
  1011. dy = merged["posY_ped"] - merged["posY"]
  1012. merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
  1013. merged["threshold"] = merged["lane_width"] / 2
  1014. entry = merged[merged["lateral_dist"] <= merged["threshold"]]
  1015. if entry.empty:
  1016. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1017. t_entry = entry["simTime"].iloc[0]
  1018. after_e = merged[merged["simTime"] > t_entry]
  1019. leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
  1020. if leave.empty:
  1021. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1022. t_leave = leave["simTime"].iloc[0]
  1023. ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
  1024. stops = ego_df[(ego_df["simTime"] > t_leave) & (ego_df["v"] == 0)].copy()
  1025. if stops.empty:
  1026. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1027. front_overPos = ego_df["dimX"] / 2 + ego_df["offX"]
  1028. yaw_stop = np.deg2rad(stops["posH"])
  1029. stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
  1030. stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
  1031. dx_s = stops["front_x"] - ped_x
  1032. dy_s = stops["front_y"] - ped_y
  1033. proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
  1034. valid = stops[proj <= 0]
  1035. t_stop = valid["simTime"].iloc[0]
  1036. launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"]
  1037. t_launch = launches.iloc[0]
  1038. return {"launchTimewhenPedestrianLeave_PGVIL": t_launch - t_stop}
  1039. def waitTimeAtCrosswalkwithPedestrian_PGVIL(data, plot_path):
  1040. ego_df = data.ego_data
  1041. ped_df = data.object_data.loc[
  1042. data.object_data["playerId"] == 2, ["simTime", "posX", "posY"]
  1043. ]
  1044. first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"]
  1045. if first_launch.empty:
  1046. t0_launch = 0.0
  1047. else:
  1048. t0_launch = first_launch.iloc[0]
  1049. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1050. merged["front_offset"] = merged["dimX"] / 2 + merged["offX"]
  1051. yaw = np.deg2rad(merged["posH"])
  1052. merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"]
  1053. merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"]
  1054. dx = merged["posX_ped"] - merged["front_x"]
  1055. dy = merged["posY_ped"] - merged["front_y"]
  1056. valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0
  1057. stops_df = merged.loc[
  1058. (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
  1059. ].sort_values("simTime")
  1060. if stops_df.empty:
  1061. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
  1062. wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
  1063. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": wait_time}
  1064. def noCollision_PGVIL(data, plot_path):
  1065. ego = data.ego_data[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
  1066. tar = data.object_data.loc[data.object_data["playerId"] == 2,
  1067. ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX", "offY"]]
  1068. df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
  1069. df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
  1070. df["posH_rad"] = np.deg2rad(df["posH"])
  1071. df["collision"] = df.apply(lambda row: funcIsCollision(
  1072. row["dimX"], row["dimY"], row["offX"], row["offY"], row["posX"], row["posY"],
  1073. row["posH_rad"], row["dimX_tar"], row["dimY_tar"], row["offX_tar"], row["offY_tar"],
  1074. row["posX_tar"], row["posY_tar"], row["posH_tar_rad"], ), axis=1, )
  1075. if df["collision"].any():
  1076. return {"noCollision_PGVIL": -1}
  1077. else:
  1078. return {"noCollision_PGVIL": 1}
  1079. def leastLateralDistance_PGVIL(data, plot_path):
  1080. ego_df = data.ego_data
  1081. cones = data.object_data.loc[data.object_data['playerId'] != 1,
  1082. ['simTime', 'playerId', 'posX', 'posY']].copy()
  1083. df = ego_df.merge(cones, on='simTime', how='inner', suffixes=('', '_cone'))
  1084. yaw = np.deg2rad(df['posH'])
  1085. x_c = df['posX'] + df['offX'] * np.cos(yaw)
  1086. y_c = df['posY'] + df['offX'] * np.sin(yaw)
  1087. dx = df['posX'] - x_c
  1088. dy = df['posY'] - y_c
  1089. dx = df['posX_cone'] - x_c
  1090. dy = df['posY_cone'] - y_c
  1091. local_x = np.cos(yaw) * dx + np.sin(yaw) * dy
  1092. local_y = -np.sin(yaw) * dx + np.cos(yaw) * dy
  1093. half_length = df['dimX'] / 2
  1094. half_width = df['dimY'] / 2
  1095. inside = (np.abs(local_x) <= half_length) & (np.abs(local_y) <= half_width)
  1096. collisions = df.loc[inside, ['simTime', 'playerId']]
  1097. if collisions.empty:
  1098. return {"noConeRectCollision_PGVIL": 1}
  1099. else:
  1100. collision_times = collisions['simTime'].unique().tolist()
  1101. collision_ids = collisions['playerId'].unique().tolist()
  1102. return {"noConeRectCollision_PGVIL": -1}
  1103. class FunctionRegistry:
  1104. """动态函数注册器(支持参数验证)"""
  1105. def __init__(self, data_processed, plot_path):
  1106. self.logger = LogManager().get_logger() # 获取全局日志实例
  1107. self.data = data_processed
  1108. self.plot_path = plot_path
  1109. # 检查function_config是否为空
  1110. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1111. self.logger.warning("功能配置为空,跳过功能指标计算")
  1112. self.fun_config = {}
  1113. self.level_3_merics = []
  1114. self._registry = {}
  1115. return
  1116. self.fun_config = data_processed.function_config.get("function", {})
  1117. self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
  1118. self._registry: Dict[str, Callable] = {}
  1119. self._registry = self._build_registry()
  1120. def _extract_level_3_metrics(self, config_node: dict) -> list:
  1121. """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
  1122. metrics = []
  1123. def _recurse(node):
  1124. if isinstance(node, dict):
  1125. if "name" in node and not any(
  1126. isinstance(v, dict) for v in node.values()
  1127. ):
  1128. metrics.append(node["name"])
  1129. for v in node.values():
  1130. _recurse(v)
  1131. _recurse(config_node)
  1132. self.logger.info(f"评比的功能指标列表:{metrics}")
  1133. return metrics
  1134. def _build_registry(self) -> dict:
  1135. """自动注册指标函数(防御性编程)"""
  1136. registry = {}
  1137. for func_name in self.level_3_merics:
  1138. try:
  1139. registry[func_name] = globals()[func_name]
  1140. except KeyError:
  1141. print(f"未实现指标函数: {func_name}")
  1142. self.logger.error(f"未实现指标函数: {func_name}")
  1143. return registry
  1144. def batch_execute(self) -> dict:
  1145. """批量执行指标计算(带熔断机制)"""
  1146. results = {}
  1147. # 如果配置为空或没有注册的指标,直接返回空结果
  1148. if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry:
  1149. self.logger.info("功能配置为空或无注册指标,返回空结果")
  1150. return results
  1151. for name, func in self._registry.items():
  1152. try:
  1153. result = func(self.data, self.plot_path) # 统一传递数据上下文
  1154. results.update(result)
  1155. except Exception as e:
  1156. print(f"{name} 执行失败: {str(e)}")
  1157. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  1158. results[name] = None
  1159. self.logger.info(f"功能指标计算结果:{results}")
  1160. return results
  1161. class FunctionManager:
  1162. """管理功能指标计算的类"""
  1163. def __init__(self, data_processed, plot_path):
  1164. self.data = data_processed
  1165. self.logger = LogManager().get_logger()
  1166. self.plot_path = plot_path
  1167. # 检查function_config是否为空
  1168. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1169. self.logger.warning("功能配置为空,跳过功能指标计算初始化")
  1170. self.function = None
  1171. else:
  1172. self.function = FunctionRegistry(self.data, self.plot_path)
  1173. def report_statistic(self):
  1174. """
  1175. 计算并报告功能指标结果。
  1176. :return: 评估结果
  1177. """
  1178. # 如果function为None,直接返回空字典
  1179. if self.function is None:
  1180. self.logger.info("功能指标管理器未初始化,返回空结果")
  1181. return {}
  1182. function_result = self.function.batch_execute()
  1183. print("\n[功能性表现及评价结果]")
  1184. return function_result
  1185. # self.logger.info(f'Function Result: {function_result}')
  1186. # 使用示例
  1187. if __name__ == "__main__":
  1188. pass
  1189. # print("\n[功能类表现及得分情况]")