chart_generator.py 103 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/05/20
  12. @Summary: Chart generation utilities for metrics visualization
  13. """
  14. """
  15. 主要功能模块:
  16. 函数指标绘图(generate_function_chart_data)
  17. 舒适性指标绘图(generate_comfort_chart_data)
  18. 安全性指标绘图(generate_safety_chart_data)
  19. 交通指标绘图(generate_traffic_chart_data,待实现)
  20. 新增的急加速指标绘图:
  21. 在generate_comfort_chart_data中增加了slamaccelerate指标的支持
  22. 实现了完整的generate_slam_accelerate_chart函数,用于绘制急加速事件
  23. 该函数包含:
  24. 数据获取与预处理
  25. CSV数据保存与读取
  26. 纵向加速度与速度的双子图绘制
  27. 急加速事件的橙色背景标记
  28. 阈值线绘制
  29. 高质量图表输出(300dpi PNG)
  30. 其他关键特性:
  31. 统一的日志记录系统
  32. 阈值获取函数get_metric_thresholds
  33. 错误处理和异常捕获
  34. 时间戳管理
  35. 数据验证机制
  36. 详细的日志输出
  37. 辅助函数:
  38. calculate_distance 和 calculate_relative_speed(简化实现)
  39. scenario_sign_dict 场景签名字典(简化实现)
  40. 此代码实现了完整的指标可视化工具,特别针对急加速指标slamAccelerate提供了详细的绘图功能,能够清晰展示急加速事件的发生时间和相关数据变化。
  41. """
  42. # import matplotlib
  43. # matplotlib.use('Agg') # 使用非图形界面的后端
  44. # import matplotlib.pyplot as plt
  45. import os
  46. import numpy as np
  47. import pandas as pd
  48. import matplotlib.pyplot as plt
  49. from typing import Optional, Dict, List, Any, Union
  50. from pathlib import Path
  51. from modules.lib.log_manager import LogManager
  52. def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  53. str]:
  54. """
  55. Generate chart data for function metrics
  56. Args:
  57. function_calculator: FunctionCalculator instance
  58. metric_name: Metric name
  59. output_dir: Output directory
  60. Returns:
  61. str: Chart file path, or None if generation fails
  62. """
  63. logger = LogManager().get_logger()
  64. try:
  65. # 确保输出目录存在
  66. if not output_dir:
  67. output_dir = os.path.join(os.getcwd(), 'data')
  68. os.makedirs(output_dir, exist_ok=True)
  69. # 根据指标名称选择不同的图表生成方法
  70. if metric_name.lower() == 'latestwarningdistance_ttc_lst':
  71. return generate_latest_warning_ttc_chart(function_calculator, output_dir)
  72. elif metric_name.lower() == 'earliestwarningdistance_ttc_lst':
  73. return generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir)
  74. elif metric_name.lower() == 'earliestwarningdistance_lst':
  75. return generate_earliest_warning_distance_chart(function_calculator, output_dir)
  76. elif metric_name.lower() == 'latestwarningdistance_lst':
  77. return generate_latest_warning_distance_chart(function_calculator, output_dir)
  78. elif metric_name.lower() == 'latestwarningdistance_ttc_pgvil':
  79. return generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir)
  80. elif metric_name.lower() == 'earliestwarningdistance_ttc_pgvil':
  81. return generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir)
  82. elif metric_name.lower() == 'earliestwarningdistance_pgvil':
  83. return generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir)
  84. elif metric_name.lower() == 'latestwarningdistance_pgvil':
  85. return generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir)
  86. elif metric_name.lower() == 'limitspeed_lst':
  87. return generate_limit_speed_chart(function_calculator, output_dir)
  88. elif metric_name.lower() == 'limitspeedpastlimitsign_lst':
  89. return generate_limit_speed_past_sign_chart(function_calculator, output_dir)
  90. elif metric_name.lower() == 'maxlongitudedist_lst':
  91. return generate_max_longitude_dist_chart(function_calculator, output_dir)
  92. else:
  93. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  94. return None
  95. except Exception as e:
  96. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  97. return None
  98. def generate_earliest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  99. """
  100. Generate warning distance chart with data visualization.
  101. This function creates charts for earliestWarningDistance_LST and latestWarningDistance_LST metrics.
  102. Args:
  103. function_calculator: FunctionCalculator instance
  104. output_dir: Output directory
  105. Returns:
  106. str: Chart file path, or None if generation fails
  107. """
  108. logger = LogManager().get_logger()
  109. try:
  110. # Get data
  111. ego_df = function_calculator.ego_data.copy()
  112. # Check if correctwarning is already calculated
  113. correctwarning = getattr(function_calculator, 'correctwarning', None)
  114. # Get configured thresholds
  115. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_LST')
  116. max_threshold = thresholds["max"]
  117. min_threshold = thresholds["min"]
  118. # Get calculated warning distance and speed
  119. warning_dist = getattr(function_calculator, 'warning_dist', None)
  120. if warning_dist.empty:
  121. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  122. return None
  123. # Calculate metric value
  124. metric_value = float(warning_dist.iloc[0]) if len(warning_dist) >= 0.0 else max_threshold
  125. # Save CSV data
  126. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_data.csv")
  127. df_csv = pd.DataFrame({
  128. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  129. 'warning_distance': warning_dist,
  130. 'min_threshold': min_threshold,
  131. 'max_threshold': max_threshold,
  132. })
  133. df_csv.to_csv(csv_filename, index=False)
  134. logger.info(f"earliestWarningDistance_LST data saved to: {csv_filename}")
  135. return csv_filename
  136. # # Read data from CSV
  137. # df = pd.read_csv(csv_filename)
  138. # # Create single chart for warning distance
  139. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  140. # # Plot warning distance
  141. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  142. # # Add threshold lines
  143. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  144. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  145. # # Mark metric value
  146. # if len(df) > 0:
  147. # label_text = 'Earliest Warning Distance'
  148. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  149. # color='red', s=100, zorder=5,
  150. # label=f'{label_text}: {metric_value:.2f}m')
  151. # # Set y-axis range
  152. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  153. # plt.xlabel('Time (s)')
  154. # plt.ylabel('Distance (m)')
  155. # plt.title(f'earliestWarningDistance_LST - Warning Distance Over Time')
  156. # plt.grid(True)
  157. # plt.legend()
  158. # # Save image
  159. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_chart.png")
  160. # plt.savefig(chart_filename, dpi=300)
  161. # plt.close()
  162. # logger.info(f"earliestWarningDistance_LST chart saved to: {chart_filename}")
  163. # return chart_filename
  164. except Exception as e:
  165. logger.error(f"Failed to generate earliestWarningDistance_LST chart: {str(e)}", exc_info=True)
  166. return None
  167. def generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  168. """
  169. Generate warning distance chart with data visualization.
  170. This function creates charts for earliestWarningDistance_PGVIL and latestWarningDistance_PGVIL metrics.
  171. Args:
  172. function_calculator: FunctionCalculator instance
  173. output_dir: Output directory
  174. Returns:
  175. str: Chart file path, or None if generation fails
  176. """
  177. logger = LogManager().get_logger()
  178. try:
  179. # Get data
  180. ego_df = function_calculator.ego_data.copy()
  181. # Check if correctwarning is already calculated
  182. correctwarning = getattr(function_calculator, 'correctwarning', None)
  183. # Get configured thresholds
  184. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_PGVIL')
  185. max_threshold = thresholds["max"]
  186. min_threshold = thresholds["min"]
  187. # Get calculated warning distance and speed
  188. warning_dist = getattr(function_calculator, 'warning_dist', None)
  189. warning_time = getattr(function_calculator, 'warning_time', None)
  190. if len(warning_dist) == 0:
  191. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  192. return None
  193. # Calculate metric value
  194. metric_value = float(warning_dist[0]) if len(warning_dist) >= 0.0 else max_threshold
  195. # Save CSV data
  196. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_data.csv")
  197. df_csv = pd.DataFrame({
  198. 'simTime': warning_time,
  199. 'warning_distance': warning_dist,
  200. 'min_threshold': min_threshold,
  201. 'max_threshold': max_threshold
  202. })
  203. df_csv.to_csv(csv_filename, index=False)
  204. logger.info(f"earliestWarningDistance_PGVIL data saved to: {csv_filename}")
  205. return csv_filename
  206. # # Read data from CSV
  207. # df = pd.read_csv(csv_filename)
  208. # # Create single chart for warning distance
  209. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  210. # # Plot warning distance
  211. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  212. # # Add threshold lines
  213. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  214. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  215. # # Mark metric value
  216. # if len(df) > 0:
  217. # label_text = 'Earliest Warning Distance'
  218. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  219. # color='red', s=100, zorder=5,
  220. # label=f'{label_text}: {metric_value:.2f}m')
  221. # # Set y-axis range
  222. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  223. # plt.xlabel('Time (s)')
  224. # plt.ylabel('Distance (m)')
  225. # plt.title(f'earliestWarningDistance_PGVIL - Warning Distance Over Time')
  226. # plt.grid(True)
  227. # plt.legend()
  228. # # Save image
  229. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_chart.png")
  230. # plt.savefig(chart_filename, dpi=300)
  231. # plt.close()
  232. # logger.info(f"earliestWarningDistance_PGVIL chart saved to: {chart_filename}")
  233. # return chart_filename
  234. except Exception as e:
  235. logger.error(f"Failed to generate earliestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  236. return None
  237. # # 使用function.py中已实现的find_nested_name函数
  238. # from modules.metric.function import find_nested_name
  239. def generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  240. """
  241. Generate TTC warning chart with data visualization.
  242. This version first saves data to CSV, then uses the CSV to generate the chart.
  243. Args:
  244. function_calculator: FunctionCalculator instance
  245. output_dir: Output directory
  246. Returns:
  247. str: Chart file path, or None if generation fails
  248. """
  249. logger = LogManager().get_logger()
  250. try:
  251. # 获取数据
  252. ego_df = function_calculator.ego_data.copy()
  253. correctwarning = getattr(function_calculator, 'correctwarning', None)
  254. # 获取配置的阈值
  255. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_PGVIL')
  256. max_threshold = thresholds["max"]
  257. min_threshold = thresholds["min"]
  258. warning_dist = getattr(function_calculator, 'warning_dist', None)
  259. warning_speed = getattr(function_calculator, 'warning_speed', None)
  260. warning_time = getattr(function_calculator, 'warning_time', None)
  261. ttc = getattr(function_calculator, 'ttc', None)
  262. if len(warning_dist) == 0:
  263. logger.warning("Cannot generate TTC warning chart: empty data")
  264. return None
  265. # 生成时间戳
  266. # 保存 CSV 数据
  267. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_data.csv")
  268. df_csv = pd.DataFrame({
  269. 'simTime': warning_time,
  270. 'warning_distance': warning_dist,
  271. 'warning_speed': warning_speed,
  272. 'ttc': ttc,
  273. 'min_threshold': min_threshold,
  274. 'max_threshold': max_threshold,
  275. })
  276. df_csv.to_csv(csv_filename, index=False)
  277. logger.info(f"latestwarningdistance_ttc_pgvil data saved to: {csv_filename}")
  278. return csv_filename
  279. # # 从 CSV 读取数据
  280. # df = pd.read_csv(csv_filename)
  281. # # 创建图表
  282. # plt.figure(figsize=(12, 8), constrained_layout=True)
  283. # # 图 1:预警距离
  284. # ax1 = plt.subplot(3, 1, 1)
  285. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  286. # ax1.set_xlabel('Time (s)')
  287. # ax1.set_ylabel('Distance (m)')
  288. # ax1.set_title('Warning Distance Over Time')
  289. # ax1.grid(True)
  290. # ax1.legend()
  291. # # 图 2:相对速度
  292. # ax2 = plt.subplot(3, 1, 2)
  293. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  294. # ax2.set_xlabel('Time (s)')
  295. # ax2.set_ylabel('Speed (m/s)')
  296. # ax2.set_title('Relative Speed Over Time')
  297. # ax2.grid(True)
  298. # ax2.legend()
  299. # # 图 3:TTC
  300. # ax3 = plt.subplot(3, 1, 3)
  301. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  302. # # Add threshold lines
  303. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  304. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  305. # # Calculate metric value (latest TTC)
  306. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  307. # # Mark latest TTC value
  308. # if len(df) > 0:
  309. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  310. # color='red', s=100, zorder=5,
  311. # label=f'Latest TTC: {metric_value:.2f}s')
  312. # ax3.set_xlabel('Time (s)')
  313. # ax3.set_ylabel('TTC (s)')
  314. # ax3.set_title('Time To Collision (TTC) Over Time')
  315. # ax3.grid(True)
  316. # ax3.legend()
  317. # # 保存图像
  318. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_chart.png")
  319. # plt.savefig(chart_filename, dpi=300)
  320. # plt.close()
  321. # logger.info(f"latestwarningdistance_ttc_pgvil chart saved to: {chart_filename}")
  322. # return chart_filename
  323. except Exception as e:
  324. logger.error(f"Failed to generate latestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  325. return None
  326. def generate_latest_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  327. """
  328. Generate TTC warning chart with data visualization.
  329. This version first saves data to CSV, then uses the CSV to generate the chart.
  330. Args:
  331. function_calculator: FunctionCalculator instance
  332. output_dir: Output directory
  333. Returns:
  334. str: Chart file path, or None if generation fails
  335. """
  336. logger = LogManager().get_logger()
  337. try:
  338. # 获取数据
  339. ego_df = function_calculator.ego_data.copy()
  340. correctwarning = getattr(function_calculator, 'correctwarning', None)
  341. # 获取配置的阈值
  342. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_LST')
  343. max_threshold = thresholds["max"]
  344. min_threshold = thresholds["min"]
  345. warning_dist = getattr(function_calculator, 'warning_dist', None)
  346. warning_speed = getattr(function_calculator, 'warning_speed', None)
  347. ttc = getattr(function_calculator, 'ttc', None)
  348. if warning_dist.empty:
  349. logger.warning("Cannot generate TTC warning chart: empty data")
  350. return None
  351. # 生成时间戳
  352. # 保存 CSV 数据
  353. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_data.csv")
  354. df_csv = pd.DataFrame({
  355. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  356. 'warning_distance': warning_dist,
  357. 'warning_speed': warning_speed,
  358. 'ttc': ttc,
  359. 'min_threshold': min_threshold,
  360. 'max_threshold': max_threshold,
  361. })
  362. df_csv.to_csv(csv_filename, index=False)
  363. logger.info(f"latestwarningdistance_ttc_lst data saved to: {csv_filename}")
  364. return csv_filename
  365. # # 从 CSV 读取数据
  366. # df = pd.read_csv(csv_filename)
  367. # # 创建图表
  368. # plt.figure(figsize=(12, 8), constrained_layout=True)
  369. # # 图 1:预警距离
  370. # ax1 = plt.subplot(3, 1, 1)
  371. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  372. # ax1.set_xlabel('Time (s)')
  373. # ax1.set_ylabel('Distance (m)')
  374. # ax1.set_title('Warning Distance Over Time')
  375. # ax1.grid(True)
  376. # ax1.legend()
  377. # # 图 2:相对速度
  378. # ax2 = plt.subplot(3, 1, 2)
  379. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  380. # ax2.set_xlabel('Time (s)')
  381. # ax2.set_ylabel('Speed (m/s)')
  382. # ax2.set_title('Relative Speed Over Time')
  383. # ax2.grid(True)
  384. # ax2.legend()
  385. # # 图 3:TTC
  386. # ax3 = plt.subplot(3, 1, 3)
  387. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  388. # # Add threshold lines
  389. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  390. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  391. # # Calculate metric value (latest TTC)
  392. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  393. # # Mark latest TTC value
  394. # if len(df) > 0:
  395. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  396. # color='red', s=100, zorder=5,
  397. # label=f'Latest TTC: {metric_value:.2f}s')
  398. # ax3.set_xlabel('Time (s)')
  399. # ax3.set_ylabel('TTC (s)')
  400. # ax3.set_title('Time To Collision (TTC) Over Time')
  401. # ax3.grid(True)
  402. # ax3.legend()
  403. # # 保存图像
  404. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_chart.png")
  405. # plt.savefig(chart_filename, dpi=300)
  406. # plt.close()
  407. # logger.info(f"latestwarningdistance_ttc_lst chart saved to: {chart_filename}")
  408. # return chart_filename
  409. except Exception as e:
  410. logger.error(f"Failed to generate latestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  411. return None
  412. def generate_latest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  413. """
  414. Generate warning distance chart with data visualization.
  415. This function creates charts for latestWarningDistance_LST metric.
  416. Args:
  417. function_calculator: FunctionCalculator instance
  418. metric_name: Metric name (latestWarningDistance_LST)
  419. output_dir: Output directory
  420. Returns:
  421. str: Chart file path, or None if generation fails
  422. """
  423. logger = LogManager().get_logger()
  424. try:
  425. # Get data
  426. ego_df = function_calculator.ego_data.copy()
  427. # Check if correctwarning is already calculated
  428. correctwarning = getattr(function_calculator, 'correctwarning', None)
  429. # Get configured thresholds
  430. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_LST')
  431. max_threshold = thresholds["max"]
  432. min_threshold = thresholds["min"]
  433. # Get calculated warning distance and speed
  434. warning_dist = getattr(function_calculator, 'warning_dist', None)
  435. if warning_dist.empty:
  436. logger.warning(f"Cannot generate latestWarningDistance_LST chart: empty data")
  437. return None
  438. # Calculate metric value
  439. metric_value = float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else max_threshold
  440. # Save CSV data
  441. csv_filename = os.path.join(output_dir, f"latestWarningDistance_LST_data.csv")
  442. df_csv = pd.DataFrame({
  443. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  444. 'warning_distance': warning_dist,
  445. 'min_threshold': min_threshold,
  446. 'max_threshold': max_threshold
  447. })
  448. df_csv.to_csv(csv_filename, index=False)
  449. logger.info(f"latestWarningDistance_LST data saved to: {csv_filename}")
  450. return csv_filename
  451. # # Read data from CSV
  452. # df = pd.read_csv(csv_filename)
  453. # # Create single chart for warning distance
  454. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  455. # # Plot warning distance
  456. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  457. # # Add threshold lines
  458. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  459. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  460. # # Mark metric value
  461. # if len(df) > 0:
  462. # label_text = 'Latest Warning Distance'
  463. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  464. # color='red', s=100, zorder=5,
  465. # label=f'{label_text}: {metric_value:.2f}m')
  466. # # Set y-axis range
  467. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  468. # plt.xlabel('Time (s)')
  469. # plt.ylabel('Distance (m)')
  470. # plt.title(f'latestWarningDistance_LST - Warning Distance Over Time')
  471. # plt.grid(True)
  472. # plt.legend()
  473. # # Save image
  474. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_LST_chart.png")
  475. # plt.savefig(chart_filename, dpi=300)
  476. # plt.close()
  477. # logger.info(f"latestWarningDistance_LST chart saved to: {chart_filename}")
  478. # return chart_filename
  479. except Exception as e:
  480. logger.error(f"Failed to generate latestWarningDistance_LST chart: {str(e)}", exc_info=True)
  481. return None
  482. def generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  483. """
  484. Generate warning distance chart with data visualization.
  485. This function creates charts for latestWarningDistance_LST metric.
  486. Args:
  487. function_calculator: FunctionCalculator instance
  488. metric_name: Metric name (latestWarningDistance_LST)
  489. output_dir: Output directory
  490. Returns:
  491. str: Chart file path, or None if generation fails
  492. """
  493. logger = LogManager().get_logger()
  494. try:
  495. # Get data
  496. ego_df = function_calculator.ego_data.copy()
  497. # Check if correctwarning is already calculated
  498. correctwarning = getattr(function_calculator, 'correctwarning', None)
  499. # Get configured thresholds
  500. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_PGVIL')
  501. max_threshold = thresholds["max"]
  502. min_threshold = thresholds["min"]
  503. # Get calculated warning distance and speed
  504. warning_dist = getattr(function_calculator, 'warning_dist', None)
  505. warning_time = getattr(function_calculator, 'warning_time', None)
  506. if len(warning_dist) == 0:
  507. logger.warning(f"Cannot generate latestWarningDistance_PGVIL chart: empty data")
  508. return None
  509. # Calculate metric value
  510. metric_value = float(warning_dist[-1]) if len(warning_dist) > 0 else max_threshold
  511. # Save CSV data
  512. csv_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_data.csv")
  513. df_csv = pd.DataFrame({
  514. 'simTime': warning_time,
  515. 'warning_distance': warning_dist,
  516. 'min_threshold': min_threshold,
  517. 'max_threshold': max_threshold
  518. })
  519. df_csv.to_csv(csv_filename, index=False)
  520. logger.info(f"latestWarningDistance_PGVIL data saved to: {csv_filename}")
  521. return csv_filename
  522. # # Read data from CSV
  523. # df = pd.read_csv(csv_filename)
  524. # # Create single chart for warning distance
  525. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  526. # # Plot warning distance
  527. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  528. # # Add threshold lines
  529. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  530. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  531. # # Mark metric value
  532. # if len(df) > 0:
  533. # label_text = 'Latest Warning Distance'
  534. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  535. # color='red', s=100, zorder=5,
  536. # label=f'{label_text}: {metric_value:.2f}m')
  537. # # Set y-axis range
  538. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  539. # plt.xlabel('Time (s)')
  540. # plt.ylabel('Distance (m)')
  541. # plt.title(f'latestWarningDistance_PGVIL - Warning Distance Over Time')
  542. # plt.grid(True)
  543. # plt.legend()
  544. # # Save image
  545. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_chart.png")
  546. # plt.savefig(chart_filename, dpi=300)
  547. # plt.close()
  548. # logger.info(f"latestWarningDistance_PGVIL chart saved to: {chart_filename}")
  549. # return chart_filename
  550. except Exception as e:
  551. logger.error(f"Failed to generate latestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  552. return None
  553. def generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  554. """
  555. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_LST metric.
  556. Args:
  557. function_calculator: FunctionCalculator instance
  558. output_dir: Output directory
  559. Returns:
  560. str: Chart file path, or None if generation fails
  561. """
  562. logger = LogManager().get_logger()
  563. metric_name = 'earliestWarningDistance_TTC_LST'
  564. try:
  565. # Get data
  566. ego_df = function_calculator.ego_data.copy()
  567. # Check if correctwarning is already calculated
  568. correctwarning = getattr(function_calculator, 'correctwarning', None)
  569. # Get configured thresholds
  570. thresholds = get_metric_thresholds(function_calculator, metric_name)
  571. max_threshold = thresholds["max"]
  572. min_threshold = thresholds["min"]
  573. # Get calculated warning distance and speed
  574. warning_dist = getattr(function_calculator, 'correctwarning', None)
  575. warning_speed = getattr(function_calculator, 'warning_speed', None)
  576. ttc = getattr(function_calculator, 'ttc', None)
  577. # Calculate metric value
  578. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  579. # Save CSV data
  580. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  581. df_csv = pd.DataFrame({
  582. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  583. 'warning_distance': warning_dist,
  584. 'warning_speed': warning_speed,
  585. 'ttc': ttc,
  586. 'min_threshold': min_threshold,
  587. 'max_threshold': max_threshold
  588. })
  589. df_csv.to_csv(csv_filename, index=False)
  590. logger.info(f"{metric_name} data saved to: {csv_filename}")
  591. return csv_filename
  592. # # Read data from CSV
  593. # df = pd.read_csv(csv_filename)
  594. # # Create chart
  595. # plt.figure(figsize=(12, 8), constrained_layout=True)
  596. # # 图 1:预警距离
  597. # ax1 = plt.subplot(3, 1, 1)
  598. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  599. # ax1.set_xlabel('Time (s)')
  600. # ax1.set_ylabel('Distance (m)')
  601. # ax1.set_title('Warning Distance Over Time')
  602. # ax1.grid(True)
  603. # ax1.legend()
  604. # # 图 2:相对速度
  605. # ax2 = plt.subplot(3, 1, 2)
  606. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  607. # ax2.set_xlabel('Time (s)')
  608. # ax2.set_ylabel('Speed (m/s)')
  609. # ax2.set_title('Relative Speed Over Time')
  610. # ax2.grid(True)
  611. # ax2.legend()
  612. # # 图 3:TTC
  613. # ax3 = plt.subplot(3, 1, 3)
  614. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  615. # # Add threshold lines
  616. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  617. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  618. # # Mark earliest TTC value
  619. # if len(df) > 0:
  620. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  621. # color='red', s=100, zorder=5,
  622. # label=f'Earliest TTC: {metric_value:.2f}s')
  623. # ax3.set_xlabel('Time (s)')
  624. # ax3.set_ylabel('TTC (s)')
  625. # ax3.set_title('Time To Collision (TTC) Over Time')
  626. # ax3.grid(True)
  627. # ax3.legend()
  628. # # Save image
  629. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_lst_chart.png")
  630. # plt.savefig(chart_filename, dpi=300)
  631. # plt.close()
  632. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  633. # return chart_filename
  634. except Exception as e:
  635. logger.error(f"Failed to generate earliestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  636. return None
  637. def generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  638. """
  639. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_PGVIL metric.
  640. Args:
  641. function_calculator: FunctionCalculator instance
  642. output_dir: Output directory
  643. Returns:
  644. str: Chart file path, or None if generation fails
  645. """
  646. logger = LogManager().get_logger()
  647. metric_name = 'earliestWarningDistance_TTC_PGVIL'
  648. try:
  649. # Get data
  650. ego_df = function_calculator.ego_data.copy()
  651. # Check if correctwarning is already calculated
  652. correctwarning = getattr(function_calculator, 'correctwarning', None)
  653. # Get configured thresholds
  654. thresholds = get_metric_thresholds(function_calculator, metric_name)
  655. max_threshold = thresholds["max"]
  656. min_threshold = thresholds["min"]
  657. # Get calculated warning distance and speed
  658. warning_dist = getattr(function_calculator, 'warning_dist', None)
  659. warning_speed = getattr(function_calculator, 'warning_speed', None)
  660. ttc = getattr(function_calculator, 'ttc', None)
  661. warning_time = getattr(function_calculator, 'warning_time', None)
  662. # Calculate metric value
  663. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  664. # Save CSV data
  665. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  666. df_csv = pd.DataFrame({
  667. 'simTime': warning_time,
  668. 'warning_distance': warning_dist,
  669. 'warning_speed': warning_speed,
  670. 'ttc': ttc,
  671. 'min_threshold': min_threshold,
  672. 'max_threshold': max_threshold
  673. })
  674. df_csv.to_csv(csv_filename, index=False)
  675. logger.info(f"{metric_name} data saved to: {csv_filename}")
  676. return csv_filename
  677. # # Read data from CSV
  678. # df = pd.read_csv(csv_filename)
  679. # # Create chart
  680. # plt.figure(figsize=(12, 8), constrained_layout=True)
  681. # # 图 1:预警距离
  682. # ax1 = plt.subplot(3, 1, 1)
  683. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  684. # ax1.set_xlabel('Time (s)')
  685. # ax1.set_ylabel('Distance (m)')
  686. # ax1.set_title('Warning Distance Over Time')
  687. # ax1.grid(True)
  688. # ax1.legend()
  689. # # 图 2:相对速度
  690. # ax2 = plt.subplot(3, 1, 2)
  691. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  692. # ax2.set_xlabel('Time (s)')
  693. # ax2.set_ylabel('Speed (m/s)')
  694. # ax2.set_title('Relative Speed Over Time')
  695. # ax2.grid(True)
  696. # ax2.legend()
  697. # # 图 3:TTC
  698. # ax3 = plt.subplot(3, 1, 3)
  699. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  700. # # Add threshold lines
  701. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  702. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  703. # # Mark earliest TTC value
  704. # if len(df) > 0:
  705. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  706. # color='red', s=100, zorder=5,
  707. # label=f'Earliest TTC: {metric_value:.2f}s')
  708. # ax3.set_xlabel('Time (s)')
  709. # ax3.set_ylabel('TTC (s)')
  710. # ax3.set_title('Time To Collision (TTC) Over Time')
  711. # ax3.grid(True)
  712. # ax3.legend()
  713. # # Save image
  714. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_pgvil_chart.png")
  715. # plt.savefig(chart_filename, dpi=300)
  716. # plt.close()
  717. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  718. # return chart_filename
  719. except Exception as e:
  720. logger.error(f"Failed to generate earliestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  721. return None
  722. def generate_limit_speed_chart(function_calculator, output_dir: str) -> Optional[str]:
  723. """
  724. Generate limit speed chart with data visualization for limitSpeed_LST metric.
  725. Args:
  726. function_calculator: FunctionCalculator instance
  727. output_dir: Output directory
  728. Returns:
  729. str: Chart file path, or None if generation fails
  730. """
  731. logger = LogManager().get_logger()
  732. metric_name = 'limitSpeed_LST'
  733. try:
  734. # Get data
  735. ego_df = function_calculator.ego_data.copy()
  736. # Get configured thresholds
  737. thresholds = get_metric_thresholds(function_calculator, metric_name)
  738. max_threshold = thresholds["max"]
  739. min_threshold = thresholds["min"]
  740. if ego_df.empty:
  741. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  742. return None
  743. # Save CSV data
  744. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  745. df_csv = pd.DataFrame({
  746. 'simTime': ego_df['simTime'],
  747. 'speed': ego_df['v'],
  748. 'max_threshold': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df)))
  749. })
  750. df_csv.to_csv(csv_filename, index=False)
  751. logger.info(f"{metric_name} data saved to: {csv_filename}")
  752. return csv_filename
  753. # # Read data from CSV
  754. # df = pd.read_csv(csv_filename)
  755. # # Create chart
  756. # plt.figure(figsize=(12, 6), constrained_layout=True)
  757. # # Plot speed
  758. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  759. # plt.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold')
  760. # # Set y-axis range
  761. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  762. # plt.xlabel('Time (s)')
  763. # plt.ylabel('Speed (m/s)')
  764. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  765. # plt.grid(True)
  766. # plt.legend()
  767. # # Save image
  768. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  769. # plt.savefig(chart_filename, dpi=300)
  770. # plt.close()
  771. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  772. # return chart_filename
  773. except Exception as e:
  774. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  775. return None
  776. def generate_limit_speed_past_sign_chart(function_calculator, output_dir: str) -> Optional[str]:
  777. """
  778. Generate limit speed past sign chart with data visualization for limitSpeedPastLimitSign_LST metric.
  779. Args:
  780. function_calculator: FunctionCalculator instance
  781. output_dir: Output directory
  782. Returns:
  783. str: Chart file path, or None if generation fails
  784. """
  785. logger = LogManager().get_logger()
  786. metric_name = 'limitSpeedPastLimitSign_LST'
  787. try:
  788. # Get data
  789. ego_df = function_calculator.ego_data.copy()
  790. # Get configured thresholds
  791. thresholds = get_metric_thresholds(function_calculator, metric_name)
  792. max_threshold = thresholds["max"]
  793. min_threshold = thresholds["min"]
  794. if ego_df.empty:
  795. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  796. return None
  797. # Get sign passing time if available
  798. sign_time = getattr(function_calculator, 'sign_pass_time', None)
  799. if sign_time is None:
  800. # Try to estimate sign passing time (middle of the simulation)
  801. sign_time = ego_df['simTime'].iloc[len(ego_df) // 2]
  802. # Save CSV data
  803. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  804. df_csv = pd.DataFrame({
  805. 'simTime': ego_df['simTime'],
  806. 'speed': ego_df['v'],
  807. 'max_threshold': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))),
  808. 'sign_pass_time': sign_time
  809. })
  810. df_csv.to_csv(csv_filename, index=False)
  811. logger.info(f"{metric_name} data saved to: {csv_filename}")
  812. return csv_filename
  813. # # Read data from CSV
  814. # df = pd.read_csv(csv_filename)
  815. # # Create chart
  816. # plt.figure(figsize=(12, 6), constrained_layout=True)
  817. # # Plot speed
  818. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  819. # plt.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Speed')
  820. # # Mark sign passing time
  821. # plt.axvline(x=sign_time, color='g', linestyle='--', label='Speed Limit Sign')
  822. # # Set y-axis range
  823. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  824. # plt.xlabel('Time (s)')
  825. # plt.ylabel('Speed (m/s)')
  826. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  827. # plt.grid(True)
  828. # plt.legend()
  829. # # Save image
  830. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  831. # plt.savefig(chart_filename, dpi=300)
  832. # plt.close()
  833. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  834. # return chart_filename
  835. except Exception as e:
  836. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  837. return None
  838. def generate_max_longitude_dist_chart(function_calculator, output_dir: str) -> Optional[str]:
  839. """
  840. Generate maximum longitudinal distance chart with data visualization for maxLongitudeDist_LST metric.
  841. Args:
  842. function_calculator: FunctionCalculator instance
  843. output_dir: Output directory
  844. Returns:
  845. str: Chart file path, or None if generation fails
  846. """
  847. logger = LogManager().get_logger()
  848. metric_name = 'maxLongitudeDist_LST'
  849. try:
  850. # Get data
  851. ego_df = function_calculator.ego_data.copy()
  852. # Get configured thresholds
  853. thresholds = get_metric_thresholds(function_calculator, metric_name)
  854. max_threshold = thresholds["max"]
  855. min_threshold = thresholds["min"]
  856. # Get longitudinal distance data
  857. longitude_dist = ego_df['longitude_dist'] if 'longitude_dist' in ego_df.columns else None
  858. stop_time = ego_df['stop_time'] if 'stop_time' in ego_df.columns else None
  859. if longitude_dist is None or longitude_dist.empty:
  860. logger.warning(f"Cannot generate {metric_name} chart: missing longitudinal distance data")
  861. return None
  862. # Calculate metric value
  863. metric_value = longitude_dist.max()
  864. max_distance_time = ego_df.loc[longitude_dist.idxmax(), 'simTime']
  865. # Save CSV data
  866. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  867. df_csv = pd.DataFrame({
  868. 'simTime': ego_df['simTime'],
  869. 'x_relative_dist': ego_df['x_relative_dist'],
  870. 'stop_time': stop_time,
  871. 'longitude_dist': longitude_dist
  872. })
  873. df_csv.to_csv(csv_filename, index=False)
  874. logger.info(f"{metric_name} data saved to: {csv_filename}")
  875. return csv_filename
  876. # # Read data from CSV
  877. # df = pd.read_csv(csv_filename)
  878. # # Create chart
  879. # plt.figure(figsize=(12, 6), constrained_layout=True)
  880. # # Plot longitudinal distance
  881. # plt.plot(df['simTime'], df['x_relative_dist'], 'b-', label='Longitudinal Distance')
  882. # # Add threshold lines
  883. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  884. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  885. # # Mark maximum longitudinal distance
  886. # plt.scatter(max_distance_time, metric_value,
  887. # color='red', s=100, zorder=5,
  888. # label=f'Maximum Longitudinal Distance: {metric_value:.2f}m')
  889. # # Set y-axis range
  890. # plt.ylim(bottom=min(0, min_threshold * 0.9), top=max(max_threshold * 1.1, df['longitude_dist'].max() * 1.1))
  891. # plt.xlabel('Time (s)')
  892. # plt.ylabel('Longitudinal Distance (m)')
  893. # plt.title(f'{metric_name} - Longitudinal Distance Over Time')
  894. # plt.grid(True)
  895. # plt.legend()
  896. # # Save image
  897. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  898. # plt.savefig(chart_filename, dpi=300)
  899. # plt.close()
  900. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  901. # return chart_filename
  902. except Exception as e:
  903. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  904. return None
  905. def generate_warning_delay_time_chart(function_calculator, output_dir: str) -> Optional[str]:
  906. """
  907. Generate warning delay time chart with data visualization for warningDelayTime_LST metric.
  908. Args:
  909. function_calculator: FunctionCalculator instance
  910. output_dir: Output directory
  911. Returns:
  912. str: Chart file path, or None if generation fails
  913. """
  914. logger = LogManager().get_logger()
  915. metric_name = 'warningDelayTime_LST'
  916. try:
  917. # Get data
  918. ego_df = function_calculator.ego_data.copy()
  919. # Get configured thresholds
  920. thresholds = get_metric_thresholds(function_calculator, metric_name)
  921. max_threshold = thresholds["max"]
  922. min_threshold = thresholds["min"]
  923. # Check if correctwarning is already calculated
  924. correctwarning = getattr(function_calculator, 'correctwarning', None)
  925. if correctwarning is None:
  926. logger.warning(f"Cannot generate {metric_name} chart: missing correctwarning value")
  927. return None
  928. # Get HMI warning time and rosbag warning time
  929. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  930. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  931. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  932. 'simTime'].tolist()
  933. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  934. if (simTime_HMI is None) or (simTime_rosbag is None):
  935. logger.warning(f"Cannot generate {metric_name} chart: missing warning time data")
  936. return None
  937. # Calculate delay time
  938. delay_time = abs(simTime_HMI - simTime_rosbag)
  939. # Save CSV data
  940. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  941. df_csv = pd.DataFrame({
  942. 'HMI_warning_time': [simTime_HMI],
  943. 'rosbag_warning_time': [simTime_rosbag],
  944. 'delay_time': [delay_time],
  945. 'min_threshold': [min_threshold],
  946. 'max_threshold': [max_threshold]
  947. })
  948. df_csv.to_csv(csv_filename, index=False)
  949. logger.info(f"{metric_name} data saved to: {csv_filename}")
  950. return csv_filename
  951. # # Create chart - bar chart for delay time
  952. # plt.figure(figsize=(10, 6), constrained_layout=True)
  953. # # Plot delay time as bar
  954. # plt.bar(['Warning Delay Time'], [delay_time], color='blue', width=0.4)
  955. # # Add threshold lines
  956. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  957. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  958. # # Add value label
  959. # plt.text(0, delay_time + 0.05, f'{delay_time:.3f}s', ha='center', va='bottom', fontweight='bold')
  960. # # Set y-axis range
  961. # plt.ylim(bottom=0, top=max(max_threshold * 1.2, delay_time * 1.2))
  962. # plt.ylabel('Delay Time (s)')
  963. # plt.title(f'{metric_name} - Warning Delay Time')
  964. # plt.grid(True, axis='y')
  965. # plt.legend()
  966. # # Save image
  967. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  968. # plt.savefig(chart_filename, dpi=300)
  969. # plt.close()
  970. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  971. # return chart_filename
  972. except Exception as e:
  973. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  974. return None
  975. def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  976. str]:
  977. """
  978. Generate chart data for comfort metrics
  979. Args:
  980. comfort_calculator: ComfortCalculator instance
  981. metric_name: Metric name
  982. output_dir: Output directory
  983. Returns:
  984. str: Chart file path, or None if generation fails
  985. """
  986. logger = LogManager().get_logger()
  987. try:
  988. # 确保输出目录存在
  989. if output_dir:
  990. os.makedirs(output_dir, exist_ok=True)
  991. else:
  992. output_dir = os.getcwd()
  993. # 根据指标名称选择不同的图表生成方法
  994. if metric_name.lower() == 'shake':
  995. return generate_shake_chart(comfort_calculator, output_dir)
  996. elif metric_name.lower() == 'zigzag':
  997. return generate_zigzag_chart(comfort_calculator, output_dir)
  998. elif metric_name.lower() == 'cadence':
  999. return generate_cadence_chart(comfort_calculator, output_dir)
  1000. elif metric_name.lower() == 'slambrake':
  1001. return generate_slam_brake_chart(comfort_calculator, output_dir)
  1002. elif metric_name.lower() == 'slamaccelerate':
  1003. return generate_slam_accelerate_chart(comfort_calculator, output_dir)
  1004. else:
  1005. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1006. return None
  1007. except Exception as e:
  1008. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1009. return None
  1010. def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1011. """
  1012. Generate shake metric chart with orange background for shake events.
  1013. This version first saves data to CSV, then uses the CSV to generate the chart.
  1014. Args:
  1015. comfort_calculator: ComfortCalculator instance
  1016. output_dir: Output directory
  1017. Returns:
  1018. str: Chart file path, or None if generation fails
  1019. """
  1020. logger = LogManager().get_logger()
  1021. try:
  1022. # 获取数据
  1023. df = comfort_calculator.ego_df.copy()
  1024. shake_events = comfort_calculator.shake_events
  1025. if df.empty:
  1026. logger.warning("Cannot generate shake chart: empty data")
  1027. return None
  1028. # 生成时间戳
  1029. # 保存 CSV 数据(第一步)
  1030. csv_filename = os.path.join(output_dir, f"shake_data.csv")
  1031. df_csv = pd.DataFrame({
  1032. 'simTime': df['simTime'],
  1033. 'lat_acc': df['lat_acc'],
  1034. 'lat_acc_rate': df['lat_acc_rate'],
  1035. 'speedH_std': df['speedH_std'],
  1036. 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None] * len(df))),
  1037. 'lat_acc_rate_threshold': 0.5,
  1038. 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None] * len(df))),
  1039. })
  1040. df_csv.to_csv(csv_filename, index=False)
  1041. logger.info(f"Shake data saved to: {csv_filename}")
  1042. return csv_filename
  1043. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1044. # df = pd.read_csv(csv_filename)
  1045. # # 创建图表(第三步)
  1046. # import matplotlib.pyplot as plt
  1047. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1048. # # 图 1:横向加速度
  1049. # ax1 = plt.subplot(3, 1, 1)
  1050. # ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
  1051. # if 'lat_acc_threshold' in df.columns:
  1052. # ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold')
  1053. # for idx, event in enumerate(shake_events):
  1054. # label = 'Shake Event' if idx == 0 else None
  1055. # ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1056. # ax1.set_xlabel('Time (s)')
  1057. # ax1.set_ylabel('Lateral Acceleration (m/s²)')
  1058. # ax1.set_title('Shake Event Detection - Lateral Acceleration')
  1059. # ax1.grid(True)
  1060. # ax1.legend()
  1061. # # 图 2:lat_acc_rate
  1062. # ax2 = plt.subplot(3, 1, 2)
  1063. # ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate')
  1064. # ax2.axhline(
  1065. # y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold'
  1066. # )
  1067. # for idx, event in enumerate(shake_events):
  1068. # label = 'Shake Event' if idx == 0 else None
  1069. # ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1070. # ax2.set_xlabel('Time (s)')
  1071. # ax2.set_ylabel('Angular Velocity (m/s³)')
  1072. # ax2.set_title('Shake Event Detection - lat_acc_rate')
  1073. # ax2.grid(True)
  1074. # ax2.legend()
  1075. # # 图 3:speedH_std
  1076. # ax3 = plt.subplot(3, 1, 3)
  1077. # ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std')
  1078. # if 'speedH_std_threshold' in df.columns:
  1079. # ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold')
  1080. # for idx, event in enumerate(shake_events):
  1081. # label = 'Shake Event' if idx == 0 else None
  1082. # ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1083. # ax3.set_xlabel('Time (s)')
  1084. # ax3.set_ylabel('Angular Velocity (deg/s)')
  1085. # ax3.set_title('Shake Event Detection - speedH_std')
  1086. # ax3.grid(True)
  1087. # ax3.legend()
  1088. # # 保存图像
  1089. # chart_filename = os.path.join(output_dir, f"shake_chart.png")
  1090. # plt.savefig(chart_filename, dpi=300)
  1091. # plt.close()
  1092. # logger.info(f"Shake chart saved to: {chart_filename}")
  1093. # return chart_filename
  1094. except Exception as e:
  1095. logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True)
  1096. return None
  1097. def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1098. """
  1099. Generate zigzag metric chart with orange background for zigzag events.
  1100. This version first saves data to CSV, then uses the CSV to generate the chart.
  1101. Args:
  1102. comfort_calculator: ComfortCalculator instance
  1103. output_dir: Output directory
  1104. Returns:
  1105. str: Chart file path, or None if generation fails
  1106. """
  1107. logger = LogManager().get_logger()
  1108. try:
  1109. # 获取数据
  1110. df = comfort_calculator.ego_df.copy()
  1111. zigzag_events = comfort_calculator.discomfort_df[
  1112. comfort_calculator.discomfort_df['type'] == 'zigzag'
  1113. ].copy()
  1114. if df.empty:
  1115. logger.warning("Cannot generate zigzag chart: empty data")
  1116. return None
  1117. # 生成时间戳
  1118. # 保存 CSV 数据(第一步)
  1119. csv_filename = os.path.join(output_dir, f"zigzag_data.csv")
  1120. df_csv = pd.DataFrame({
  1121. 'simTime': df['simTime'],
  1122. 'speedH': df['speedH'],
  1123. 'posH': df['posH'],
  1124. 'min_speedH_threshold': -2.3, # 可替换为动态阈值
  1125. 'max_speedH_threshold': 2.3
  1126. })
  1127. df_csv.to_csv(csv_filename, index=False)
  1128. logger.info(f"Zigzag data saved to: {csv_filename}")
  1129. return csv_filename
  1130. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1131. # df = pd.read_csv(csv_filename)
  1132. # # 创建图表(第三步)
  1133. # import matplotlib.pyplot as plt
  1134. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1135. # # ===== 子图1:Yaw Rate =====
  1136. # ax1 = plt.subplot(2, 1, 1)
  1137. # ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
  1138. # # 添加 speedH 上下限阈值线
  1139. # ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)')
  1140. # ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)')
  1141. # # 添加橙色背景:Zigzag Events
  1142. # for idx, event in zigzag_events.iterrows():
  1143. # label = 'Zigzag Event' if idx == 0 else None
  1144. # ax1.axvspan(event['start_time'], event['end_time'],
  1145. # alpha=0.3, color='orange', label=label)
  1146. # ax1.set_xlabel('Time (s)')
  1147. # ax1.set_ylabel('Angular Velocity (deg/s)')
  1148. # ax1.set_title('Zigzag Event Detection - Yaw Rate')
  1149. # ax1.grid(True)
  1150. # ax1.legend(loc='upper left')
  1151. # # ===== 子图2:Yaw Angle =====
  1152. # ax2 = plt.subplot(2, 1, 2)
  1153. # ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw')
  1154. # # 添加橙色背景:Zigzag Events
  1155. # for idx, event in zigzag_events.iterrows():
  1156. # label = 'Zigzag Event' if idx == 0 else None
  1157. # ax2.axvspan(event['start_time'], event['end_time'],
  1158. # alpha=0.3, color='orange', label=label)
  1159. # ax2.set_xlabel('Time (s)')
  1160. # ax2.set_ylabel('Yaw (deg)')
  1161. # ax2.set_title('Zigzag Event Detection - Yaw Angle')
  1162. # ax2.grid(True)
  1163. # ax2.legend(loc='upper left')
  1164. # # 保存图像
  1165. # chart_filename = os.path.join(output_dir, f"zigzag_chart.png")
  1166. # plt.savefig(chart_filename, dpi=300)
  1167. # plt.close()
  1168. # logger.info(f"Zigzag chart saved to: {chart_filename}")
  1169. # return csv_filename
  1170. except Exception as e:
  1171. logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True)
  1172. return None
  1173. def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1174. """
  1175. Generate cadence metric chart with orange background for cadence events.
  1176. This version first saves data to CSV, then uses the CSV to generate the chart.
  1177. Args:
  1178. comfort_calculator: ComfortCalculator instance
  1179. output_dir: Output directory
  1180. Returns:
  1181. str: Chart file path, or None if generation fails
  1182. """
  1183. logger = LogManager().get_logger()
  1184. try:
  1185. # 获取数据
  1186. df = comfort_calculator.ego_df.copy()
  1187. cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy()
  1188. if df.empty:
  1189. logger.warning("Cannot generate cadence chart: empty data")
  1190. return None
  1191. # 生成时间戳
  1192. # 保存 CSV 数据(第一步)
  1193. csv_filename = os.path.join(output_dir, f"cadence_data.csv")
  1194. df_csv = pd.DataFrame({
  1195. 'simTime': df['simTime'],
  1196. 'lon_acc': df['lon_acc'],
  1197. 'v': df['v'],
  1198. 'max_threshold': df.get('ip_acc', pd.Series([None] * len(df))),
  1199. 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df)))
  1200. })
  1201. df_csv.to_csv(csv_filename, index=False)
  1202. logger.info(f"Cadence data saved to: {csv_filename}")
  1203. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1204. # df = pd.read_csv(csv_filename)
  1205. # # 创建图表(第三步)
  1206. # import matplotlib.pyplot as plt
  1207. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1208. # # 图 1:纵向加速度
  1209. # ax1 = plt.subplot(2, 1, 1)
  1210. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1211. # if 'max_threshold' in df.columns and 'min_threshold' in df.columns:
  1212. # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold')
  1213. # ax1.plot(df['simTime'], df['min_threshold'], 'g--', label='Min Threshold')
  1214. # # 添加橙色背景标识顿挫事件
  1215. # for idx, event in cadence_events.iterrows():
  1216. # label = 'Cadence Event' if idx == 0 else None
  1217. # ax1.axvspan(event['start_time'], event['end_time'],
  1218. # alpha=0.3, color='orange', label=label)
  1219. # ax1.set_xlabel('Time (s)')
  1220. # ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1221. # ax1.set_title('Cadence Event Detection - Longitudinal Acceleration')
  1222. # ax1.grid(True)
  1223. # ax1.legend()
  1224. # # 图 2:速度
  1225. # ax2 = plt.subplot(2, 1, 2)
  1226. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1227. # # 添加橙色背景标识顿挫事件
  1228. # for idx, event in cadence_events.iterrows():
  1229. # label = 'Cadence Event' if idx == 0 else None
  1230. # ax2.axvspan(event['start_time'], event['end_time'],
  1231. # alpha=0.3, color='orange', label=label)
  1232. # ax2.set_xlabel('Time (s)')
  1233. # ax2.set_ylabel('Velocity (m/s)')
  1234. # ax2.set_title('Cadence Event Detection - Vehicle Speed')
  1235. # ax2.grid(True)
  1236. # ax2.legend()
  1237. # # 保存图像
  1238. # chart_filename = os.path.join(output_dir, f"cadence_chart.png")
  1239. # plt.savefig(chart_filename, dpi=300)
  1240. # plt.close()
  1241. # logger.info(f"Cadence chart saved to: {chart_filename}")
  1242. # return chart_filename
  1243. except Exception as e:
  1244. logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True)
  1245. return None
  1246. def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1247. """
  1248. Generate slam brake metric chart with orange background for slam brake events.
  1249. This version first saves data to CSV, then uses the CSV to generate the chart.
  1250. Args:
  1251. comfort_calculator: ComfortCalculator instance
  1252. output_dir: Output directory
  1253. Returns:
  1254. str: Chart file path, or None if generation fails
  1255. """
  1256. logger = LogManager().get_logger()
  1257. try:
  1258. # 获取数据
  1259. df = comfort_calculator.ego_df.copy()
  1260. slam_brake_events = comfort_calculator.discomfort_df[
  1261. comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy()
  1262. if df.empty:
  1263. logger.warning("Cannot generate slam brake chart: empty data")
  1264. return None
  1265. # 生成时间戳
  1266. # 保存 CSV 数据(第一步)
  1267. csv_filename = os.path.join(output_dir, f"slam_brake_data.csv")
  1268. df_csv = pd.DataFrame({
  1269. 'simTime': df['simTime'],
  1270. 'lon_acc': df['lon_acc'],
  1271. 'lon_acc_diff': df['lon_acc_diff'],
  1272. 'v': df['v'],
  1273. 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))),
  1274. 'max_threshold': 0.0
  1275. })
  1276. df_csv.to_csv(csv_filename, index=False)
  1277. logger.info(f"Slam brake data saved to: {csv_filename}")
  1278. # return csv_filename
  1279. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1280. df = pd.read_csv(csv_filename)
  1281. # # 创建图表(第三步)
  1282. plt.figure(figsize=(12, 8), constrained_layout=True)
  1283. # # 图 1:纵向加速度
  1284. ax1 = plt.subplot(3, 1, 1)
  1285. ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1286. if 'min_threshold' in df.columns:
  1287. ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Min Threshold')
  1288. # # 添加橙色背景标识急刹车事件
  1289. for idx, event in slam_brake_events.iterrows():
  1290. label = 'Slam Brake Event' if idx == 0 else None
  1291. ax1.axvspan(event['start_time'], event['end_time'],
  1292. alpha=0.3, color='orange', label=label)
  1293. ax1.set_xlabel('Time (s)')
  1294. ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1295. ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration')
  1296. ax1.grid(True)
  1297. ax1.legend()
  1298. # # 图 2:速度
  1299. ax2 = plt.subplot(3, 1, 2)
  1300. ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1301. # # 添加橙色背景标识急刹车事件
  1302. for idx, event in slam_brake_events.iterrows():
  1303. label = 'Slam Brake Event' if idx == 0 else None
  1304. ax2.axvspan(event['start_time'], event['end_time'],
  1305. alpha=0.3, color='orange', label=label)
  1306. ax2.set_xlabel('Time (s)')
  1307. ax2.set_ylabel('Velocity (m/s)')
  1308. ax2.set_title('Slam Brake Event Detection - Vehicle Speed')
  1309. ax2.grid(True)
  1310. ax2.legend()
  1311. # # 图 3:加速度变化率
  1312. ax3 = plt.subplot(3, 1, 3)
  1313. ax3.plot(df['simTime'], df['lon_acc_diff'], 'r-', label='lon_acc_diff')
  1314. # # 添加橙色背景标识急刹车事件
  1315. # for idx, event in slam_brake_events.iterrows():
  1316. # label = 'Slam Brake Event' if idx == 0 else None
  1317. # ax2.axvspan(event['start_time'], event['end_time'],
  1318. # alpha=0.3, color='orange', label=label)
  1319. ax3.set_xlabel('Time (s)')
  1320. ax3.set_ylabel('lon_acc_diff (m/s^3)')
  1321. ax3.set_title('Slam Brake Event Detection - Longitudinal Acceleration Difference')
  1322. ax3.grid(True)
  1323. ax3.legend()
  1324. # # 保存图像
  1325. chart_filename = os.path.join(output_dir, f"slam_brake_chart.png")
  1326. plt.savefig(chart_filename, dpi=300)
  1327. plt.close()
  1328. logger.info(f"Slam brake chart saved to: {chart_filename}")
  1329. return chart_filename
  1330. except Exception as e:
  1331. logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True)
  1332. return None
  1333. def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1334. """
  1335. Generate slam accelerate metric chart with orange background for slam accelerate events.
  1336. This version first saves data to CSV, then uses the CSV to generate the chart.
  1337. Args:
  1338. comfort_calculator: ComfortCalculator instance
  1339. output_dir: Output directory
  1340. Returns:
  1341. str: Chart file path, or None if generation fails
  1342. """
  1343. logger = LogManager().get_logger()
  1344. try:
  1345. # 获取数据
  1346. df = comfort_calculator.ego_df.copy()
  1347. slam_accel_events = comfort_calculator.discomfort_df[
  1348. (comfort_calculator.discomfort_df['type'] == 'slam_accel')
  1349. ].copy()
  1350. if df.empty:
  1351. logger.warning("Cannot generate slam accelerate chart: empty data")
  1352. return None
  1353. # 生成时间戳
  1354. # 保存 CSV 数据(第一步)
  1355. csv_filename = os.path.join(output_dir, f"slam_accel_data.csv")
  1356. # 获取加速度阈值(如果存在)
  1357. accel_threshold = df.get('ip_acc', pd.Series([None] * len(df)))
  1358. df_csv = pd.DataFrame({
  1359. 'simTime': df['simTime'],
  1360. 'lon_acc': df['lon_acc'],
  1361. 'v': df['v'],
  1362. 'min_threshold': 0.0, # 加速度最小阈值设为0
  1363. 'max_threshold': accel_threshold # 急加速阈值
  1364. })
  1365. df_csv.to_csv(csv_filename, index=False)
  1366. logger.info(f"Slam accelerate data saved to: {csv_filename}")
  1367. return csv_filename
  1368. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1369. # df = pd.read_csv(csv_filename)
  1370. # # 创建图表(第三步)
  1371. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1372. # # 图 1:纵向加速度
  1373. # ax1 = plt.subplot(2, 1, 1)
  1374. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1375. # # 添加加速度阈值线
  1376. # if 'max_threshold' in df.columns and not df['max_threshold'].isnull().all():
  1377. # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold')
  1378. # # 添加橙色背景标识急加速事件
  1379. # for idx, event in slam_accel_events.iterrows():
  1380. # label = 'Slam Accelerate Event' if idx == 0 else None
  1381. # ax1.axvspan(event['start_time'], event['end_time'],
  1382. # alpha=0.3, color='orange', label=label)
  1383. # ax1.set_xlabel('Time (s)')
  1384. # ax1.set_ylabel('Acceleration (m/s²)')
  1385. # ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration')
  1386. # ax1.grid(True)
  1387. # ax1.legend()
  1388. # # 图 2:速度
  1389. # ax2 = plt.subplot(2, 1, 2)
  1390. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1391. # # 添加橙色背景标识急加速事件
  1392. # for idx, event in slam_accel_events.iterrows():
  1393. # label = 'Slam Accelerate Event' if idx == 0 else None
  1394. # ax2.axvspan(event['start_time'], event['end_time'],
  1395. # alpha=0.3, color='orange', label=label)
  1396. # ax2.set_xlabel('Time (s)')
  1397. # ax2.set_ylabel('Velocity (m/s)')
  1398. # ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed')
  1399. # ax2.grid(True)
  1400. # ax2.legend()
  1401. # # 保存图像
  1402. # chart_filename = os.path.join(output_dir, f"slam_accel_chart.png")
  1403. # plt.savefig(chart_filename, dpi=300)
  1404. # plt.close()
  1405. # logger.info(f"Slam accelerate chart saved to: {chart_filename}")
  1406. # return chart_filename
  1407. except Exception as e:
  1408. logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True)
  1409. return None
  1410. def get_metric_thresholds(calculator, metric_name: str) -> dict:
  1411. """
  1412. 从配置文件中获取指标的阈值
  1413. Args:
  1414. calculator: Calculator instance (FunctionCalculator, SafetyCalculator, ComfortCalculator, EfficientCalculator, TrafficCalculator)
  1415. metric_name: 指标名称
  1416. Returns:
  1417. dict: 包含min和max阈值的字典
  1418. """
  1419. logger = LogManager().get_logger()
  1420. thresholds = {"min": None, "max": None}
  1421. try:
  1422. # 根据计算器类型获取配置
  1423. if hasattr(calculator, 'data_processed'):
  1424. # 检查安全性指标配置
  1425. if hasattr(calculator.data_processed,
  1426. 'safety_config') and 'safety' in calculator.data_processed.safety_config:
  1427. config = calculator.data_processed.safety_config['safety']
  1428. metric_type = 'safety'
  1429. # 检查舒适性指标配置
  1430. elif hasattr(calculator.data_processed,
  1431. 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config:
  1432. config = calculator.data_processed.comfort_config['comfort']
  1433. metric_type = 'comfort'
  1434. # 检查功能性指标配置
  1435. elif hasattr(calculator.data_processed,
  1436. 'function_config') and 'function' in calculator.data_processed.function_config:
  1437. config = calculator.data_processed.function_config['function']
  1438. metric_type = 'function'
  1439. # 检查高效性指标配置
  1440. elif hasattr(calculator.data_processed,
  1441. 'efficient_config') and 'efficient' in calculator.data_processed.efficient_config:
  1442. config = calculator.data_processed.efficient_config['efficient']
  1443. metric_type = 'efficient'
  1444. # 检查交通性指标配置
  1445. elif hasattr(calculator.data_processed,
  1446. 'traffic_config') and 'traffic' in calculator.data_processed.traffic_config:
  1447. config = calculator.data_processed.traffic_config['traffic']
  1448. metric_type = 'traffic'
  1449. else:
  1450. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1451. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1452. config = calculator.function_config['function']
  1453. metric_type = 'function'
  1454. else:
  1455. logger.warning(f"无法找到{metric_name}的配置信息")
  1456. return thresholds
  1457. else:
  1458. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1459. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1460. config = calculator.function_config['function']
  1461. metric_type = 'function'
  1462. else:
  1463. logger.warning(f"计算器没有data_processed属性或function_config属性")
  1464. return thresholds
  1465. # 递归查找指标配置
  1466. def find_metric_config(node, target_name):
  1467. if isinstance(node, dict):
  1468. if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node:
  1469. return node
  1470. for key, value in node.items():
  1471. result = find_metric_config(value, target_name)
  1472. if result:
  1473. return result
  1474. return None
  1475. # 查找指标配置
  1476. metric_config = find_metric_config(config, metric_name)
  1477. if metric_config:
  1478. thresholds["min"] = metric_config.get("min")
  1479. thresholds["max"] = metric_config.get("max")
  1480. logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}")
  1481. else:
  1482. logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息")
  1483. except Exception as e:
  1484. logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True)
  1485. return thresholds
  1486. def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
  1487. """
  1488. Generate chart data for safety metrics
  1489. Args:
  1490. safety_calculator: SafetyCalculator instance
  1491. metric_name: Metric name
  1492. output_dir: Output directory
  1493. Returns:
  1494. str: Chart file path, or None if generation fails
  1495. """
  1496. logger = LogManager().get_logger()
  1497. try:
  1498. # 确保输出目录存在
  1499. if output_dir:
  1500. os.makedirs(output_dir, exist_ok=True)
  1501. else:
  1502. output_dir = os.getcwd()
  1503. # 根据指标名称选择不同的图表生成方法
  1504. if metric_name.lower() == 'ttc':
  1505. return generate_ttc_chart(safety_calculator, output_dir)
  1506. elif metric_name.lower() == 'mttc':
  1507. return generate_mttc_chart(safety_calculator, output_dir)
  1508. elif metric_name.lower() == 'thw':
  1509. return generate_thw_chart(safety_calculator, output_dir)
  1510. elif metric_name.lower() == 'lonsd':
  1511. return generate_lonsd_chart(safety_calculator, output_dir)
  1512. elif metric_name.lower() == 'latsd':
  1513. return generate_latsd_chart(safety_calculator, output_dir)
  1514. elif metric_name.lower() == 'btn':
  1515. return generate_btn_chart(safety_calculator, output_dir)
  1516. elif metric_name.lower() == 'collisionrisk':
  1517. return generate_collision_risk_chart(safety_calculator, output_dir)
  1518. elif metric_name.lower() == 'collisionseverity':
  1519. return generate_collision_severity_chart(safety_calculator, output_dir)
  1520. else:
  1521. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1522. return None
  1523. except Exception as e:
  1524. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1525. return None
  1526. def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1527. """
  1528. Generate TTC metric chart with orange background for unsafe events.
  1529. This version first saves data to CSV, then uses the CSV to generate the chart.
  1530. Args:
  1531. safety_calculator: SafetyCalculator instance
  1532. output_dir: Output directory
  1533. Returns:
  1534. str: Chart file path, or None if generation fails
  1535. """
  1536. logger = LogManager().get_logger()
  1537. try:
  1538. # 获取数据
  1539. ttc_data = safety_calculator.ttc_data
  1540. if not ttc_data:
  1541. logger.warning("Cannot generate TTC chart: empty data")
  1542. return None
  1543. # 创建DataFrame
  1544. df = pd.DataFrame(ttc_data)
  1545. # 获取阈值
  1546. thresholds = get_metric_thresholds(safety_calculator, 'TTC')
  1547. min_threshold = thresholds.get('min')
  1548. max_threshold = thresholds.get('max')
  1549. # 生成时间戳
  1550. # 保存 CSV 数据(第一步)
  1551. csv_filename = os.path.join(output_dir, f"ttc_data.csv")
  1552. df_csv = pd.DataFrame({
  1553. 'simTime': df['simTime'],
  1554. 'simFrame': df['simFrame'],
  1555. 'TTC': df['TTC'],
  1556. 'min_threshold': min_threshold,
  1557. 'max_threshold': max_threshold
  1558. })
  1559. df_csv.to_csv(csv_filename, index=False)
  1560. logger.info(f"TTC data saved to: {csv_filename}")
  1561. return csv_filename
  1562. # 第二步:从 CSV 读取(可验证保存数据无误)
  1563. df = pd.read_csv(csv_filename)
  1564. # 检测超阈值事件
  1565. unsafe_events = []
  1566. if min_threshold is not None:
  1567. # 对于TTC,小于最小阈值视为不安全
  1568. unsafe_condition = df['TTC'] < min_threshold
  1569. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1570. for _, group in df[unsafe_condition].groupby(event_groups):
  1571. if len(group) >= 2: # 至少2帧才算一次事件
  1572. start_time = group['simTime'].iloc[0]
  1573. end_time = group['simTime'].iloc[-1]
  1574. duration = end_time - start_time
  1575. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1576. unsafe_events.append({
  1577. 'start_time': start_time,
  1578. 'end_time': end_time,
  1579. 'start_frame': group['simFrame'].iloc[0],
  1580. 'end_frame': group['simFrame'].iloc[-1],
  1581. 'duration': duration,
  1582. 'min_ttc': group['TTC'].min()
  1583. })
  1584. # # 创建图表(第三步)
  1585. # plt.figure(figsize=(12, 8))
  1586. # plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC')
  1587. # # 添加阈值线
  1588. # if min_threshold is not None:
  1589. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1590. # if max_threshold is not None:
  1591. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1592. # # 添加橙色背景标识不安全事件
  1593. # for idx, event in enumerate(unsafe_events):
  1594. # label = 'Unsafe TTC Event' if idx == 0 else None
  1595. # plt.axvspan(event['start_time'], event['end_time'],
  1596. # alpha=0.3, color='orange', label=label)
  1597. # plt.xlabel('Time (s)')
  1598. # plt.ylabel('TTC (s)')
  1599. # plt.title('Time To Collision (TTC) Trend')
  1600. # plt.grid(True)
  1601. # plt.legend()
  1602. # # 保存图像
  1603. # chart_filename = os.path.join(output_dir, f"ttc_chart.png")
  1604. # plt.savefig(chart_filename, dpi=300)
  1605. # plt.close()
  1606. # # 记录不安全事件信息
  1607. # if unsafe_events:
  1608. # logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件")
  1609. # for i, event in enumerate(unsafe_events):
  1610. # logger.info(
  1611. # f"TTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s")
  1612. # logger.info(f"TTC chart saved to: {chart_filename}")
  1613. # return chart_filename
  1614. except Exception as e:
  1615. logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True)
  1616. return None
  1617. def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1618. """
  1619. Generate MTTC metric chart with orange background for unsafe events
  1620. Args:
  1621. safety_calculator: SafetyCalculator instance
  1622. output_dir: Output directory
  1623. Returns:
  1624. str: Chart file path, or None if generation fails
  1625. """
  1626. logger = LogManager().get_logger()
  1627. try:
  1628. # 获取数据
  1629. mttc_data = safety_calculator.mttc_data
  1630. if not mttc_data:
  1631. logger.warning("Cannot generate MTTC chart: empty data")
  1632. return None
  1633. # 创建DataFrame
  1634. df = pd.DataFrame(mttc_data)
  1635. # 获取阈值
  1636. thresholds = get_metric_thresholds(safety_calculator, 'MTTC')
  1637. min_threshold = thresholds.get('min')
  1638. max_threshold = thresholds.get('max')
  1639. # 检测超阈值事件
  1640. unsafe_events = []
  1641. if min_threshold is not None:
  1642. # 对于MTTC,小于最小阈值视为不安全
  1643. unsafe_condition = df['MTTC'] < min_threshold
  1644. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1645. for _, group in df[unsafe_condition].groupby(event_groups):
  1646. if len(group) >= 2: # 至少2帧才算一次事件
  1647. start_time = group['simTime'].iloc[0]
  1648. end_time = group['simTime'].iloc[-1]
  1649. duration = end_time - start_time
  1650. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1651. unsafe_events.append({
  1652. 'start_time': start_time,
  1653. 'end_time': end_time,
  1654. 'start_frame': group['simFrame'].iloc[0],
  1655. 'end_frame': group['simFrame'].iloc[-1],
  1656. 'duration': duration,
  1657. 'min_mttc': group['MTTC'].min()
  1658. })
  1659. # # 创建图表
  1660. # plt.figure(figsize=(12, 6))
  1661. # plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC')
  1662. # # 添加阈值线
  1663. # if min_threshold is not None:
  1664. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1665. # if max_threshold is not None:
  1666. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1667. # # 添加橙色背景标识不安全事件
  1668. # for idx, event in enumerate(unsafe_events):
  1669. # label = 'Unsafe MTTC Event' if idx == 0 else None
  1670. # plt.axvspan(event['start_time'], event['end_time'],
  1671. # alpha=0.3, color='orange', label=label)
  1672. # plt.xlabel('Time (s)')
  1673. # plt.ylabel('MTTC (s)')
  1674. # plt.title('Modified Time To Collision (MTTC) Trend')
  1675. # plt.grid(True)
  1676. # plt.legend()
  1677. # # 保存图表
  1678. # chart_filename = os.path.join(output_dir, f"mttc_chart.png")
  1679. # plt.savefig(chart_filename, dpi=300)
  1680. # plt.close()
  1681. # logger.info(f"MTTC chart saved to: {chart_filename}")
  1682. # 保存CSV数据,包含阈值信息
  1683. csv_filename = os.path.join(output_dir, f"mttc_data.csv")
  1684. df_csv = df.copy()
  1685. df_csv['min_threshold'] = min_threshold
  1686. df_csv['max_threshold'] = max_threshold
  1687. df_csv.to_csv(csv_filename, index=False)
  1688. # 记录不安全事件信息
  1689. if unsafe_events:
  1690. logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件")
  1691. for i, event in enumerate(unsafe_events):
  1692. logger.info(
  1693. f"MTTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s")
  1694. logger.info(f"MTTC data saved to: {csv_filename}")
  1695. return csv_filename
  1696. except Exception as e:
  1697. logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True)
  1698. return None
  1699. def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1700. """
  1701. Generate THW metric chart with orange background for unsafe events
  1702. Args:
  1703. safety_calculator: SafetyCalculator instance
  1704. output_dir: Output directory
  1705. Returns:
  1706. str: Chart file path, or None if generation fails
  1707. """
  1708. logger = LogManager().get_logger()
  1709. try:
  1710. # 获取数据
  1711. thw_data = safety_calculator.thw_data
  1712. if not thw_data:
  1713. logger.warning("Cannot generate THW chart: empty data")
  1714. return None
  1715. # 创建DataFrame
  1716. df = pd.DataFrame(thw_data)
  1717. # 获取阈值
  1718. thresholds = get_metric_thresholds(safety_calculator, 'THW')
  1719. min_threshold = thresholds.get('min')
  1720. max_threshold = thresholds.get('max')
  1721. # 检测超阈值事件
  1722. unsafe_events = []
  1723. if min_threshold is not None:
  1724. # 对于THW,小于最小阈值视为不安全
  1725. unsafe_condition = df['THW'] < min_threshold
  1726. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1727. for _, group in df[unsafe_condition].groupby(event_groups):
  1728. if len(group) >= 2: # 至少2帧才算一次事件
  1729. start_time = group['simTime'].iloc[0]
  1730. end_time = group['simTime'].iloc[-1]
  1731. duration = end_time - start_time
  1732. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1733. unsafe_events.append({
  1734. 'start_time': start_time,
  1735. 'end_time': end_time,
  1736. 'start_frame': group['simFrame'].iloc[0],
  1737. 'end_frame': group['simFrame'].iloc[-1],
  1738. 'duration': duration,
  1739. 'min_thw': group['THW'].min()
  1740. })
  1741. # # 创建图表
  1742. # plt.figure(figsize=(12, 10))
  1743. # plt.plot(df['simTime'], df['THW'], 'c-', label='THW')
  1744. # # 添加阈值线
  1745. # if min_threshold is not None:
  1746. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  1747. # if max_threshold is not None:
  1748. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1749. # # 添加橙色背景标识不安全事件
  1750. # for idx, event in enumerate(unsafe_events):
  1751. # label = 'Unsafe THW Event' if idx == 0 else None
  1752. # plt.axvspan(event['start_time'], event['end_time'],
  1753. # alpha=0.3, color='orange', label=label)
  1754. # plt.xlabel('Time (s)')
  1755. # plt.ylabel('THW (s)')
  1756. # plt.title('Time Headway (THW) Trend')
  1757. # plt.grid(True)
  1758. # plt.legend()
  1759. # # 保存图表
  1760. # chart_filename = os.path.join(output_dir, f"thw_chart.png")
  1761. # plt.savefig(chart_filename, dpi=300)
  1762. # plt.close()
  1763. # logger.info(f"THW chart saved to: {chart_filename}")
  1764. # 保存CSV数据,包含阈值信息
  1765. csv_filename = os.path.join(output_dir, f"thw_data.csv")
  1766. df_csv = df.copy()
  1767. df_csv['min_threshold'] = min_threshold
  1768. df_csv['max_threshold'] = max_threshold
  1769. df_csv.to_csv(csv_filename, index=False)
  1770. # 记录不安全事件信息
  1771. if unsafe_events:
  1772. logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件")
  1773. for i, event in enumerate(unsafe_events):
  1774. logger.info(
  1775. f"THW不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s")
  1776. logger.info(f"THW data saved to: {csv_filename}")
  1777. return csv_filename
  1778. except Exception as e:
  1779. logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True)
  1780. return None
  1781. def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1782. """
  1783. Generate Longitudinal Safe Distance metric chart
  1784. Args:
  1785. safety_calculator: SafetyCalculator instance
  1786. output_dir: Output directory
  1787. Returns:
  1788. str: Chart file path, or None if generation fails
  1789. """
  1790. logger = LogManager().get_logger()
  1791. try:
  1792. # 获取数据
  1793. lonsd_data = safety_calculator.lonsd_data
  1794. if not lonsd_data:
  1795. logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data")
  1796. return None
  1797. # 创建DataFrame
  1798. df = pd.DataFrame(lonsd_data)
  1799. # 获取阈值
  1800. thresholds = get_metric_thresholds(safety_calculator, 'LonSD')
  1801. min_threshold = thresholds.get('min')
  1802. max_threshold = thresholds.get('max')
  1803. # # 创建图表
  1804. # plt.figure(figsize=(12, 6))
  1805. # plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance')
  1806. # # 添加阈值线
  1807. # if min_threshold is not None:
  1808. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1809. # if max_threshold is not None:
  1810. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1811. # plt.xlabel('Time (s)')
  1812. # plt.ylabel('Distance (m)')
  1813. # plt.title('Longitudinal Safe Distance (LonSD) Trend')
  1814. # plt.grid(True)
  1815. # plt.legend()
  1816. # # 保存图表
  1817. # chart_filename = os.path.join(output_dir, f"lonsd_chart.png")
  1818. # plt.savefig(chart_filename, dpi=300)
  1819. # plt.close()
  1820. # logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}")
  1821. # 保存CSV数据,包含阈值信息
  1822. csv_filename = os.path.join(output_dir, f"lonsd_data.csv")
  1823. df_csv = df.copy()
  1824. df_csv['min_threshold'] = min_threshold
  1825. df_csv['max_threshold'] = max_threshold
  1826. df_csv.to_csv(csv_filename, index=False)
  1827. logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}")
  1828. return csv_filename
  1829. except Exception as e:
  1830. logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True)
  1831. return None
  1832. def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1833. """
  1834. Generate Lateral Safe Distance metric chart with orange background for unsafe events
  1835. Args:
  1836. safety_calculator: SafetyCalculator instance
  1837. output_dir: Output directory
  1838. Returns:
  1839. str: Chart file path, or None if generation fails
  1840. """
  1841. logger = LogManager().get_logger()
  1842. try:
  1843. # 获取数据
  1844. latsd_data = safety_calculator.latsd_data
  1845. if not latsd_data:
  1846. logger.warning("Cannot generate Lateral Safe Distance chart: empty data")
  1847. return None
  1848. # 创建DataFrame
  1849. df = pd.DataFrame(latsd_data)
  1850. # 获取阈值
  1851. thresholds = get_metric_thresholds(safety_calculator, 'LatSD')
  1852. min_threshold = thresholds.get('min')
  1853. max_threshold = thresholds.get('max')
  1854. # 检测超阈值事件
  1855. unsafe_events = []
  1856. if min_threshold is not None:
  1857. # 对于LatSD,小于最小阈值视为不安全
  1858. unsafe_condition = df['LatSD'] < min_threshold
  1859. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1860. for _, group in df[unsafe_condition].groupby(event_groups):
  1861. if len(group) >= 2: # 至少2帧才算一次事件
  1862. start_time = group['simTime'].iloc[0]
  1863. end_time = group['simTime'].iloc[-1]
  1864. duration = end_time - start_time
  1865. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1866. unsafe_events.append({
  1867. 'start_time': start_time,
  1868. 'end_time': end_time,
  1869. 'start_frame': group['simFrame'].iloc[0],
  1870. 'end_frame': group['simFrame'].iloc[-1],
  1871. 'duration': duration,
  1872. 'min_latsd': group['LatSD'].min()
  1873. })
  1874. # # 创建图表
  1875. # plt.figure(figsize=(12, 6))
  1876. # plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance')
  1877. # # 添加阈值线
  1878. # if min_threshold is not None:
  1879. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1880. # if max_threshold is not None:
  1881. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1882. # # 添加橙色背景标识不安全事件
  1883. # for idx, event in enumerate(unsafe_events):
  1884. # label = 'Unsafe LatSD Event' if idx == 0 else None
  1885. # plt.axvspan(event['start_time'], event['end_time'],
  1886. # alpha=0.3, color='orange', label=label)
  1887. # plt.xlabel('Time (s)')
  1888. # plt.ylabel('Distance (m)')
  1889. # plt.title('Lateral Safe Distance (LatSD) Trend')
  1890. # plt.grid(True)
  1891. # plt.legend()
  1892. # # 保存图表
  1893. # chart_filename = os.path.join(output_dir, f"latsd_chart.png")
  1894. # plt.savefig(chart_filename, dpi=300)
  1895. # plt.close()
  1896. # logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}")
  1897. # 保存CSV数据,包含阈值信息
  1898. csv_filename = os.path.join(output_dir, f"latsd_data.csv")
  1899. df_csv = df.copy()
  1900. df_csv['min_threshold'] = min_threshold
  1901. df_csv['max_threshold'] = max_threshold
  1902. df_csv.to_csv(csv_filename, index=False)
  1903. # 记录不安全事件信息
  1904. if unsafe_events:
  1905. logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件")
  1906. for i, event in enumerate(unsafe_events):
  1907. logger.info(
  1908. f"LatSD不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m")
  1909. logger.info(f"Lateral Safe Distance data saved to: {csv_filename}")
  1910. return csv_filename
  1911. except Exception as e:
  1912. logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True)
  1913. return None
  1914. def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1915. """
  1916. Generate Brake Threat Number metric chart with orange background for unsafe events
  1917. Args:
  1918. safety_calculator: SafetyCalculator instance
  1919. output_dir: Output directory
  1920. Returns:
  1921. str: Chart file path, or None if generation fails
  1922. """
  1923. logger = LogManager().get_logger()
  1924. try:
  1925. # 获取数据
  1926. btn_data = safety_calculator.btn_data
  1927. if not btn_data:
  1928. logger.warning("Cannot generate Brake Threat Number chart: empty data")
  1929. return None
  1930. # 创建DataFrame
  1931. df = pd.DataFrame(btn_data)
  1932. # 获取阈值
  1933. thresholds = get_metric_thresholds(safety_calculator, 'BTN')
  1934. min_threshold = thresholds.get('min')
  1935. max_threshold = thresholds.get('max')
  1936. # 检测超阈值事件
  1937. unsafe_events = []
  1938. if max_threshold is not None:
  1939. # 对于BTN,大于最大阈值视为不安全
  1940. unsafe_condition = df['BTN'] > max_threshold
  1941. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1942. for _, group in df[unsafe_condition].groupby(event_groups):
  1943. if len(group) >= 2: # 至少2帧才算一次事件
  1944. start_time = group['simTime'].iloc[0]
  1945. end_time = group['simTime'].iloc[-1]
  1946. duration = end_time - start_time
  1947. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1948. unsafe_events.append({
  1949. 'start_time': start_time,
  1950. 'end_time': end_time,
  1951. 'start_frame': group['simFrame'].iloc[0],
  1952. 'end_frame': group['simFrame'].iloc[-1],
  1953. 'duration': duration,
  1954. 'max_btn': group['BTN'].max()
  1955. })
  1956. # # 创建图表
  1957. # plt.figure(figsize=(12, 6))
  1958. # plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number')
  1959. # # 添加阈值线
  1960. # if min_threshold is not None:
  1961. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})')
  1962. # if max_threshold is not None:
  1963. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1964. # # 添加橙色背景标识不安全事件
  1965. # for idx, event in enumerate(unsafe_events):
  1966. # label = 'Unsafe BTN Event' if idx == 0 else None
  1967. # plt.axvspan(event['start_time'], event['end_time'],
  1968. # alpha=0.3, color='orange', label=label)
  1969. # plt.xlabel('Time (s)')
  1970. # plt.ylabel('BTN')
  1971. # plt.title('Brake Threat Number (BTN) Trend')
  1972. # plt.grid(True)
  1973. # plt.legend()
  1974. # # 保存图表
  1975. # chart_filename = os.path.join(output_dir, f"btn_chart.png")
  1976. # plt.savefig(chart_filename, dpi=300)
  1977. # plt.close()
  1978. # logger.info(f"Brake Threat Number chart saved to: {chart_filename}")
  1979. # 保存CSV数据,包含阈值信息
  1980. csv_filename = os.path.join(output_dir, f"btn_data.csv")
  1981. df_csv = df.copy()
  1982. df_csv['min_threshold'] = min_threshold
  1983. df_csv['max_threshold'] = max_threshold
  1984. df_csv.to_csv(csv_filename, index=False)
  1985. # 记录不安全事件信息
  1986. if unsafe_events:
  1987. logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件")
  1988. for i, event in enumerate(unsafe_events):
  1989. logger.info(
  1990. f"BTN不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}")
  1991. logger.info(f"Brake Threat Number data saved to: {csv_filename}")
  1992. return csv_filename
  1993. except Exception as e:
  1994. logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True)
  1995. return None
  1996. def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1997. """
  1998. Generate Collision Risk metric chart
  1999. Args:
  2000. safety_calculator: SafetyCalculator instance
  2001. output_dir: Output directory
  2002. Returns:
  2003. str: Chart file path, or None if generation fails
  2004. """
  2005. logger = LogManager().get_logger()
  2006. try:
  2007. # 获取数据
  2008. risk_data = safety_calculator.collision_risk_data
  2009. if not risk_data:
  2010. logger.warning("Cannot generate Collision Risk chart: empty data")
  2011. return None
  2012. # 创建DataFrame
  2013. df = pd.DataFrame(risk_data)
  2014. # 获取阈值
  2015. thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk')
  2016. min_threshold = thresholds.get('min')
  2017. max_threshold = thresholds.get('max')
  2018. # # 创建图表
  2019. # plt.figure(figsize=(12, 6))
  2020. # plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk')
  2021. # # 添加阈值线
  2022. # if min_threshold is not None:
  2023. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
  2024. # if max_threshold is not None:
  2025. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
  2026. # plt.xlabel('Time (s)')
  2027. # plt.ylabel('Risk Value (%)')
  2028. # plt.title('Collision Risk (collisionRisk) Trend')
  2029. # plt.grid(True)
  2030. # plt.legend()
  2031. # # 保存图表
  2032. # chart_filename = os.path.join(output_dir, f"collision_risk_chart.png")
  2033. # plt.savefig(chart_filename, dpi=300)
  2034. # plt.close()
  2035. # logger.info(f"Collision Risk chart saved to: {chart_filename}")
  2036. # 保存CSV数据,包含阈值信息
  2037. csv_filename = os.path.join(output_dir, f"collisionrisk_data.csv")
  2038. df_csv = df.copy()
  2039. df_csv['min_threshold'] = min_threshold
  2040. df_csv['max_threshold'] = max_threshold
  2041. df_csv.to_csv(csv_filename, index=False)
  2042. logger.info(f"Collision Risk data saved to: {csv_filename}")
  2043. return csv_filename
  2044. except Exception as e:
  2045. logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True)
  2046. return None
  2047. def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]:
  2048. """
  2049. Generate Collision Severity metric chart
  2050. Args:
  2051. safety_calculator: SafetyCalculator instance
  2052. output_dir: Output directory
  2053. Returns:
  2054. str: Chart file path, or None if generation fails
  2055. """
  2056. logger = LogManager().get_logger()
  2057. try:
  2058. # 获取数据
  2059. severity_data = safety_calculator.collision_severity_data
  2060. if not severity_data:
  2061. logger.warning("Cannot generate Collision Severity chart: empty data")
  2062. return None
  2063. # 创建DataFrame
  2064. df = pd.DataFrame(severity_data)
  2065. # 获取阈值
  2066. thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity')
  2067. min_threshold = thresholds.get('min')
  2068. max_threshold = thresholds.get('max')
  2069. # # 创建图表
  2070. # plt.figure(figsize=(12, 6))
  2071. # plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity')
  2072. # # 添加阈值线
  2073. # if min_threshold is not None:
  2074. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
  2075. # if max_threshold is not None:
  2076. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
  2077. # plt.xlabel('Time (s)')
  2078. # plt.ylabel('Severity (%)')
  2079. # plt.title('Collision Severity (collisionSeverity) Trend')
  2080. # plt.grid(True)
  2081. # plt.legend()
  2082. # # 保存图表
  2083. # chart_filename = os.path.join(output_dir, f"collision_severity_chart.png")
  2084. # plt.savefig(chart_filename, dpi=300)
  2085. # plt.close()
  2086. # logger.info(f"Collision Severity chart saved to: {chart_filename}")
  2087. # 保存CSV数据,包含阈值信息
  2088. csv_filename = os.path.join(output_dir, f"collisionseverity_data.csv")
  2089. df_csv = df.copy()
  2090. df_csv['min_threshold'] = min_threshold
  2091. df_csv['max_threshold'] = max_threshold
  2092. df_csv.to_csv(csv_filename, index=False)
  2093. logger.info(f"Collision Severity data saved to: {csv_filename}")
  2094. return csv_filename
  2095. except Exception as e:
  2096. logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True)
  2097. return None
  2098. def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  2099. str]:
  2100. """Generate chart data for traffic metrics"""
  2101. # 待实现
  2102. return None
  2103. def calculate_distance(ego_df, correctwarning):
  2104. """计算预警距离"""
  2105. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  2106. return dist
  2107. def calculate_relative_speed(ego_df, correctwarning):
  2108. """计算相对速度"""
  2109. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  2110. # 使用function.py中已实现的scenario_sign_dict
  2111. from modules.metric.function import scenario_sign_dict
  2112. if __name__ == "__main__":
  2113. # 测试代码
  2114. print("Metrics visualization utilities loaded.")