chart_generator.py 103 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/05/20
  12. @Summary: Chart generation utilities for metrics visualization
  13. """
  14. """
  15. 主要功能模块:
  16. 函数指标绘图(generate_function_chart_data)
  17. 舒适性指标绘图(generate_comfort_chart_data)
  18. 安全性指标绘图(generate_safety_chart_data)
  19. 交通指标绘图(generate_traffic_chart_data,待实现)
  20. 新增的急加速指标绘图:
  21. 在generate_comfort_chart_data中增加了slamaccelerate指标的支持
  22. 实现了完整的generate_slam_accelerate_chart函数,用于绘制急加速事件
  23. 该函数包含:
  24. 数据获取与预处理
  25. CSV数据保存与读取
  26. 纵向加速度与速度的双子图绘制
  27. 急加速事件的橙色背景标记
  28. 阈值线绘制
  29. 高质量图表输出(300dpi PNG)
  30. 其他关键特性:
  31. 统一的日志记录系统
  32. 阈值获取函数get_metric_thresholds
  33. 错误处理和异常捕获
  34. 时间戳管理
  35. 数据验证机制
  36. 详细的日志输出
  37. 辅助函数:
  38. calculate_distance 和 calculate_relative_speed(简化实现)
  39. scenario_sign_dict 场景签名字典(简化实现)
  40. 此代码实现了完整的指标可视化工具,特别针对急加速指标slamAccelerate提供了详细的绘图功能,能够清晰展示急加速事件的发生时间和相关数据变化。
  41. """
  42. # import matplotlib
  43. # matplotlib.use('Agg') # 使用非图形界面的后端
  44. # import matplotlib.pyplot as plt
  45. import os
  46. import numpy as np
  47. import pandas as pd
  48. from typing import Optional, Dict, List, Any, Union
  49. from pathlib import Path
  50. from modules.lib.log_manager import LogManager
  51. def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  52. str]:
  53. """
  54. Generate chart data for function metrics
  55. Args:
  56. function_calculator: FunctionCalculator instance
  57. metric_name: Metric name
  58. output_dir: Output directory
  59. Returns:
  60. str: Chart file path, or None if generation fails
  61. """
  62. logger = LogManager().get_logger()
  63. try:
  64. # 确保输出目录存在
  65. if not output_dir:
  66. output_dir = os.path.join(os.getcwd(), 'data')
  67. os.makedirs(output_dir, exist_ok=True)
  68. # 根据指标名称选择不同的图表生成方法
  69. if metric_name.lower() == 'latestwarningdistance_ttc_lst':
  70. return generate_latest_warning_ttc_chart(function_calculator, output_dir)
  71. elif metric_name.lower() == 'earliestwarningdistance_ttc_lst':
  72. return generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir)
  73. elif metric_name.lower() == 'earliestwarningdistance_lst':
  74. return generate_earliest_warning_distance_chart(function_calculator, output_dir)
  75. elif metric_name.lower() == 'latestwarningdistance_lst':
  76. return generate_latest_warning_distance_chart(function_calculator, output_dir)
  77. elif metric_name.lower() == 'latestwarningdistance_ttc_pgvil':
  78. return generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir)
  79. elif metric_name.lower() == 'earliestwarningdistance_ttc_pgvil':
  80. return generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir)
  81. elif metric_name.lower() == 'earliestwarningdistance_pgvil':
  82. return generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir)
  83. elif metric_name.lower() == 'latestwarningdistance_pgvil':
  84. return generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir)
  85. elif metric_name.lower() == 'limitspeed_lst':
  86. return generate_limit_speed_chart(function_calculator, output_dir)
  87. elif metric_name.lower() == 'limitspeedpastlimitsign_lst':
  88. return generate_limit_speed_past_sign_chart(function_calculator, output_dir)
  89. elif metric_name.lower() == 'maxlongitudedist_lst':
  90. return generate_max_longitude_dist_chart(function_calculator, output_dir)
  91. else:
  92. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  93. return None
  94. except Exception as e:
  95. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  96. return None
  97. def generate_earliest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  98. """
  99. Generate warning distance chart with data visualization.
  100. This function creates charts for earliestWarningDistance_LST and latestWarningDistance_LST metrics.
  101. Args:
  102. function_calculator: FunctionCalculator instance
  103. output_dir: Output directory
  104. Returns:
  105. str: Chart file path, or None if generation fails
  106. """
  107. logger = LogManager().get_logger()
  108. try:
  109. # Get data
  110. ego_df = function_calculator.ego_data.copy()
  111. # Check if correctwarning is already calculated
  112. correctwarning = getattr(function_calculator, 'correctwarning', None)
  113. # Get configured thresholds
  114. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_LST')
  115. max_threshold = thresholds["max"]
  116. min_threshold = thresholds["min"]
  117. # Get calculated warning distance and speed
  118. warning_dist = getattr(function_calculator, 'warning_dist', None)
  119. if warning_dist.empty:
  120. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  121. return None
  122. # Calculate metric value
  123. metric_value = float(warning_dist.iloc[0]) if len(warning_dist) >= 0.0 else max_threshold
  124. # Save CSV data
  125. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_data.csv")
  126. df_csv = pd.DataFrame({
  127. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  128. 'warning_distance': warning_dist,
  129. 'min_threshold': min_threshold,
  130. 'max_threshold': max_threshold,
  131. })
  132. df_csv.to_csv(csv_filename, index=False)
  133. logger.info(f"earliestWarningDistance_LST data saved to: {csv_filename}")
  134. return csv_filename
  135. # # Read data from CSV
  136. # df = pd.read_csv(csv_filename)
  137. # # Create single chart for warning distance
  138. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  139. # # Plot warning distance
  140. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  141. # # Add threshold lines
  142. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  143. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  144. # # Mark metric value
  145. # if len(df) > 0:
  146. # label_text = 'Earliest Warning Distance'
  147. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  148. # color='red', s=100, zorder=5,
  149. # label=f'{label_text}: {metric_value:.2f}m')
  150. # # Set y-axis range
  151. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  152. # plt.xlabel('Time (s)')
  153. # plt.ylabel('Distance (m)')
  154. # plt.title(f'earliestWarningDistance_LST - Warning Distance Over Time')
  155. # plt.grid(True)
  156. # plt.legend()
  157. # # Save image
  158. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_chart.png")
  159. # plt.savefig(chart_filename, dpi=300)
  160. # plt.close()
  161. # logger.info(f"earliestWarningDistance_LST chart saved to: {chart_filename}")
  162. # return chart_filename
  163. except Exception as e:
  164. logger.error(f"Failed to generate earliestWarningDistance_LST chart: {str(e)}", exc_info=True)
  165. return None
  166. def generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  167. """
  168. Generate warning distance chart with data visualization.
  169. This function creates charts for earliestWarningDistance_PGVIL and latestWarningDistance_PGVIL metrics.
  170. Args:
  171. function_calculator: FunctionCalculator instance
  172. output_dir: Output directory
  173. Returns:
  174. str: Chart file path, or None if generation fails
  175. """
  176. logger = LogManager().get_logger()
  177. try:
  178. # Get data
  179. ego_df = function_calculator.ego_data.copy()
  180. # Check if correctwarning is already calculated
  181. correctwarning = getattr(function_calculator, 'correctwarning', None)
  182. # Get configured thresholds
  183. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_PGVIL')
  184. max_threshold = thresholds["max"]
  185. min_threshold = thresholds["min"]
  186. # Get calculated warning distance and speed
  187. warning_dist = getattr(function_calculator, 'warning_dist', None)
  188. warning_time = getattr(function_calculator, 'warning_time', None)
  189. if len(warning_dist) == 0:
  190. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  191. return None
  192. # Calculate metric value
  193. metric_value = float(warning_dist[0]) if len(warning_dist) >= 0.0 else max_threshold
  194. # Save CSV data
  195. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_data.csv")
  196. df_csv = pd.DataFrame({
  197. 'simTime': warning_time,
  198. 'warning_distance': warning_dist,
  199. 'min_threshold': min_threshold,
  200. 'max_threshold': max_threshold
  201. })
  202. df_csv.to_csv(csv_filename, index=False)
  203. logger.info(f"earliestWarningDistance_PGVIL data saved to: {csv_filename}")
  204. return csv_filename
  205. # # Read data from CSV
  206. # df = pd.read_csv(csv_filename)
  207. # # Create single chart for warning distance
  208. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  209. # # Plot warning distance
  210. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  211. # # Add threshold lines
  212. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  213. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  214. # # Mark metric value
  215. # if len(df) > 0:
  216. # label_text = 'Earliest Warning Distance'
  217. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  218. # color='red', s=100, zorder=5,
  219. # label=f'{label_text}: {metric_value:.2f}m')
  220. # # Set y-axis range
  221. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  222. # plt.xlabel('Time (s)')
  223. # plt.ylabel('Distance (m)')
  224. # plt.title(f'earliestWarningDistance_PGVIL - Warning Distance Over Time')
  225. # plt.grid(True)
  226. # plt.legend()
  227. # # Save image
  228. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_chart.png")
  229. # plt.savefig(chart_filename, dpi=300)
  230. # plt.close()
  231. # logger.info(f"earliestWarningDistance_PGVIL chart saved to: {chart_filename}")
  232. # return chart_filename
  233. except Exception as e:
  234. logger.error(f"Failed to generate earliestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  235. return None
  236. # # 使用function.py中已实现的find_nested_name函数
  237. # from modules.metric.function import find_nested_name
  238. def generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  239. """
  240. Generate TTC warning chart with data visualization.
  241. This version first saves data to CSV, then uses the CSV to generate the chart.
  242. Args:
  243. function_calculator: FunctionCalculator instance
  244. output_dir: Output directory
  245. Returns:
  246. str: Chart file path, or None if generation fails
  247. """
  248. logger = LogManager().get_logger()
  249. try:
  250. # 获取数据
  251. ego_df = function_calculator.ego_data.copy()
  252. correctwarning = getattr(function_calculator, 'correctwarning', None)
  253. # 获取配置的阈值
  254. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_PGVIL')
  255. max_threshold = thresholds["max"]
  256. min_threshold = thresholds["min"]
  257. warning_dist = getattr(function_calculator, 'warning_dist', None)
  258. warning_speed = getattr(function_calculator, 'warning_speed', None)
  259. warning_time = getattr(function_calculator, 'warning_time', None)
  260. ttc = getattr(function_calculator, 'ttc', None)
  261. if len(warning_dist) == 0:
  262. logger.warning("Cannot generate TTC warning chart: empty data")
  263. return None
  264. # 生成时间戳
  265. # 保存 CSV 数据
  266. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_data.csv")
  267. df_csv = pd.DataFrame({
  268. 'simTime': warning_time,
  269. 'warning_distance': warning_dist,
  270. 'warning_speed': warning_speed,
  271. 'ttc': ttc,
  272. 'min_threshold': min_threshold,
  273. 'max_threshold': max_threshold,
  274. })
  275. df_csv.to_csv(csv_filename, index=False)
  276. logger.info(f"latestwarningdistance_ttc_pgvil data saved to: {csv_filename}")
  277. return csv_filename
  278. # # 从 CSV 读取数据
  279. # df = pd.read_csv(csv_filename)
  280. # # 创建图表
  281. # plt.figure(figsize=(12, 8), constrained_layout=True)
  282. # # 图 1:预警距离
  283. # ax1 = plt.subplot(3, 1, 1)
  284. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  285. # ax1.set_xlabel('Time (s)')
  286. # ax1.set_ylabel('Distance (m)')
  287. # ax1.set_title('Warning Distance Over Time')
  288. # ax1.grid(True)
  289. # ax1.legend()
  290. # # 图 2:相对速度
  291. # ax2 = plt.subplot(3, 1, 2)
  292. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  293. # ax2.set_xlabel('Time (s)')
  294. # ax2.set_ylabel('Speed (m/s)')
  295. # ax2.set_title('Relative Speed Over Time')
  296. # ax2.grid(True)
  297. # ax2.legend()
  298. # # 图 3:TTC
  299. # ax3 = plt.subplot(3, 1, 3)
  300. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  301. # # Add threshold lines
  302. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  303. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  304. # # Calculate metric value (latest TTC)
  305. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  306. # # Mark latest TTC value
  307. # if len(df) > 0:
  308. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  309. # color='red', s=100, zorder=5,
  310. # label=f'Latest TTC: {metric_value:.2f}s')
  311. # ax3.set_xlabel('Time (s)')
  312. # ax3.set_ylabel('TTC (s)')
  313. # ax3.set_title('Time To Collision (TTC) Over Time')
  314. # ax3.grid(True)
  315. # ax3.legend()
  316. # # 保存图像
  317. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_chart.png")
  318. # plt.savefig(chart_filename, dpi=300)
  319. # plt.close()
  320. # logger.info(f"latestwarningdistance_ttc_pgvil chart saved to: {chart_filename}")
  321. # return chart_filename
  322. except Exception as e:
  323. logger.error(f"Failed to generate latestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  324. return None
  325. def generate_latest_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  326. """
  327. Generate TTC warning chart with data visualization.
  328. This version first saves data to CSV, then uses the CSV to generate the chart.
  329. Args:
  330. function_calculator: FunctionCalculator instance
  331. output_dir: Output directory
  332. Returns:
  333. str: Chart file path, or None if generation fails
  334. """
  335. logger = LogManager().get_logger()
  336. try:
  337. # 获取数据
  338. ego_df = function_calculator.ego_data.copy()
  339. correctwarning = getattr(function_calculator, 'correctwarning', None)
  340. # 获取配置的阈值
  341. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_LST')
  342. max_threshold = thresholds["max"]
  343. min_threshold = thresholds["min"]
  344. warning_dist = getattr(function_calculator, 'warning_dist', None)
  345. warning_speed = getattr(function_calculator, 'warning_speed', None)
  346. ttc = getattr(function_calculator, 'ttc', None)
  347. if warning_dist.empty:
  348. logger.warning("Cannot generate TTC warning chart: empty data")
  349. return None
  350. # 生成时间戳
  351. # 保存 CSV 数据
  352. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_data.csv")
  353. df_csv = pd.DataFrame({
  354. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  355. 'warning_distance': warning_dist,
  356. 'warning_speed': warning_speed,
  357. 'ttc': ttc,
  358. 'min_threshold': min_threshold,
  359. 'max_threshold': max_threshold,
  360. })
  361. df_csv.to_csv(csv_filename, index=False)
  362. logger.info(f"latestwarningdistance_ttc_lst data saved to: {csv_filename}")
  363. return csv_filename
  364. # # 从 CSV 读取数据
  365. # df = pd.read_csv(csv_filename)
  366. # # 创建图表
  367. # plt.figure(figsize=(12, 8), constrained_layout=True)
  368. # # 图 1:预警距离
  369. # ax1 = plt.subplot(3, 1, 1)
  370. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  371. # ax1.set_xlabel('Time (s)')
  372. # ax1.set_ylabel('Distance (m)')
  373. # ax1.set_title('Warning Distance Over Time')
  374. # ax1.grid(True)
  375. # ax1.legend()
  376. # # 图 2:相对速度
  377. # ax2 = plt.subplot(3, 1, 2)
  378. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  379. # ax2.set_xlabel('Time (s)')
  380. # ax2.set_ylabel('Speed (m/s)')
  381. # ax2.set_title('Relative Speed Over Time')
  382. # ax2.grid(True)
  383. # ax2.legend()
  384. # # 图 3:TTC
  385. # ax3 = plt.subplot(3, 1, 3)
  386. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  387. # # Add threshold lines
  388. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  389. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  390. # # Calculate metric value (latest TTC)
  391. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  392. # # Mark latest TTC value
  393. # if len(df) > 0:
  394. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  395. # color='red', s=100, zorder=5,
  396. # label=f'Latest TTC: {metric_value:.2f}s')
  397. # ax3.set_xlabel('Time (s)')
  398. # ax3.set_ylabel('TTC (s)')
  399. # ax3.set_title('Time To Collision (TTC) Over Time')
  400. # ax3.grid(True)
  401. # ax3.legend()
  402. # # 保存图像
  403. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_chart.png")
  404. # plt.savefig(chart_filename, dpi=300)
  405. # plt.close()
  406. # logger.info(f"latestwarningdistance_ttc_lst chart saved to: {chart_filename}")
  407. # return chart_filename
  408. except Exception as e:
  409. logger.error(f"Failed to generate latestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  410. return None
  411. def generate_latest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  412. """
  413. Generate warning distance chart with data visualization.
  414. This function creates charts for latestWarningDistance_LST metric.
  415. Args:
  416. function_calculator: FunctionCalculator instance
  417. metric_name: Metric name (latestWarningDistance_LST)
  418. output_dir: Output directory
  419. Returns:
  420. str: Chart file path, or None if generation fails
  421. """
  422. logger = LogManager().get_logger()
  423. try:
  424. # Get data
  425. ego_df = function_calculator.ego_data.copy()
  426. # Check if correctwarning is already calculated
  427. correctwarning = getattr(function_calculator, 'correctwarning', None)
  428. # Get configured thresholds
  429. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_LST')
  430. max_threshold = thresholds["max"]
  431. min_threshold = thresholds["min"]
  432. # Get calculated warning distance and speed
  433. warning_dist = getattr(function_calculator, 'warning_dist', None)
  434. if warning_dist.empty:
  435. logger.warning(f"Cannot generate latestWarningDistance_LST chart: empty data")
  436. return None
  437. # Calculate metric value
  438. metric_value = float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else max_threshold
  439. # Save CSV data
  440. csv_filename = os.path.join(output_dir, f"latestWarningDistance_LST_data.csv")
  441. df_csv = pd.DataFrame({
  442. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  443. 'warning_distance': warning_dist,
  444. 'min_threshold': min_threshold,
  445. 'max_threshold': max_threshold
  446. })
  447. df_csv.to_csv(csv_filename, index=False)
  448. logger.info(f"latestWarningDistance_LST data saved to: {csv_filename}")
  449. return csv_filename
  450. # # Read data from CSV
  451. # df = pd.read_csv(csv_filename)
  452. # # Create single chart for warning distance
  453. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  454. # # Plot warning distance
  455. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  456. # # Add threshold lines
  457. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  458. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  459. # # Mark metric value
  460. # if len(df) > 0:
  461. # label_text = 'Latest Warning Distance'
  462. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  463. # color='red', s=100, zorder=5,
  464. # label=f'{label_text}: {metric_value:.2f}m')
  465. # # Set y-axis range
  466. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  467. # plt.xlabel('Time (s)')
  468. # plt.ylabel('Distance (m)')
  469. # plt.title(f'latestWarningDistance_LST - Warning Distance Over Time')
  470. # plt.grid(True)
  471. # plt.legend()
  472. # # Save image
  473. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_LST_chart.png")
  474. # plt.savefig(chart_filename, dpi=300)
  475. # plt.close()
  476. # logger.info(f"latestWarningDistance_LST chart saved to: {chart_filename}")
  477. # return chart_filename
  478. except Exception as e:
  479. logger.error(f"Failed to generate latestWarningDistance_LST chart: {str(e)}", exc_info=True)
  480. return None
  481. def generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  482. """
  483. Generate warning distance chart with data visualization.
  484. This function creates charts for latestWarningDistance_LST metric.
  485. Args:
  486. function_calculator: FunctionCalculator instance
  487. metric_name: Metric name (latestWarningDistance_LST)
  488. output_dir: Output directory
  489. Returns:
  490. str: Chart file path, or None if generation fails
  491. """
  492. logger = LogManager().get_logger()
  493. try:
  494. # Get data
  495. ego_df = function_calculator.ego_data.copy()
  496. # Check if correctwarning is already calculated
  497. correctwarning = getattr(function_calculator, 'correctwarning', None)
  498. # Get configured thresholds
  499. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_PGVIL')
  500. max_threshold = thresholds["max"]
  501. min_threshold = thresholds["min"]
  502. # Get calculated warning distance and speed
  503. warning_dist = getattr(function_calculator, 'warning_dist', None)
  504. warning_time = getattr(function_calculator, 'warning_time', None)
  505. if len(warning_dist) == 0:
  506. logger.warning(f"Cannot generate latestWarningDistance_PGVIL chart: empty data")
  507. return None
  508. # Calculate metric value
  509. metric_value = float(warning_dist[-1]) if len(warning_dist) > 0 else max_threshold
  510. # Save CSV data
  511. csv_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_data.csv")
  512. df_csv = pd.DataFrame({
  513. 'simTime': warning_time,
  514. 'warning_distance': warning_dist,
  515. 'min_threshold': min_threshold,
  516. 'max_threshold': max_threshold
  517. })
  518. df_csv.to_csv(csv_filename, index=False)
  519. logger.info(f"latestWarningDistance_PGVIL data saved to: {csv_filename}")
  520. return csv_filename
  521. # # Read data from CSV
  522. # df = pd.read_csv(csv_filename)
  523. # # Create single chart for warning distance
  524. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  525. # # Plot warning distance
  526. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  527. # # Add threshold lines
  528. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  529. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  530. # # Mark metric value
  531. # if len(df) > 0:
  532. # label_text = 'Latest Warning Distance'
  533. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  534. # color='red', s=100, zorder=5,
  535. # label=f'{label_text}: {metric_value:.2f}m')
  536. # # Set y-axis range
  537. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  538. # plt.xlabel('Time (s)')
  539. # plt.ylabel('Distance (m)')
  540. # plt.title(f'latestWarningDistance_PGVIL - Warning Distance Over Time')
  541. # plt.grid(True)
  542. # plt.legend()
  543. # # Save image
  544. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_chart.png")
  545. # plt.savefig(chart_filename, dpi=300)
  546. # plt.close()
  547. # logger.info(f"latestWarningDistance_PGVIL chart saved to: {chart_filename}")
  548. # return chart_filename
  549. except Exception as e:
  550. logger.error(f"Failed to generate latestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  551. return None
  552. def generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  553. """
  554. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_LST metric.
  555. Args:
  556. function_calculator: FunctionCalculator instance
  557. output_dir: Output directory
  558. Returns:
  559. str: Chart file path, or None if generation fails
  560. """
  561. logger = LogManager().get_logger()
  562. metric_name = 'earliestWarningDistance_TTC_LST'
  563. try:
  564. # Get data
  565. ego_df = function_calculator.ego_data.copy()
  566. # Check if correctwarning is already calculated
  567. correctwarning = getattr(function_calculator, 'correctwarning', None)
  568. # Get configured thresholds
  569. thresholds = get_metric_thresholds(function_calculator, metric_name)
  570. max_threshold = thresholds["max"]
  571. min_threshold = thresholds["min"]
  572. # Get calculated warning distance and speed
  573. warning_dist = getattr(function_calculator, 'correctwarning', None)
  574. warning_speed = getattr(function_calculator, 'warning_speed', None)
  575. ttc = getattr(function_calculator, 'ttc', None)
  576. # Calculate metric value
  577. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  578. # Save CSV data
  579. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  580. df_csv = pd.DataFrame({
  581. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  582. 'warning_distance': warning_dist,
  583. 'warning_speed': warning_speed,
  584. 'ttc': ttc,
  585. 'min_threshold': min_threshold,
  586. 'max_threshold': max_threshold
  587. })
  588. df_csv.to_csv(csv_filename, index=False)
  589. logger.info(f"{metric_name} data saved to: {csv_filename}")
  590. return csv_filename
  591. # # Read data from CSV
  592. # df = pd.read_csv(csv_filename)
  593. # # Create chart
  594. # plt.figure(figsize=(12, 8), constrained_layout=True)
  595. # # 图 1:预警距离
  596. # ax1 = plt.subplot(3, 1, 1)
  597. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  598. # ax1.set_xlabel('Time (s)')
  599. # ax1.set_ylabel('Distance (m)')
  600. # ax1.set_title('Warning Distance Over Time')
  601. # ax1.grid(True)
  602. # ax1.legend()
  603. # # 图 2:相对速度
  604. # ax2 = plt.subplot(3, 1, 2)
  605. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  606. # ax2.set_xlabel('Time (s)')
  607. # ax2.set_ylabel('Speed (m/s)')
  608. # ax2.set_title('Relative Speed Over Time')
  609. # ax2.grid(True)
  610. # ax2.legend()
  611. # # 图 3:TTC
  612. # ax3 = plt.subplot(3, 1, 3)
  613. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  614. # # Add threshold lines
  615. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  616. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  617. # # Mark earliest TTC value
  618. # if len(df) > 0:
  619. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  620. # color='red', s=100, zorder=5,
  621. # label=f'Earliest TTC: {metric_value:.2f}s')
  622. # ax3.set_xlabel('Time (s)')
  623. # ax3.set_ylabel('TTC (s)')
  624. # ax3.set_title('Time To Collision (TTC) Over Time')
  625. # ax3.grid(True)
  626. # ax3.legend()
  627. # # Save image
  628. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_lst_chart.png")
  629. # plt.savefig(chart_filename, dpi=300)
  630. # plt.close()
  631. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  632. # return chart_filename
  633. except Exception as e:
  634. logger.error(f"Failed to generate earliestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  635. return None
  636. def generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  637. """
  638. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_PGVIL metric.
  639. Args:
  640. function_calculator: FunctionCalculator instance
  641. output_dir: Output directory
  642. Returns:
  643. str: Chart file path, or None if generation fails
  644. """
  645. logger = LogManager().get_logger()
  646. metric_name = 'earliestWarningDistance_TTC_PGVIL'
  647. try:
  648. # Get data
  649. ego_df = function_calculator.ego_data.copy()
  650. # Check if correctwarning is already calculated
  651. correctwarning = getattr(function_calculator, 'correctwarning', None)
  652. # Get configured thresholds
  653. thresholds = get_metric_thresholds(function_calculator, metric_name)
  654. max_threshold = thresholds["max"]
  655. min_threshold = thresholds["min"]
  656. # Get calculated warning distance and speed
  657. warning_dist = getattr(function_calculator, 'warning_dist', None)
  658. warning_speed = getattr(function_calculator, 'warning_speed', None)
  659. ttc = getattr(function_calculator, 'ttc', None)
  660. warning_time = getattr(function_calculator, 'warning_time', None)
  661. # Calculate metric value
  662. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  663. # Save CSV data
  664. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  665. df_csv = pd.DataFrame({
  666. 'simTime': warning_time,
  667. 'warning_distance': warning_dist,
  668. 'warning_speed': warning_speed,
  669. 'ttc': ttc,
  670. 'min_threshold': min_threshold,
  671. 'max_threshold': max_threshold
  672. })
  673. df_csv.to_csv(csv_filename, index=False)
  674. logger.info(f"{metric_name} data saved to: {csv_filename}")
  675. return csv_filename
  676. # # Read data from CSV
  677. # df = pd.read_csv(csv_filename)
  678. # # Create chart
  679. # plt.figure(figsize=(12, 8), constrained_layout=True)
  680. # # 图 1:预警距离
  681. # ax1 = plt.subplot(3, 1, 1)
  682. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  683. # ax1.set_xlabel('Time (s)')
  684. # ax1.set_ylabel('Distance (m)')
  685. # ax1.set_title('Warning Distance Over Time')
  686. # ax1.grid(True)
  687. # ax1.legend()
  688. # # 图 2:相对速度
  689. # ax2 = plt.subplot(3, 1, 2)
  690. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  691. # ax2.set_xlabel('Time (s)')
  692. # ax2.set_ylabel('Speed (m/s)')
  693. # ax2.set_title('Relative Speed Over Time')
  694. # ax2.grid(True)
  695. # ax2.legend()
  696. # # 图 3:TTC
  697. # ax3 = plt.subplot(3, 1, 3)
  698. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  699. # # Add threshold lines
  700. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  701. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  702. # # Mark earliest TTC value
  703. # if len(df) > 0:
  704. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  705. # color='red', s=100, zorder=5,
  706. # label=f'Earliest TTC: {metric_value:.2f}s')
  707. # ax3.set_xlabel('Time (s)')
  708. # ax3.set_ylabel('TTC (s)')
  709. # ax3.set_title('Time To Collision (TTC) Over Time')
  710. # ax3.grid(True)
  711. # ax3.legend()
  712. # # Save image
  713. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_pgvil_chart.png")
  714. # plt.savefig(chart_filename, dpi=300)
  715. # plt.close()
  716. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  717. # return chart_filename
  718. except Exception as e:
  719. logger.error(f"Failed to generate earliestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  720. return None
  721. def generate_limit_speed_chart(function_calculator, output_dir: str) -> Optional[str]:
  722. """
  723. Generate limit speed chart with data visualization for limitSpeed_LST metric.
  724. Args:
  725. function_calculator: FunctionCalculator instance
  726. output_dir: Output directory
  727. Returns:
  728. str: Chart file path, or None if generation fails
  729. """
  730. logger = LogManager().get_logger()
  731. metric_name = 'limitSpeed_LST'
  732. try:
  733. # Get data
  734. ego_df = function_calculator.ego_data.copy()
  735. # Get configured thresholds
  736. thresholds = get_metric_thresholds(function_calculator, metric_name)
  737. max_threshold = thresholds["max"]
  738. min_threshold = thresholds["min"]
  739. if ego_df.empty:
  740. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  741. return None
  742. # Save CSV data
  743. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  744. df_csv = pd.DataFrame({
  745. 'simTime': ego_df['simTime'],
  746. 'speed': ego_df['v'],
  747. 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df)))
  748. })
  749. df_csv.to_csv(csv_filename, index=False)
  750. logger.info(f"{metric_name} data saved to: {csv_filename}")
  751. return csv_filename
  752. # # Read data from CSV
  753. # df = pd.read_csv(csv_filename)
  754. # # Create chart
  755. # plt.figure(figsize=(12, 6), constrained_layout=True)
  756. # # Plot speed
  757. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  758. # plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
  759. # # Set y-axis range
  760. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  761. # plt.xlabel('Time (s)')
  762. # plt.ylabel('Speed (m/s)')
  763. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  764. # plt.grid(True)
  765. # plt.legend()
  766. # # Save image
  767. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  768. # plt.savefig(chart_filename, dpi=300)
  769. # plt.close()
  770. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  771. # return chart_filename
  772. except Exception as e:
  773. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  774. return None
  775. def generate_limit_speed_past_sign_chart(function_calculator, output_dir: str) -> Optional[str]:
  776. """
  777. Generate limit speed past sign chart with data visualization for limitSpeedPastLimitSign_LST metric.
  778. Args:
  779. function_calculator: FunctionCalculator instance
  780. output_dir: Output directory
  781. Returns:
  782. str: Chart file path, or None if generation fails
  783. """
  784. logger = LogManager().get_logger()
  785. metric_name = 'limitSpeedPastLimitSign_LST'
  786. try:
  787. # Get data
  788. ego_df = function_calculator.ego_data.copy()
  789. # Get configured thresholds
  790. thresholds = get_metric_thresholds(function_calculator, metric_name)
  791. max_threshold = thresholds["max"]
  792. min_threshold = thresholds["min"]
  793. if ego_df.empty:
  794. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  795. return None
  796. # Get sign passing time if available
  797. sign_time = getattr(function_calculator, 'sign_pass_time', None)
  798. if sign_time is None:
  799. # Try to estimate sign passing time (middle of the simulation)
  800. sign_time = ego_df['simTime'].iloc[len(ego_df) // 2]
  801. # Save CSV data
  802. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  803. df_csv = pd.DataFrame({
  804. 'simTime': ego_df['simTime'],
  805. 'speed': ego_df['v'],
  806. 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))),
  807. 'sign_pass_time': sign_time
  808. })
  809. df_csv.to_csv(csv_filename, index=False)
  810. logger.info(f"{metric_name} data saved to: {csv_filename}")
  811. return csv_filename
  812. # # Read data from CSV
  813. # df = pd.read_csv(csv_filename)
  814. # # Create chart
  815. # plt.figure(figsize=(12, 6), constrained_layout=True)
  816. # # Plot speed
  817. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  818. # plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
  819. # # Mark sign passing time
  820. # plt.axvline(x=sign_time, color='g', linestyle='--', label='Speed Limit Sign')
  821. # # Set y-axis range
  822. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  823. # plt.xlabel('Time (s)')
  824. # plt.ylabel('Speed (m/s)')
  825. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  826. # plt.grid(True)
  827. # plt.legend()
  828. # # Save image
  829. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  830. # plt.savefig(chart_filename, dpi=300)
  831. # plt.close()
  832. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  833. # return chart_filename
  834. except Exception as e:
  835. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  836. return None
  837. def generate_max_longitude_dist_chart(function_calculator, output_dir: str) -> Optional[str]:
  838. """
  839. Generate maximum longitudinal distance chart with data visualization for maxLongitudeDist_LST metric.
  840. Args:
  841. function_calculator: FunctionCalculator instance
  842. output_dir: Output directory
  843. Returns:
  844. str: Chart file path, or None if generation fails
  845. """
  846. logger = LogManager().get_logger()
  847. metric_name = 'maxLongitudeDist_LST'
  848. try:
  849. # Get data
  850. ego_df = function_calculator.ego_data.copy()
  851. # Get configured thresholds
  852. thresholds = get_metric_thresholds(function_calculator, metric_name)
  853. max_threshold = thresholds["max"]
  854. min_threshold = thresholds["min"]
  855. # Get longitudinal distance data
  856. longitude_dist = ego_df['longitude_dist'] if 'longitude_dist' in ego_df.columns else None
  857. stop_time = ego_df['stop_time'] if 'stop_time' in ego_df.columns else None
  858. if longitude_dist is None or longitude_dist.empty:
  859. logger.warning(f"Cannot generate {metric_name} chart: missing longitudinal distance data")
  860. return None
  861. # Calculate metric value
  862. metric_value = longitude_dist.max()
  863. max_distance_time = ego_df.loc[longitude_dist.idxmax(), 'simTime']
  864. # Save CSV data
  865. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  866. df_csv = pd.DataFrame({
  867. 'simTime': ego_df['simTime'],
  868. 'x_relative_dist': ego_df['x_relative_dist'],
  869. 'stop_time': stop_time,
  870. 'longitude_dist': longitude_dist
  871. })
  872. df_csv.to_csv(csv_filename, index=False)
  873. logger.info(f"{metric_name} data saved to: {csv_filename}")
  874. return csv_filename
  875. # # Read data from CSV
  876. # df = pd.read_csv(csv_filename)
  877. # # Create chart
  878. # plt.figure(figsize=(12, 6), constrained_layout=True)
  879. # # Plot longitudinal distance
  880. # plt.plot(df['simTime'], df['x_relative_dist'], 'b-', label='Longitudinal Distance')
  881. # # Add threshold lines
  882. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  883. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  884. # # Mark maximum longitudinal distance
  885. # plt.scatter(max_distance_time, metric_value,
  886. # color='red', s=100, zorder=5,
  887. # label=f'Maximum Longitudinal Distance: {metric_value:.2f}m')
  888. # # Set y-axis range
  889. # plt.ylim(bottom=min(0, min_threshold * 0.9), top=max(max_threshold * 1.1, df['longitude_dist'].max() * 1.1))
  890. # plt.xlabel('Time (s)')
  891. # plt.ylabel('Longitudinal Distance (m)')
  892. # plt.title(f'{metric_name} - Longitudinal Distance Over Time')
  893. # plt.grid(True)
  894. # plt.legend()
  895. # # Save image
  896. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  897. # plt.savefig(chart_filename, dpi=300)
  898. # plt.close()
  899. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  900. # return chart_filename
  901. except Exception as e:
  902. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  903. return None
  904. def generate_warning_delay_time_chart(function_calculator, output_dir: str) -> Optional[str]:
  905. """
  906. Generate warning delay time chart with data visualization for warningDelayTime_LST metric.
  907. Args:
  908. function_calculator: FunctionCalculator instance
  909. output_dir: Output directory
  910. Returns:
  911. str: Chart file path, or None if generation fails
  912. """
  913. logger = LogManager().get_logger()
  914. metric_name = 'warningDelayTime_LST'
  915. try:
  916. # Get data
  917. ego_df = function_calculator.ego_data.copy()
  918. # Get configured thresholds
  919. thresholds = get_metric_thresholds(function_calculator, metric_name)
  920. max_threshold = thresholds["max"]
  921. min_threshold = thresholds["min"]
  922. # Check if correctwarning is already calculated
  923. correctwarning = getattr(function_calculator, 'correctwarning', None)
  924. if correctwarning is None:
  925. logger.warning(f"Cannot generate {metric_name} chart: missing correctwarning value")
  926. return None
  927. # Get HMI warning time and rosbag warning time
  928. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  929. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  930. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  931. 'simTime'].tolist()
  932. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  933. if (simTime_HMI is None) or (simTime_rosbag is None):
  934. logger.warning(f"Cannot generate {metric_name} chart: missing warning time data")
  935. return None
  936. # Calculate delay time
  937. delay_time = abs(simTime_HMI - simTime_rosbag)
  938. # Save CSV data
  939. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  940. df_csv = pd.DataFrame({
  941. 'HMI_warning_time': [simTime_HMI],
  942. 'rosbag_warning_time': [simTime_rosbag],
  943. 'delay_time': [delay_time],
  944. 'min_threshold': [min_threshold],
  945. 'max_threshold': [max_threshold]
  946. })
  947. df_csv.to_csv(csv_filename, index=False)
  948. logger.info(f"{metric_name} data saved to: {csv_filename}")
  949. return csv_filename
  950. # # Create chart - bar chart for delay time
  951. # plt.figure(figsize=(10, 6), constrained_layout=True)
  952. # # Plot delay time as bar
  953. # plt.bar(['Warning Delay Time'], [delay_time], color='blue', width=0.4)
  954. # # Add threshold lines
  955. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  956. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  957. # # Add value label
  958. # plt.text(0, delay_time + 0.05, f'{delay_time:.3f}s', ha='center', va='bottom', fontweight='bold')
  959. # # Set y-axis range
  960. # plt.ylim(bottom=0, top=max(max_threshold * 1.2, delay_time * 1.2))
  961. # plt.ylabel('Delay Time (s)')
  962. # plt.title(f'{metric_name} - Warning Delay Time')
  963. # plt.grid(True, axis='y')
  964. # plt.legend()
  965. # # Save image
  966. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  967. # plt.savefig(chart_filename, dpi=300)
  968. # plt.close()
  969. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  970. # return chart_filename
  971. except Exception as e:
  972. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  973. return None
  974. def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  975. str]:
  976. """
  977. Generate chart data for comfort metrics
  978. Args:
  979. comfort_calculator: ComfortCalculator instance
  980. metric_name: Metric name
  981. output_dir: Output directory
  982. Returns:
  983. str: Chart file path, or None if generation fails
  984. """
  985. logger = LogManager().get_logger()
  986. try:
  987. # 确保输出目录存在
  988. if output_dir:
  989. os.makedirs(output_dir, exist_ok=True)
  990. else:
  991. output_dir = os.getcwd()
  992. # 根据指标名称选择不同的图表生成方法
  993. if metric_name.lower() == 'shake':
  994. return generate_shake_chart(comfort_calculator, output_dir)
  995. elif metric_name.lower() == 'zigzag':
  996. return generate_zigzag_chart(comfort_calculator, output_dir)
  997. elif metric_name.lower() == 'cadence':
  998. return generate_cadence_chart(comfort_calculator, output_dir)
  999. elif metric_name.lower() == 'slambrake':
  1000. return generate_slam_brake_chart(comfort_calculator, output_dir)
  1001. elif metric_name.lower() == 'slamaccelerate':
  1002. return generate_slam_accelerate_chart(comfort_calculator, output_dir)
  1003. else:
  1004. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1005. return None
  1006. except Exception as e:
  1007. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1008. return None
  1009. def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1010. """
  1011. Generate shake metric chart with orange background for shake events.
  1012. This version first saves data to CSV, then uses the CSV to generate the chart.
  1013. Args:
  1014. comfort_calculator: ComfortCalculator instance
  1015. output_dir: Output directory
  1016. Returns:
  1017. str: Chart file path, or None if generation fails
  1018. """
  1019. logger = LogManager().get_logger()
  1020. try:
  1021. # 获取数据
  1022. df = comfort_calculator.ego_df.copy()
  1023. shake_events = comfort_calculator.shake_events
  1024. if df.empty:
  1025. logger.warning("Cannot generate shake chart: empty data")
  1026. return None
  1027. # 生成时间戳
  1028. # 保存 CSV 数据(第一步)
  1029. csv_filename = os.path.join(output_dir, f"shake_data.csv")
  1030. df_csv = pd.DataFrame({
  1031. 'simTime': df['simTime'],
  1032. 'lat_acc': df['lat_acc'],
  1033. 'lat_acc_rate': df['lat_acc_rate'],
  1034. 'speedH_std': df['speedH_std'],
  1035. 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None] * len(df))),
  1036. 'lat_acc_rate_threshold': 0.5,
  1037. 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None] * len(df))),
  1038. })
  1039. df_csv.to_csv(csv_filename, index=False)
  1040. logger.info(f"Shake data saved to: {csv_filename}")
  1041. return csv_filename
  1042. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1043. # df = pd.read_csv(csv_filename)
  1044. # # 创建图表(第三步)
  1045. # import matplotlib.pyplot as plt
  1046. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1047. # # 图 1:横向加速度
  1048. # ax1 = plt.subplot(3, 1, 1)
  1049. # ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
  1050. # if 'lat_acc_threshold' in df.columns:
  1051. # ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold')
  1052. # for idx, event in enumerate(shake_events):
  1053. # label = 'Shake Event' if idx == 0 else None
  1054. # ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1055. # ax1.set_xlabel('Time (s)')
  1056. # ax1.set_ylabel('Lateral Acceleration (m/s²)')
  1057. # ax1.set_title('Shake Event Detection - Lateral Acceleration')
  1058. # ax1.grid(True)
  1059. # ax1.legend()
  1060. # # 图 2:lat_acc_rate
  1061. # ax2 = plt.subplot(3, 1, 2)
  1062. # ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate')
  1063. # ax2.axhline(
  1064. # y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold'
  1065. # )
  1066. # for idx, event in enumerate(shake_events):
  1067. # label = 'Shake Event' if idx == 0 else None
  1068. # ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1069. # ax2.set_xlabel('Time (s)')
  1070. # ax2.set_ylabel('Angular Velocity (m/s³)')
  1071. # ax2.set_title('Shake Event Detection - lat_acc_rate')
  1072. # ax2.grid(True)
  1073. # ax2.legend()
  1074. # # 图 3:speedH_std
  1075. # ax3 = plt.subplot(3, 1, 3)
  1076. # ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std')
  1077. # if 'speedH_std_threshold' in df.columns:
  1078. # ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold')
  1079. # for idx, event in enumerate(shake_events):
  1080. # label = 'Shake Event' if idx == 0 else None
  1081. # ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1082. # ax3.set_xlabel('Time (s)')
  1083. # ax3.set_ylabel('Angular Velocity (deg/s)')
  1084. # ax3.set_title('Shake Event Detection - speedH_std')
  1085. # ax3.grid(True)
  1086. # ax3.legend()
  1087. # # 保存图像
  1088. # chart_filename = os.path.join(output_dir, f"shake_chart.png")
  1089. # plt.savefig(chart_filename, dpi=300)
  1090. # plt.close()
  1091. # logger.info(f"Shake chart saved to: {chart_filename}")
  1092. # return chart_filename
  1093. except Exception as e:
  1094. logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True)
  1095. return None
  1096. def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1097. """
  1098. Generate zigzag metric chart with orange background for zigzag events.
  1099. This version first saves data to CSV, then uses the CSV to generate the chart.
  1100. Args:
  1101. comfort_calculator: ComfortCalculator instance
  1102. output_dir: Output directory
  1103. Returns:
  1104. str: Chart file path, or None if generation fails
  1105. """
  1106. logger = LogManager().get_logger()
  1107. try:
  1108. # 获取数据
  1109. df = comfort_calculator.ego_df.copy()
  1110. zigzag_events = comfort_calculator.discomfort_df[
  1111. comfort_calculator.discomfort_df['type'] == 'zigzag'
  1112. ].copy()
  1113. if df.empty:
  1114. logger.warning("Cannot generate zigzag chart: empty data")
  1115. return None
  1116. # 生成时间戳
  1117. # 保存 CSV 数据(第一步)
  1118. csv_filename = os.path.join(output_dir, f"zigzag_data.csv")
  1119. df_csv = pd.DataFrame({
  1120. 'simTime': df['simTime'],
  1121. 'speedH': df['speedH'],
  1122. 'posH': df['posH'],
  1123. 'min_speedH_threshold': -2.3, # 可替换为动态阈值
  1124. 'max_speedH_threshold': 2.3
  1125. })
  1126. df_csv.to_csv(csv_filename, index=False)
  1127. logger.info(f"Zigzag data saved to: {csv_filename}")
  1128. return csv_filename
  1129. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1130. # df = pd.read_csv(csv_filename)
  1131. # # 创建图表(第三步)
  1132. # import matplotlib.pyplot as plt
  1133. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1134. # # ===== 子图1:Yaw Rate =====
  1135. # ax1 = plt.subplot(2, 1, 1)
  1136. # ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
  1137. # # 添加 speedH 上下限阈值线
  1138. # ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)')
  1139. # ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)')
  1140. # # 添加橙色背景:Zigzag Events
  1141. # for idx, event in zigzag_events.iterrows():
  1142. # label = 'Zigzag Event' if idx == 0 else None
  1143. # ax1.axvspan(event['start_time'], event['end_time'],
  1144. # alpha=0.3, color='orange', label=label)
  1145. # ax1.set_xlabel('Time (s)')
  1146. # ax1.set_ylabel('Angular Velocity (deg/s)')
  1147. # ax1.set_title('Zigzag Event Detection - Yaw Rate')
  1148. # ax1.grid(True)
  1149. # ax1.legend(loc='upper left')
  1150. # # ===== 子图2:Yaw Angle =====
  1151. # ax2 = plt.subplot(2, 1, 2)
  1152. # ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw')
  1153. # # 添加橙色背景:Zigzag Events
  1154. # for idx, event in zigzag_events.iterrows():
  1155. # label = 'Zigzag Event' if idx == 0 else None
  1156. # ax2.axvspan(event['start_time'], event['end_time'],
  1157. # alpha=0.3, color='orange', label=label)
  1158. # ax2.set_xlabel('Time (s)')
  1159. # ax2.set_ylabel('Yaw (deg)')
  1160. # ax2.set_title('Zigzag Event Detection - Yaw Angle')
  1161. # ax2.grid(True)
  1162. # ax2.legend(loc='upper left')
  1163. # # 保存图像
  1164. # chart_filename = os.path.join(output_dir, f"zigzag_chart.png")
  1165. # plt.savefig(chart_filename, dpi=300)
  1166. # plt.close()
  1167. # logger.info(f"Zigzag chart saved to: {chart_filename}")
  1168. # return csv_filename
  1169. except Exception as e:
  1170. logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True)
  1171. return None
  1172. def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1173. """
  1174. Generate cadence metric chart with orange background for cadence events.
  1175. This version first saves data to CSV, then uses the CSV to generate the chart.
  1176. Args:
  1177. comfort_calculator: ComfortCalculator instance
  1178. output_dir: Output directory
  1179. Returns:
  1180. str: Chart file path, or None if generation fails
  1181. """
  1182. logger = LogManager().get_logger()
  1183. try:
  1184. # 获取数据
  1185. df = comfort_calculator.ego_df.copy()
  1186. cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy()
  1187. if df.empty:
  1188. logger.warning("Cannot generate cadence chart: empty data")
  1189. return None
  1190. # 生成时间戳
  1191. # 保存 CSV 数据(第一步)
  1192. csv_filename = os.path.join(output_dir, f"cadence_data.csv")
  1193. df_csv = pd.DataFrame({
  1194. 'simTime': df['simTime'],
  1195. 'lon_acc': df['lon_acc'],
  1196. 'v': df['v'],
  1197. 'ip_acc': df.get('ip_acc', pd.Series([None] * len(df))),
  1198. 'ip_dec': df.get('ip_dec', pd.Series([None] * len(df)))
  1199. })
  1200. df_csv.to_csv(csv_filename, index=False)
  1201. logger.info(f"Cadence data saved to: {csv_filename}")
  1202. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1203. # df = pd.read_csv(csv_filename)
  1204. # # 创建图表(第三步)
  1205. # import matplotlib.pyplot as plt
  1206. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1207. # # 图 1:纵向加速度
  1208. # ax1 = plt.subplot(2, 1, 1)
  1209. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1210. # if 'ip_acc' in df.columns and 'ip_dec' in df.columns:
  1211. # ax1.plot(df['simTime'], df['ip_acc'], 'r--', label='Acceleration Threshold')
  1212. # ax1.plot(df['simTime'], df['ip_dec'], 'g--', label='Deceleration Threshold')
  1213. # # 添加橙色背景标识顿挫事件
  1214. # for idx, event in cadence_events.iterrows():
  1215. # label = 'Cadence Event' if idx == 0 else None
  1216. # ax1.axvspan(event['start_time'], event['end_time'],
  1217. # alpha=0.3, color='orange', label=label)
  1218. # ax1.set_xlabel('Time (s)')
  1219. # ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1220. # ax1.set_title('Cadence Event Detection - Longitudinal Acceleration')
  1221. # ax1.grid(True)
  1222. # ax1.legend()
  1223. # # 图 2:速度
  1224. # ax2 = plt.subplot(2, 1, 2)
  1225. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1226. # # 添加橙色背景标识顿挫事件
  1227. # for idx, event in cadence_events.iterrows():
  1228. # label = 'Cadence Event' if idx == 0 else None
  1229. # ax2.axvspan(event['start_time'], event['end_time'],
  1230. # alpha=0.3, color='orange', label=label)
  1231. # ax2.set_xlabel('Time (s)')
  1232. # ax2.set_ylabel('Velocity (m/s)')
  1233. # ax2.set_title('Cadence Event Detection - Vehicle Speed')
  1234. # ax2.grid(True)
  1235. # ax2.legend()
  1236. # # 保存图像
  1237. # chart_filename = os.path.join(output_dir, f"cadence_chart.png")
  1238. # plt.savefig(chart_filename, dpi=300)
  1239. # plt.close()
  1240. # logger.info(f"Cadence chart saved to: {chart_filename}")
  1241. # return chart_filename
  1242. except Exception as e:
  1243. logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True)
  1244. return None
  1245. def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1246. """
  1247. Generate slam brake metric chart with orange background for slam brake events.
  1248. This version first saves data to CSV, then uses the CSV to generate the chart.
  1249. Args:
  1250. comfort_calculator: ComfortCalculator instance
  1251. output_dir: Output directory
  1252. Returns:
  1253. str: Chart file path, or None if generation fails
  1254. """
  1255. logger = LogManager().get_logger()
  1256. try:
  1257. # 获取数据
  1258. df = comfort_calculator.ego_df.copy()
  1259. slam_brake_events = comfort_calculator.discomfort_df[
  1260. comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy()
  1261. if df.empty:
  1262. logger.warning("Cannot generate slam brake chart: empty data")
  1263. return None
  1264. # 生成时间戳
  1265. # 保存 CSV 数据(第一步)
  1266. csv_filename = os.path.join(output_dir, f"slam_brake_data.csv")
  1267. df_csv = pd.DataFrame({
  1268. 'simTime': df['simTime'],
  1269. 'lon_acc': df['lon_acc'],
  1270. 'v': df['v'],
  1271. 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))),
  1272. 'max_threshold': 0.0
  1273. })
  1274. df_csv.to_csv(csv_filename, index=False)
  1275. logger.info(f"Slam brake data saved to: {csv_filename}")
  1276. return csv_filename
  1277. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1278. # df = pd.read_csv(csv_filename)
  1279. # # 创建图表(第三步)
  1280. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1281. # # 图 1:纵向加速度
  1282. # ax1 = plt.subplot(2, 1, 1)
  1283. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1284. # if 'min_threshold' in df.columns:
  1285. # ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Deceleration Threshold')
  1286. # # 添加橙色背景标识急刹车事件
  1287. # for idx, event in slam_brake_events.iterrows():
  1288. # label = 'Slam Brake Event' if idx == 0 else None
  1289. # ax1.axvspan(event['start_time'], event['end_time'],
  1290. # alpha=0.3, color='orange', label=label)
  1291. # ax1.set_xlabel('Time (s)')
  1292. # ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1293. # ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration')
  1294. # ax1.grid(True)
  1295. # ax1.legend()
  1296. # # 图 2:速度
  1297. # ax2 = plt.subplot(2, 1, 2)
  1298. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1299. # # 添加橙色背景标识急刹车事件
  1300. # for idx, event in slam_brake_events.iterrows():
  1301. # label = 'Slam Brake Event' if idx == 0 else None
  1302. # ax2.axvspan(event['start_time'], event['end_time'],
  1303. # alpha=0.3, color='orange', label=label)
  1304. # ax2.set_xlabel('Time (s)')
  1305. # ax2.set_ylabel('Velocity (m/s)')
  1306. # ax2.set_title('Slam Brake Event Detection - Vehicle Speed')
  1307. # ax2.grid(True)
  1308. # ax2.legend()
  1309. # # 保存图像
  1310. # chart_filename = os.path.join(output_dir, f"slam_brake_chart.png")
  1311. # plt.savefig(chart_filename, dpi=300)
  1312. # plt.close()
  1313. # logger.info(f"Slam brake chart saved to: {chart_filename}")
  1314. # return chart_filename
  1315. except Exception as e:
  1316. logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True)
  1317. return None
  1318. def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1319. """
  1320. Generate slam accelerate metric chart with orange background for slam accelerate events.
  1321. This version first saves data to CSV, then uses the CSV to generate the chart.
  1322. Args:
  1323. comfort_calculator: ComfortCalculator instance
  1324. output_dir: Output directory
  1325. Returns:
  1326. str: Chart file path, or None if generation fails
  1327. """
  1328. logger = LogManager().get_logger()
  1329. try:
  1330. # 获取数据
  1331. df = comfort_calculator.ego_df.copy()
  1332. slam_accel_events = comfort_calculator.discomfort_df[
  1333. (comfort_calculator.discomfort_df['type'] == 'slam_accel')
  1334. ].copy()
  1335. if df.empty:
  1336. logger.warning("Cannot generate slam accelerate chart: empty data")
  1337. return None
  1338. # 生成时间戳
  1339. # 保存 CSV 数据(第一步)
  1340. csv_filename = os.path.join(output_dir, f"slam_accel_data.csv")
  1341. # 获取加速度阈值(如果存在)
  1342. accel_threshold = df.get('ip_acc', pd.Series([None] * len(df)))
  1343. df_csv = pd.DataFrame({
  1344. 'simTime': df['simTime'],
  1345. 'lon_acc': df['lon_acc'],
  1346. 'v': df['v'],
  1347. 'min_threshold': 0.0, # 加速度最小阈值设为0
  1348. 'max_threshold': accel_threshold # 急加速阈值
  1349. })
  1350. df_csv.to_csv(csv_filename, index=False)
  1351. logger.info(f"Slam accelerate data saved to: {csv_filename}")
  1352. return csv_filename
  1353. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1354. # df = pd.read_csv(csv_filename)
  1355. # # 创建图表(第三步)
  1356. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1357. # # 图 1:纵向加速度
  1358. # ax1 = plt.subplot(2, 1, 1)
  1359. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1360. # # 添加加速度阈值线
  1361. # if 'max_threshold' in df.columns and not df['max_threshold'].isnull().all():
  1362. # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Acceleration Threshold')
  1363. # # 添加橙色背景标识急加速事件
  1364. # for idx, event in slam_accel_events.iterrows():
  1365. # label = 'Slam Accelerate Event' if idx == 0 else None
  1366. # ax1.axvspan(event['start_time'], event['end_time'],
  1367. # alpha=0.3, color='orange', label=label)
  1368. # ax1.set_xlabel('Time (s)')
  1369. # ax1.set_ylabel('Acceleration (m/s²)')
  1370. # ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration')
  1371. # ax1.grid(True)
  1372. # ax1.legend()
  1373. # # 图 2:速度
  1374. # ax2 = plt.subplot(2, 1, 2)
  1375. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1376. # # 添加橙色背景标识急加速事件
  1377. # for idx, event in slam_accel_events.iterrows():
  1378. # label = 'Slam Accelerate Event' if idx == 0 else None
  1379. # ax2.axvspan(event['start_time'], event['end_time'],
  1380. # alpha=0.3, color='orange', label=label)
  1381. # ax2.set_xlabel('Time (s)')
  1382. # ax2.set_ylabel('Velocity (m/s)')
  1383. # ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed')
  1384. # ax2.grid(True)
  1385. # ax2.legend()
  1386. # # 保存图像
  1387. # chart_filename = os.path.join(output_dir, f"slam_accel_chart.png")
  1388. # plt.savefig(chart_filename, dpi=300)
  1389. # plt.close()
  1390. # logger.info(f"Slam accelerate chart saved to: {chart_filename}")
  1391. # return chart_filename
  1392. except Exception as e:
  1393. logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True)
  1394. return None
  1395. def get_metric_thresholds(calculator, metric_name: str) -> dict:
  1396. """
  1397. 从配置文件中获取指标的阈值
  1398. Args:
  1399. calculator: Calculator instance (FunctionCalculator, SafetyCalculator, ComfortCalculator, EfficientCalculator, TrafficCalculator)
  1400. metric_name: 指标名称
  1401. Returns:
  1402. dict: 包含min和max阈值的字典
  1403. """
  1404. logger = LogManager().get_logger()
  1405. thresholds = {"min": None, "max": None}
  1406. try:
  1407. # 根据计算器类型获取配置
  1408. if hasattr(calculator, 'data_processed'):
  1409. # 检查安全性指标配置
  1410. if hasattr(calculator.data_processed,
  1411. 'safety_config') and 'safety' in calculator.data_processed.safety_config:
  1412. config = calculator.data_processed.safety_config['safety']
  1413. metric_type = 'safety'
  1414. # 检查舒适性指标配置
  1415. elif hasattr(calculator.data_processed,
  1416. 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config:
  1417. config = calculator.data_processed.comfort_config['comfort']
  1418. metric_type = 'comfort'
  1419. # 检查功能性指标配置
  1420. elif hasattr(calculator.data_processed,
  1421. 'function_config') and 'function' in calculator.data_processed.function_config:
  1422. config = calculator.data_processed.function_config['function']
  1423. metric_type = 'function'
  1424. # 检查高效性指标配置
  1425. elif hasattr(calculator.data_processed,
  1426. 'efficient_config') and 'efficient' in calculator.data_processed.efficient_config:
  1427. config = calculator.data_processed.efficient_config['efficient']
  1428. metric_type = 'efficient'
  1429. # 检查交通性指标配置
  1430. elif hasattr(calculator.data_processed,
  1431. 'traffic_config') and 'traffic' in calculator.data_processed.traffic_config:
  1432. config = calculator.data_processed.traffic_config['traffic']
  1433. metric_type = 'traffic'
  1434. else:
  1435. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1436. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1437. config = calculator.function_config['function']
  1438. metric_type = 'function'
  1439. else:
  1440. logger.warning(f"无法找到{metric_name}的配置信息")
  1441. return thresholds
  1442. else:
  1443. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1444. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1445. config = calculator.function_config['function']
  1446. metric_type = 'function'
  1447. else:
  1448. logger.warning(f"计算器没有data_processed属性或function_config属性")
  1449. return thresholds
  1450. # 递归查找指标配置
  1451. def find_metric_config(node, target_name):
  1452. if isinstance(node, dict):
  1453. if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node:
  1454. return node
  1455. for key, value in node.items():
  1456. result = find_metric_config(value, target_name)
  1457. if result:
  1458. return result
  1459. return None
  1460. # 查找指标配置
  1461. metric_config = find_metric_config(config, metric_name)
  1462. if metric_config:
  1463. thresholds["min"] = metric_config.get("min")
  1464. thresholds["max"] = metric_config.get("max")
  1465. logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}")
  1466. else:
  1467. logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息")
  1468. except Exception as e:
  1469. logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True)
  1470. return thresholds
  1471. def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
  1472. """
  1473. Generate chart data for safety metrics
  1474. Args:
  1475. safety_calculator: SafetyCalculator instance
  1476. metric_name: Metric name
  1477. output_dir: Output directory
  1478. Returns:
  1479. str: Chart file path, or None if generation fails
  1480. """
  1481. logger = LogManager().get_logger()
  1482. try:
  1483. # 确保输出目录存在
  1484. if output_dir:
  1485. os.makedirs(output_dir, exist_ok=True)
  1486. else:
  1487. output_dir = os.getcwd()
  1488. # 根据指标名称选择不同的图表生成方法
  1489. if metric_name.lower() == 'ttc':
  1490. return generate_ttc_chart(safety_calculator, output_dir)
  1491. elif metric_name.lower() == 'mttc':
  1492. return generate_mttc_chart(safety_calculator, output_dir)
  1493. elif metric_name.lower() == 'thw':
  1494. return generate_thw_chart(safety_calculator, output_dir)
  1495. elif metric_name.lower() == 'lonsd':
  1496. return generate_lonsd_chart(safety_calculator, output_dir)
  1497. elif metric_name.lower() == 'latsd':
  1498. return generate_latsd_chart(safety_calculator, output_dir)
  1499. elif metric_name.lower() == 'btn':
  1500. return generate_btn_chart(safety_calculator, output_dir)
  1501. elif metric_name.lower() == 'collisionrisk':
  1502. return generate_collision_risk_chart(safety_calculator, output_dir)
  1503. elif metric_name.lower() == 'collisionseverity':
  1504. return generate_collision_severity_chart(safety_calculator, output_dir)
  1505. else:
  1506. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1507. return None
  1508. except Exception as e:
  1509. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1510. return None
  1511. def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1512. """
  1513. Generate TTC metric chart with orange background for unsafe events.
  1514. This version first saves data to CSV, then uses the CSV to generate the chart.
  1515. Args:
  1516. safety_calculator: SafetyCalculator instance
  1517. output_dir: Output directory
  1518. Returns:
  1519. str: Chart file path, or None if generation fails
  1520. """
  1521. logger = LogManager().get_logger()
  1522. try:
  1523. # 获取数据
  1524. ttc_data = safety_calculator.ttc_data
  1525. if not ttc_data:
  1526. logger.warning("Cannot generate TTC chart: empty data")
  1527. return None
  1528. # 创建DataFrame
  1529. df = pd.DataFrame(ttc_data)
  1530. # 获取阈值
  1531. thresholds = get_metric_thresholds(safety_calculator, 'TTC')
  1532. min_threshold = thresholds.get('min')
  1533. max_threshold = thresholds.get('max')
  1534. # 生成时间戳
  1535. # 保存 CSV 数据(第一步)
  1536. csv_filename = os.path.join(output_dir, f"ttc_data.csv")
  1537. df_csv = pd.DataFrame({
  1538. 'simTime': df['simTime'],
  1539. 'simFrame': df['simFrame'],
  1540. 'TTC': df['TTC'],
  1541. 'min_threshold': min_threshold,
  1542. 'max_threshold': max_threshold
  1543. })
  1544. df_csv.to_csv(csv_filename, index=False)
  1545. logger.info(f"TTC data saved to: {csv_filename}")
  1546. return csv_filename
  1547. # 第二步:从 CSV 读取(可验证保存数据无误)
  1548. df = pd.read_csv(csv_filename)
  1549. # 检测超阈值事件
  1550. unsafe_events = []
  1551. if min_threshold is not None:
  1552. # 对于TTC,小于最小阈值视为不安全
  1553. unsafe_condition = df['TTC'] < min_threshold
  1554. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1555. for _, group in df[unsafe_condition].groupby(event_groups):
  1556. if len(group) >= 2: # 至少2帧才算一次事件
  1557. start_time = group['simTime'].iloc[0]
  1558. end_time = group['simTime'].iloc[-1]
  1559. duration = end_time - start_time
  1560. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1561. unsafe_events.append({
  1562. 'start_time': start_time,
  1563. 'end_time': end_time,
  1564. 'start_frame': group['simFrame'].iloc[0],
  1565. 'end_frame': group['simFrame'].iloc[-1],
  1566. 'duration': duration,
  1567. 'min_ttc': group['TTC'].min()
  1568. })
  1569. # # 创建图表(第三步)
  1570. # plt.figure(figsize=(12, 8))
  1571. # plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC')
  1572. # # 添加阈值线
  1573. # if min_threshold is not None:
  1574. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1575. # if max_threshold is not None:
  1576. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1577. # # 添加橙色背景标识不安全事件
  1578. # for idx, event in enumerate(unsafe_events):
  1579. # label = 'Unsafe TTC Event' if idx == 0 else None
  1580. # plt.axvspan(event['start_time'], event['end_time'],
  1581. # alpha=0.3, color='orange', label=label)
  1582. # plt.xlabel('Time (s)')
  1583. # plt.ylabel('TTC (s)')
  1584. # plt.title('Time To Collision (TTC) Trend')
  1585. # plt.grid(True)
  1586. # plt.legend()
  1587. # # 保存图像
  1588. # chart_filename = os.path.join(output_dir, f"ttc_chart.png")
  1589. # plt.savefig(chart_filename, dpi=300)
  1590. # plt.close()
  1591. # # 记录不安全事件信息
  1592. # if unsafe_events:
  1593. # logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件")
  1594. # for i, event in enumerate(unsafe_events):
  1595. # logger.info(
  1596. # f"TTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s")
  1597. # logger.info(f"TTC chart saved to: {chart_filename}")
  1598. # return chart_filename
  1599. except Exception as e:
  1600. logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True)
  1601. return None
  1602. def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1603. """
  1604. Generate MTTC metric chart with orange background for unsafe events
  1605. Args:
  1606. safety_calculator: SafetyCalculator instance
  1607. output_dir: Output directory
  1608. Returns:
  1609. str: Chart file path, or None if generation fails
  1610. """
  1611. logger = LogManager().get_logger()
  1612. try:
  1613. # 获取数据
  1614. mttc_data = safety_calculator.mttc_data
  1615. if not mttc_data:
  1616. logger.warning("Cannot generate MTTC chart: empty data")
  1617. return None
  1618. # 创建DataFrame
  1619. df = pd.DataFrame(mttc_data)
  1620. # 获取阈值
  1621. thresholds = get_metric_thresholds(safety_calculator, 'MTTC')
  1622. min_threshold = thresholds.get('min')
  1623. max_threshold = thresholds.get('max')
  1624. # 检测超阈值事件
  1625. unsafe_events = []
  1626. if min_threshold is not None:
  1627. # 对于MTTC,小于最小阈值视为不安全
  1628. unsafe_condition = df['MTTC'] < min_threshold
  1629. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1630. for _, group in df[unsafe_condition].groupby(event_groups):
  1631. if len(group) >= 2: # 至少2帧才算一次事件
  1632. start_time = group['simTime'].iloc[0]
  1633. end_time = group['simTime'].iloc[-1]
  1634. duration = end_time - start_time
  1635. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1636. unsafe_events.append({
  1637. 'start_time': start_time,
  1638. 'end_time': end_time,
  1639. 'start_frame': group['simFrame'].iloc[0],
  1640. 'end_frame': group['simFrame'].iloc[-1],
  1641. 'duration': duration,
  1642. 'min_mttc': group['MTTC'].min()
  1643. })
  1644. # # 创建图表
  1645. # plt.figure(figsize=(12, 6))
  1646. # plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC')
  1647. # # 添加阈值线
  1648. # if min_threshold is not None:
  1649. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1650. # if max_threshold is not None:
  1651. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1652. # # 添加橙色背景标识不安全事件
  1653. # for idx, event in enumerate(unsafe_events):
  1654. # label = 'Unsafe MTTC Event' if idx == 0 else None
  1655. # plt.axvspan(event['start_time'], event['end_time'],
  1656. # alpha=0.3, color='orange', label=label)
  1657. # plt.xlabel('Time (s)')
  1658. # plt.ylabel('MTTC (s)')
  1659. # plt.title('Modified Time To Collision (MTTC) Trend')
  1660. # plt.grid(True)
  1661. # plt.legend()
  1662. # # 保存图表
  1663. # chart_filename = os.path.join(output_dir, f"mttc_chart.png")
  1664. # plt.savefig(chart_filename, dpi=300)
  1665. # plt.close()
  1666. # logger.info(f"MTTC chart saved to: {chart_filename}")
  1667. # 保存CSV数据,包含阈值信息
  1668. csv_filename = os.path.join(output_dir, f"mttc_data.csv")
  1669. df_csv = df.copy()
  1670. df_csv['min_threshold'] = min_threshold
  1671. df_csv['max_threshold'] = max_threshold
  1672. df_csv.to_csv(csv_filename, index=False)
  1673. # 记录不安全事件信息
  1674. if unsafe_events:
  1675. logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件")
  1676. for i, event in enumerate(unsafe_events):
  1677. logger.info(
  1678. f"MTTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s")
  1679. logger.info(f"MTTC data saved to: {csv_filename}")
  1680. return csv_filename
  1681. except Exception as e:
  1682. logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True)
  1683. return None
  1684. def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1685. """
  1686. Generate THW metric chart with orange background for unsafe events
  1687. Args:
  1688. safety_calculator: SafetyCalculator instance
  1689. output_dir: Output directory
  1690. Returns:
  1691. str: Chart file path, or None if generation fails
  1692. """
  1693. logger = LogManager().get_logger()
  1694. try:
  1695. # 获取数据
  1696. thw_data = safety_calculator.thw_data
  1697. if not thw_data:
  1698. logger.warning("Cannot generate THW chart: empty data")
  1699. return None
  1700. # 创建DataFrame
  1701. df = pd.DataFrame(thw_data)
  1702. # 获取阈值
  1703. thresholds = get_metric_thresholds(safety_calculator, 'THW')
  1704. min_threshold = thresholds.get('min')
  1705. max_threshold = thresholds.get('max')
  1706. # 检测超阈值事件
  1707. unsafe_events = []
  1708. if min_threshold is not None:
  1709. # 对于THW,小于最小阈值视为不安全
  1710. unsafe_condition = df['THW'] < min_threshold
  1711. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1712. for _, group in df[unsafe_condition].groupby(event_groups):
  1713. if len(group) >= 2: # 至少2帧才算一次事件
  1714. start_time = group['simTime'].iloc[0]
  1715. end_time = group['simTime'].iloc[-1]
  1716. duration = end_time - start_time
  1717. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1718. unsafe_events.append({
  1719. 'start_time': start_time,
  1720. 'end_time': end_time,
  1721. 'start_frame': group['simFrame'].iloc[0],
  1722. 'end_frame': group['simFrame'].iloc[-1],
  1723. 'duration': duration,
  1724. 'min_thw': group['THW'].min()
  1725. })
  1726. # # 创建图表
  1727. # plt.figure(figsize=(12, 10))
  1728. # plt.plot(df['simTime'], df['THW'], 'c-', label='THW')
  1729. # # 添加阈值线
  1730. # if min_threshold is not None:
  1731. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1732. # if max_threshold is not None:
  1733. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1734. # # 添加橙色背景标识不安全事件
  1735. # for idx, event in enumerate(unsafe_events):
  1736. # label = 'Unsafe THW Event' if idx == 0 else None
  1737. # plt.axvspan(event['start_time'], event['end_time'],
  1738. # alpha=0.3, color='orange', label=label)
  1739. # plt.xlabel('Time (s)')
  1740. # plt.ylabel('THW (s)')
  1741. # plt.title('Time Headway (THW) Trend')
  1742. # plt.grid(True)
  1743. # plt.legend()
  1744. # # 保存图表
  1745. # chart_filename = os.path.join(output_dir, f"thw_chart.png")
  1746. # plt.savefig(chart_filename, dpi=300)
  1747. # plt.close()
  1748. # logger.info(f"THW chart saved to: {chart_filename}")
  1749. # 保存CSV数据,包含阈值信息
  1750. csv_filename = os.path.join(output_dir, f"thw_data.csv")
  1751. df_csv = df.copy()
  1752. df_csv['min_threshold'] = min_threshold
  1753. df_csv['max_threshold'] = max_threshold
  1754. df_csv.to_csv(csv_filename, index=False)
  1755. # 记录不安全事件信息
  1756. if unsafe_events:
  1757. logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件")
  1758. for i, event in enumerate(unsafe_events):
  1759. logger.info(
  1760. f"THW不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s")
  1761. logger.info(f"THW data saved to: {csv_filename}")
  1762. return csv_filename
  1763. except Exception as e:
  1764. logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True)
  1765. return None
  1766. def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1767. """
  1768. Generate Longitudinal Safe Distance metric chart
  1769. Args:
  1770. safety_calculator: SafetyCalculator instance
  1771. output_dir: Output directory
  1772. Returns:
  1773. str: Chart file path, or None if generation fails
  1774. """
  1775. logger = LogManager().get_logger()
  1776. try:
  1777. # 获取数据
  1778. lonsd_data = safety_calculator.lonsd_data
  1779. if not lonsd_data:
  1780. logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data")
  1781. return None
  1782. # 创建DataFrame
  1783. df = pd.DataFrame(lonsd_data)
  1784. # 获取阈值
  1785. thresholds = get_metric_thresholds(safety_calculator, 'LonSD')
  1786. min_threshold = thresholds.get('min')
  1787. max_threshold = thresholds.get('max')
  1788. # # 创建图表
  1789. # plt.figure(figsize=(12, 6))
  1790. # plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance')
  1791. # # 添加阈值线
  1792. # if min_threshold is not None:
  1793. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1794. # if max_threshold is not None:
  1795. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1796. # plt.xlabel('Time (s)')
  1797. # plt.ylabel('Distance (m)')
  1798. # plt.title('Longitudinal Safe Distance (LonSD) Trend')
  1799. # plt.grid(True)
  1800. # plt.legend()
  1801. # # 保存图表
  1802. # chart_filename = os.path.join(output_dir, f"lonsd_chart.png")
  1803. # plt.savefig(chart_filename, dpi=300)
  1804. # plt.close()
  1805. # logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}")
  1806. # 保存CSV数据,包含阈值信息
  1807. csv_filename = os.path.join(output_dir, f"lonsd_data.csv")
  1808. df_csv = df.copy()
  1809. df_csv['min_threshold'] = min_threshold
  1810. df_csv['max_threshold'] = max_threshold
  1811. df_csv.to_csv(csv_filename, index=False)
  1812. logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}")
  1813. return csv_filename
  1814. except Exception as e:
  1815. logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True)
  1816. return None
  1817. def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1818. """
  1819. Generate Lateral Safe Distance metric chart with orange background for unsafe events
  1820. Args:
  1821. safety_calculator: SafetyCalculator instance
  1822. output_dir: Output directory
  1823. Returns:
  1824. str: Chart file path, or None if generation fails
  1825. """
  1826. logger = LogManager().get_logger()
  1827. try:
  1828. # 获取数据
  1829. latsd_data = safety_calculator.latsd_data
  1830. if not latsd_data:
  1831. logger.warning("Cannot generate Lateral Safe Distance chart: empty data")
  1832. return None
  1833. # 创建DataFrame
  1834. df = pd.DataFrame(latsd_data)
  1835. # 获取阈值
  1836. thresholds = get_metric_thresholds(safety_calculator, 'LatSD')
  1837. min_threshold = thresholds.get('min')
  1838. max_threshold = thresholds.get('max')
  1839. # 检测超阈值事件
  1840. unsafe_events = []
  1841. if min_threshold is not None:
  1842. # 对于LatSD,小于最小阈值视为不安全
  1843. unsafe_condition = df['LatSD'] < min_threshold
  1844. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1845. for _, group in df[unsafe_condition].groupby(event_groups):
  1846. if len(group) >= 2: # 至少2帧才算一次事件
  1847. start_time = group['simTime'].iloc[0]
  1848. end_time = group['simTime'].iloc[-1]
  1849. duration = end_time - start_time
  1850. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1851. unsafe_events.append({
  1852. 'start_time': start_time,
  1853. 'end_time': end_time,
  1854. 'start_frame': group['simFrame'].iloc[0],
  1855. 'end_frame': group['simFrame'].iloc[-1],
  1856. 'duration': duration,
  1857. 'min_latsd': group['LatSD'].min()
  1858. })
  1859. # # 创建图表
  1860. # plt.figure(figsize=(12, 6))
  1861. # plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance')
  1862. # # 添加阈值线
  1863. # if min_threshold is not None:
  1864. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1865. # if max_threshold is not None:
  1866. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1867. # # 添加橙色背景标识不安全事件
  1868. # for idx, event in enumerate(unsafe_events):
  1869. # label = 'Unsafe LatSD Event' if idx == 0 else None
  1870. # plt.axvspan(event['start_time'], event['end_time'],
  1871. # alpha=0.3, color='orange', label=label)
  1872. # plt.xlabel('Time (s)')
  1873. # plt.ylabel('Distance (m)')
  1874. # plt.title('Lateral Safe Distance (LatSD) Trend')
  1875. # plt.grid(True)
  1876. # plt.legend()
  1877. # # 保存图表
  1878. # chart_filename = os.path.join(output_dir, f"latsd_chart.png")
  1879. # plt.savefig(chart_filename, dpi=300)
  1880. # plt.close()
  1881. # logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}")
  1882. # 保存CSV数据,包含阈值信息
  1883. csv_filename = os.path.join(output_dir, f"latsd_data.csv")
  1884. df_csv = df.copy()
  1885. df_csv['min_threshold'] = min_threshold
  1886. df_csv['max_threshold'] = max_threshold
  1887. df_csv.to_csv(csv_filename, index=False)
  1888. # 记录不安全事件信息
  1889. if unsafe_events:
  1890. logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件")
  1891. for i, event in enumerate(unsafe_events):
  1892. logger.info(
  1893. f"LatSD不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m")
  1894. logger.info(f"Lateral Safe Distance data saved to: {csv_filename}")
  1895. return csv_filename
  1896. except Exception as e:
  1897. logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True)
  1898. return None
  1899. def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1900. """
  1901. Generate Brake Threat Number metric chart with orange background for unsafe events
  1902. Args:
  1903. safety_calculator: SafetyCalculator instance
  1904. output_dir: Output directory
  1905. Returns:
  1906. str: Chart file path, or None if generation fails
  1907. """
  1908. logger = LogManager().get_logger()
  1909. try:
  1910. # 获取数据
  1911. btn_data = safety_calculator.btn_data
  1912. if not btn_data:
  1913. logger.warning("Cannot generate Brake Threat Number chart: empty data")
  1914. return None
  1915. # 创建DataFrame
  1916. df = pd.DataFrame(btn_data)
  1917. # 获取阈值
  1918. thresholds = get_metric_thresholds(safety_calculator, 'BTN')
  1919. min_threshold = thresholds.get('min')
  1920. max_threshold = thresholds.get('max')
  1921. # 检测超阈值事件
  1922. unsafe_events = []
  1923. if max_threshold is not None:
  1924. # 对于BTN,大于最大阈值视为不安全
  1925. unsafe_condition = df['BTN'] > max_threshold
  1926. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1927. for _, group in df[unsafe_condition].groupby(event_groups):
  1928. if len(group) >= 2: # 至少2帧才算一次事件
  1929. start_time = group['simTime'].iloc[0]
  1930. end_time = group['simTime'].iloc[-1]
  1931. duration = end_time - start_time
  1932. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1933. unsafe_events.append({
  1934. 'start_time': start_time,
  1935. 'end_time': end_time,
  1936. 'start_frame': group['simFrame'].iloc[0],
  1937. 'end_frame': group['simFrame'].iloc[-1],
  1938. 'duration': duration,
  1939. 'max_btn': group['BTN'].max()
  1940. })
  1941. # # 创建图表
  1942. # plt.figure(figsize=(12, 6))
  1943. # plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number')
  1944. # # 添加阈值线
  1945. # if min_threshold is not None:
  1946. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})')
  1947. # if max_threshold is not None:
  1948. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1949. # # 添加橙色背景标识不安全事件
  1950. # for idx, event in enumerate(unsafe_events):
  1951. # label = 'Unsafe BTN Event' if idx == 0 else None
  1952. # plt.axvspan(event['start_time'], event['end_time'],
  1953. # alpha=0.3, color='orange', label=label)
  1954. # plt.xlabel('Time (s)')
  1955. # plt.ylabel('BTN')
  1956. # plt.title('Brake Threat Number (BTN) Trend')
  1957. # plt.grid(True)
  1958. # plt.legend()
  1959. # # 保存图表
  1960. # chart_filename = os.path.join(output_dir, f"btn_chart.png")
  1961. # plt.savefig(chart_filename, dpi=300)
  1962. # plt.close()
  1963. # logger.info(f"Brake Threat Number chart saved to: {chart_filename}")
  1964. # 保存CSV数据,包含阈值信息
  1965. csv_filename = os.path.join(output_dir, f"btn_data.csv")
  1966. df_csv = df.copy()
  1967. df_csv['min_threshold'] = min_threshold
  1968. df_csv['max_threshold'] = max_threshold
  1969. df_csv.to_csv(csv_filename, index=False)
  1970. # 记录不安全事件信息
  1971. if unsafe_events:
  1972. logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件")
  1973. for i, event in enumerate(unsafe_events):
  1974. logger.info(
  1975. f"BTN不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}")
  1976. logger.info(f"Brake Threat Number data saved to: {csv_filename}")
  1977. return csv_filename
  1978. except Exception as e:
  1979. logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True)
  1980. return None
  1981. def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1982. """
  1983. Generate Collision Risk metric chart
  1984. Args:
  1985. safety_calculator: SafetyCalculator instance
  1986. output_dir: Output directory
  1987. Returns:
  1988. str: Chart file path, or None if generation fails
  1989. """
  1990. logger = LogManager().get_logger()
  1991. try:
  1992. # 获取数据
  1993. risk_data = safety_calculator.collision_risk_data
  1994. if not risk_data:
  1995. logger.warning("Cannot generate Collision Risk chart: empty data")
  1996. return None
  1997. # 创建DataFrame
  1998. df = pd.DataFrame(risk_data)
  1999. # 获取阈值
  2000. thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk')
  2001. min_threshold = thresholds.get('min')
  2002. max_threshold = thresholds.get('max')
  2003. # # 创建图表
  2004. # plt.figure(figsize=(12, 6))
  2005. # plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk')
  2006. # # 添加阈值线
  2007. # if min_threshold is not None:
  2008. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
  2009. # if max_threshold is not None:
  2010. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
  2011. # plt.xlabel('Time (s)')
  2012. # plt.ylabel('Risk Value (%)')
  2013. # plt.title('Collision Risk (collisionRisk) Trend')
  2014. # plt.grid(True)
  2015. # plt.legend()
  2016. # # 保存图表
  2017. # chart_filename = os.path.join(output_dir, f"collision_risk_chart.png")
  2018. # plt.savefig(chart_filename, dpi=300)
  2019. # plt.close()
  2020. # logger.info(f"Collision Risk chart saved to: {chart_filename}")
  2021. # 保存CSV数据,包含阈值信息
  2022. csv_filename = os.path.join(output_dir, f"collisionrisk_data.csv")
  2023. df_csv = df.copy()
  2024. df_csv['min_threshold'] = min_threshold
  2025. df_csv['max_threshold'] = max_threshold
  2026. df_csv.to_csv(csv_filename, index=False)
  2027. logger.info(f"Collision Risk data saved to: {csv_filename}")
  2028. return csv_filename
  2029. except Exception as e:
  2030. logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True)
  2031. return None
  2032. def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]:
  2033. """
  2034. Generate Collision Severity metric chart
  2035. Args:
  2036. safety_calculator: SafetyCalculator instance
  2037. output_dir: Output directory
  2038. Returns:
  2039. str: Chart file path, or None if generation fails
  2040. """
  2041. logger = LogManager().get_logger()
  2042. try:
  2043. # 获取数据
  2044. severity_data = safety_calculator.collision_severity_data
  2045. if not severity_data:
  2046. logger.warning("Cannot generate Collision Severity chart: empty data")
  2047. return None
  2048. # 创建DataFrame
  2049. df = pd.DataFrame(severity_data)
  2050. # 获取阈值
  2051. thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity')
  2052. min_threshold = thresholds.get('min')
  2053. max_threshold = thresholds.get('max')
  2054. # # 创建图表
  2055. # plt.figure(figsize=(12, 6))
  2056. # plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity')
  2057. # # 添加阈值线
  2058. # if min_threshold is not None:
  2059. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
  2060. # if max_threshold is not None:
  2061. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
  2062. # plt.xlabel('Time (s)')
  2063. # plt.ylabel('Severity (%)')
  2064. # plt.title('Collision Severity (collisionSeverity) Trend')
  2065. # plt.grid(True)
  2066. # plt.legend()
  2067. # # 保存图表
  2068. # chart_filename = os.path.join(output_dir, f"collision_severity_chart.png")
  2069. # plt.savefig(chart_filename, dpi=300)
  2070. # plt.close()
  2071. # logger.info(f"Collision Severity chart saved to: {chart_filename}")
  2072. # 保存CSV数据,包含阈值信息
  2073. csv_filename = os.path.join(output_dir, f"collisionseverity_data.csv")
  2074. df_csv = df.copy()
  2075. df_csv['min_threshold'] = min_threshold
  2076. df_csv['max_threshold'] = max_threshold
  2077. df_csv.to_csv(csv_filename, index=False)
  2078. logger.info(f"Collision Severity data saved to: {csv_filename}")
  2079. return csv_filename
  2080. except Exception as e:
  2081. logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True)
  2082. return None
  2083. def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  2084. str]:
  2085. """Generate chart data for traffic metrics"""
  2086. # 待实现
  2087. return None
  2088. def calculate_distance(ego_df, correctwarning):
  2089. """计算预警距离"""
  2090. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  2091. return dist
  2092. def calculate_relative_speed(ego_df, correctwarning):
  2093. """计算相对速度"""
  2094. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  2095. # 使用function.py中已实现的scenario_sign_dict
  2096. from modules.metric.function import scenario_sign_dict
  2097. if __name__ == "__main__":
  2098. # 测试代码
  2099. print("Metrics visualization utilities loaded.")