chart_generator.py 97 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/05/20
  12. @Summary: Chart generation utilities for metrics visualization
  13. """
  14. """
  15. 主要功能模块:
  16. 函数指标绘图(generate_function_chart_data)
  17. 舒适性指标绘图(generate_comfort_chart_data)
  18. 安全性指标绘图(generate_safety_chart_data)
  19. 交通指标绘图(generate_traffic_chart_data,待实现)
  20. 新增的急加速指标绘图:
  21. 在generate_comfort_chart_data中增加了slamaccelerate指标的支持
  22. 实现了完整的generate_slam_accelerate_chart函数,用于绘制急加速事件
  23. 该函数包含:
  24. 数据获取与预处理
  25. CSV数据保存与读取
  26. 纵向加速度与速度的双子图绘制
  27. 急加速事件的橙色背景标记
  28. 阈值线绘制
  29. 高质量图表输出(300dpi PNG)
  30. 其他关键特性:
  31. 统一的日志记录系统
  32. 阈值获取函数get_metric_thresholds
  33. 错误处理和异常捕获
  34. 时间戳管理
  35. 数据验证机制
  36. 详细的日志输出
  37. 辅助函数:
  38. calculate_distance 和 calculate_relative_speed(简化实现)
  39. scenario_sign_dict 场景签名字典(简化实现)
  40. 此代码实现了完整的指标可视化工具,特别针对急加速指标slamAccelerate提供了详细的绘图功能,能够清晰展示急加速事件的发生时间和相关数据变化。
  41. """
  42. # import matplotlib
  43. # matplotlib.use('Agg') # 使用非图形界面的后端
  44. # import matplotlib.pyplot as plt
  45. import os
  46. import numpy as np
  47. import pandas as pd
  48. from typing import Optional, Dict, List, Any, Union
  49. from pathlib import Path
  50. import matplotlib
  51. matplotlib.use('Agg')
  52. from modules.lib.log_manager import LogManager
  53. def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  54. str]:
  55. """
  56. Generate chart data for function metrics
  57. Args:
  58. function_calculator: FunctionCalculator instance
  59. metric_name: Metric name
  60. output_dir: Output directory
  61. Returns:
  62. str: Chart file path, or None if generation fails
  63. """
  64. logger = LogManager().get_logger()
  65. try:
  66. # 确保输出目录存在
  67. if not output_dir:
  68. output_dir = os.path.join(os.getcwd(), 'data')
  69. os.makedirs(output_dir, exist_ok=True)
  70. # 根据指标名称选择不同的图表生成方法
  71. if metric_name.lower() == 'latestwarningdistance_ttc_lst':
  72. return generate_latest_warning_ttc_chart(function_calculator, output_dir)
  73. elif metric_name.lower() == 'earliestwarningdistance_ttc_lst':
  74. return generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir)
  75. elif metric_name.lower() == 'earliestwarningdistance_lst':
  76. return generate_earliest_warning_distance_chart(function_calculator, output_dir)
  77. elif metric_name.lower() == 'latestwarningdistance_lst':
  78. return generate_latest_warning_distance_chart(function_calculator, output_dir)
  79. elif metric_name.lower() == 'latestwarningdistance_ttc_pgvil':
  80. return generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir)
  81. elif metric_name.lower() == 'earliestwarningdistance_ttc_pgvil':
  82. return generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir)
  83. elif metric_name.lower() == 'earliestwarningdistance_pgvil':
  84. return generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir)
  85. elif metric_name.lower() == 'latestwarningdistance_pgvil':
  86. return generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir)
  87. elif metric_name.lower() == 'limitspeed_lst':
  88. return generate_limit_speed_chart(function_calculator, output_dir)
  89. elif metric_name.lower() == 'limitspeedpastlimitsign_lst':
  90. return generate_limit_speed_past_sign_chart(function_calculator, output_dir)
  91. elif metric_name.lower() == 'maxlongitudedist_lst':
  92. return generate_max_longitude_dist_chart(function_calculator, output_dir)
  93. else:
  94. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  95. return None
  96. except Exception as e:
  97. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  98. return None
  99. def generate_earliest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  100. """
  101. Generate warning distance chart with data visualization.
  102. This function creates charts for earliestWarningDistance_LST and latestWarningDistance_LST metrics.
  103. Args:
  104. function_calculator: FunctionCalculator instance
  105. output_dir: Output directory
  106. Returns:
  107. str: Chart file path, or None if generation fails
  108. """
  109. logger = LogManager().get_logger()
  110. try:
  111. # Get data
  112. ego_df = function_calculator.ego_data.copy()
  113. # Check if correctwarning is already calculated
  114. correctwarning = getattr(function_calculator, 'correctwarning', None)
  115. # Get configured thresholds
  116. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_LST')
  117. max_threshold = thresholds["max"]
  118. min_threshold = thresholds["min"]
  119. # Get calculated warning distance and speed
  120. warning_dist = getattr(function_calculator, 'warning_dist', None)
  121. if warning_dist.empty:
  122. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  123. return None
  124. # Calculate metric value
  125. metric_value = float(warning_dist.iloc[0]) if len(warning_dist) >= 0.0 else max_threshold
  126. # Save CSV data
  127. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_data.csv")
  128. df_csv = pd.DataFrame({
  129. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  130. 'warning_distance': warning_dist,
  131. 'min_threshold': min_threshold,
  132. 'max_threshold': max_threshold,
  133. })
  134. df_csv.to_csv(csv_filename, index=False)
  135. logger.info(f"earliestWarningDistance_LST data saved to: {csv_filename}")
  136. return csv_filename
  137. # # Read data from CSV
  138. # df = pd.read_csv(csv_filename)
  139. # # Create single chart for warning distance
  140. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  141. # # Plot warning distance
  142. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  143. # # Add threshold lines
  144. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  145. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  146. # # Mark metric value
  147. # if len(df) > 0:
  148. # label_text = 'Earliest Warning Distance'
  149. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  150. # color='red', s=100, zorder=5,
  151. # label=f'{label_text}: {metric_value:.2f}m')
  152. # # Set y-axis range
  153. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  154. # plt.xlabel('Time (s)')
  155. # plt.ylabel('Distance (m)')
  156. # plt.title(f'earliestWarningDistance_LST - Warning Distance Over Time')
  157. # plt.grid(True)
  158. # plt.legend()
  159. # # Save image
  160. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_chart.png")
  161. # plt.savefig(chart_filename, dpi=300)
  162. # plt.close()
  163. # logger.info(f"earliestWarningDistance_LST chart saved to: {chart_filename}")
  164. # return chart_filename
  165. except Exception as e:
  166. logger.error(f"Failed to generate earliestWarningDistance_LST chart: {str(e)}", exc_info=True)
  167. return None
  168. def generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  169. """
  170. Generate warning distance chart with data visualization.
  171. This function creates charts for earliestWarningDistance_PGVIL and latestWarningDistance_PGVIL metrics.
  172. Args:
  173. function_calculator: FunctionCalculator instance
  174. output_dir: Output directory
  175. Returns:
  176. str: Chart file path, or None if generation fails
  177. """
  178. logger = LogManager().get_logger()
  179. try:
  180. # Get data
  181. ego_df = function_calculator.ego_data.copy()
  182. # Check if correctwarning is already calculated
  183. correctwarning = getattr(function_calculator, 'correctwarning', None)
  184. # Get configured thresholds
  185. thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_PGVIL')
  186. max_threshold = thresholds["max"]
  187. min_threshold = thresholds["min"]
  188. # Get calculated warning distance and speed
  189. warning_dist = getattr(function_calculator, 'warning_dist', None)
  190. warning_time = getattr(function_calculator, 'warning_time', None)
  191. if len(warning_dist) == 0:
  192. logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
  193. return None
  194. # Calculate metric value
  195. metric_value = float(warning_dist[0]) if len(warning_dist) >= 0.0 else max_threshold
  196. # Save CSV data
  197. csv_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_data.csv")
  198. df_csv = pd.DataFrame({
  199. 'simTime': warning_time,
  200. 'warning_distance': warning_dist,
  201. 'min_threshold': min_threshold,
  202. 'max_threshold': max_threshold
  203. })
  204. df_csv.to_csv(csv_filename, index=False)
  205. logger.info(f"earliestWarningDistance_PGVIL data saved to: {csv_filename}")
  206. return csv_filename
  207. # # Read data from CSV
  208. # df = pd.read_csv(csv_filename)
  209. # # Create single chart for warning distance
  210. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  211. # # Plot warning distance
  212. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  213. # # Add threshold lines
  214. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  215. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  216. # # Mark metric value
  217. # if len(df) > 0:
  218. # label_text = 'Earliest Warning Distance'
  219. # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
  220. # color='red', s=100, zorder=5,
  221. # label=f'{label_text}: {metric_value:.2f}m')
  222. # # Set y-axis range
  223. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  224. # plt.xlabel('Time (s)')
  225. # plt.ylabel('Distance (m)')
  226. # plt.title(f'earliestWarningDistance_PGVIL - Warning Distance Over Time')
  227. # plt.grid(True)
  228. # plt.legend()
  229. # # Save image
  230. # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_chart.png")
  231. # plt.savefig(chart_filename, dpi=300)
  232. # plt.close()
  233. # logger.info(f"earliestWarningDistance_PGVIL chart saved to: {chart_filename}")
  234. # return chart_filename
  235. except Exception as e:
  236. logger.error(f"Failed to generate earliestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  237. return None
  238. # # 使用function.py中已实现的find_nested_name函数
  239. # from modules.metric.function import find_nested_name
  240. def generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  241. """
  242. Generate TTC warning chart with data visualization.
  243. This version first saves data to CSV, then uses the CSV to generate the chart.
  244. Args:
  245. function_calculator: FunctionCalculator instance
  246. output_dir: Output directory
  247. Returns:
  248. str: Chart file path, or None if generation fails
  249. """
  250. logger = LogManager().get_logger()
  251. try:
  252. # 获取数据
  253. ego_df = function_calculator.ego_data.copy()
  254. correctwarning = getattr(function_calculator, 'correctwarning', None)
  255. # 获取配置的阈值
  256. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_PGVIL')
  257. max_threshold = thresholds["max"]
  258. min_threshold = thresholds["min"]
  259. warning_dist = getattr(function_calculator, 'warning_dist', None)
  260. warning_speed = getattr(function_calculator, 'warning_speed', None)
  261. warning_time = getattr(function_calculator, 'warning_time', None)
  262. ttc = getattr(function_calculator, 'ttc', None)
  263. if len(warning_dist) == 0:
  264. logger.warning("Cannot generate TTC warning chart: empty data")
  265. return None
  266. # 生成时间戳
  267. # 保存 CSV 数据
  268. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_data.csv")
  269. df_csv = pd.DataFrame({
  270. 'simTime': warning_time,
  271. 'warning_distance': warning_dist,
  272. 'warning_speed': warning_speed,
  273. 'ttc': ttc,
  274. 'min_threshold': min_threshold,
  275. 'max_threshold': max_threshold,
  276. })
  277. df_csv.to_csv(csv_filename, index=False)
  278. logger.info(f"latestwarningdistance_ttc_pgvil data saved to: {csv_filename}")
  279. return csv_filename
  280. # # 从 CSV 读取数据
  281. # df = pd.read_csv(csv_filename)
  282. # # 创建图表
  283. # plt.figure(figsize=(12, 8), constrained_layout=True)
  284. # # 图 1:预警距离
  285. # ax1 = plt.subplot(3, 1, 1)
  286. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  287. # ax1.set_xlabel('Time (s)')
  288. # ax1.set_ylabel('Distance (m)')
  289. # ax1.set_title('Warning Distance Over Time')
  290. # ax1.grid(True)
  291. # ax1.legend()
  292. # # 图 2:相对速度
  293. # ax2 = plt.subplot(3, 1, 2)
  294. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  295. # ax2.set_xlabel('Time (s)')
  296. # ax2.set_ylabel('Speed (m/s)')
  297. # ax2.set_title('Relative Speed Over Time')
  298. # ax2.grid(True)
  299. # ax2.legend()
  300. # # 图 3:TTC
  301. # ax3 = plt.subplot(3, 1, 3)
  302. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  303. # # Add threshold lines
  304. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  305. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  306. # # Calculate metric value (latest TTC)
  307. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  308. # # Mark latest TTC value
  309. # if len(df) > 0:
  310. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  311. # color='red', s=100, zorder=5,
  312. # label=f'Latest TTC: {metric_value:.2f}s')
  313. # ax3.set_xlabel('Time (s)')
  314. # ax3.set_ylabel('TTC (s)')
  315. # ax3.set_title('Time To Collision (TTC) Over Time')
  316. # ax3.grid(True)
  317. # ax3.legend()
  318. # # 保存图像
  319. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_chart.png")
  320. # plt.savefig(chart_filename, dpi=300)
  321. # plt.close()
  322. # logger.info(f"latestwarningdistance_ttc_pgvil chart saved to: {chart_filename}")
  323. # return chart_filename
  324. except Exception as e:
  325. logger.error(f"Failed to generate latestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  326. return None
  327. def generate_latest_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  328. """
  329. Generate TTC warning chart with data visualization.
  330. This version first saves data to CSV, then uses the CSV to generate the chart.
  331. Args:
  332. function_calculator: FunctionCalculator instance
  333. output_dir: Output directory
  334. Returns:
  335. str: Chart file path, or None if generation fails
  336. """
  337. logger = LogManager().get_logger()
  338. try:
  339. # 获取数据
  340. ego_df = function_calculator.ego_data.copy()
  341. correctwarning = getattr(function_calculator, 'correctwarning', None)
  342. # 获取配置的阈值
  343. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_LST')
  344. max_threshold = thresholds["max"]
  345. min_threshold = thresholds["min"]
  346. warning_dist = getattr(function_calculator, 'warning_dist', None)
  347. warning_speed = getattr(function_calculator, 'warning_speed', None)
  348. ttc = getattr(function_calculator, 'ttc', None)
  349. if warning_dist.empty:
  350. logger.warning("Cannot generate TTC warning chart: empty data")
  351. return None
  352. # 生成时间戳
  353. # 保存 CSV 数据
  354. csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_data.csv")
  355. df_csv = pd.DataFrame({
  356. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  357. 'warning_distance': warning_dist,
  358. 'warning_speed': warning_speed,
  359. 'ttc': ttc,
  360. 'min_threshold': min_threshold,
  361. 'max_threshold': max_threshold,
  362. })
  363. df_csv.to_csv(csv_filename, index=False)
  364. logger.info(f"latestwarningdistance_ttc_lst data saved to: {csv_filename}")
  365. return csv_filename
  366. # # 从 CSV 读取数据
  367. # df = pd.read_csv(csv_filename)
  368. # # 创建图表
  369. # plt.figure(figsize=(12, 8), constrained_layout=True)
  370. # # 图 1:预警距离
  371. # ax1 = plt.subplot(3, 1, 1)
  372. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  373. # ax1.set_xlabel('Time (s)')
  374. # ax1.set_ylabel('Distance (m)')
  375. # ax1.set_title('Warning Distance Over Time')
  376. # ax1.grid(True)
  377. # ax1.legend()
  378. # # 图 2:相对速度
  379. # ax2 = plt.subplot(3, 1, 2)
  380. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  381. # ax2.set_xlabel('Time (s)')
  382. # ax2.set_ylabel('Speed (m/s)')
  383. # ax2.set_title('Relative Speed Over Time')
  384. # ax2.grid(True)
  385. # ax2.legend()
  386. # # 图 3:TTC
  387. # ax3 = plt.subplot(3, 1, 3)
  388. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  389. # # Add threshold lines
  390. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  391. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  392. # # Calculate metric value (latest TTC)
  393. # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
  394. # # Mark latest TTC value
  395. # if len(df) > 0:
  396. # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
  397. # color='red', s=100, zorder=5,
  398. # label=f'Latest TTC: {metric_value:.2f}s')
  399. # ax3.set_xlabel('Time (s)')
  400. # ax3.set_ylabel('TTC (s)')
  401. # ax3.set_title('Time To Collision (TTC) Over Time')
  402. # ax3.grid(True)
  403. # ax3.legend()
  404. # # 保存图像
  405. # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_chart.png")
  406. # plt.savefig(chart_filename, dpi=300)
  407. # plt.close()
  408. # logger.info(f"latestwarningdistance_ttc_lst chart saved to: {chart_filename}")
  409. # return chart_filename
  410. except Exception as e:
  411. logger.error(f"Failed to generate latestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  412. return None
  413. def generate_latest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
  414. """
  415. Generate warning distance chart with data visualization.
  416. This function creates charts for latestWarningDistance_LST metric.
  417. Args:
  418. function_calculator: FunctionCalculator instance
  419. metric_name: Metric name (latestWarningDistance_LST)
  420. output_dir: Output directory
  421. Returns:
  422. str: Chart file path, or None if generation fails
  423. """
  424. logger = LogManager().get_logger()
  425. try:
  426. # Get data
  427. ego_df = function_calculator.ego_data.copy()
  428. # Check if correctwarning is already calculated
  429. correctwarning = getattr(function_calculator, 'correctwarning', None)
  430. # Get configured thresholds
  431. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_LST')
  432. max_threshold = thresholds["max"]
  433. min_threshold = thresholds["min"]
  434. # Get calculated warning distance and speed
  435. warning_dist = getattr(function_calculator, 'warning_dist', None)
  436. if warning_dist.empty:
  437. logger.warning(f"Cannot generate latestWarningDistance_LST chart: empty data")
  438. return None
  439. # Calculate metric value
  440. metric_value = float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else max_threshold
  441. # Save CSV data
  442. csv_filename = os.path.join(output_dir, f"latestWarningDistance_LST_data.csv")
  443. df_csv = pd.DataFrame({
  444. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  445. 'warning_distance': warning_dist,
  446. 'min_threshold': min_threshold,
  447. 'max_threshold': max_threshold
  448. })
  449. df_csv.to_csv(csv_filename, index=False)
  450. logger.info(f"latestWarningDistance_LST data saved to: {csv_filename}")
  451. return csv_filename
  452. # # Read data from CSV
  453. # df = pd.read_csv(csv_filename)
  454. # # Create single chart for warning distance
  455. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  456. # # Plot warning distance
  457. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  458. # # Add threshold lines
  459. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  460. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  461. # # Mark metric value
  462. # if len(df) > 0:
  463. # label_text = 'Latest Warning Distance'
  464. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  465. # color='red', s=100, zorder=5,
  466. # label=f'{label_text}: {metric_value:.2f}m')
  467. # # Set y-axis range
  468. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  469. # plt.xlabel('Time (s)')
  470. # plt.ylabel('Distance (m)')
  471. # plt.title(f'latestWarningDistance_LST - Warning Distance Over Time')
  472. # plt.grid(True)
  473. # plt.legend()
  474. # # Save image
  475. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_LST_chart.png")
  476. # plt.savefig(chart_filename, dpi=300)
  477. # plt.close()
  478. # logger.info(f"latestWarningDistance_LST chart saved to: {chart_filename}")
  479. # return chart_filename
  480. except Exception as e:
  481. logger.error(f"Failed to generate latestWarningDistance_LST chart: {str(e)}", exc_info=True)
  482. return None
  483. def generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  484. """
  485. Generate warning distance chart with data visualization.
  486. This function creates charts for latestWarningDistance_LST metric.
  487. Args:
  488. function_calculator: FunctionCalculator instance
  489. metric_name: Metric name (latestWarningDistance_LST)
  490. output_dir: Output directory
  491. Returns:
  492. str: Chart file path, or None if generation fails
  493. """
  494. logger = LogManager().get_logger()
  495. try:
  496. # Get data
  497. ego_df = function_calculator.ego_data.copy()
  498. # Check if correctwarning is already calculated
  499. correctwarning = getattr(function_calculator, 'correctwarning', None)
  500. # Get configured thresholds
  501. thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_PGVIL')
  502. max_threshold = thresholds["max"]
  503. min_threshold = thresholds["min"]
  504. # Get calculated warning distance and speed
  505. warning_dist = getattr(function_calculator, 'warning_dist', None)
  506. warning_time = getattr(function_calculator, 'warning_time', None)
  507. if len(warning_dist) == 0:
  508. logger.warning(f"Cannot generate latestWarningDistance_PGVIL chart: empty data")
  509. return None
  510. # Calculate metric value
  511. metric_value = float(warning_dist[-1]) if len(warning_dist) > 0 else max_threshold
  512. # Save CSV data
  513. csv_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_data.csv")
  514. df_csv = pd.DataFrame({
  515. 'simTime': warning_time,
  516. 'warning_distance': warning_dist,
  517. 'min_threshold': min_threshold,
  518. 'max_threshold': max_threshold
  519. })
  520. df_csv.to_csv(csv_filename, index=False)
  521. logger.info(f"latestWarningDistance_PGVIL data saved to: {csv_filename}")
  522. return csv_filename
  523. # # Read data from CSV
  524. # df = pd.read_csv(csv_filename)
  525. # # Create single chart for warning distance
  526. # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
  527. # # Plot warning distance
  528. # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  529. # # Add threshold lines
  530. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  531. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  532. # # Mark metric value
  533. # if len(df) > 0:
  534. # label_text = 'Latest Warning Distance'
  535. # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
  536. # color='red', s=100, zorder=5,
  537. # label=f'{label_text}: {metric_value:.2f}m')
  538. # # Set y-axis range
  539. # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
  540. # plt.xlabel('Time (s)')
  541. # plt.ylabel('Distance (m)')
  542. # plt.title(f'latestWarningDistance_PGVIL - Warning Distance Over Time')
  543. # plt.grid(True)
  544. # plt.legend()
  545. # # Save image
  546. # chart_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_chart.png")
  547. # plt.savefig(chart_filename, dpi=300)
  548. # plt.close()
  549. # logger.info(f"latestWarningDistance_PGVIL chart saved to: {chart_filename}")
  550. # return chart_filename
  551. except Exception as e:
  552. logger.error(f"Failed to generate latestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
  553. return None
  554. def generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
  555. """
  556. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_LST metric.
  557. Args:
  558. function_calculator: FunctionCalculator instance
  559. output_dir: Output directory
  560. Returns:
  561. str: Chart file path, or None if generation fails
  562. """
  563. logger = LogManager().get_logger()
  564. metric_name = 'earliestWarningDistance_TTC_LST'
  565. try:
  566. # Get data
  567. ego_df = function_calculator.ego_data.copy()
  568. # Check if correctwarning is already calculated
  569. correctwarning = getattr(function_calculator, 'correctwarning', None)
  570. # Get configured thresholds
  571. thresholds = get_metric_thresholds(function_calculator, metric_name)
  572. max_threshold = thresholds["max"]
  573. min_threshold = thresholds["min"]
  574. # Get calculated warning distance and speed
  575. warning_dist = getattr(function_calculator, 'correctwarning', None)
  576. warning_speed = getattr(function_calculator, 'warning_speed', None)
  577. ttc = getattr(function_calculator, 'ttc', None)
  578. # Calculate metric value
  579. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  580. # Save CSV data
  581. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  582. df_csv = pd.DataFrame({
  583. 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
  584. 'warning_distance': warning_dist,
  585. 'warning_speed': warning_speed,
  586. 'ttc': ttc,
  587. 'min_threshold': min_threshold,
  588. 'max_threshold': max_threshold
  589. })
  590. df_csv.to_csv(csv_filename, index=False)
  591. logger.info(f"{metric_name} data saved to: {csv_filename}")
  592. return csv_filename
  593. # # Read data from CSV
  594. # df = pd.read_csv(csv_filename)
  595. # # Create chart
  596. # plt.figure(figsize=(12, 8), constrained_layout=True)
  597. # # 图 1:预警距离
  598. # ax1 = plt.subplot(3, 1, 1)
  599. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  600. # ax1.set_xlabel('Time (s)')
  601. # ax1.set_ylabel('Distance (m)')
  602. # ax1.set_title('Warning Distance Over Time')
  603. # ax1.grid(True)
  604. # ax1.legend()
  605. # # 图 2:相对速度
  606. # ax2 = plt.subplot(3, 1, 2)
  607. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  608. # ax2.set_xlabel('Time (s)')
  609. # ax2.set_ylabel('Speed (m/s)')
  610. # ax2.set_title('Relative Speed Over Time')
  611. # ax2.grid(True)
  612. # ax2.legend()
  613. # # 图 3:TTC
  614. # ax3 = plt.subplot(3, 1, 3)
  615. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  616. # # Add threshold lines
  617. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  618. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  619. # # Mark earliest TTC value
  620. # if len(df) > 0:
  621. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  622. # color='red', s=100, zorder=5,
  623. # label=f'Earliest TTC: {metric_value:.2f}s')
  624. # ax3.set_xlabel('Time (s)')
  625. # ax3.set_ylabel('TTC (s)')
  626. # ax3.set_title('Time To Collision (TTC) Over Time')
  627. # ax3.grid(True)
  628. # ax3.legend()
  629. # # Save image
  630. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_lst_chart.png")
  631. # plt.savefig(chart_filename, dpi=300)
  632. # plt.close()
  633. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  634. # return chart_filename
  635. except Exception as e:
  636. logger.error(f"Failed to generate earliestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
  637. return None
  638. def generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
  639. """
  640. Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_PGVIL metric.
  641. Args:
  642. function_calculator: FunctionCalculator instance
  643. output_dir: Output directory
  644. Returns:
  645. str: Chart file path, or None if generation fails
  646. """
  647. logger = LogManager().get_logger()
  648. metric_name = 'earliestWarningDistance_TTC_PGVIL'
  649. try:
  650. # Get data
  651. ego_df = function_calculator.ego_data.copy()
  652. # Check if correctwarning is already calculated
  653. correctwarning = getattr(function_calculator, 'correctwarning', None)
  654. # Get configured thresholds
  655. thresholds = get_metric_thresholds(function_calculator, metric_name)
  656. max_threshold = thresholds["max"]
  657. min_threshold = thresholds["min"]
  658. # Get calculated warning distance and speed
  659. warning_dist = getattr(function_calculator, 'warning_dist', None)
  660. warning_speed = getattr(function_calculator, 'warning_speed', None)
  661. ttc = getattr(function_calculator, 'ttc', None)
  662. warning_time = getattr(function_calculator, 'warning_time', None)
  663. # Calculate metric value
  664. metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
  665. # Save CSV data
  666. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  667. df_csv = pd.DataFrame({
  668. 'simTime': warning_time,
  669. 'warning_distance': warning_dist,
  670. 'warning_speed': warning_speed,
  671. 'ttc': ttc,
  672. 'min_threshold': min_threshold,
  673. 'max_threshold': max_threshold
  674. })
  675. df_csv.to_csv(csv_filename, index=False)
  676. logger.info(f"{metric_name} data saved to: {csv_filename}")
  677. return csv_filename
  678. # # Read data from CSV
  679. # df = pd.read_csv(csv_filename)
  680. # # Create chart
  681. # plt.figure(figsize=(12, 8), constrained_layout=True)
  682. # # 图 1:预警距离
  683. # ax1 = plt.subplot(3, 1, 1)
  684. # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
  685. # ax1.set_xlabel('Time (s)')
  686. # ax1.set_ylabel('Distance (m)')
  687. # ax1.set_title('Warning Distance Over Time')
  688. # ax1.grid(True)
  689. # ax1.legend()
  690. # # 图 2:相对速度
  691. # ax2 = plt.subplot(3, 1, 2)
  692. # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
  693. # ax2.set_xlabel('Time (s)')
  694. # ax2.set_ylabel('Speed (m/s)')
  695. # ax2.set_title('Relative Speed Over Time')
  696. # ax2.grid(True)
  697. # ax2.legend()
  698. # # 图 3:TTC
  699. # ax3 = plt.subplot(3, 1, 3)
  700. # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
  701. # # Add threshold lines
  702. # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  703. # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  704. # # Mark earliest TTC value
  705. # if len(df) > 0:
  706. # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
  707. # color='red', s=100, zorder=5,
  708. # label=f'Earliest TTC: {metric_value:.2f}s')
  709. # ax3.set_xlabel('Time (s)')
  710. # ax3.set_ylabel('TTC (s)')
  711. # ax3.set_title('Time To Collision (TTC) Over Time')
  712. # ax3.grid(True)
  713. # ax3.legend()
  714. # # Save image
  715. # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_pgvil_chart.png")
  716. # plt.savefig(chart_filename, dpi=300)
  717. # plt.close()
  718. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  719. # return chart_filename
  720. except Exception as e:
  721. logger.error(f"Failed to generate earliestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
  722. return None
  723. def generate_limit_speed_chart(function_calculator, output_dir: str) -> Optional[str]:
  724. """
  725. Generate limit speed chart with data visualization for limitSpeed_LST metric.
  726. Args:
  727. function_calculator: FunctionCalculator instance
  728. output_dir: Output directory
  729. Returns:
  730. str: Chart file path, or None if generation fails
  731. """
  732. logger = LogManager().get_logger()
  733. metric_name = 'limitSpeed_LST'
  734. try:
  735. # Get data
  736. ego_df = function_calculator.ego_data.copy()
  737. # Get configured thresholds
  738. thresholds = get_metric_thresholds(function_calculator, metric_name)
  739. max_threshold = thresholds["max"]
  740. min_threshold = thresholds["min"]
  741. if ego_df.empty:
  742. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  743. return None
  744. # Save CSV data
  745. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  746. df_csv = pd.DataFrame({
  747. 'simTime': ego_df['simTime'],
  748. 'speed': ego_df['v'],
  749. 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df)))
  750. })
  751. df_csv.to_csv(csv_filename, index=False)
  752. logger.info(f"{metric_name} data saved to: {csv_filename}")
  753. return csv_filename
  754. # # Read data from CSV
  755. # df = pd.read_csv(csv_filename)
  756. # # Create chart
  757. # plt.figure(figsize=(12, 6), constrained_layout=True)
  758. # # Plot speed
  759. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  760. # plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
  761. # # Set y-axis range
  762. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  763. # plt.xlabel('Time (s)')
  764. # plt.ylabel('Speed (m/s)')
  765. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  766. # plt.grid(True)
  767. # plt.legend()
  768. # # Save image
  769. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  770. # plt.savefig(chart_filename, dpi=300)
  771. # plt.close()
  772. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  773. # return chart_filename
  774. except Exception as e:
  775. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  776. return None
  777. def generate_limit_speed_past_sign_chart(function_calculator, output_dir: str) -> Optional[str]:
  778. """
  779. Generate limit speed past sign chart with data visualization for limitSpeedPastLimitSign_LST metric.
  780. Args:
  781. function_calculator: FunctionCalculator instance
  782. output_dir: Output directory
  783. Returns:
  784. str: Chart file path, or None if generation fails
  785. """
  786. logger = LogManager().get_logger()
  787. metric_name = 'limitSpeedPastLimitSign_LST'
  788. try:
  789. # Get data
  790. ego_df = function_calculator.ego_data.copy()
  791. # Get configured thresholds
  792. thresholds = get_metric_thresholds(function_calculator, metric_name)
  793. max_threshold = thresholds["max"]
  794. min_threshold = thresholds["min"]
  795. if ego_df.empty:
  796. logger.warning(f"Cannot generate {metric_name} chart: empty data")
  797. return None
  798. # Get sign passing time if available
  799. sign_time = getattr(function_calculator, 'sign_pass_time', None)
  800. if sign_time is None:
  801. # Try to estimate sign passing time (middle of the simulation)
  802. sign_time = ego_df['simTime'].iloc[len(ego_df) // 2]
  803. # Save CSV data
  804. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  805. df_csv = pd.DataFrame({
  806. 'simTime': ego_df['simTime'],
  807. 'speed': ego_df['v'],
  808. 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))),
  809. 'sign_pass_time': sign_time
  810. })
  811. df_csv.to_csv(csv_filename, index=False)
  812. logger.info(f"{metric_name} data saved to: {csv_filename}")
  813. return csv_filename
  814. # # Read data from CSV
  815. # df = pd.read_csv(csv_filename)
  816. # # Create chart
  817. # plt.figure(figsize=(12, 6), constrained_layout=True)
  818. # # Plot speed
  819. # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
  820. # plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
  821. # # Mark sign passing time
  822. # plt.axvline(x=sign_time, color='g', linestyle='--', label='Speed Limit Sign')
  823. # # Set y-axis range
  824. # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
  825. # plt.xlabel('Time (s)')
  826. # plt.ylabel('Speed (m/s)')
  827. # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
  828. # plt.grid(True)
  829. # plt.legend()
  830. # # Save image
  831. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  832. # plt.savefig(chart_filename, dpi=300)
  833. # plt.close()
  834. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  835. # return chart_filename
  836. except Exception as e:
  837. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  838. return None
  839. def generate_max_longitude_dist_chart(function_calculator, output_dir: str) -> Optional[str]:
  840. """
  841. Generate maximum longitudinal distance chart with data visualization for maxLongitudeDist_LST metric.
  842. Args:
  843. function_calculator: FunctionCalculator instance
  844. output_dir: Output directory
  845. Returns:
  846. str: Chart file path, or None if generation fails
  847. """
  848. logger = LogManager().get_logger()
  849. metric_name = 'maxLongitudeDist_LST'
  850. try:
  851. # Get data
  852. ego_df = function_calculator.ego_data.copy()
  853. # Get configured thresholds
  854. thresholds = get_metric_thresholds(function_calculator, metric_name)
  855. max_threshold = thresholds["max"]
  856. min_threshold = thresholds["min"]
  857. # Get longitudinal distance data
  858. longitude_dist = ego_df['longitude_dist'] if 'longitude_dist' in ego_df.columns else None
  859. stop_time = ego_df['stop_time'] if 'stop_time' in ego_df.columns else None
  860. if longitude_dist is None or longitude_dist.empty:
  861. logger.warning(f"Cannot generate {metric_name} chart: missing longitudinal distance data")
  862. return None
  863. # Calculate metric value
  864. metric_value = longitude_dist.max()
  865. max_distance_time = ego_df.loc[longitude_dist.idxmax(), 'simTime']
  866. # Save CSV data
  867. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  868. df_csv = pd.DataFrame({
  869. 'simTime': ego_df['simTime'],
  870. 'x_relative_dist': ego_df['x_relative_dist'],
  871. 'stop_time': stop_time,
  872. 'longitude_dist': longitude_dist
  873. })
  874. df_csv.to_csv(csv_filename, index=False)
  875. logger.info(f"{metric_name} data saved to: {csv_filename}")
  876. return csv_filename
  877. # # Read data from CSV
  878. # df = pd.read_csv(csv_filename)
  879. # # Create chart
  880. # plt.figure(figsize=(12, 6), constrained_layout=True)
  881. # # Plot longitudinal distance
  882. # plt.plot(df['simTime'], df['x_relative_dist'], 'b-', label='Longitudinal Distance')
  883. # # Add threshold lines
  884. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  885. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  886. # # Mark maximum longitudinal distance
  887. # plt.scatter(max_distance_time, metric_value,
  888. # color='red', s=100, zorder=5,
  889. # label=f'Maximum Longitudinal Distance: {metric_value:.2f}m')
  890. # # Set y-axis range
  891. # plt.ylim(bottom=min(0, min_threshold * 0.9), top=max(max_threshold * 1.1, df['longitude_dist'].max() * 1.1))
  892. # plt.xlabel('Time (s)')
  893. # plt.ylabel('Longitudinal Distance (m)')
  894. # plt.title(f'{metric_name} - Longitudinal Distance Over Time')
  895. # plt.grid(True)
  896. # plt.legend()
  897. # # Save image
  898. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  899. # plt.savefig(chart_filename, dpi=300)
  900. # plt.close()
  901. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  902. # return chart_filename
  903. except Exception as e:
  904. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  905. return None
  906. def generate_warning_delay_time_chart(function_calculator, output_dir: str) -> Optional[str]:
  907. """
  908. Generate warning delay time chart with data visualization for warningDelayTime_LST metric.
  909. Args:
  910. function_calculator: FunctionCalculator instance
  911. output_dir: Output directory
  912. Returns:
  913. str: Chart file path, or None if generation fails
  914. """
  915. logger = LogManager().get_logger()
  916. metric_name = 'warningDelayTime_LST'
  917. try:
  918. # Get data
  919. ego_df = function_calculator.ego_data.copy()
  920. # Get configured thresholds
  921. thresholds = get_metric_thresholds(function_calculator, metric_name)
  922. max_threshold = thresholds["max"]
  923. min_threshold = thresholds["min"]
  924. # Check if correctwarning is already calculated
  925. correctwarning = getattr(function_calculator, 'correctwarning', None)
  926. if correctwarning is None:
  927. logger.warning(f"Cannot generate {metric_name} chart: missing correctwarning value")
  928. return None
  929. # Get HMI warning time and rosbag warning time
  930. HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
  931. simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
  932. rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
  933. 'simTime'].tolist()
  934. simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
  935. if (simTime_HMI is None) or (simTime_rosbag is None):
  936. logger.warning(f"Cannot generate {metric_name} chart: missing warning time data")
  937. return None
  938. # Calculate delay time
  939. delay_time = abs(simTime_HMI - simTime_rosbag)
  940. # Save CSV data
  941. csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
  942. df_csv = pd.DataFrame({
  943. 'HMI_warning_time': [simTime_HMI],
  944. 'rosbag_warning_time': [simTime_rosbag],
  945. 'delay_time': [delay_time],
  946. 'min_threshold': [min_threshold],
  947. 'max_threshold': [max_threshold]
  948. })
  949. df_csv.to_csv(csv_filename, index=False)
  950. logger.info(f"{metric_name} data saved to: {csv_filename}")
  951. return csv_filename
  952. # # Create chart - bar chart for delay time
  953. # plt.figure(figsize=(10, 6), constrained_layout=True)
  954. # # Plot delay time as bar
  955. # plt.bar(['Warning Delay Time'], [delay_time], color='blue', width=0.4)
  956. # # Add threshold lines
  957. # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
  958. # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
  959. # # Add value label
  960. # plt.text(0, delay_time + 0.05, f'{delay_time:.3f}s', ha='center', va='bottom', fontweight='bold')
  961. # # Set y-axis range
  962. # plt.ylim(bottom=0, top=max(max_threshold * 1.2, delay_time * 1.2))
  963. # plt.ylabel('Delay Time (s)')
  964. # plt.title(f'{metric_name} - Warning Delay Time')
  965. # plt.grid(True, axis='y')
  966. # plt.legend()
  967. # # Save image
  968. # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
  969. # plt.savefig(chart_filename, dpi=300)
  970. # plt.close()
  971. # logger.info(f"{metric_name} chart saved to: {chart_filename}")
  972. # return chart_filename
  973. except Exception as e:
  974. logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
  975. return None
  976. def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  977. str]:
  978. """
  979. Generate chart data for comfort metrics
  980. Args:
  981. comfort_calculator: ComfortCalculator instance
  982. metric_name: Metric name
  983. output_dir: Output directory
  984. Returns:
  985. str: Chart file path, or None if generation fails
  986. """
  987. logger = LogManager().get_logger()
  988. try:
  989. # 确保输出目录存在
  990. if output_dir:
  991. os.makedirs(output_dir, exist_ok=True)
  992. else:
  993. output_dir = os.getcwd()
  994. # 根据指标名称选择不同的图表生成方法
  995. if metric_name.lower() == 'shake':
  996. return generate_shake_chart(comfort_calculator, output_dir)
  997. elif metric_name.lower() == 'zigzag':
  998. return generate_zigzag_chart(comfort_calculator, output_dir)
  999. elif metric_name.lower() == 'cadence':
  1000. return generate_cadence_chart(comfort_calculator, output_dir)
  1001. elif metric_name.lower() == 'slambrake':
  1002. return generate_slam_brake_chart(comfort_calculator, output_dir)
  1003. elif metric_name.lower() == 'slamaccelerate':
  1004. return generate_slam_accelerate_chart(comfort_calculator, output_dir)
  1005. else:
  1006. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1007. return None
  1008. except Exception as e:
  1009. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1010. return None
  1011. def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1012. """
  1013. Generate shake metric chart with orange background for shake events.
  1014. This version first saves data to CSV, then uses the CSV to generate the chart.
  1015. Args:
  1016. comfort_calculator: ComfortCalculator instance
  1017. output_dir: Output directory
  1018. Returns:
  1019. str: Chart file path, or None if generation fails
  1020. """
  1021. logger = LogManager().get_logger()
  1022. try:
  1023. # 获取数据
  1024. df = comfort_calculator.ego_df.copy()
  1025. shake_events = comfort_calculator.shake_events
  1026. if df.empty:
  1027. logger.warning("Cannot generate shake chart: empty data")
  1028. return None
  1029. # 生成时间戳
  1030. # 保存 CSV 数据(第一步)
  1031. csv_filename = os.path.join(output_dir, f"shake_data.csv")
  1032. # logger.info(f"self.data列名对象:: {df.columns}")
  1033. df_csv = pd.DataFrame({
  1034. 'simTime': df['simTime'],
  1035. 'lat_acc': df['lat_acc_vehicle'],
  1036. 'lat_acc_rate': df['lat_acc_rate'],
  1037. 'speedH_std': df['speedH_std'],
  1038. 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None] * len(df))),
  1039. 'lat_acc_rate_threshold': 0.5,
  1040. 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None] * len(df))),
  1041. })
  1042. df_csv.to_csv(csv_filename, index=False)
  1043. logger.info(f"Shake data saved to: {csv_filename}")
  1044. return csv_filename
  1045. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1046. # df = pd.read_csv(csv_filename)
  1047. # # 创建图表(第三步)
  1048. # import matplotlib.pyplot as plt
  1049. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1050. # # 图 1:横向加速度
  1051. # ax1 = plt.subplot(3, 1, 1)
  1052. # ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
  1053. # if 'lat_acc_threshold' in df.columns:
  1054. # ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold')
  1055. # for idx, event in enumerate(shake_events):
  1056. # label = 'Shake Event' if idx == 0 else None
  1057. # ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1058. # ax1.set_xlabel('Time (s)')
  1059. # ax1.set_ylabel('Lateral Acceleration (m/s²)')
  1060. # ax1.set_title('Shake Event Detection - Lateral Acceleration')
  1061. # ax1.grid(True)
  1062. # ax1.legend()
  1063. # # 图 2:lat_acc_rate
  1064. # ax2 = plt.subplot(3, 1, 2)
  1065. # ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate')
  1066. # ax2.axhline(
  1067. # y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold'
  1068. # )
  1069. # for idx, event in enumerate(shake_events):
  1070. # label = 'Shake Event' if idx == 0 else None
  1071. # ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1072. # ax2.set_xlabel('Time (s)')
  1073. # ax2.set_ylabel('Angular Velocity (m/s³)')
  1074. # ax2.set_title('Shake Event Detection - lat_acc_rate')
  1075. # ax2.grid(True)
  1076. # ax2.legend()
  1077. # # 图 3:speedH_std
  1078. # ax3 = plt.subplot(3, 1, 3)
  1079. # ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std')
  1080. # if 'speedH_std_threshold' in df.columns:
  1081. # ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold')
  1082. # for idx, event in enumerate(shake_events):
  1083. # label = 'Shake Event' if idx == 0 else None
  1084. # ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
  1085. # ax3.set_xlabel('Time (s)')
  1086. # ax3.set_ylabel('Angular Velocity (deg/s)')
  1087. # ax3.set_title('Shake Event Detection - speedH_std')
  1088. # ax3.grid(True)
  1089. # ax3.legend()
  1090. # # 保存图像
  1091. # chart_filename = os.path.join(output_dir, f"shake_chart.png")
  1092. # plt.savefig(chart_filename, dpi=300)
  1093. # plt.close()
  1094. # logger.info(f"Shake chart saved to: {chart_filename}")
  1095. # return chart_filename
  1096. except Exception as e:
  1097. logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True)
  1098. return None
  1099. def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1100. """
  1101. Generate zigzag metric chart with orange background for zigzag events.
  1102. This version first saves data to CSV, then uses the CSV to generate the chart.
  1103. Args:
  1104. comfort_calculator: ComfortCalculator instance
  1105. output_dir: Output directory
  1106. Returns:
  1107. str: Chart file path, or None if generation fails
  1108. """
  1109. logger = LogManager().get_logger()
  1110. try:
  1111. # 获取数据
  1112. df = comfort_calculator.ego_df.copy()
  1113. zigzag_events = comfort_calculator.discomfort_df[
  1114. comfort_calculator.discomfort_df['type'] == 'zigzag'
  1115. ].copy()
  1116. if df.empty:
  1117. logger.warning("Cannot generate zigzag chart: empty data")
  1118. return None
  1119. # 生成时间戳
  1120. # 保存 CSV 数据(第一步)
  1121. csv_filename = os.path.join(output_dir, f"zigzag_data.csv")
  1122. df_csv = pd.DataFrame({
  1123. 'simTime': df['simTime'],
  1124. 'speedH': df['speedH'],
  1125. 'posH': df['posH'],
  1126. 'min_threshold': -10, # 可替换为动态阈值0.02618
  1127. 'max_threshold': 10
  1128. })
  1129. df_csv.to_csv(csv_filename, index=False)
  1130. logger.info(f"Zigzag data saved to: {csv_filename}")
  1131. return csv_filename
  1132. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1133. # df = pd.read_csv(csv_filename)
  1134. # # 创建图表(第三步)
  1135. # import matplotlib.pyplot as plt
  1136. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1137. # # ===== 子图1:Yaw Rate =====
  1138. # ax1 = plt.subplot(2, 1, 1)
  1139. # ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
  1140. # # 添加 speedH 上下限阈值线
  1141. # ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)')
  1142. # ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)')
  1143. # # 添加橙色背景:Zigzag Events
  1144. # for idx, event in zigzag_events.iterrows():
  1145. # label = 'Zigzag Event' if idx == 0 else None
  1146. # ax1.axvspan(event['start_time'], event['end_time'],
  1147. # alpha=0.3, color='orange', label=label)
  1148. # ax1.set_xlabel('Time (s)')
  1149. # ax1.set_ylabel('Angular Velocity (deg/s)')
  1150. # ax1.set_title('Zigzag Event Detection - Yaw Rate')
  1151. # ax1.grid(True)
  1152. # ax1.legend(loc='upper left')
  1153. # # ===== 子图2:Yaw Angle =====
  1154. # ax2 = plt.subplot(2, 1, 2)
  1155. # ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw')
  1156. # # 添加橙色背景:Zigzag Events
  1157. # for idx, event in zigzag_events.iterrows():
  1158. # label = 'Zigzag Event' if idx == 0 else None
  1159. # ax2.axvspan(event['start_time'], event['end_time'],
  1160. # alpha=0.3, color='orange', label=label)
  1161. # ax2.set_xlabel('Time (s)')
  1162. # ax2.set_ylabel('Yaw (deg)')
  1163. # ax2.set_title('Zigzag Event Detection - Yaw Angle')
  1164. # ax2.grid(True)
  1165. # ax2.legend(loc='upper left')
  1166. # # 保存图像
  1167. # chart_filename = os.path.join(output_dir, f"zigzag_chart.png")
  1168. # plt.savefig(chart_filename, dpi=300)
  1169. # plt.close()
  1170. # logger.info(f"Zigzag chart saved to: {chart_filename}")
  1171. # return csv_filename
  1172. except Exception as e:
  1173. logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True)
  1174. return None
  1175. def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1176. """
  1177. Generate cadence metric chart with orange background for cadence events.
  1178. This version first saves data to CSV, then uses the CSV to generate the chart.
  1179. Args:
  1180. comfort_calculator: ComfortCalculator instance
  1181. output_dir: Output directory
  1182. Returns:
  1183. str: Chart file path, or None if generation fails
  1184. """
  1185. logger = LogManager().get_logger()
  1186. try:
  1187. # 获取数据
  1188. df = comfort_calculator.ego_df.copy()
  1189. cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy()
  1190. if df.empty:
  1191. logger.warning("Cannot generate cadence chart: empty data")
  1192. return None
  1193. # 生成时间戳
  1194. # 保存 CSV 数据(第一步)
  1195. csv_filename = os.path.join(output_dir, f"cadence_data.csv")
  1196. df_csv = pd.DataFrame({
  1197. 'simTime': df['simTime'],
  1198. 'lon_acc': df['lon_acc_vehicle'],
  1199. 'v': df['v'],
  1200. 'ip_acc': df.get('ip_acc', pd.Series([None] * len(df))),
  1201. 'ip_dec': df.get('ip_dec', pd.Series([None] * len(df)))
  1202. })
  1203. df_csv.to_csv(csv_filename, index=False)
  1204. logger.info(f"Cadence data saved to: {csv_filename}")
  1205. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1206. # df = pd.read_csv(csv_filename)
  1207. # # 创建图表(第三步)
  1208. # import matplotlib.pyplot as plt
  1209. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1210. # # 图 1:纵向加速度
  1211. # ax1 = plt.subplot(2, 1, 1)
  1212. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1213. # if 'ip_acc' in df.columns and 'ip_dec' in df.columns:
  1214. # ax1.plot(df['simTime'], df['ip_acc'], 'r--', label='Acceleration Threshold')
  1215. # ax1.plot(df['simTime'], df['ip_dec'], 'g--', label='Deceleration Threshold')
  1216. # # 添加橙色背景标识顿挫事件
  1217. # for idx, event in cadence_events.iterrows():
  1218. # label = 'Cadence Event' if idx == 0 else None
  1219. # ax1.axvspan(event['start_time'], event['end_time'],
  1220. # alpha=0.3, color='orange', label=label)
  1221. # ax1.set_xlabel('Time (s)')
  1222. # ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1223. # ax1.set_title('Cadence Event Detection - Longitudinal Acceleration')
  1224. # ax1.grid(True)
  1225. # ax1.legend()
  1226. # # 图 2:速度
  1227. # ax2 = plt.subplot(2, 1, 2)
  1228. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1229. # # 添加橙色背景标识顿挫事件
  1230. # for idx, event in cadence_events.iterrows():
  1231. # label = 'Cadence Event' if idx == 0 else None
  1232. # ax2.axvspan(event['start_time'], event['end_time'],
  1233. # alpha=0.3, color='orange', label=label)
  1234. # ax2.set_xlabel('Time (s)')
  1235. # ax2.set_ylabel('Velocity (m/s)')
  1236. # ax2.set_title('Cadence Event Detection - Vehicle Speed')
  1237. # ax2.grid(True)
  1238. # ax2.legend()
  1239. # # 保存图像
  1240. # chart_filename = os.path.join(output_dir, f"cadence_chart.png")
  1241. # plt.savefig(chart_filename, dpi=300)
  1242. # plt.close()
  1243. # logger.info(f"Cadence chart saved to: {chart_filename}")
  1244. # return chart_filename
  1245. except Exception as e:
  1246. logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True)
  1247. return None
  1248. def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1249. """
  1250. Generate slam brake metric chart with orange background for slam brake events.
  1251. This version first saves data to CSV, then uses the CSV to generate the chart.
  1252. Args:
  1253. comfort_calculator: ComfortCalculator instance
  1254. output_dir: Output directory
  1255. Returns:
  1256. str: Chart file path, or None if generation fails
  1257. """
  1258. logger = LogManager().get_logger()
  1259. try:
  1260. # 获取数据
  1261. df = comfort_calculator.ego_df.copy()
  1262. slam_brake_events = comfort_calculator.discomfort_df[
  1263. comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy()
  1264. if df.empty:
  1265. logger.warning("Cannot generate slam brake chart: empty data")
  1266. return None
  1267. # 生成时间戳
  1268. # 保存 CSV 数据(第一步)
  1269. csv_filename = os.path.join(output_dir, f"slam_brake_data.csv")
  1270. df_csv = pd.DataFrame({
  1271. 'simTime': df['simTime'],
  1272. 'lon_acc': df['lon_acc_vehicle'],
  1273. 'v': df['v'],
  1274. 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))),
  1275. 'max_threshold': 0.0
  1276. })
  1277. df_csv.to_csv(csv_filename, index=False)
  1278. logger.info(f"Slam brake data saved to: {csv_filename}")
  1279. return csv_filename
  1280. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1281. # df = pd.read_csv(csv_filename)
  1282. # # 创建图表(第三步)
  1283. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1284. # # 图 1:纵向加速度
  1285. # ax1 = plt.subplot(2, 1, 1)
  1286. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1287. # if 'min_threshold' in df.columns:
  1288. # ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Deceleration Threshold')
  1289. # # 添加橙色背景标识急刹车事件
  1290. # for idx, event in slam_brake_events.iterrows():
  1291. # label = 'Slam Brake Event' if idx == 0 else None
  1292. # ax1.axvspan(event['start_time'], event['end_time'],
  1293. # alpha=0.3, color='orange', label=label)
  1294. # ax1.set_xlabel('Time (s)')
  1295. # ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
  1296. # ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration')
  1297. # ax1.grid(True)
  1298. # ax1.legend()
  1299. # # 图 2:速度
  1300. # ax2 = plt.subplot(2, 1, 2)
  1301. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1302. # # 添加橙色背景标识急刹车事件
  1303. # for idx, event in slam_brake_events.iterrows():
  1304. # label = 'Slam Brake Event' if idx == 0 else None
  1305. # ax2.axvspan(event['start_time'], event['end_time'],
  1306. # alpha=0.3, color='orange', label=label)
  1307. # ax2.set_xlabel('Time (s)')
  1308. # ax2.set_ylabel('Velocity (m/s)')
  1309. # ax2.set_title('Slam Brake Event Detection - Vehicle Speed')
  1310. # ax2.grid(True)
  1311. # ax2.legend()
  1312. # # 保存图像
  1313. # chart_filename = os.path.join(output_dir, f"slam_brake_chart.png")
  1314. # plt.savefig(chart_filename, dpi=300)
  1315. # plt.close()
  1316. # logger.info(f"Slam brake chart saved to: {chart_filename}")
  1317. # return chart_filename
  1318. except Exception as e:
  1319. logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True)
  1320. return None
  1321. def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]:
  1322. """
  1323. Generate slam accelerate metric chart with orange background for slam accelerate events.
  1324. This version first saves data to CSV, then uses the CSV to generate the chart.
  1325. Args:
  1326. comfort_calculator: ComfortCalculator instance
  1327. output_dir: Output directory
  1328. Returns:
  1329. str: Chart file path, or None if generation fails
  1330. """
  1331. logger = LogManager().get_logger()
  1332. try:
  1333. # 获取数据
  1334. df = comfort_calculator.ego_df.copy()
  1335. slam_accel_events = comfort_calculator.discomfort_df[
  1336. (comfort_calculator.discomfort_df['type'] == 'slam_accel')
  1337. ].copy()
  1338. if df.empty:
  1339. logger.warning("Cannot generate slam accelerate chart: empty data")
  1340. return None
  1341. # 生成时间戳
  1342. # 保存 CSV 数据(第一步)
  1343. csv_filename = os.path.join(output_dir, f"slam_accel_data.csv")
  1344. # 获取加速度阈值(如果存在)
  1345. accel_threshold = df.get('ip_acc', pd.Series([None] * len(df)))
  1346. df_csv = pd.DataFrame({
  1347. 'simTime': df['simTime'],
  1348. 'lon_acc': df['lon_acc_vehicle'],
  1349. 'v': df['v'],
  1350. 'min_threshold': 0.0, # 加速度最小阈值设为0
  1351. 'max_threshold': accel_threshold # 急加速阈值
  1352. })
  1353. df_csv.to_csv(csv_filename, index=False)
  1354. logger.info(f"Slam accelerate data saved to: {csv_filename}")
  1355. return csv_filename
  1356. # # 第二步:从 CSV 读取(可验证保存数据无误)
  1357. # df = pd.read_csv(csv_filename)
  1358. # # 创建图表(第三步)
  1359. # plt.figure(figsize=(12, 8), constrained_layout=True)
  1360. # # 图 1:纵向加速度
  1361. # ax1 = plt.subplot(2, 1, 1)
  1362. # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
  1363. # # 添加加速度阈值线
  1364. # if 'max_threshold' in df.columns and not df['max_threshold'].isnull().all():
  1365. # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Acceleration Threshold')
  1366. # # 添加橙色背景标识急加速事件
  1367. # for idx, event in slam_accel_events.iterrows():
  1368. # label = 'Slam Accelerate Event' if idx == 0 else None
  1369. # ax1.axvspan(event['start_time'], event['end_time'],
  1370. # alpha=0.3, color='orange', label=label)
  1371. # ax1.set_xlabel('Time (s)')
  1372. # ax1.set_ylabel('Acceleration (m/s²)')
  1373. # ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration')
  1374. # ax1.grid(True)
  1375. # ax1.legend()
  1376. # # 图 2:速度
  1377. # ax2 = plt.subplot(2, 1, 2)
  1378. # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
  1379. # # 添加橙色背景标识急加速事件
  1380. # for idx, event in slam_accel_events.iterrows():
  1381. # label = 'Slam Accelerate Event' if idx == 0 else None
  1382. # ax2.axvspan(event['start_time'], event['end_time'],
  1383. # alpha=0.3, color='orange', label=label)
  1384. # ax2.set_xlabel('Time (s)')
  1385. # ax2.set_ylabel('Velocity (m/s)')
  1386. # ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed')
  1387. # ax2.grid(True)
  1388. # ax2.legend()
  1389. # # 保存图像
  1390. # chart_filename = os.path.join(output_dir, f"slam_accel_chart.png")
  1391. # plt.savefig(chart_filename, dpi=300)
  1392. # plt.close()
  1393. # logger.info(f"Slam accelerate chart saved to: {chart_filename}")
  1394. # return chart_filename
  1395. except Exception as e:
  1396. logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True)
  1397. return None
  1398. def get_metric_thresholds(calculator, metric_name: str) -> dict:
  1399. """
  1400. 从配置文件中获取指标的阈值
  1401. Args:
  1402. calculator: Calculator instance (FunctionCalculator, SafetyCalculator, ComfortCalculator, EfficientCalculator, TrafficCalculator)
  1403. metric_name: 指标名称
  1404. Returns:
  1405. dict: 包含min和max阈值的字典
  1406. """
  1407. logger = LogManager().get_logger()
  1408. thresholds = {"min": None, "max": None}
  1409. try:
  1410. # 根据计算器类型获取配置
  1411. if hasattr(calculator, 'data_processed'):
  1412. # 检查安全性指标配置
  1413. if hasattr(calculator.data_processed,
  1414. 'safety_config') and 'safety' in calculator.data_processed.safety_config:
  1415. config = calculator.data_processed.safety_config['safety']
  1416. metric_type = 'safety'
  1417. # 检查舒适性指标配置
  1418. elif hasattr(calculator.data_processed,
  1419. 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config:
  1420. config = calculator.data_processed.comfort_config['comfort']
  1421. metric_type = 'comfort'
  1422. # 检查功能性指标配置
  1423. elif hasattr(calculator.data_processed,
  1424. 'function_config') and 'function' in calculator.data_processed.function_config:
  1425. config = calculator.data_processed.function_config['function']
  1426. metric_type = 'function'
  1427. # 检查高效性指标配置
  1428. elif hasattr(calculator.data_processed,
  1429. 'efficient_config') and 'efficient' in calculator.data_processed.efficient_config:
  1430. config = calculator.data_processed.efficient_config['efficient']
  1431. metric_type = 'efficient'
  1432. # 检查交通性指标配置
  1433. elif hasattr(calculator.data_processed,
  1434. 'traffic_config') and 'traffic' in calculator.data_processed.traffic_config:
  1435. config = calculator.data_processed.traffic_config['traffic']
  1436. metric_type = 'traffic'
  1437. else:
  1438. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1439. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1440. config = calculator.function_config['function']
  1441. metric_type = 'function'
  1442. else:
  1443. logger.warning(f"无法找到{metric_name}的配置信息")
  1444. return thresholds
  1445. else:
  1446. # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
  1447. if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
  1448. config = calculator.function_config['function']
  1449. metric_type = 'function'
  1450. else:
  1451. logger.warning(f"计算器没有data_processed属性或function_config属性")
  1452. return thresholds
  1453. # 递归查找指标配置
  1454. def find_metric_config(node, target_name):
  1455. if isinstance(node, dict):
  1456. if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node:
  1457. return node
  1458. for key, value in node.items():
  1459. result = find_metric_config(value, target_name)
  1460. if result:
  1461. return result
  1462. return None
  1463. # 查找指标配置
  1464. metric_config = find_metric_config(config, metric_name)
  1465. if metric_config:
  1466. thresholds["min"] = metric_config.get("min")
  1467. thresholds["max"] = metric_config.get("max")
  1468. logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}")
  1469. else:
  1470. logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息")
  1471. except Exception as e:
  1472. logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True)
  1473. return thresholds
  1474. def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
  1475. """
  1476. Generate chart data for safety metrics
  1477. Args:
  1478. safety_calculator: SafetyCalculator instance
  1479. metric_name: Metric name
  1480. output_dir: Output directory
  1481. Returns:
  1482. str: Chart file path, or None if generation fails
  1483. """
  1484. logger = LogManager().get_logger()
  1485. try:
  1486. # 确保输出目录存在
  1487. if output_dir:
  1488. os.makedirs(output_dir, exist_ok=True)
  1489. else:
  1490. output_dir = os.getcwd()
  1491. # 根据指标名称选择不同的图表生成方法
  1492. if metric_name.lower() == 'ttc':
  1493. return generate_ttc_chart(safety_calculator, output_dir)
  1494. elif metric_name.lower() == 'mttc':
  1495. return generate_mttc_chart(safety_calculator, output_dir)
  1496. elif metric_name.lower() == 'thw':
  1497. return generate_thw_chart(safety_calculator, output_dir)
  1498. elif metric_name.lower() == 'lonsd':
  1499. return generate_lonsd_chart(safety_calculator, output_dir)
  1500. elif metric_name.lower() == 'latsd':
  1501. return generate_latsd_chart(safety_calculator, output_dir)
  1502. elif metric_name.lower() == 'btn':
  1503. return generate_btn_chart(safety_calculator, output_dir)
  1504. elif metric_name.lower() == 'collisionrisk':
  1505. return generate_collision_risk_chart(safety_calculator, output_dir)
  1506. elif metric_name.lower() == 'collisionseverity':
  1507. return generate_collision_severity_chart(safety_calculator, output_dir)
  1508. else:
  1509. logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
  1510. return None
  1511. except Exception as e:
  1512. logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
  1513. return None
  1514. def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1515. """
  1516. Generate TTC metric chart with orange background for unsafe events.
  1517. This version first saves data to CSV, then uses the CSV to generate the chart.
  1518. Args:
  1519. safety_calculator: SafetyCalculator instance
  1520. output_dir: Output directory
  1521. Returns:
  1522. str: Chart file path, or None if generation fails
  1523. """
  1524. logger = LogManager().get_logger()
  1525. try:
  1526. # 获取数据
  1527. ttc_data = safety_calculator.ttc_data
  1528. if not ttc_data:
  1529. logger.warning("Cannot generate TTC chart: empty data")
  1530. return None
  1531. # 创建DataFrame
  1532. df = pd.DataFrame(ttc_data)
  1533. # 获取阈值
  1534. thresholds = get_metric_thresholds(safety_calculator, 'TTC')
  1535. min_threshold = thresholds.get('min')
  1536. max_threshold = thresholds.get('max')
  1537. # 生成时间戳
  1538. # 保存 CSV 数据(第一步)
  1539. csv_filename = os.path.join(output_dir, f"ttc_data.csv")
  1540. df_csv = pd.DataFrame({
  1541. 'simTime': df['simTime'],
  1542. 'simFrame': df['simFrame'],
  1543. 'TTC': df['TTC'],
  1544. 'min_threshold': min_threshold,
  1545. 'max_threshold': max_threshold
  1546. })
  1547. df_csv.to_csv(csv_filename, index=False)
  1548. logger.info(f"TTC data saved to: {csv_filename}")
  1549. return csv_filename
  1550. # 第二步:从 CSV 读取(可验证保存数据无误)
  1551. df = pd.read_csv(csv_filename)
  1552. # 检测超阈值事件
  1553. unsafe_events = []
  1554. if min_threshold is not None:
  1555. # 对于TTC,小于最小阈值视为不安全
  1556. unsafe_condition = df['TTC'] < min_threshold
  1557. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1558. for _, group in df[unsafe_condition].groupby(event_groups):
  1559. if len(group) >= 2: # 至少2帧才算一次事件
  1560. start_time = group['simTime'].iloc[0]
  1561. end_time = group['simTime'].iloc[-1]
  1562. duration = end_time - start_time
  1563. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1564. unsafe_events.append({
  1565. 'start_time': start_time,
  1566. 'end_time': end_time,
  1567. 'start_frame': group['simFrame'].iloc[0],
  1568. 'end_frame': group['simFrame'].iloc[-1],
  1569. 'duration': duration,
  1570. 'min_ttc': group['TTC'].min()
  1571. })
  1572. # 记录不安全事件信息
  1573. if unsafe_events:
  1574. logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件")
  1575. for i, event in enumerate(unsafe_events):
  1576. logger.info(
  1577. f"TTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s")
  1578. logger.info(f"TTC chart saved to: {chart_filename}")
  1579. return chart_filename
  1580. except Exception as e:
  1581. logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True)
  1582. return None
  1583. def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1584. """
  1585. Generate MTTC metric chart with orange background for unsafe events
  1586. Args:
  1587. safety_calculator: SafetyCalculator instance
  1588. output_dir: Output directory
  1589. Returns:
  1590. str: Chart file path, or None if generation fails
  1591. """
  1592. logger = LogManager().get_logger()
  1593. try:
  1594. # 获取数据
  1595. mttc_data = safety_calculator.mttc_data
  1596. if not mttc_data:
  1597. logger.warning("Cannot generate MTTC chart: empty data")
  1598. return None
  1599. # 创建DataFrame
  1600. df = pd.DataFrame(mttc_data)
  1601. # 获取阈值
  1602. thresholds = get_metric_thresholds(safety_calculator, 'MTTC')
  1603. min_threshold = thresholds.get('min')
  1604. max_threshold = thresholds.get('max')
  1605. # 检测超阈值事件
  1606. unsafe_events = []
  1607. if min_threshold is not None:
  1608. # 对于MTTC,小于最小阈值视为不安全
  1609. unsafe_condition = df['MTTC'] < min_threshold
  1610. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1611. for _, group in df[unsafe_condition].groupby(event_groups):
  1612. if len(group) >= 2: # 至少2帧才算一次事件
  1613. start_time = group['simTime'].iloc[0]
  1614. end_time = group['simTime'].iloc[-1]
  1615. duration = end_time - start_time
  1616. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1617. unsafe_events.append({
  1618. 'start_time': start_time,
  1619. 'end_time': end_time,
  1620. 'start_frame': group['simFrame'].iloc[0],
  1621. 'end_frame': group['simFrame'].iloc[-1],
  1622. 'duration': duration,
  1623. 'min_mttc': group['MTTC'].min()
  1624. })
  1625. # 保存CSV数据,包含阈值信息
  1626. csv_filename = os.path.join(output_dir, f"mttc_data.csv")
  1627. df_csv = df.copy()
  1628. df_csv['min_threshold'] = min_threshold
  1629. df_csv['max_threshold'] = max_threshold
  1630. df_csv.to_csv(csv_filename, index=False)
  1631. # 记录不安全事件信息
  1632. if unsafe_events:
  1633. logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件")
  1634. for i, event in enumerate(unsafe_events):
  1635. logger.info(
  1636. f"MTTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s")
  1637. logger.info(f"MTTC data saved to: {csv_filename}")
  1638. return csv_filename
  1639. except Exception as e:
  1640. logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True)
  1641. return None
  1642. def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1643. """
  1644. Generate THW metric chart with orange background for unsafe events
  1645. Args:
  1646. safety_calculator: SafetyCalculator instance
  1647. output_dir: Output directory
  1648. Returns:
  1649. str: Chart file path, or None if generation fails
  1650. """
  1651. logger = LogManager().get_logger()
  1652. try:
  1653. # 获取数据
  1654. thw_data = safety_calculator.thw_data
  1655. if not thw_data:
  1656. logger.warning("Cannot generate THW chart: empty data")
  1657. return None
  1658. # 创建DataFrame
  1659. df = pd.DataFrame(thw_data)
  1660. # 获取阈值
  1661. thresholds = get_metric_thresholds(safety_calculator, 'THW')
  1662. min_threshold = thresholds.get('min')
  1663. max_threshold = 60
  1664. # 检测超阈值事件
  1665. unsafe_events = []
  1666. if min_threshold is not None:
  1667. # 对于THW,小于最小阈值视为不安全
  1668. unsafe_condition = df['THW'] < min_threshold
  1669. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1670. for _, group in df[unsafe_condition].groupby(event_groups):
  1671. if len(group) >= 2: # 至少2帧才算一次事件
  1672. start_time = group['simTime'].iloc[0]
  1673. end_time = group['simTime'].iloc[-1]
  1674. duration = end_time - start_time
  1675. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1676. unsafe_events.append({
  1677. 'start_time': start_time,
  1678. 'end_time': end_time,
  1679. 'start_frame': group['simFrame'].iloc[0],
  1680. 'end_frame': group['simFrame'].iloc[-1],
  1681. 'duration': duration,
  1682. 'min_thw': group['THW'].min()
  1683. })
  1684. # 保存CSV数据,包含阈值信息
  1685. csv_filename = os.path.join(output_dir, f"thw_data.csv")
  1686. df_csv = df.copy()
  1687. df_csv['min_threshold'] = min_threshold
  1688. df_csv['max_threshold'] = max_threshold
  1689. df_csv.to_csv(csv_filename, index=False)
  1690. # 记录不安全事件信息
  1691. if unsafe_events:
  1692. logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件")
  1693. for i, event in enumerate(unsafe_events):
  1694. logger.info(
  1695. f"THW不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s")
  1696. logger.info(f"THW data saved to: {csv_filename}")
  1697. return csv_filename
  1698. except Exception as e:
  1699. logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True)
  1700. return None
  1701. def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1702. """
  1703. Generate Longitudinal Safe Distance metric chart
  1704. Args:
  1705. safety_calculator: SafetyCalculator instance
  1706. output_dir: Output directory
  1707. Returns:
  1708. str: Chart file path, or None if generation fails
  1709. """
  1710. logger = LogManager().get_logger()
  1711. try:
  1712. # 获取数据
  1713. lonsd_data = safety_calculator.lonsd_data
  1714. if not lonsd_data:
  1715. logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data")
  1716. return None
  1717. # 创建DataFrame
  1718. df = pd.DataFrame(lonsd_data)
  1719. # 获取阈值
  1720. thresholds = get_metric_thresholds(safety_calculator, 'LonSD')
  1721. min_threshold = thresholds.get('min')
  1722. max_threshold = thresholds.get('max')
  1723. # # 创建图表
  1724. # plt.figure(figsize=(12, 6))
  1725. # plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance')
  1726. # # 添加阈值线
  1727. # if min_threshold is not None:
  1728. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1729. # if max_threshold is not None:
  1730. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1731. # plt.xlabel('Time (s)')
  1732. # plt.ylabel('Distance (m)')
  1733. # plt.title('Longitudinal Safe Distance (LonSD) Trend')
  1734. # plt.grid(True)
  1735. # plt.legend()
  1736. # # 保存图表
  1737. # chart_filename = os.path.join(output_dir, f"lonsd_chart.png")
  1738. # plt.savefig(chart_filename, dpi=300)
  1739. # plt.close()
  1740. # logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}")
  1741. # 保存CSV数据,包含阈值信息
  1742. csv_filename = os.path.join(output_dir, f"lonsd_data.csv")
  1743. df_csv = df.copy()
  1744. df_csv['min_threshold'] = min_threshold
  1745. df_csv['max_threshold'] = max_threshold
  1746. df_csv.to_csv(csv_filename, index=False)
  1747. logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}")
  1748. return csv_filename
  1749. except Exception as e:
  1750. logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True)
  1751. return None
  1752. def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1753. """
  1754. Generate Lateral Safe Distance metric chart with orange background for unsafe events
  1755. Args:
  1756. safety_calculator: SafetyCalculator instance
  1757. output_dir: Output directory
  1758. Returns:
  1759. str: Chart file path, or None if generation fails
  1760. """
  1761. logger = LogManager().get_logger()
  1762. try:
  1763. # 获取数据
  1764. latsd_data = safety_calculator.latsd_data
  1765. if not latsd_data:
  1766. logger.warning("Cannot generate Lateral Safe Distance chart: empty data")
  1767. return None
  1768. # 创建DataFrame
  1769. df = pd.DataFrame(latsd_data)
  1770. # 获取阈值
  1771. thresholds = get_metric_thresholds(safety_calculator, 'LatSD')
  1772. min_threshold = thresholds.get('min')
  1773. max_threshold = thresholds.get('max')
  1774. # 检测超阈值事件
  1775. unsafe_events = []
  1776. if min_threshold is not None:
  1777. # 对于LatSD,小于最小阈值视为不安全
  1778. unsafe_condition = df['LatSD'] < min_threshold
  1779. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1780. for _, group in df[unsafe_condition].groupby(event_groups):
  1781. if len(group) >= 2: # 至少2帧才算一次事件
  1782. start_time = group['simTime'].iloc[0]
  1783. end_time = group['simTime'].iloc[-1]
  1784. duration = end_time - start_time
  1785. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1786. unsafe_events.append({
  1787. 'start_time': start_time,
  1788. 'end_time': end_time,
  1789. 'start_frame': group['simFrame'].iloc[0],
  1790. 'end_frame': group['simFrame'].iloc[-1],
  1791. 'duration': duration,
  1792. 'min_latsd': group['LatSD'].min()
  1793. })
  1794. # # 创建图表
  1795. # plt.figure(figsize=(12, 6))
  1796. # plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance')
  1797. # # 添加阈值线
  1798. # if min_threshold is not None:
  1799. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
  1800. # if max_threshold is not None:
  1801. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
  1802. # # 添加橙色背景标识不安全事件
  1803. # for idx, event in enumerate(unsafe_events):
  1804. # label = 'Unsafe LatSD Event' if idx == 0 else None
  1805. # plt.axvspan(event['start_time'], event['end_time'],
  1806. # alpha=0.3, color='orange', label=label)
  1807. # plt.xlabel('Time (s)')
  1808. # plt.ylabel('Distance (m)')
  1809. # plt.title('Lateral Safe Distance (LatSD) Trend')
  1810. # plt.grid(True)
  1811. # plt.legend()
  1812. # # 保存图表
  1813. # chart_filename = os.path.join(output_dir, f"latsd_chart.png")
  1814. # plt.savefig(chart_filename, dpi=300)
  1815. # plt.close()
  1816. # logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}")
  1817. # 保存CSV数据,包含阈值信息
  1818. csv_filename = os.path.join(output_dir, f"latsd_data.csv")
  1819. df_csv = df.copy()
  1820. df_csv['min_threshold'] = min_threshold
  1821. df_csv['max_threshold'] = max_threshold
  1822. df_csv.to_csv(csv_filename, index=False)
  1823. # 记录不安全事件信息
  1824. if unsafe_events:
  1825. logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件")
  1826. for i, event in enumerate(unsafe_events):
  1827. logger.info(
  1828. f"LatSD不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m")
  1829. logger.info(f"Lateral Safe Distance data saved to: {csv_filename}")
  1830. return csv_filename
  1831. except Exception as e:
  1832. logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True)
  1833. return None
  1834. def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1835. """
  1836. Generate Brake Threat Number metric chart with orange background for unsafe events
  1837. Args:
  1838. safety_calculator: SafetyCalculator instance
  1839. output_dir: Output directory
  1840. Returns:
  1841. str: Chart file path, or None if generation fails
  1842. """
  1843. logger = LogManager().get_logger()
  1844. try:
  1845. # 获取数据
  1846. btn_data = safety_calculator.btn_data
  1847. if not btn_data:
  1848. logger.warning("Cannot generate Brake Threat Number chart: empty data")
  1849. return None
  1850. # 创建DataFrame
  1851. df = pd.DataFrame(btn_data)
  1852. # 获取阈值
  1853. thresholds = get_metric_thresholds(safety_calculator, 'BTN')
  1854. min_threshold = thresholds.get('min')
  1855. max_threshold = thresholds.get('max')
  1856. # 检测超阈值事件
  1857. unsafe_events = []
  1858. if max_threshold is not None:
  1859. # 对于BTN,大于最大阈值视为不安全
  1860. unsafe_condition = df['BTN'] > max_threshold
  1861. event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
  1862. for _, group in df[unsafe_condition].groupby(event_groups):
  1863. if len(group) >= 2: # 至少2帧才算一次事件
  1864. start_time = group['simTime'].iloc[0]
  1865. end_time = group['simTime'].iloc[-1]
  1866. duration = end_time - start_time
  1867. if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
  1868. unsafe_events.append({
  1869. 'start_time': start_time,
  1870. 'end_time': end_time,
  1871. 'start_frame': group['simFrame'].iloc[0],
  1872. 'end_frame': group['simFrame'].iloc[-1],
  1873. 'duration': duration,
  1874. 'max_btn': group['BTN'].max()
  1875. })
  1876. # # 创建图表
  1877. # plt.figure(figsize=(12, 6))
  1878. # plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number')
  1879. # # 添加阈值线
  1880. # if min_threshold is not None:
  1881. # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})')
  1882. # if max_threshold is not None:
  1883. # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
  1884. # # 添加橙色背景标识不安全事件
  1885. # for idx, event in enumerate(unsafe_events):
  1886. # label = 'Unsafe BTN Event' if idx == 0 else None
  1887. # plt.axvspan(event['start_time'], event['end_time'],
  1888. # alpha=0.3, color='orange', label=label)
  1889. # plt.xlabel('Time (s)')
  1890. # plt.ylabel('BTN')
  1891. # plt.title('Brake Threat Number (BTN) Trend')
  1892. # plt.grid(True)
  1893. # plt.legend()
  1894. # # 保存图表
  1895. # chart_filename = os.path.join(output_dir, f"btn_chart.png")
  1896. # plt.savefig(chart_filename, dpi=300)
  1897. # plt.close()
  1898. # logger.info(f"Brake Threat Number chart saved to: {chart_filename}")
  1899. # 保存CSV数据,包含阈值信息
  1900. csv_filename = os.path.join(output_dir, f"btn_data.csv")
  1901. df_csv = df.copy()
  1902. df_csv['min_threshold'] = min_threshold
  1903. df_csv['max_threshold'] = max_threshold
  1904. df_csv.to_csv(csv_filename, index=False)
  1905. # 记录不安全事件信息
  1906. if unsafe_events:
  1907. logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件")
  1908. for i, event in enumerate(unsafe_events):
  1909. logger.info(
  1910. f"BTN不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}")
  1911. logger.info(f"Brake Threat Number data saved to: {csv_filename}")
  1912. return csv_filename
  1913. except Exception as e:
  1914. logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True)
  1915. return None
  1916. def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1917. """
  1918. Generate Collision Risk metric chart
  1919. Args:
  1920. safety_calculator: SafetyCalculator instance
  1921. output_dir: Output directory
  1922. Returns:
  1923. str: Chart file path, or None if generation fails
  1924. """
  1925. logger = LogManager().get_logger()
  1926. try:
  1927. # 获取数据
  1928. risk_data = safety_calculator.collision_risk_data
  1929. if not risk_data:
  1930. logger.warning("Cannot generate Collision Risk chart: empty data")
  1931. return None
  1932. # 创建DataFrame
  1933. df = pd.DataFrame(risk_data)
  1934. # 获取阈值
  1935. thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk')
  1936. min_threshold = thresholds.get('min')
  1937. max_threshold = thresholds.get('max')
  1938. # 保存CSV数据,包含阈值信息
  1939. csv_filename = os.path.join(output_dir, f"collisionrisk_data.csv")
  1940. df_csv = df.copy()
  1941. df_csv['min_threshold'] = min_threshold
  1942. df_csv['max_threshold'] = max_threshold
  1943. df_csv.to_csv(csv_filename, index=False)
  1944. logger.info(f"Collision Risk data saved to: {csv_filename}")
  1945. return csv_filename
  1946. except Exception as e:
  1947. logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True)
  1948. return None
  1949. def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]:
  1950. """
  1951. Generate Collision Severity metric chart
  1952. Args:
  1953. safety_calculator: SafetyCalculator instance
  1954. output_dir: Output directory
  1955. Returns:
  1956. str: Chart file path, or None if generation fails
  1957. """
  1958. logger = LogManager().get_logger()
  1959. try:
  1960. # 获取数据
  1961. severity_data = safety_calculator.collision_severity_data
  1962. if not severity_data:
  1963. logger.warning("Cannot generate Collision Severity chart: empty data")
  1964. return None
  1965. # 创建DataFrame
  1966. df = pd.DataFrame(severity_data)
  1967. # 获取阈值
  1968. thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity')
  1969. min_threshold = thresholds.get('min')
  1970. max_threshold = thresholds.get('max')
  1971. # 保存CSV数据,包含阈值信息
  1972. csv_filename = os.path.join(output_dir, f"collisionseverity_data.csv")
  1973. df_csv = df.copy()
  1974. df_csv['min_threshold'] = min_threshold
  1975. df_csv['max_threshold'] = max_threshold
  1976. df_csv.to_csv(csv_filename, index=False)
  1977. logger.info(f"Collision Severity data saved to: {csv_filename}")
  1978. return csv_filename
  1979. except Exception as e:
  1980. logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True)
  1981. return None
  1982. def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
  1983. str]:
  1984. """Generate chart data for traffic metrics"""
  1985. # 待实现
  1986. return None
  1987. def calculate_distance(ego_df, correctwarning):
  1988. """计算预警距离"""
  1989. dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
  1990. return dist
  1991. def calculate_relative_speed(ego_df, correctwarning):
  1992. """计算相对速度"""
  1993. return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
  1994. # 使用function.py中已实现的scenario_sign_dict
  1995. from modules.metric.function import scenario_sign_dict
  1996. if __name__ == "__main__":
  1997. # 测试代码
  1998. print("Metrics visualization utilities loaded.")