function.py 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2025 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2025/01/5
  11. @Last Modified: 2025/01/5
  12. @Summary: Function Metrics Calculation
  13. """
  14. import sys
  15. from pathlib import Path
  16. # 添加项目根目录到系统路径
  17. root_path = Path(__file__).resolve().parent.parent
  18. sys.path.append(str(root_path))
  19. print(root_path)
  20. from modules.lib.score import Score
  21. from modules.lib.log_manager import LogManager
  22. import numpy as np
  23. from typing import Dict, Tuple, Optional, Callable, Any
  24. import pandas as pd
  25. import yaml
  26. from modules.lib.chart_generator import generate_function_chart_data
  27. from shapely.geometry import Point, Polygon
  28. from modules.lib.common import get_interpolation
  29. import pandas as pd
  30. import yaml
  31. import math
  32. # ----------------------
  33. # 基础工具函数 (Pure functions)
  34. # ----------------------
  35. scenario_sign_dict = {
  36. "LeftTurnAssist": 206,
  37. "HazardousLocationW": 207,
  38. "RedLightViolationW": 208,
  39. "AbnormalVehicleW": 209,
  40. "NsightVulnerableRoadUserCollisionW": 210,
  41. "LitterW": 211,
  42. "ForwardCollision": 212,
  43. "VisibilityW": 213,
  44. "EmergencyBrakeW": 214,
  45. "IntersectionCollisionW": 215,
  46. "BlindSpotW": 216,
  47. "DoNotPassW": 217,
  48. "ControlLossW": 218,
  49. "FrontTrafficJamW": 219,
  50. "EmergencyVehicleW": 220,
  51. "CooperativeVehicleMerge": 221,
  52. "CooperativeLaneChange": 223,
  53. "VulnerableRoadUserCollisionW": 224,
  54. "CooperativeIntersectionPassing": 225,
  55. "RampMerge": 226,
  56. "DrivingLaneRecommendation": 227,
  57. "TrafficJamW": 228,
  58. "DynamicSpeedLimitingInformation": 229,
  59. "EmergencyVehiclesPriority": 230,
  60. "RemoteSupervision": 231,
  61. "SignalLightReminder": 232,
  62. "OtherVehicleRedLightViolationW": 233,
  63. "GreenLightOptimalSpeedAdvisory": 234,
  64. }
  65. def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
  66. polygon = Polygon(polygon)
  67. point = Point(test_point)
  68. return polygon.contains(point)
  69. def _is_segment_by_interval(time_list, expected_step) -> list:
  70. """
  71. 根据时间戳之间的间隔进行分段。
  72. 参数:
  73. time_list (list): 时间戳列表。
  74. expected_step (float): 预期的固定步长。
  75. 返回:
  76. list: 分段后的时间戳列表,每个元素是一个子列表。
  77. """
  78. if not time_list:
  79. return []
  80. segments = []
  81. current_segment = [time_list[0]]
  82. for i in range(1, len(time_list)):
  83. actual_step = time_list[i] - time_list[i - 1]
  84. if actual_step != expected_step:
  85. # 如果间隔不符合预期,则开始一个新的段
  86. segments.append(current_segment)
  87. current_segment = [time_list[i]]
  88. else:
  89. # 否则,将当前时间戳添加到当前段中
  90. current_segment.append(time_list[i])
  91. # 添加最后一个段
  92. if current_segment:
  93. segments.append(current_segment)
  94. return segments
  95. # 寻找二级指标的名称
  96. def find_nested_name(data):
  97. """
  98. 查找字典中嵌套的name结构。
  99. :param data: 要搜索的字典
  100. :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None
  101. """
  102. if isinstance(data, dict):
  103. for key, value in data.items():
  104. if isinstance(value, dict) and 'name' in value:
  105. return value['name']
  106. # 递归查找嵌套字典
  107. result = find_nested_name(value)
  108. if result is not None:
  109. return result
  110. elif isinstance(data, list):
  111. for item in data:
  112. result = find_nested_name(item)
  113. if result is not None:
  114. return result
  115. return None
  116. def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
  117. """向量化距离计算"""
  118. return np.linalg.norm(ego_pos - obj_pos, axis=1)
  119. def calculate_relative_speed_PGVIL(
  120. ego_speed: np.ndarray, obj_speed: np.ndarray
  121. ) -> np.ndarray:
  122. """向量化相对速度计算"""
  123. return np.linalg.norm(ego_speed - obj_speed, axis=1)
  124. def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  125. """向量化距离计算"""
  126. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  127. return dist
  128. def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
  129. """向量化相对速度计算"""
  130. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  131. def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
  132. """数据提取函数"""
  133. ego = data[data["playerId"] == 1].iloc[0]
  134. obj = data[data["playerId"] != 1]
  135. return ego, obj
  136. def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
  137. """带缓存的预警数据获取"""
  138. ego_df = data_processed.ego_data
  139. obj_df = data_processed.object_df
  140. scenario_name = find_nested_name(data_processed.function_config["function"])
  141. correctwarning = scenario_sign_dict.get(scenario_name)
  142. if correctwarning is None:
  143. print("无法获取正确的预警信号标志位!")
  144. return None
  145. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  146. warning_times = warning_rows['simTime']
  147. if warning_times.empty:
  148. print("没有找到预警数据!")
  149. return None
  150. first_time = warning_times.iloc[0]
  151. return obj_df[obj_df['simTime'] == first_time]
  152. def getAxis(heading):
  153. AxisL = [0, 0]
  154. AxisW = [0, 0]
  155. AxisL[0] = math.cos(heading)
  156. AxisL[1] = math.sin(heading)
  157. AxisW[0] = -math.sin(heading)
  158. AxisW[1] = math.cos(heading)
  159. return AxisL, AxisW
  160. def getDotMultiply(firstAxis, secondAxis):
  161. dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1])
  162. return dotMultiply
  163. def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
  164. projAxisL = getDotMultiply(AxisL, baseAxis)
  165. projAxisW = getDotMultiply(AxisW, baseAxis)
  166. projectionRadius = halfLength * projAxisL + halfWidth * projAxisW
  167. return projectionRadius
  168. def isCollision(
  169. firstAxisL,
  170. firstAxisW,
  171. firstHalfLength,
  172. firstHalfWidth,
  173. secondAxisL,
  174. secondAxisW,
  175. secondHalfLength,
  176. secondHalfWidth,
  177. disVector,
  178. ):
  179. isCollision = True
  180. axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
  181. addProjectionRadius = 0
  182. projectionDisVecotr = 0
  183. for axis in axes:
  184. addProjectionRadius = getProjectionRadius(
  185. firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth
  186. ) + getProjectionRadius(
  187. secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth
  188. )
  189. projectionDisVecotr = getDotMultiply(disVector, axis)
  190. if projectionDisVecotr > addProjectionRadius:
  191. isCollision = False
  192. break
  193. return isCollision
  194. def funcIsCollision(
  195. firstDimX,
  196. firstDimY,
  197. firstOffX,
  198. firstOffY,
  199. firstX,
  200. firstY,
  201. firstHeading,
  202. secondDimX,
  203. secondDimY,
  204. secondOffX,
  205. secondOffY,
  206. secondX,
  207. secondY,
  208. secondHeading,
  209. ):
  210. firstAxisL = getAxis(firstHeading)[0]
  211. firstAxisW = getAxis(firstHeading)[1]
  212. secondAxisL = getAxis(secondHeading)[0]
  213. secondAxisW = getAxis(secondHeading)[1]
  214. firstHalfLength = 0.5 * firstDimX
  215. firstHalfWidth = 0.5 * firstDimY
  216. secondHalfLength = 0.5 * secondDimX
  217. secondHalfWidth = 0.5 * secondDimY
  218. firstCenter = [0, 0]
  219. secondCenter = [0, 0]
  220. disVector = [0, 0]
  221. firstCenter[0] = firstX + firstOffX * math.cos(firstHeading)
  222. firstCenter[1] = firstY + firstOffX * math.sin(firstHeading)
  223. secondCenter[0] = secondX + secondOffX * math.cos(secondHeading)
  224. secondCenter[1] = secondY + secondOffX * math.sin(secondHeading)
  225. disVector[0] = secondCenter[0] - firstCenter[0]
  226. disVector[1] = secondCenter[1] - firstCenter[1]
  227. varIsCollision = isCollision(
  228. firstAxisL,
  229. firstAxisW,
  230. firstHalfLength,
  231. firstHalfWidth,
  232. secondAxisL,
  233. secondAxisW,
  234. secondHalfLength,
  235. secondHalfWidth,
  236. disVector,
  237. )
  238. return varIsCollision
  239. # ----------------------
  240. # 核心计算功能函数
  241. # ----------------------
  242. def latestWarningDistance_LST(data, plot_path) -> dict:
  243. """预警距离计算流水线"""
  244. scenario_name = find_nested_name(data.function_config["function"])
  245. value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
  246. correctwarning = scenario_sign_dict[scenario_name]
  247. ego_df = data.ego_data
  248. warning_dist = calculate_distance(ego_df, correctwarning)
  249. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  250. # 将计算结果保存到data对象中,供图表生成使用
  251. data.warning_dist = warning_dist
  252. data.warning_speed = warning_speed
  253. data.correctwarning = correctwarning
  254. if len(warning_dist.tolist()) == 0:
  255. return {"latestWarningDistance_LST": 0.0}
  256. # 生成图表数据
  257. generate_function_chart_data(data, 'latestWarningDistance_LST', plot_path)
  258. return {"latestWarningDistance_LST": warning_dist.tolist()[-1] if len(warning_dist) > 0 else value}
  259. def earliestWarningDistance_LST(data, plot_path) -> dict:
  260. """预警距离计算流水线"""
  261. scenario_name = find_nested_name(data.function_config["function"])
  262. value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
  263. correctwarning = scenario_sign_dict[scenario_name]
  264. ego_df = data.ego_data
  265. warning_dist = calculate_distance(ego_df, correctwarning)
  266. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  267. # 将计算结果保存到data对象中,供图表生成使用
  268. data.warning_dist = warning_dist
  269. data.warning_speed = warning_speed
  270. data.correctwarning = correctwarning
  271. if len(warning_dist.tolist()) == 0:
  272. return {"earliestWarningDistance_LST": 0.0}
  273. # 生成图表数据
  274. generate_function_chart_data(data, 'earliestWarningDistance_LST', plot_path)
  275. return {"earliestWarningDistance_LST": float(warning_dist.tolist()[0]) if len(warning_dist) > 0 else value}
  276. def latestWarningDistance_TTC_LST(data, plot_path) -> dict:
  277. """TTC计算流水线"""
  278. scenario_name = find_nested_name(data.function_config["function"])
  279. value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
  280. correctwarning = scenario_sign_dict[scenario_name]
  281. ego_df = data.ego_data
  282. warning_dist = calculate_distance(ego_df, correctwarning)
  283. if len(warning_dist.tolist()) == 0:
  284. return {"latestWarningDistance_TTC_LST": 0.0}
  285. # 将correctwarning保存到data对象中,供图表生成使用
  286. data.correctwarning = correctwarning
  287. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  288. with np.errstate(divide='ignore', invalid='ignore'):
  289. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  290. # 处理无效的TTC值
  291. for i in range(len(ttc)):
  292. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  293. data.warning_dist = warning_dist
  294. data.warning_speed = warning_speed
  295. data.ttc = ttc
  296. # 生成图表数据
  297. # from modules.lib.chart_generator import generate_function_chart_data
  298. generate_function_chart_data(data, 'latestWarningDistance_TTC_LST', plot_path)
  299. return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
  300. def earliestWarningDistance_TTC_LST(data, plot_path) -> dict:
  301. """TTC计算流水线"""
  302. scenario_name = find_nested_name(data.function_config["function"])
  303. value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
  304. correctwarning = scenario_sign_dict[scenario_name]
  305. ego_df = data.ego_data
  306. warning_dist = calculate_distance(ego_df, correctwarning)
  307. if len(warning_dist.tolist()) == 0:
  308. return {"earliestWarningDistance_TTC_LST": 0.0}
  309. # 将correctwarning保存到data对象中,供图表生成使用
  310. data.correctwarning = correctwarning
  311. warning_speed = calculate_relative_speed(ego_df, correctwarning)
  312. with np.errstate(divide='ignore', invalid='ignore'):
  313. ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
  314. # 处理无效的TTC值
  315. for i in range(len(ttc)):
  316. ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
  317. # 将计算结果保存到data对象中,供图表生成使用
  318. data.warning_dist = warning_dist
  319. data.warning_speed = warning_speed
  320. data.ttc = ttc
  321. data.correctwarning = correctwarning
  322. # 生成图表数据
  323. generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST', plot_path)
  324. return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
  325. def warningDelayTime_LST(data, plot_path):
  326. scenario_name = find_nested_name(data.function_config["function"])
  327. correctwarning = scenario_sign_dict[scenario_name]
  328. # 将correctwarning保存到data对象中,供图表生成使用
  329. data.correctwarning = correctwarning
  330. ego_df = data.ego_data
  331. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  332. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  333. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  334. 'simTime'].tolist()
  335. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  336. if (simTime_HMI is None) or (simTime_rosbag is None):
  337. print("预警出错!")
  338. delay_time = 100.0
  339. else:
  340. delay_time = abs(simTime_HMI - simTime_rosbag)
  341. return {"warningDelayTime_LST": delay_time}
  342. def warningDelayTimeofReachDecel_LST(data, plot_path):
  343. scenario_name = find_nested_name(data.function_config["function"])
  344. correctwarning = scenario_sign_dict[scenario_name]
  345. # 将correctwarning保存到data对象中,供图表生成使用
  346. data.correctwarning = correctwarning
  347. ego_df = data.ego_data
  348. obj_df = data.obj_data[2]
  349. # 加速度在速度方向上的投影
  350. speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6
  351. obj_df["ux"] = obj_df["speedX"] / speed_mag
  352. obj_df["uy"] = obj_df["speedY"] / speed_mag
  353. obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"]
  354. obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2
  355. warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
  356. if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
  357. return {"warningDelayTimeofReachDecel_LST": 0}
  358. elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0):
  359. return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]}
  360. elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0):
  361. return {"warningDelayTimeofReachDecel_LST": -1}
  362. else:
  363. return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
  364. def rightWarningSignal_LST(data, plot_path):
  365. scenario_name = find_nested_name(data.function_config["function"])
  366. correctwarning = scenario_sign_dict[scenario_name]
  367. # 将correctwarning保存到data对象中,供图表生成使用
  368. data.correctwarning = correctwarning
  369. ego_df = data.ego_data
  370. if ego_df['ifwarning'].empty:
  371. print("无法获取正确预警信号标志位!")
  372. return
  373. warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
  374. if warning_rows.empty:
  375. return {"rightWarningSignal_LST": -1}
  376. else:
  377. return {"rightWarningSignal_LST": 1}
  378. def noWarning_LST(data, plot_path):
  379. scenario_name = find_nested_name(data.function_config["function"])
  380. correctwarning = scenario_sign_dict[scenario_name]
  381. ego_df = data.ego_data
  382. if ego_df['ifwarning'].empty:
  383. print("无法获取正确预警信号标志位!")
  384. return
  385. warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
  386. if warning_rows:
  387. return {"noWarning_LST": 1}
  388. else:
  389. return {"noWarning_LST": -1}
  390. # def ifCrossingRedLight_LST(data, plot_path):
  391. # scenario_name = find_nested_name(data.function_config["function"])
  392. # correctwarning = scenario_sign_dict[scenario_name]
  393. # # 将correctwarning保存到data对象中,供图表生成使用
  394. # data.correctwarning = correctwarning
  395. # ego_df = data.ego_data
  396. # redlight_simtime = ego_df[
  397. # (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] <= 0.5) & (
  398. # ego_df['v'] != 0)]['simTime']
  399. # if redlight_simtime.empty:
  400. # return {"ifCrossingRedLight_LST": -1}
  401. # else:
  402. # return {"ifCrossingRedLight_LST": 1}
  403. def ifStopgreenWaveSpeedGuidance_LST(data, plot_path):
  404. scenario_name = find_nested_name(data.function_config["function"])
  405. correctwarning = scenario_sign_dict[scenario_name]
  406. # 将correctwarning保存到data对象中,供图表生成使用
  407. data.correctwarning = correctwarning
  408. ego_df = data.ego_data
  409. greenlight_simtime = \
  410. ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] <= 0.1)]['simTime']
  411. if greenlight_simtime.empty:
  412. return {"ifStopgreenWaveSpeedGuidance_LST": -1}
  413. else:
  414. return {"ifStopgreenWaveSpeedGuidance_LST": 1}
  415. # ------ 单车智能指标 ------
  416. def limitSpeed_LST(data, plot_path):
  417. ego_df = data.ego_data
  418. scenario_name = find_nested_name(data.function_config["function"])
  419. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  420. speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.2]['v'].tolist()
  421. if len(speed_limit) == 0:
  422. return {"limitSpeed_LST": -1}
  423. max_speed = max(speed_limit)*3.6
  424. data.speedLimit = limit_speed
  425. generate_function_chart_data(data, 'limitspeed_LST', plot_path)
  426. return {"limitSpeed_LST": max_speed}
  427. def limitSpeedPastLimitSign_LST(data, plot_path):
  428. ego_df = data.ego_data
  429. scenario_name = find_nested_name(data.function_config["function"])
  430. limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
  431. car_length = data.function_config["vehicle"]['CAR_LENGTH']
  432. ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist()
  433. ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
  434. data.speedLimit = limit_speed
  435. data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
  436. generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST', plot_path)
  437. if len(ego_speed) == 0:
  438. return {"speedPastLimitSign_LST": -1}
  439. return {"speedPastLimitSign_LST": ego_speed[0]}
  440. def leastDistance_LST(data, plot_path):
  441. ego_df = data.ego_data
  442. dist_row = ego_df[ego_df['v'] <= 0.1]['relative_dist'].tolist()
  443. if len(dist_row) == 0:
  444. return {"leastDistance_LST": -1}
  445. else:
  446. min_dist = min(dist_row)
  447. return {"leastDistance_LST": min_dist}
  448. def noCrossRedLight_LST(data, plot_path):
  449. ego_df = data.ego_data
  450. cross_redlight_dist = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None) & abs(ego_df['v'] > 0.1)]['x_relative_dist']
  451. has_positive = any(x > 0 for x in cross_redlight_dist)
  452. has_negative = any(x < 0 for x in cross_redlight_dist)
  453. has_both = has_positive and has_negative
  454. if has_both:
  455. return {'noCrossRedLight_LST': -1}
  456. else:
  457. return {'noCrossRedLight_LST': 1}
  458. def launchTimeinStopLine_LST(data, plot_path):
  459. ego_df = data.ego_data
  460. simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist()
  461. if len(simtime_row) == 0:
  462. return {"launchTimeinStopLine_LST": -1}
  463. else:
  464. delta_t = simtime_row[-1] - simtime_row[0]
  465. return {"launchTimeinStopLine_LST": delta_t}
  466. def launchTimewhenFollowingCar_LST(data, plot_path):
  467. ego_df = data.ego_data
  468. t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
  469. simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist()
  470. if len(simtime_row) == 0:
  471. return {"launchTimewhenFollowingCar_LST": 0}
  472. else:
  473. time_interval = _is_segment_by_interval(simtime_row, t_interval)
  474. delta_t = []
  475. for t in time_interval:
  476. delta_t.append(t[-1] - t[0])
  477. return {"launchTimewhenFollowingCar_LST": max(delta_t)}
  478. def noStop_LST(data, plot_path):
  479. ego_df_ini = data.ego_data
  480. min_time = ego_df_ini['simTime'].min() + 0
  481. max_time = ego_df_ini['simTime'].max() - 0
  482. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  483. speed = ego_df['v'].tolist()
  484. if (any(speed) <= 0.5):
  485. return {"noStop_LST": -1}
  486. else:
  487. return {"noStop_LST": 1}
  488. def launchTimeinTrafficLight_LST(data, plot_path):
  489. '''
  490. 待修改:
  491. 红灯的状态值:1
  492. 绿灯的状态值:0
  493. '''
  494. ego_df = data.ego_data
  495. simtime_of_redlight = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None)]['simTime']
  496. simtime_of_stop = ego_df[ego_df['v'] <= 0.1]['simTime']
  497. if len(simtime_of_stop) or len(simtime_of_redlight):
  498. return {"launchTimeinTrafficLight_LST": -1}
  499. simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist()
  500. if len(simtime_of_launch) == 0:
  501. return {"launchTimeinTrafficLight_LST": -1}
  502. return {"launchTimeinTrafficLight_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
  503. def crossJunctionToTargetLane_LST(data, plot_path):
  504. ego_df = data.ego_data
  505. lane_in_leftturn = set(ego_df['lane_id'].tolist())
  506. scenario_name = find_nested_name(data.function_config["function"])
  507. if (30225902 in lane_in_leftturn) or (30226005 in lane_in_leftturn):
  508. return {"crossJunctionToTargetLane_LST": 1}
  509. else:
  510. return {"crossJunctionToTargetLane_LST": -1}
  511. def keepInLane_LST(data, plot_path):
  512. # ego_df = data.ego_data[data.ego_data['road_type'] == 15]
  513. ego_df = data.ego_data[data.ego_data['lane_id'].isin([30225904, 30225978])].copy()
  514. notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width'] / 2]
  515. if len(notkeepinlane) == 0:
  516. return {"keepInLane_LST": -1}
  517. return {"keepInLane_LST": 1}
  518. def leastLateralDistance_LST(data, plot_path):
  519. ego_df = data.ego_data
  520. lane_width = ego_df[abs(ego_df['x_relative_dist']) <= 0.5]['lane_width'].tolist()[0]
  521. if len(lane_width) == 0:
  522. return {"leastLateralDistance_LST": -1}
  523. else:
  524. y_relative_dist = ego_df[abs(ego_df['x_relative_dist']) <= 0.05]['y_relative_dist']
  525. if (abs(y_relative_dist) <= lane_width / 2).all():
  526. return {"leastLateralDistance_LST": 1}
  527. else:
  528. return {"leastLateralDistance_LST": -1}
  529. def waitTimeAtCrosswalkwithPedestrian_LST(data, plot_path):
  530. ego_df = data.ego_data
  531. object_df = data.obj_data
  532. data['in_crosswalk'] = []
  533. position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True)
  534. for cross_id in position_data['cross_id'].tolist():
  535. for posX, posY in object_df['posX', 'posY']:
  536. pedestrian_pos = (posX, posY)
  537. plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0]
  538. if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos):
  539. data[data['playerId'] == 2]['in_crosswalk'] = 1
  540. else:
  541. data[data['playerId'] == 2]['in_crosswalk'] = 0
  542. car_stop_time = ego_df[ego_df['v'] <= 0.1]['simTime']
  543. pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime']
  544. car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist()
  545. return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len(
  546. car_wait_pedestrian) > 0 else 0}
  547. def launchTimewhenPedestrianLeave_LST(data, plot_path):
  548. ego_df = data.ego_data
  549. car_stop_time = ego_df[ego_df['v'] <= 0.1]["simTime"]
  550. if len(car_stop_time) == 0:
  551. return {"launchTimewhenPedestrianLeave_LST": 0}
  552. else:
  553. lane_width = ego_df[ego_df['v'] <= 0.1]['lane_width'].tolist()[0]
  554. car_to_pedestrain = ego_df[abs(ego_df['y_relative_dist']) <= lane_width / 2]["simTime"]
  555. legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist()
  556. return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
  557. def noCollision_LST(data, plot_path):
  558. ego_df = data.ego_data
  559. is_dist_min = False
  560. is_sign_change = False
  561. for i in range(1, len(ego_df) - 1):
  562. relative_dist_decreasing = ego_df['relative_dist'][i-1] > ego_df['relative_dist'][i]
  563. relative_dist_increasing = ego_df['relative_dist'][i+1] > ego_df['relative_dist'][i]
  564. is_dist_min = relative_dist_decreasing and relative_dist_increasing and (ego_df['relative_dist'][i] < 0.5)
  565. x_relative = (ego_df['x_relative_dist'][i-1]*ego_df['x_relative_dist'][i]) < 0
  566. y_relative = (ego_df['y_relative_dist'][i-1]*ego_df['y_relative_dist'][i]) < 0
  567. is_sign_change = x_relative or y_relative
  568. if is_dist_min and is_sign_change:
  569. return {"noCollision_LST": -1}
  570. return {"noCollision_LST": 1}
  571. def noReverse_LST(data, plot_path):
  572. ego_df = data.ego_data
  573. if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
  574. return {"noReverse_LST": -1}
  575. else:
  576. return {"noReverse_LST": 1}
  577. def turnAround_LST(data, plot_path):
  578. ego_df = data.ego_data
  579. if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
  580. ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
  581. return {"turnAround_LST": 1}
  582. else:
  583. return {"turnAround_LST": -1}
  584. def laneOffset_LST(data, plot_path):
  585. car_width = data.vehicle_config['CAR_WIDTH']
  586. ego_df_ini = data.ego_data
  587. min_time = ego_df_ini['simTime'].min() + 0
  588. max_time = ego_df_ini['simTime'].max() - 0
  589. ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
  590. laneoffset = abs(ego_df['y_relative_dist']) - car_width / 2
  591. return {"laneOffset_LST": max(laneoffset)}
  592. def maxLongitudeDist_LST(data, plot_path):
  593. ego_df = data.ego_data
  594. longitude_dist = abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist'])
  595. data.longitude_dist = min(abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist']), default=0)
  596. stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist()
  597. data.stop_time = min(stop_time)
  598. if len(longitude_dist) == 0:
  599. return {"maxLongitudeDist_LST": -1}
  600. generate_function_chart_data(data, 'maxLongitudeDist_LST', plot_path)
  601. return {"maxLongDist_LST": min(longitude_dist)}
  602. def noEmergencyBraking_LST(data, plot_path):
  603. ego_df = data.ego_data
  604. ego_df['ip_dec'] = ego_df['v'].apply(
  605. get_interpolation, point1=[18, -5], point2=[72, -3.5])
  606. ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply(
  607. lambda x: 1 if x < 0 else 0)
  608. if sum(ego_df['slam_brake']) == 0:
  609. return {"noEmergencyBraking_LST": 1}
  610. else:
  611. return {"noEmergencyBraking_LST": -1}
  612. def rightWarningSignal_PGVIL(data_processed, plot_path) -> dict:
  613. """判断是否发出正确预警信号"""
  614. ego_df = data_processed.ego_data
  615. scenario_name = find_nested_name(data_processed.function_config["function"])
  616. correctwarning = scenario_sign_dict[scenario_name]
  617. if correctwarning is None:
  618. print("无法获取正确的预警信号标志位!")
  619. return None
  620. # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
  621. warning_rows = ego_df[
  622. (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
  623. ]
  624. if warning_rows.empty:
  625. return {"rightWarningSignal_PGVIL": -1}
  626. else:
  627. return {"rightWarningSignal_PGVIL": 1}
  628. def latestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
  629. """预警距离计算流水线"""
  630. ego_df = data_processed.ego_data
  631. obj_df = data_processed.object_df
  632. warning_data = get_first_warning(data_processed)
  633. if warning_data is None:
  634. return {"latestWarningDistance_PGVIL": -11}
  635. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  636. # dis_pos2head = 5.929 / 2 + 1.982
  637. ego, obj = extract_ego_obj(warning_data)
  638. distances = calculate_distance_PGVIL(
  639. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  640. ) - dis_pos2head
  641. # 将计算结果保存到data对象中,供图表生成使用
  642. data_processed.warning_dist = distances
  643. data_processed.warning_time = ego['simTime'].tolist()
  644. if distances.size == 0:
  645. print("没有找到数据!")
  646. return {"latestWarningDistance_PGVIL": -1} # 或返回其他默认值,如0.0
  647. generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL', plot_path)
  648. return {"latestWarningDistance_PGVIL": float(np.min(distances))}
  649. def latestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict:
  650. """预警距离计算流水线"""
  651. ego_df = data_processed.ego_data
  652. warning_data = get_first_warning(data_processed)
  653. if warning_data is None:
  654. return {"latestWarningDistancetoConflictPoint_PGVIL": -11}
  655. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  656. ego, obj = extract_ego_obj(warning_data)
  657. ConflictPoint = obj[obj["type"] == 25].copy()
  658. distances = calculate_distance_PGVIL(
  659. np.array([[ego["posX"], ego["posY"]]]), ConflictPoint[["posX", "posY"]].values
  660. ) - dis_pos2head
  661. # 将计算结果保存到data对象中,供图表生成使用
  662. data_processed.warning_dist = distances
  663. data_processed.warning_time = ego['simTime'].tolist()
  664. if distances.size == 0:
  665. print("没有找到数据!")
  666. return {"latestWarningDistancetoConflictPoint_PGVIL": -1} # 或返回其他默认值,如0.0
  667. generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_PGVIL', plot_path)
  668. return {"latestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))}
  669. def latestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
  670. """TTC计算流水线"""
  671. ego_df = data_processed.ego_data
  672. obj_df = data_processed.object_df
  673. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  674. # dis_pos2head = 5.929 / 2 + 1.982
  675. warning_data = get_first_warning(data_processed)
  676. if warning_data is None:
  677. return {"latestWarningDistance_TTC_PGVIL": 0.0}
  678. ego, obj = extract_ego_obj(warning_data)
  679. # 向量化计算
  680. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  681. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  682. obj_pos = obj[["posX", "posY"]].values
  683. obj_speed = obj[["speedX", "speedY"]].values
  684. distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head
  685. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  686. data_processed.warning_dist = distances
  687. data_processed.warning_speed = rel_speeds
  688. data_processed.warning_time = ego['simTime'].tolist()
  689. with np.errstate(divide="ignore", invalid="ignore"):
  690. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  691. if ttc.size == 0:
  692. print("没有找到数据!")
  693. return {"latestWarningDistance_TTC_PGVIL": -2} # 或返回其他默认值,如0.0
  694. data_processed.ttc = ttc
  695. generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL', plot_path)
  696. return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  697. def latestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict:
  698. """TTC计算流水线"""
  699. ego_df = data_processed.ego_data
  700. warning_data = get_first_warning(data_processed)
  701. if warning_data is None:
  702. return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -11}
  703. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  704. ego, obj = extract_ego_obj(warning_data)
  705. # 向量化计算
  706. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  707. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  708. ConflictPoint = obj[obj["type"] == 25].copy()
  709. if ConflictPoint.empty:
  710. print("未找到 type == 25 的目标物")
  711. return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22}
  712. ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values
  713. ConflictPoint_speed = obj[["speedX", "speedY"]].values
  714. distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head
  715. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed)
  716. data_processed.warning_dist = distances
  717. data_processed.warning_speed = rel_speeds
  718. data_processed.warning_time = ego['simTime'].tolist()
  719. with np.errstate(divide="ignore", invalid="ignore"):
  720. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  721. if ttc.size == 0:
  722. print("没有找到数据!")
  723. return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -33} # 或返回其他默认值,如0.0
  724. data_processed.ttc = ttc
  725. generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path)
  726. return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))}
  727. def earliestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
  728. """预警距离计算流水线"""
  729. ego_df = data_processed.ego_data
  730. obj_df = data_processed.object_df
  731. warning_data = get_first_warning(data_processed)
  732. if warning_data is None:
  733. return {"earliestWarningDistance_PGVIL": -11}
  734. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  735. # dis_pos2head = 5.929 / 2 + 1.982
  736. ego, obj = extract_ego_obj(warning_data)
  737. distances = calculate_distance_PGVIL(
  738. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  739. ) - dis_pos2head
  740. # 将计算结果保存到data对象中,供图表生成使用
  741. data_processed.warning_dist = distances
  742. data_processed.warning_time = ego['simTime'].tolist()
  743. if distances.size == 0:
  744. print("没有找到数据!")
  745. return {"earliestWarningDistance_PGVIL": -22} # 或返回其他默认值,如0.0
  746. generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL', plot_path)
  747. return {"earliestWarningDistance_PGVIL": float(np.min(distances))}
  748. def earliestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict:
  749. """预警距离计算流水线"""
  750. ego_df = data_processed.ego_data
  751. warning_data = get_first_warning(data_processed)
  752. if warning_data is None:
  753. return {"earliestWarningDistancetoConflictPoint_PGVIL": -11}
  754. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  755. ego, obj = extract_ego_obj(warning_data)
  756. distances = calculate_distance_PGVIL(
  757. np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
  758. ) - dis_pos2head
  759. # 将计算结果保存到data对象中,供图表生成使用
  760. data_processed.warning_dist = distances
  761. data_processed.warning_time = ego['simTime'].tolist()
  762. if distances.size == 0:
  763. print("没有找到数据!")
  764. return {"earliestWarningDistancetoConflictPoint_PGVIL": -22} # 或返回其他默认值,如0.0
  765. generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_PGVIL', plot_path)
  766. return {"earliestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))}
  767. def earliestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
  768. """TTC计算流水线"""
  769. ego_df = data_processed.ego_data
  770. obj_df = data_processed.object_df
  771. warning_data = get_first_warning(data_processed)
  772. if warning_data is None:
  773. return {"earliestWarningDistance_TTC_PGVIL": -11}
  774. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  775. # dis_pos2head = 5.929 / 2 + 1.982
  776. ego, obj = extract_ego_obj(warning_data)
  777. # 向量化计算
  778. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  779. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  780. obj_pos = obj[["posX", "posY"]].values
  781. obj_speed = obj[["speedX", "speedY"]].values
  782. distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head
  783. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
  784. data_processed.warning_dist = distances
  785. data_processed.warning_speed = rel_speeds
  786. data_processed.warning_time = ego['simTime'].tolist()
  787. with np.errstate(divide="ignore", invalid="ignore"):
  788. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  789. if ttc.size == 0:
  790. print("没有找到数据!")
  791. return {"earliestWarningDistance_TTC_PGVIL": -22} # 或返回其他默认值,如0.0
  792. data_processed.ttc = ttc
  793. generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL', plot_path)
  794. return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
  795. def earliestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict:
  796. """TTC计算流水线"""
  797. ego_df = data_processed.ego_data
  798. warning_data = get_first_warning(data_processed)
  799. if warning_data is None:
  800. return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -11}
  801. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  802. ego, obj = extract_ego_obj(warning_data)
  803. ConflictPoint = obj[obj["type"] == 25].copy()
  804. if ConflictPoint.empty:
  805. print("未找到 type == 25 的目标物")
  806. return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22}
  807. # 向量化计算
  808. ego_pos = np.array([[ego["posX"], ego["posY"]]])
  809. ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
  810. ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values
  811. ConflictPoint_speed = ConflictPoint[["speedX", "speedY"]].values
  812. distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head
  813. rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed)
  814. data_processed.warning_dist = distances
  815. data_processed.warning_speed = rel_speeds
  816. data_processed.warning_time = ego['simTime'].tolist()
  817. with np.errstate(divide="ignore", invalid="ignore"):
  818. ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
  819. if ttc.size == 0:
  820. print("没有找到数据!")
  821. return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -22} # 或返回其他默认值,如0.0
  822. data_processed.ttc = ttc
  823. generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path)
  824. return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))}
  825. def warningDelayTimeofReachDecel_PGVIL(data_processed, plot_path) -> dict:
  826. # 预警时机相对背景车辆减速度达到-4m/s2后的时延
  827. ego_df = data_processed.ego_data
  828. obj_df = data_processed.object_df
  829. warning_data = get_first_warning(data_processed)
  830. if warning_data is None:
  831. return {"warningDelayTimeofReachDecel_PGVIL": -1}
  832. try:
  833. ego, _ = extract_ego_obj(warning_data)
  834. warning_time = ego["simTime"] if isinstance(ego, pd.Series) else ego["simTime"].iloc[0]
  835. # 回溯:只取在预警前的背景车轨迹
  836. obj_pre = obj_df[(obj_df["playerId"] == 2) & (obj_df["simTime"] < warning_time)].copy()
  837. if len(obj_pre) < 2:
  838. print("预警前目标车辆轨迹不足2帧,无法计算减速度")
  839. return {"warningDelayTimeofReachDecel_PGVIL": -2}
  840. obj_pre = obj_pre.sort_values(by="simTime").reset_index(drop=True)
  841. obj_speed = obj_pre[["speedX", "speedY"]].values
  842. simtime_gap = obj_pre["simTime"].iloc[1] - obj_pre["simTime"].iloc[0]
  843. obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)
  844. obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
  845. if np.min(obj_deceleration) < -4:
  846. max_index = np.argmin(obj_deceleration)
  847. decel_time = obj_pre["simTime"].iloc[max_index + 1] # 注意 +1 是因为 diff 少1行
  848. delay_time = warning_time - decel_time
  849. return {"warningDelayTimeofReachDecel_PGVIL": round(float(delay_time), 3)}
  850. else:
  851. print("未发现背景车减速度低于 -4 m/s² 的时刻")
  852. return {"warningDelayTimeofReachDecel_PGVIL": -3}
  853. except Exception as e:
  854. print("异常:", e)
  855. return {"warningDelayTimeofReachDecel_PGVIL": -99}
  856. def warningDelayTime_PGVIL(data_processed, plot_path) -> dict:
  857. """车端接收到预警到HMI发出预警的时延"""
  858. ego_df = data_processed.ego_data
  859. # #打印ego_df的列名
  860. # print(ego_df.columns.tolist())
  861. warning_data = get_first_warning(data_processed)
  862. if warning_data is None:
  863. return {"warningDelayTime_PGVIL": -1}
  864. try:
  865. ego, obj = extract_ego_obj(warning_data)
  866. # 找到event_Type不为空,且playerID为1的行
  867. rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
  868. first_time = rosbag_warning_rows["simTime"].iloc[0]
  869. warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
  870. delay_time = warning_time - first_time
  871. return {"warningDelayTime_PGVIL": float(delay_time)}
  872. except Exception as e:
  873. print(f"计算预警时延时发生错误: {e}")
  874. return {"warningDelayTime_PGVIL": -1}
  875. def noWarning_PGVIL(data_processed, plot_path) -> dict:
  876. scenario_name = find_nested_name(data_processed.function_config["function"])
  877. correctwarning = scenario_sign_dict[scenario_name]
  878. ego_df = data_processed.ego_data
  879. if ego_df['ifwarning'].empty:
  880. print("无法获取正确预警信号标志位!")
  881. return
  882. warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
  883. if warning_rows:
  884. return {"noWarning_PGVIL": 1}
  885. else:
  886. return {"noWarning_PGVIL": -1}
  887. def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
  888. """
  889. 计算主车后轴中心点到停止线的距离
  890. :return 距离
  891. """
  892. distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
  893. # 计算停止线的方向向量
  894. line_vector = np.array(
  895. [
  896. stop_line_points[1][0] - stop_line_points[0][0],
  897. stop_line_points[1][1] - stop_line_points[0][1],
  898. ]
  899. )
  900. direction_vector_norm = np.linalg.norm(line_vector)
  901. direction_vector_unit = (
  902. line_vector / direction_vector_norm
  903. if direction_vector_norm != 0
  904. else np.array([0, 0])
  905. )
  906. # 计算主车后轴中心点到停止线投影的坐标(垂足)
  907. projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
  908. perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
  909. # 计算主车后轴中心点到垂足的距离
  910. distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
  911. carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
  912. return carhead_distance_to_foot
  913. def ifCrossingRedLight_PGVIL(data_processed, plot_path) -> dict:
  914. GREEN = 0x100000 # 1048576
  915. YELLOW = 0x1000000 # 16777216
  916. RED = 0x10000000 # 268435456
  917. ego_df = data_processed.ego_data.copy()
  918. stopline_df = data_processed.object_df.copy()
  919. # 过滤出停止线对象(type==25),只保留两端点
  920. stopline_df = stopline_df.loc[
  921. stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"]
  922. ]
  923. # 去除无效stateMask帧
  924. ego_df = ego_df[ego_df["stateMask"].notna()].copy()
  925. ego_df.sort_values(by="simTime", inplace=True)
  926. dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
  927. red_light_violation = False
  928. prev_distance = float("inf")
  929. for index, ego in ego_df.iterrows():
  930. simTime = ego["simTime"]
  931. car_point = (ego["posX"], ego["posY"])
  932. stateMask = int(ego["stateMask"])
  933. # 获取该时刻的两个停止线点(playerId 2 和 3)
  934. p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == simTime)]
  935. p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == simTime)]
  936. # 如果缺失,跳过该帧
  937. if p1_row.empty or p2_row.empty:
  938. continue
  939. p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0])
  940. p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0])
  941. stop_line_points = np.array([p1, p2], dtype=float)
  942. distance_to_stopline = get_car_to_stop_line_distance(
  943. ego, car_point, stop_line_points
  944. ) - dis_pos2head
  945. # 如果车头刚刚越线
  946. if prev_distance > 0 and distance_to_stopline <= 0:
  947. # 如果不是绿灯,就视为闯红灯
  948. if stateMask != GREEN:
  949. red_light_violation = True
  950. print(f"[Red Light Violation] simTime={simTime}, stateMask={stateMask}")
  951. break
  952. prev_distance = distance_to_stopline
  953. return {"ifCrossingRedLight_PGVIL": -1 if red_light_violation else 1}
  954. def limitSpeed_PGVIL(data, plot_path):
  955. ego_df = data.ego_data
  956. max_speed = max(ego_df["v"])
  957. if len(ego_df["v"]) == 0:
  958. return {"speedLimit_PGVIL": -1}
  959. else:
  960. return {"speedLimit_PGVIL": round(max_speed * 3.6, 3)}
  961. def leastDistance_PGVIL(data, plot_path):
  962. exclude_seconds = 2.0
  963. ego_df = data.ego_data
  964. stopline_df = data.object_df
  965. stopline_df = stopline_df.loc[
  966. stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"]
  967. ]
  968. max_sim_time = ego_df["simTime"].max()
  969. threshold_time = max_sim_time - exclude_seconds
  970. for index, ego in ego_df.iterrows():
  971. if ego["simTime"] >= threshold_time:
  972. continue
  973. if "v" in ego and ego["v"] == 0:
  974. current_time = ego["simTime"]
  975. car_point = (ego["posX"], ego["posY"])
  976. # p1 = (ego["stopline_x1"], ego["stopline_y1"])
  977. # p2 = (ego["stopline_x2"], ego["stopline_y2"])
  978. p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == current_time)]
  979. p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == current_time)]
  980. if p1_row.empty or p2_row.empty:
  981. continue # 如果找不到,就跳过,继续找下一个停车帧
  982. p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0])
  983. p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0])
  984. stop_line_points = np.array([p1, p2], dtype=float)
  985. distance_to_stopline = get_car_to_stop_line_distance(
  986. ego, car_point, stop_line_points
  987. )
  988. # print(f"distance_to_stopline:{distance_to_stopline}")
  989. return {"leastDistance_PGVIL": round(distance_to_stopline, 3)}
  990. return {"leastDistance_PGVIL": -1}
  991. def launchTimeinStopLine_PGVIL(data, plot_path):
  992. ego_df = data.ego_data
  993. # === 先计算有效时间段 ===
  994. max_sim_time = ego_df["simTime"].max()
  995. min_sim_time = ego_df["simTime"].min()
  996. threshold_start = min_sim_time + 3.0
  997. threshold_end = max_sim_time - 3.0
  998. # === 只保留有效时间段 ===
  999. ego_df = ego_df[(ego_df["simTime"] >= threshold_start) & (ego_df["simTime"] <= threshold_end)]
  1000. in_stop = False
  1001. start_time = None
  1002. last_low_time = None
  1003. max_duration = 0.0
  1004. v_thresh = 0.01
  1005. for _, row in ego_df.iterrows():
  1006. simt = row["simTime"]
  1007. v = row.get("v", None)
  1008. if v is None or np.isnan(v) or abs(v) > v_thresh:
  1009. if in_stop:
  1010. duration = last_low_time - start_time
  1011. if duration > max_duration:
  1012. max_duration = duration
  1013. in_stop = False
  1014. else:
  1015. if not in_stop:
  1016. start_time = simt
  1017. in_stop = True
  1018. last_low_time = simt
  1019. if in_stop:
  1020. duration = last_low_time - start_time
  1021. if duration > max_duration:
  1022. max_duration = duration
  1023. if max_duration <= 0:
  1024. return {"launchTimeinStopLine_PGVIL": -1}
  1025. else:
  1026. return {"launchTimeinStopLine_PGVIL": round(float(max_duration), 3)}
  1027. def launchTimeinTrafficLight_PGVIL(data, plot_path):
  1028. GREEN = 1048576 # 0x100000
  1029. RED = 268435456 # 0x10000000
  1030. ego_df = data.ego_data
  1031. mask = (ego_df["stateMask"]).notna()
  1032. ego_df_light = ego_df[mask].copy()
  1033. ego_df_light = ego_df_light.drop_duplicates(subset=["simTime"]).sort_values(by="simTime")
  1034. # 找到第一次红灯 to 绿灯的切换时刻
  1035. is_transition = (
  1036. (ego_df_light["stateMask"] == GREEN) &
  1037. (ego_df_light["stateMask"].shift(1) == RED)
  1038. )
  1039. transition_times = ego_df_light.loc[is_transition, "simTime"]
  1040. if transition_times.empty:
  1041. # print("没有红绿灯切换")
  1042. return {"launchTimeinTrafficLight_PGVIL": -88}
  1043. time_red2green = transition_times.iloc[0]
  1044. car_speed_at_transition = ego_df.loc[ego_df["simTime"] == time_red2green, "v"]
  1045. if not car_speed_at_transition.empty and car_speed_at_transition.iloc[0] > 0.5:
  1046. return {"launchTimeinTrafficLight_PGVIL": -66}
  1047. car_move_times = ego_df.loc[
  1048. (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime"
  1049. ]
  1050. if car_move_times.empty:
  1051. # print("没有车移动")
  1052. return {"launchTimeinTrafficLight_PGVIL": -99}
  1053. time_move = car_move_times.iloc[0]
  1054. return {"launchTimeinTrafficLight_PGVIL": round(time_move - time_red2green, 3)}
  1055. def crossJunctionToTargetLane_PGVIL(data, plot_path):
  1056. ego_df = data.ego_data
  1057. road_ids = set(ego_df["road_id"].dropna())
  1058. try:
  1059. scenario_name = find_nested_name(data.function_config["function"])
  1060. target_lane_id = data.function_config["function"][scenario_name][
  1061. "crossJunctionToTargetLane_PGVIL"
  1062. ]["max"]
  1063. except KeyError:
  1064. raise ValueError("请在配置文件中指定目标车道ID!")
  1065. result = target_lane_id if target_lane_id in road_ids else -1
  1066. return {"crossJunctionToTargetLane_PGVIL": result}
  1067. def crossJunctionToTargetLane_PGVIL(data, plot_path):
  1068. ego_df = data.ego_data
  1069. road_ids = set(ego_df["road_id"].dropna())
  1070. try:
  1071. scenario_name = find_nested_name(data.function_config["function"])
  1072. target_lane_id = data.function_config["function"][scenario_name][
  1073. "crossJunctionToTargetLane_PGVIL"
  1074. ]["max"]
  1075. except KeyError:
  1076. raise ValueError("请在配置文件中指定目标车道ID!")
  1077. result = target_lane_id if target_lane_id in road_ids else -1
  1078. return {"crossJunctionToTargetLane_PGVIL": result}
  1079. def noStop_PGVIL(data, plot_path):
  1080. exclude_end_seconds = 5.0
  1081. exclude_start_seconds = 5.0
  1082. ego_df = data.ego_data
  1083. min_sim_time = ego_df["simTime"].min()
  1084. max_sim_time = ego_df["simTime"].max()
  1085. start_threshold = min_sim_time + exclude_start_seconds
  1086. end_threshold = max_sim_time - exclude_end_seconds
  1087. filtered_df = ego_df[
  1088. (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
  1089. ]
  1090. if (filtered_df["v"] == 0).any():
  1091. return {"noStop_PGVIL": -1}
  1092. else:
  1093. return {"noStop_PGVIL": 1}
  1094. def noEmergencyBraking_PGVIL(data, plot_path):
  1095. ego_df = data.ego_data
  1096. ego_df["ip_dec"] = ego_df["v"].apply(
  1097. get_interpolation, point1=[18, -5], point2=[72, -3.5]
  1098. )
  1099. ego_df["slam_brake"] = (ego_df["accelX"] - ego_df["ip_dec"]).apply(
  1100. lambda x: 1 if x < 0 else 0
  1101. )
  1102. if sum(ego_df["slam_brake"]) == 0:
  1103. return {"noEmergencyBraking_PGVIL": 1}
  1104. else:
  1105. return {"noEmergencyBraking_PGVIL": -1}
  1106. def noReverse_PGVIL(data, plot_path):
  1107. ego_df = data.ego_data.copy()
  1108. ego_df["posH"] = (90 - ego_df["posH"]) % 360
  1109. yaw_rad = np.deg2rad(ego_df["posH"])
  1110. # 车头朝向向量
  1111. heading_x = np.cos(yaw_rad)
  1112. heading_y = np.sin(yaw_rad)
  1113. # 速度向量
  1114. speed_x = ego_df["speedX"]
  1115. speed_y = ego_df["speedY"]
  1116. # 点积:判断速度是否与车头方向一致
  1117. dot_product = speed_x * heading_x + speed_y * heading_y
  1118. moving = np.hypot(speed_x, speed_y) > 0.5 # 排除近乎静止的帧(避免误判)
  1119. reverse_flag = (dot_product < 0) & moving # 倒车帧:点积为负 且 在移动
  1120. if reverse_flag.any():
  1121. return {"noReverse_PGVIL": -1} # 存在倒车
  1122. else:
  1123. return {"noReverse_PGVIL": 1} # 全程未倒车
  1124. def laneOffset_PGVIL(data, plot_path):
  1125. ego_df = data.ego_data
  1126. car_width = ego_df["dimY"].loc[0]
  1127. print(f"car_width:{car_width}")
  1128. is_zero = ego_df["v"] == 0
  1129. if not is_zero.any():
  1130. # 全程未停车
  1131. return {"laneOffset_PGVIL": -1}
  1132. groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
  1133. last_zero_group = groups[is_zero].iloc[-1]
  1134. last_stop = ego_df[groups == last_zero_group]
  1135. # 距离右侧车道线
  1136. edge_dist = (last_stop["width"] / 2 + last_stop["laneOffset"]) - (
  1137. car_width / 2
  1138. )
  1139. return {"laneOffset_PGVIL": round(edge_dist.max(), 3)}
  1140. def maxLongitudelDistance_PGVIL(data, plot_path):
  1141. ego_df = data.ego_data
  1142. object_df = data.object_df
  1143. is_zero = ego_df["v"] == 0
  1144. if not is_zero.any():
  1145. # 全程未停车
  1146. return {"maxLongitudelDistance_PGVIL": -1}
  1147. groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
  1148. last_zero_group = groups[is_zero].iloc[-1]
  1149. last_stop = ego_df[groups == last_zero_group]
  1150. stop_point = object_df[object_df["type"] == 25]
  1151. stop_x = stop_point["posX"].iloc[0]
  1152. stop_y = stop_point["posY"].iloc[0]
  1153. dx = last_stop["posX"] - stop_x
  1154. dy = last_stop["posY"] - stop_y
  1155. last_stop["posH"] = (90 - last_stop["posH"]) % 360
  1156. heading_rad = last_stop["posH"] # 弧度制
  1157. longitudinal_offset = np.cos(heading_rad) * dx + np.sin(heading_rad) * dy
  1158. max_longitudinal_offset = np.abs(longitudinal_offset).max()
  1159. return {"maxLongitudelDistance_PGVIL": round(max_longitudinal_offset, 3)}
  1160. def keepInLane_PGVIL(data, plot_path):
  1161. ego_df = data.ego_data.copy()
  1162. ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
  1163. if ego_df.empty:
  1164. return {"keepInLane_PGVIL": -1}
  1165. threshold = ego_df["lane_width"] / 2
  1166. if (ego_df["laneOffset"].abs() > threshold).any():
  1167. return {"keepInLane_PGVIL": -1}
  1168. else:
  1169. return {"keepInLane_PGVIL": 1}
  1170. # def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
  1171. # ego_df = data.ego_data
  1172. # object_df = data.object_df
  1173. # # == 1. 提取行人轨迹 ==
  1174. # ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
  1175. # merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1176. # if merged.empty:
  1177. # return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1178. # # == 2. 判断主车是否在行人完全离开前抢行 ==
  1179. # ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "offX"]].copy()
  1180. # ped = ped_df.rename(columns={"posX": "posX_ped", "posY": "posY_ped"})
  1181. # ego_ped = pd.merge(ego, ped, on="simTime")
  1182. # ego_ped = ego_ped.sort_values("simTime").reset_index(drop=True)
  1183. # front_offset = ego_ped["dimX"] / 2 + ego_ped["offX"]
  1184. # yaw = np.deg2rad(ego_ped["posH"])
  1185. # ego_ped["front_x"] = ego_ped["posX"] + np.cos(yaw) * front_offset
  1186. # ego_ped["front_y"] = ego_ped["posY"] + np.sin(yaw) * front_offset
  1187. # dx = ego_ped["front_x"] - ego_ped["posX_ped"]
  1188. # dy = ego_ped["front_y"] - ego_ped["posY_ped"]
  1189. # ego_ped["proj"] = dx * np.cos(yaw) + dy * np.sin(yaw)
  1190. # ego_ped["lateral_dist"] = np.abs(-np.sin(yaw) * dx + np.cos(yaw) * dy)
  1191. # cross_idx = ego_ped[ego_ped["proj"] > 0].index
  1192. # if not cross_idx.empty and cross_idx[-1] <= len(ego_ped) - 3:
  1193. # i = cross_idx[0]
  1194. # d0 = ego_ped.loc[i, "lateral_dist"]
  1195. # d1 = ego_ped.loc[i + 1, "lateral_dist"]
  1196. # d2 = ego_ped.loc[i + 2, "lateral_dist"]
  1197. # if d1 < d0 and d2 < d1:
  1198. # return {"launchTimewhenPedestrianLeave_PGVIL": -66} # 主车未避让行人
  1199. # # == 3. 判断行人进入与离开车道 ==
  1200. # yaw_rad = np.deg2rad(merged["posH"])
  1201. # dx = merged["posX_ped"] - merged["posX"]
  1202. # dy = merged["posY_ped"] - merged["posY"]
  1203. # merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
  1204. # merged["threshold"] = merged["lane_width"] / 2
  1205. # entry = merged[merged["lateral_dist"] <= merged["threshold"]]
  1206. # if entry.empty:
  1207. # return {"launchTimewhenPedestrianLeave_PGVIL": -11}
  1208. # t_entry = entry["simTime"].iloc[0]
  1209. # after_e = merged[merged["simTime"] > t_entry]
  1210. # leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
  1211. # if leave.empty:
  1212. # return {"launchTimewhenPedestrianLeave_PGVIL": -22}
  1213. # t_leave = leave["simTime"].iloc[0]
  1214. # ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
  1215. # # == 4. 查找 t_leave 之后主车是否在前方合理范围内停车 ==
  1216. # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] == 0)].copy()
  1217. # if stops.empty:
  1218. # return {"launchTimewhenPedestrianLeave_PGVIL": -33}
  1219. # front_overPos = stops["dimX"] / 2 + stops["offX"]
  1220. # yaw_stop = np.deg2rad(stops["posH"])
  1221. # stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
  1222. # stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
  1223. # dx_s = stops["front_x"] - ped_x
  1224. # dy_s = stops["front_y"] - ped_y
  1225. # proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
  1226. # valid = stops[
  1227. # (proj >= 0) &
  1228. # (proj <= 20) &
  1229. # ((stops["simTime"] - t_leave) <= 5.0)
  1230. # ]
  1231. # if valid.empty:
  1232. # return {"launchTimewhenPedestrianLeave_PGVIL": -88}
  1233. # # == 5. 查找 t_stop 后首次启动 ==
  1234. # t_stop = valid["simTime"].iloc[0]
  1235. # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"]
  1236. # if launches.empty:
  1237. # return {"launchTimewhenPedestrianLeave_PGVIL": -99}
  1238. # t_launch = launches.iloc[0]
  1239. # return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)}
  1240. def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
  1241. ego_df = data.ego_data
  1242. object_df = data.object_df
  1243. ego_df["posH"] = (90 - ego_df["posH"]) % 360
  1244. object_df["posH"] = (90 - object_df["posH"]) % 360
  1245. ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
  1246. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1247. if merged.empty:
  1248. return {"launchTimewhenPedestrianLeave_PGVIL": -1}
  1249. yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度
  1250. dx = merged["posX_ped"] - merged["posX"]
  1251. dy = merged["posY_ped"] - merged["posY"]
  1252. merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
  1253. merged["threshold"] = merged["lane_width"] / 2
  1254. entry = merged[merged["lateral_dist"] <= merged["threshold"]]
  1255. if entry.empty:
  1256. return {"launchTimewhenPedestrianLeave_PGVIL": -11}
  1257. t_entry = entry["simTime"].iloc[0]
  1258. after_e = merged[merged["simTime"] > t_entry]
  1259. leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
  1260. if leave.empty:
  1261. return {"launchTimewhenPedestrianLeave_PGVIL": -22}
  1262. t_leave = leave["simTime"].iloc[0]
  1263. ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
  1264. ego_after_leave = ego_df[ego_df["simTime"] >= t_leave].copy()
  1265. min_V = ego_after_leave["v"].min()
  1266. if ego_after_leave["v"].min() == 0:
  1267. stops = ego_after_leave[ego_after_leave["v"] == 0].copy()
  1268. elif min_V <= 1:
  1269. stops = ego_after_leave[ego_after_leave["v"] == min_V].copy()
  1270. else:
  1271. stops = pd.DataFrame()
  1272. # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] <= 0.01)].copy()
  1273. if stops.empty: # 没有停车
  1274. return {"launchTimewhenPedestrianLeave_PGVIL": -33}
  1275. front_overPos = stops["dimX"] / 2 + stops["offX"]
  1276. yaw_stop = np.deg2rad(stops["posH"])
  1277. stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
  1278. stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
  1279. dx_s = stops["front_x"] - ped_x
  1280. dy_s = stops["front_y"] - ped_y
  1281. proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
  1282. valid = stops[
  1283. (proj >= 0) &
  1284. (proj <= 20) &
  1285. ((stops["simTime"] - t_leave) <= 5.0)
  1286. ]
  1287. if valid.empty: # 车辆未在行人离开后停止
  1288. return {"launchTimewhenPedestrianLeave_PGVIL": -88}
  1289. t_stop = valid["simTime"].iloc[0]
  1290. if ego_after_leave["v"].min() == 0:
  1291. launches_data = ego_after_leave[ego_after_leave["v"] > 0].copy()
  1292. else:
  1293. launches_data = ego_after_leave[ego_after_leave["v"] > 1].copy()
  1294. launches = launches_data[launches_data["simTime"] > t_stop]
  1295. # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0.5)]["simTime"]
  1296. if launches.empty: # 车辆停止后未启动
  1297. return {"launchTimewhenPedestrianLeave_PGVIL": -99}
  1298. # t_launch = launches.iloc[0]
  1299. t_launch = launches["simTime"].iloc[0]
  1300. return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)}
  1301. def waitTimeAtCrosswalkwithPedestrian_PGVIL(data, plot_path):
  1302. ego_df = data.ego_data
  1303. object_df = data.object_df
  1304. # ego_df["posH"] = (90 - ego_df["posH"]) % 360
  1305. # object_df["posH"] = (90 - object_df["posH"]) % 360
  1306. ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
  1307. first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"]
  1308. if first_launch.empty:
  1309. t0_launch = 0.0
  1310. else:
  1311. t0_launch = first_launch.iloc[0]
  1312. merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
  1313. merged["front_offset"] = merged["dimX"] / 2 + merged["offX"]
  1314. yaw = np.deg2rad(merged["posH"])
  1315. merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"]
  1316. merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"]
  1317. dx = merged["posX_ped"] - merged["front_x"]
  1318. dy = merged["posY_ped"] - merged["front_y"]
  1319. valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0.1
  1320. stops_df = merged.loc[
  1321. (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
  1322. ].sort_values("simTime")
  1323. if stops_df.empty:
  1324. print("no stopping time found!")
  1325. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
  1326. wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
  1327. return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": round(wait_time, 3)}
  1328. def noCollision_PGVIL(data, plot_path):
  1329. ego_df = data.ego_data
  1330. object_df = data.object_df
  1331. ego_df['posH'] = (90 - ego_df['posH']) % 360
  1332. object_df['posH'] = (90 - object_df['posH']) % 360
  1333. ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]]
  1334. tar = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]]
  1335. df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
  1336. df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
  1337. df["posH_rad"] = np.deg2rad(df["posH"])
  1338. df["collision"] = df.apply(lambda row: funcIsCollision(
  1339. row["dimX"], row["dimY"], row["offX"], 0, row["posX"], row["posY"],
  1340. row["posH_rad"], row["dimX_tar"], row["dimY_tar"], row["offX_tar"], 0,
  1341. row["posX_tar"], row["posY_tar"], row["posH_tar_rad"], ), axis=1, )
  1342. if df["collision"].any():
  1343. return {"noCollision_PGVIL": -1}
  1344. else:
  1345. return {"noCollision_PGVIL": 1}
  1346. def leastLateralDistance_PGVIL(data, plot_path):
  1347. ego_df = data.ego_data
  1348. object_df = data.object_df
  1349. ego_df['posH'] = (90 - ego_df['posH']) % 360
  1350. object_df['posH'] = (90 - object_df['posH']) % 360
  1351. cones = object_df[object_df["type"] == 25].copy()
  1352. ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]].copy()
  1353. ego["posH_rad"] = np.deg2rad(ego["posH"])
  1354. has_collision = False
  1355. for idx, cone in cones.iterrows():
  1356. # 找到与ego同一时刻的数据
  1357. ego_at_time = ego[ego["simTime"] == cone["simTime"]]
  1358. if ego_at_time.empty:
  1359. continue
  1360. posX_tar = cone["posX"]
  1361. posY_tar = cone["posY"]
  1362. dimX_tar = cone["dimX"]
  1363. dimY_tar = cone["dimY"]
  1364. offX_tar = cone.get("offX", 0)
  1365. posH_tar_rad = np.deg2rad(cone["posH"]) if not np.isnan(cone["posH"]) else 0
  1366. for _, row in ego_at_time.iterrows():
  1367. result = funcIsCollision(
  1368. row["dimX"], row["dimY"], row["offX"], 0,
  1369. row["posX"], row["posY"], row["posH_rad"],
  1370. dimX_tar, dimY_tar, offX_tar, 0,
  1371. posX_tar, posY_tar, posH_tar_rad,
  1372. )
  1373. if result:
  1374. has_collision = True
  1375. break
  1376. if has_collision:
  1377. break
  1378. return {"leastLateralDistance_PGVIL": -1 if has_collision else 1}
  1379. class FunctionRegistry:
  1380. """动态函数注册器(支持参数验证)"""
  1381. def __init__(self, data_processed, plot_path):
  1382. self.logger = LogManager().get_logger() # 获取全局日志实例
  1383. self.data = data_processed
  1384. self.plot_path = plot_path
  1385. # 检查function_config是否为空
  1386. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1387. self.logger.warning("功能配置为空,跳过功能指标计算")
  1388. self.fun_config = {}
  1389. self.level_3_merics = []
  1390. self._registry = {}
  1391. return
  1392. self.fun_config = data_processed.function_config.get("function", {})
  1393. self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
  1394. self._registry: Dict[str, Callable] = {}
  1395. self._registry = self._build_registry()
  1396. def _extract_level_3_metrics(self, config_node: dict) -> list:
  1397. """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
  1398. metrics = []
  1399. def _recurse(node):
  1400. if isinstance(node, dict):
  1401. if "name" in node and not any(
  1402. isinstance(v, dict) for v in node.values()
  1403. ):
  1404. metrics.append(node["name"])
  1405. for v in node.values():
  1406. _recurse(v)
  1407. _recurse(config_node)
  1408. self.logger.info(f"评比的功能指标列表:{metrics}")
  1409. return metrics
  1410. def _build_registry(self) -> dict:
  1411. """自动注册指标函数(防御性编程)"""
  1412. registry = {}
  1413. for func_name in self.level_3_merics:
  1414. try:
  1415. registry[func_name] = globals()[func_name]
  1416. except KeyError:
  1417. print(f"未实现指标函数: {func_name}")
  1418. self.logger.error(f"未实现指标函数: {func_name}")
  1419. return registry
  1420. def batch_execute(self) -> dict:
  1421. """批量执行指标计算(带熔断机制)"""
  1422. results = {}
  1423. # 如果配置为空或没有注册的指标,直接返回空结果
  1424. if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry:
  1425. self.logger.info("功能配置为空或无注册指标,返回空结果")
  1426. return results
  1427. for name, func in self._registry.items():
  1428. try:
  1429. result = func(self.data, self.plot_path) # 统一传递数据上下文
  1430. results.update(result)
  1431. except Exception as e:
  1432. print(f"{name} 执行失败: {str(e)}")
  1433. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  1434. results[name] = None
  1435. self.logger.info(f"功能指标计算结果:{results}")
  1436. return results
  1437. class FunctionManager:
  1438. """管理功能指标计算的类"""
  1439. def __init__(self, data_processed, plot_path):
  1440. self.data = data_processed
  1441. self.logger = LogManager().get_logger()
  1442. self.plot_path = plot_path
  1443. # 检查function_config是否为空
  1444. if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
  1445. self.logger.warning("功能配置为空,跳过功能指标计算初始化")
  1446. self.function = None
  1447. else:
  1448. self.function = FunctionRegistry(self.data, self.plot_path)
  1449. def report_statistic(self):
  1450. """
  1451. 计算并报告功能指标结果。
  1452. :return: 评估结果
  1453. """
  1454. # 如果function为None,直接返回空字典
  1455. if self.function is None:
  1456. self.logger.info("功能指标管理器未初始化,返回空结果")
  1457. return {}
  1458. function_result = self.function.batch_execute()
  1459. print("\n[功能性表现及评价结果]")
  1460. return function_result
  1461. # self.logger.info(f'Function Result: {function_result}')
  1462. # 使用示例
  1463. if __name__ == "__main__":
  1464. pass
  1465. # print("\n[功能类表现及得分情况]")