#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2023/06/25 @Last Modified: 2025/05/20 @Summary: Chart generation utilities for metrics visualization """ """ 主要功能模块: 函数指标绘图(generate_function_chart_data) 舒适性指标绘图(generate_comfort_chart_data) 安全性指标绘图(generate_safety_chart_data) 交通指标绘图(generate_traffic_chart_data,待实现) 新增的急加速指标绘图: 在generate_comfort_chart_data中增加了slamaccelerate指标的支持 实现了完整的generate_slam_accelerate_chart函数,用于绘制急加速事件 该函数包含: 数据获取与预处理 CSV数据保存与读取 纵向加速度与速度的双子图绘制 急加速事件的橙色背景标记 阈值线绘制 高质量图表输出(300dpi PNG) 其他关键特性: 统一的日志记录系统 阈值获取函数get_metric_thresholds 错误处理和异常捕获 时间戳管理 数据验证机制 详细的日志输出 辅助函数: calculate_distance 和 calculate_relative_speed(简化实现) scenario_sign_dict 场景签名字典(简化实现) 此代码实现了完整的指标可视化工具,特别针对急加速指标slamAccelerate提供了详细的绘图功能,能够清晰展示急加速事件的发生时间和相关数据变化。 """ # import matplotlib # matplotlib.use('Agg') # 使用非图形界面的后端 # import matplotlib.pyplot as plt import os import numpy as np import pandas as pd import matplotlib.pyplot as plt from typing import Optional, Dict, List, Any, Union from pathlib import Path from modules.lib.log_manager import LogManager def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[ str]: """ Generate chart data for function metrics Args: function_calculator: FunctionCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if not output_dir: output_dir = os.path.join(os.getcwd(), 'data') os.makedirs(output_dir, exist_ok=True) # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'latestwarningdistance_ttc_lst': return generate_latest_warning_ttc_chart(function_calculator, output_dir) elif metric_name.lower() == 'earliestwarningdistance_ttc_lst': return generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir) elif metric_name.lower() == 'earliestwarningdistance_lst': return generate_earliest_warning_distance_chart(function_calculator, output_dir) elif metric_name.lower() == 'latestwarningdistance_lst': return generate_latest_warning_distance_chart(function_calculator, output_dir) elif metric_name.lower() == 'latestwarningdistance_ttc_pgvil': return generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir) elif metric_name.lower() == 'earliestwarningdistance_ttc_pgvil': return generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir) elif metric_name.lower() == 'earliestwarningdistance_pgvil': return generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir) elif metric_name.lower() == 'latestwarningdistance_pgvil': return generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir) elif metric_name.lower() == 'limitspeed_lst': return generate_limit_speed_chart(function_calculator, output_dir) elif metric_name.lower() == 'limitspeedpastlimitsign_lst': return generate_limit_speed_past_sign_chart(function_calculator, output_dir) elif metric_name.lower() == 'maxlongitudedist_lst': return generate_max_longitude_dist_chart(function_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_earliest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate warning distance chart with data visualization. This function creates charts for earliestWarningDistance_LST and latestWarningDistance_LST metrics. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_LST') max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'warning_dist', None) if warning_dist.empty: logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data") return None # Calculate metric value metric_value = float(warning_dist.iloc[0]) if len(warning_dist) >= 0.0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'], 'warning_distance': warning_dist, 'min_threshold': min_threshold, 'max_threshold': max_threshold, }) df_csv.to_csv(csv_filename, index=False) logger.info(f"earliestWarningDistance_LST data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create single chart for warning distance # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart # # Plot warning distance # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # # Mark metric value # if len(df) > 0: # label_text = 'Earliest Warning Distance' # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0], # color='red', s=100, zorder=5, # label=f'{label_text}: {metric_value:.2f}m') # # Set y-axis range # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title(f'earliestWarningDistance_LST - Warning Distance Over Time') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"earliestWarningDistance_LST chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate earliestWarningDistance_LST chart: {str(e)}", exc_info=True) return None def generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate warning distance chart with data visualization. This function creates charts for earliestWarningDistance_PGVIL and latestWarningDistance_PGVIL metrics. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_PGVIL') max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'warning_dist', None) warning_time = getattr(function_calculator, 'warning_time', None) if len(warning_dist) == 0: logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data") return None # Calculate metric value metric_value = float(warning_dist[0]) if len(warning_dist) >= 0.0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_data.csv") df_csv = pd.DataFrame({ 'simTime': warning_time, 'warning_distance': warning_dist, 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"earliestWarningDistance_PGVIL data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create single chart for warning distance # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart # # Plot warning distance # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # # Mark metric value # if len(df) > 0: # label_text = 'Earliest Warning Distance' # plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0], # color='red', s=100, zorder=5, # label=f'{label_text}: {metric_value:.2f}m') # # Set y-axis range # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title(f'earliestWarningDistance_PGVIL - Warning Distance Over Time') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"earliestWarningDistance_PGVIL chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate earliestWarningDistance_PGVIL chart: {str(e)}", exc_info=True) return None # # 使用function.py中已实现的find_nested_name函数 # from modules.metric.function import find_nested_name def generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate TTC warning chart with data visualization. This version first saves data to CSV, then uses the CSV to generate the chart. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 ego_df = function_calculator.ego_data.copy() correctwarning = getattr(function_calculator, 'correctwarning', None) # 获取配置的阈值 thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_PGVIL') max_threshold = thresholds["max"] min_threshold = thresholds["min"] warning_dist = getattr(function_calculator, 'warning_dist', None) warning_speed = getattr(function_calculator, 'warning_speed', None) warning_time = getattr(function_calculator, 'warning_time', None) ttc = getattr(function_calculator, 'ttc', None) if len(warning_dist) == 0: logger.warning("Cannot generate TTC warning chart: empty data") return None # 生成时间戳 # 保存 CSV 数据 csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_data.csv") df_csv = pd.DataFrame({ 'simTime': warning_time, 'warning_distance': warning_dist, 'warning_speed': warning_speed, 'ttc': ttc, 'min_threshold': min_threshold, 'max_threshold': max_threshold, }) df_csv.to_csv(csv_filename, index=False) logger.info(f"latestwarningdistance_ttc_pgvil data saved to: {csv_filename}") return csv_filename # # 从 CSV 读取数据 # df = pd.read_csv(csv_filename) # # 创建图表 # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:预警距离 # ax1 = plt.subplot(3, 1, 1) # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Distance (m)') # ax1.set_title('Warning Distance Over Time') # ax1.grid(True) # ax1.legend() # # 图 2:相对速度 # ax2 = plt.subplot(3, 1, 2) # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed') # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Speed (m/s)') # ax2.set_title('Relative Speed Over Time') # ax2.grid(True) # ax2.legend() # # 图 3:TTC # ax3 = plt.subplot(3, 1, 3) # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC') # # Add threshold lines # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)') # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # # Calculate metric value (latest TTC) # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold # # Mark latest TTC value # if len(df) > 0: # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1], # color='red', s=100, zorder=5, # label=f'Latest TTC: {metric_value:.2f}s') # ax3.set_xlabel('Time (s)') # ax3.set_ylabel('TTC (s)') # ax3.set_title('Time To Collision (TTC) Over Time') # ax3.grid(True) # ax3.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"latestwarningdistance_ttc_pgvil chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate latestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True) return None def generate_latest_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate TTC warning chart with data visualization. This version first saves data to CSV, then uses the CSV to generate the chart. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 ego_df = function_calculator.ego_data.copy() correctwarning = getattr(function_calculator, 'correctwarning', None) # 获取配置的阈值 thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_LST') max_threshold = thresholds["max"] min_threshold = thresholds["min"] warning_dist = getattr(function_calculator, 'warning_dist', None) warning_speed = getattr(function_calculator, 'warning_speed', None) ttc = getattr(function_calculator, 'ttc', None) if warning_dist.empty: logger.warning("Cannot generate TTC warning chart: empty data") return None # 生成时间戳 # 保存 CSV 数据 csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'], 'warning_distance': warning_dist, 'warning_speed': warning_speed, 'ttc': ttc, 'min_threshold': min_threshold, 'max_threshold': max_threshold, }) df_csv.to_csv(csv_filename, index=False) logger.info(f"latestwarningdistance_ttc_lst data saved to: {csv_filename}") return csv_filename # # 从 CSV 读取数据 # df = pd.read_csv(csv_filename) # # 创建图表 # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:预警距离 # ax1 = plt.subplot(3, 1, 1) # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Distance (m)') # ax1.set_title('Warning Distance Over Time') # ax1.grid(True) # ax1.legend() # # 图 2:相对速度 # ax2 = plt.subplot(3, 1, 2) # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed') # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Speed (m/s)') # ax2.set_title('Relative Speed Over Time') # ax2.grid(True) # ax2.legend() # # 图 3:TTC # ax3 = plt.subplot(3, 1, 3) # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC') # # Add threshold lines # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)') # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # # Calculate metric value (latest TTC) # metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold # # Mark latest TTC value # if len(df) > 0: # ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1], # color='red', s=100, zorder=5, # label=f'Latest TTC: {metric_value:.2f}s') # ax3.set_xlabel('Time (s)') # ax3.set_ylabel('TTC (s)') # ax3.set_title('Time To Collision (TTC) Over Time') # ax3.grid(True) # ax3.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"latestwarningdistance_ttc_lst chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate latestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True) return None def generate_latest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate warning distance chart with data visualization. This function creates charts for latestWarningDistance_LST metric. Args: function_calculator: FunctionCalculator instance metric_name: Metric name (latestWarningDistance_LST) output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_LST') max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'warning_dist', None) if warning_dist.empty: logger.warning(f"Cannot generate latestWarningDistance_LST chart: empty data") return None # Calculate metric value metric_value = float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"latestWarningDistance_LST_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'], 'warning_distance': warning_dist, 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"latestWarningDistance_LST data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create single chart for warning distance # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart # # Plot warning distance # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # # Mark metric value # if len(df) > 0: # label_text = 'Latest Warning Distance' # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1], # color='red', s=100, zorder=5, # label=f'{label_text}: {metric_value:.2f}m') # # Set y-axis range # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title(f'latestWarningDistance_LST - Warning Distance Over Time') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"latestWarningDistance_LST_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"latestWarningDistance_LST chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate latestWarningDistance_LST chart: {str(e)}", exc_info=True) return None def generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate warning distance chart with data visualization. This function creates charts for latestWarningDistance_LST metric. Args: function_calculator: FunctionCalculator instance metric_name: Metric name (latestWarningDistance_LST) output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_PGVIL') max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'warning_dist', None) warning_time = getattr(function_calculator, 'warning_time', None) if len(warning_dist) == 0: logger.warning(f"Cannot generate latestWarningDistance_PGVIL chart: empty data") return None # Calculate metric value metric_value = float(warning_dist[-1]) if len(warning_dist) > 0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_data.csv") df_csv = pd.DataFrame({ 'simTime': warning_time, 'warning_distance': warning_dist, 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"latestWarningDistance_PGVIL data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create single chart for warning distance # plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart # # Plot warning distance # plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # # Mark metric value # if len(df) > 0: # label_text = 'Latest Warning Distance' # plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1], # color='red', s=100, zorder=5, # label=f'{label_text}: {metric_value:.2f}m') # # Set y-axis range # plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title(f'latestWarningDistance_PGVIL - Warning Distance Over Time') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"latestWarningDistance_PGVIL chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate latestWarningDistance_PGVIL chart: {str(e)}", exc_info=True) return None def generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_LST metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'earliestWarningDistance_TTC_LST' try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'correctwarning', None) warning_speed = getattr(function_calculator, 'warning_speed', None) ttc = getattr(function_calculator, 'ttc', None) # Calculate metric value metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'], 'warning_distance': warning_dist, 'warning_speed': warning_speed, 'ttc': ttc, 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create chart # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:预警距离 # ax1 = plt.subplot(3, 1, 1) # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Distance (m)') # ax1.set_title('Warning Distance Over Time') # ax1.grid(True) # ax1.legend() # # 图 2:相对速度 # ax2 = plt.subplot(3, 1, 2) # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed') # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Speed (m/s)') # ax2.set_title('Relative Speed Over Time') # ax2.grid(True) # ax2.legend() # # 图 3:TTC # ax3 = plt.subplot(3, 1, 3) # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC') # # Add threshold lines # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)') # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # # Mark earliest TTC value # if len(df) > 0: # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0], # color='red', s=100, zorder=5, # label=f'Earliest TTC: {metric_value:.2f}s') # ax3.set_xlabel('Time (s)') # ax3.set_ylabel('TTC (s)') # ax3.set_title('Time To Collision (TTC) Over Time') # ax3.grid(True) # ax3.legend() # # Save image # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_lst_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate earliestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True) return None def generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_PGVIL metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'earliestWarningDistance_TTC_PGVIL' try: # Get data ego_df = function_calculator.ego_data.copy() # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get calculated warning distance and speed warning_dist = getattr(function_calculator, 'warning_dist', None) warning_speed = getattr(function_calculator, 'warning_speed', None) ttc = getattr(function_calculator, 'ttc', None) warning_time = getattr(function_calculator, 'warning_time', None) # Calculate metric value metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'simTime': warning_time, 'warning_distance': warning_dist, 'warning_speed': warning_speed, 'ttc': ttc, 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create chart # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:预警距离 # ax1 = plt.subplot(3, 1, 1) # ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Distance (m)') # ax1.set_title('Warning Distance Over Time') # ax1.grid(True) # ax1.legend() # # 图 2:相对速度 # ax2 = plt.subplot(3, 1, 2) # ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed') # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Speed (m/s)') # ax2.set_title('Relative Speed Over Time') # ax2.grid(True) # ax2.legend() # # 图 3:TTC # ax3 = plt.subplot(3, 1, 3) # ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC') # # Add threshold lines # ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)') # ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # # Mark earliest TTC value # if len(df) > 0: # ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0], # color='red', s=100, zorder=5, # label=f'Earliest TTC: {metric_value:.2f}s') # ax3.set_xlabel('Time (s)') # ax3.set_ylabel('TTC (s)') # ax3.set_title('Time To Collision (TTC) Over Time') # ax3.grid(True) # ax3.legend() # # Save image # chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_pgvil_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate earliestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True) return None def generate_limit_speed_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate limit speed chart with data visualization for limitSpeed_LST metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'limitSpeed_LST' try: # Get data ego_df = function_calculator.ego_data.copy() # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] if ego_df.empty: logger.warning(f"Cannot generate {metric_name} chart: empty data") return None # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df['simTime'], 'speed': ego_df['v'], 'max_threshold': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))) }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create chart # plt.figure(figsize=(12, 6), constrained_layout=True) # # Plot speed # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed') # plt.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold') # # Set y-axis range # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Speed (m/s)') # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True) return None def generate_limit_speed_past_sign_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate limit speed past sign chart with data visualization for limitSpeedPastLimitSign_LST metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'limitSpeedPastLimitSign_LST' try: # Get data ego_df = function_calculator.ego_data.copy() # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] if ego_df.empty: logger.warning(f"Cannot generate {metric_name} chart: empty data") return None # Get sign passing time if available sign_time = getattr(function_calculator, 'sign_pass_time', None) if sign_time is None: # Try to estimate sign passing time (middle of the simulation) sign_time = ego_df['simTime'].iloc[len(ego_df) // 2] # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df['simTime'], 'speed': ego_df['v'], 'max_threshold': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))), 'sign_pass_time': sign_time }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create chart # plt.figure(figsize=(12, 6), constrained_layout=True) # # Plot speed # plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed') # plt.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Speed') # # Mark sign passing time # plt.axvline(x=sign_time, color='g', linestyle='--', label='Speed Limit Sign') # # Set y-axis range # plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Speed (m/s)') # plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True) return None def generate_max_longitude_dist_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate maximum longitudinal distance chart with data visualization for maxLongitudeDist_LST metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'maxLongitudeDist_LST' try: # Get data ego_df = function_calculator.ego_data.copy() # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Get longitudinal distance data longitude_dist = ego_df['longitude_dist'] if 'longitude_dist' in ego_df.columns else None stop_time = ego_df['stop_time'] if 'stop_time' in ego_df.columns else None if longitude_dist is None or longitude_dist.empty: logger.warning(f"Cannot generate {metric_name} chart: missing longitudinal distance data") return None # Calculate metric value metric_value = longitude_dist.max() max_distance_time = ego_df.loc[longitude_dist.idxmax(), 'simTime'] # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'simTime': ego_df['simTime'], 'x_relative_dist': ego_df['x_relative_dist'], 'stop_time': stop_time, 'longitude_dist': longitude_dist }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Read data from CSV # df = pd.read_csv(csv_filename) # # Create chart # plt.figure(figsize=(12, 6), constrained_layout=True) # # Plot longitudinal distance # plt.plot(df['simTime'], df['x_relative_dist'], 'b-', label='Longitudinal Distance') # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # # Mark maximum longitudinal distance # plt.scatter(max_distance_time, metric_value, # color='red', s=100, zorder=5, # label=f'Maximum Longitudinal Distance: {metric_value:.2f}m') # # Set y-axis range # plt.ylim(bottom=min(0, min_threshold * 0.9), top=max(max_threshold * 1.1, df['longitude_dist'].max() * 1.1)) # plt.xlabel('Time (s)') # plt.ylabel('Longitudinal Distance (m)') # plt.title(f'{metric_name} - Longitudinal Distance Over Time') # plt.grid(True) # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True) return None def generate_warning_delay_time_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate warning delay time chart with data visualization for warningDelayTime_LST metric. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() metric_name = 'warningDelayTime_LST' try: # Get data ego_df = function_calculator.ego_data.copy() # Get configured thresholds thresholds = get_metric_thresholds(function_calculator, metric_name) max_threshold = thresholds["max"] min_threshold = thresholds["min"] # Check if correctwarning is already calculated correctwarning = getattr(function_calculator, 'correctwarning', None) if correctwarning is None: logger.warning(f"Cannot generate {metric_name} chart: missing correctwarning value") return None # Get HMI warning time and rosbag warning time HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist() simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][ 'simTime'].tolist() simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None if (simTime_HMI is None) or (simTime_rosbag is None): logger.warning(f"Cannot generate {metric_name} chart: missing warning time data") return None # Calculate delay time delay_time = abs(simTime_HMI - simTime_rosbag) # Save CSV data csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv") df_csv = pd.DataFrame({ 'HMI_warning_time': [simTime_HMI], 'rosbag_warning_time': [simTime_rosbag], 'delay_time': [delay_time], 'min_threshold': [min_threshold], 'max_threshold': [max_threshold] }) df_csv.to_csv(csv_filename, index=False) logger.info(f"{metric_name} data saved to: {csv_filename}") return csv_filename # # Create chart - bar chart for delay time # plt.figure(figsize=(10, 6), constrained_layout=True) # # Plot delay time as bar # plt.bar(['Warning Delay Time'], [delay_time], color='blue', width=0.4) # # Add threshold lines # plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)') # plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # # Add value label # plt.text(0, delay_time + 0.05, f'{delay_time:.3f}s', ha='center', va='bottom', fontweight='bold') # # Set y-axis range # plt.ylim(bottom=0, top=max(max_threshold * 1.2, delay_time * 1.2)) # plt.ylabel('Delay Time (s)') # plt.title(f'{metric_name} - Warning Delay Time') # plt.grid(True, axis='y') # plt.legend() # # Save image # chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"{metric_name} chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True) return None def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[ str]: """ Generate chart data for comfort metrics Args: comfort_calculator: ComfortCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if output_dir: os.makedirs(output_dir, exist_ok=True) else: output_dir = os.getcwd() # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'shake': return generate_shake_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'zigzag': return generate_zigzag_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'cadence': return generate_cadence_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'slambrake': return generate_slam_brake_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'slamaccelerate': return generate_slam_accelerate_chart(comfort_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate shake metric chart with orange background for shake events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() shake_events = comfort_calculator.shake_events if df.empty: logger.warning("Cannot generate shake chart: empty data") return None # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"shake_data.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lat_acc': df['lat_acc'], 'lat_acc_rate': df['lat_acc_rate'], 'speedH_std': df['speedH_std'], 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None] * len(df))), 'lat_acc_rate_threshold': 0.5, 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None] * len(df))), }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Shake data saved to: {csv_filename}") return csv_filename # # 第二步:从 CSV 读取(可验证保存数据无误) # df = pd.read_csv(csv_filename) # # 创建图表(第三步) # import matplotlib.pyplot as plt # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:横向加速度 # ax1 = plt.subplot(3, 1, 1) # ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration') # if 'lat_acc_threshold' in df.columns: # ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold') # for idx, event in enumerate(shake_events): # label = 'Shake Event' if idx == 0 else None # ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Lateral Acceleration (m/s²)') # ax1.set_title('Shake Event Detection - Lateral Acceleration') # ax1.grid(True) # ax1.legend() # # 图 2:lat_acc_rate # ax2 = plt.subplot(3, 1, 2) # ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate') # ax2.axhline( # y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold' # ) # for idx, event in enumerate(shake_events): # label = 'Shake Event' if idx == 0 else None # ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Angular Velocity (m/s³)') # ax2.set_title('Shake Event Detection - lat_acc_rate') # ax2.grid(True) # ax2.legend() # # 图 3:speedH_std # ax3 = plt.subplot(3, 1, 3) # ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std') # if 'speedH_std_threshold' in df.columns: # ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold') # for idx, event in enumerate(shake_events): # label = 'Shake Event' if idx == 0 else None # ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) # ax3.set_xlabel('Time (s)') # ax3.set_ylabel('Angular Velocity (deg/s)') # ax3.set_title('Shake Event Detection - speedH_std') # ax3.grid(True) # ax3.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"shake_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Shake chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True) return None def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate zigzag metric chart with orange background for zigzag events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() zigzag_events = comfort_calculator.discomfort_df[ comfort_calculator.discomfort_df['type'] == 'zigzag' ].copy() if df.empty: logger.warning("Cannot generate zigzag chart: empty data") return None # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"zigzag_data.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'speedH': df['speedH'], 'posH': df['posH'], 'min_speedH_threshold': -2.3, # 可替换为动态阈值 'max_speedH_threshold': 2.3 }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Zigzag data saved to: {csv_filename}") return csv_filename # # 第二步:从 CSV 读取(可验证保存数据无误) # df = pd.read_csv(csv_filename) # # 创建图表(第三步) # import matplotlib.pyplot as plt # plt.figure(figsize=(12, 8), constrained_layout=True) # # ===== 子图1:Yaw Rate ===== # ax1 = plt.subplot(2, 1, 1) # ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate') # # 添加 speedH 上下限阈值线 # ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)') # ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)') # # 添加橙色背景:Zigzag Events # for idx, event in zigzag_events.iterrows(): # label = 'Zigzag Event' if idx == 0 else None # ax1.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Angular Velocity (deg/s)') # ax1.set_title('Zigzag Event Detection - Yaw Rate') # ax1.grid(True) # ax1.legend(loc='upper left') # # ===== 子图2:Yaw Angle ===== # ax2 = plt.subplot(2, 1, 2) # ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw') # # 添加橙色背景:Zigzag Events # for idx, event in zigzag_events.iterrows(): # label = 'Zigzag Event' if idx == 0 else None # ax2.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Yaw (deg)') # ax2.set_title('Zigzag Event Detection - Yaw Angle') # ax2.grid(True) # ax2.legend(loc='upper left') # # 保存图像 # chart_filename = os.path.join(output_dir, f"zigzag_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Zigzag chart saved to: {chart_filename}") # return csv_filename except Exception as e: logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True) return None def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate cadence metric chart with orange background for cadence events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy() if df.empty: logger.warning("Cannot generate cadence chart: empty data") return None # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"cadence_data.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'v': df['v'], 'max_threshold': df.get('ip_acc', pd.Series([None] * len(df))), 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))) }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Cadence data saved to: {csv_filename}") # # 第二步:从 CSV 读取(可验证保存数据无误) # df = pd.read_csv(csv_filename) # # 创建图表(第三步) # import matplotlib.pyplot as plt # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:纵向加速度 # ax1 = plt.subplot(2, 1, 1) # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') # if 'max_threshold' in df.columns and 'min_threshold' in df.columns: # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold') # ax1.plot(df['simTime'], df['min_threshold'], 'g--', label='Min Threshold') # # 添加橙色背景标识顿挫事件 # for idx, event in cadence_events.iterrows(): # label = 'Cadence Event' if idx == 0 else None # ax1.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Longitudinal Acceleration (m/s²)') # ax1.set_title('Cadence Event Detection - Longitudinal Acceleration') # ax1.grid(True) # ax1.legend() # # 图 2:速度 # ax2 = plt.subplot(2, 1, 2) # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # # 添加橙色背景标识顿挫事件 # for idx, event in cadence_events.iterrows(): # label = 'Cadence Event' if idx == 0 else None # ax2.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Velocity (m/s)') # ax2.set_title('Cadence Event Detection - Vehicle Speed') # ax2.grid(True) # ax2.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"cadence_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Cadence chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True) return None def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate slam brake metric chart with orange background for slam brake events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() slam_brake_events = comfort_calculator.discomfort_df[ comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy() if df.empty: logger.warning("Cannot generate slam brake chart: empty data") return None # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"slam_brake_data.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'lon_acc_diff': df['lon_acc_diff'], 'v': df['v'], 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))), 'max_threshold': 0.0 }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Slam brake data saved to: {csv_filename}") # return csv_filename # # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # # 创建图表(第三步) plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:纵向加速度 ax1 = plt.subplot(3, 1, 1) ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') if 'min_threshold' in df.columns: ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Min Threshold') # # 添加橙色背景标识急刹车事件 for idx, event in slam_brake_events.iterrows(): label = 'Slam Brake Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Longitudinal Acceleration (m/s²)') ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration') ax1.grid(True) ax1.legend() # # 图 2:速度 ax2 = plt.subplot(3, 1, 2) ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # # 添加橙色背景标识急刹车事件 for idx, event in slam_brake_events.iterrows(): label = 'Slam Brake Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Velocity (m/s)') ax2.set_title('Slam Brake Event Detection - Vehicle Speed') ax2.grid(True) ax2.legend() # # 图 3:加速度变化率 ax3 = plt.subplot(3, 1, 3) ax3.plot(df['simTime'], df['lon_acc_diff'], 'r-', label='lon_acc_diff') # # 添加橙色背景标识急刹车事件 # for idx, event in slam_brake_events.iterrows(): # label = 'Slam Brake Event' if idx == 0 else None # ax2.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) ax3.set_xlabel('Time (s)') ax3.set_ylabel('lon_acc_diff (m/s^3)') ax3.set_title('Slam Brake Event Detection - Longitudinal Acceleration Difference') ax3.grid(True) ax3.legend() # # 保存图像 chart_filename = os.path.join(output_dir, f"slam_brake_chart.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Slam brake chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True) return None def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate slam accelerate metric chart with orange background for slam accelerate events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() slam_accel_events = comfort_calculator.discomfort_df[ (comfort_calculator.discomfort_df['type'] == 'slam_accel') ].copy() if df.empty: logger.warning("Cannot generate slam accelerate chart: empty data") return None # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"slam_accel_data.csv") # 获取加速度阈值(如果存在) accel_threshold = df.get('ip_acc', pd.Series([None] * len(df))) df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'v': df['v'], 'min_threshold': 0.0, # 加速度最小阈值设为0 'max_threshold': accel_threshold # 急加速阈值 }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Slam accelerate data saved to: {csv_filename}") return csv_filename # # 第二步:从 CSV 读取(可验证保存数据无误) # df = pd.read_csv(csv_filename) # # 创建图表(第三步) # plt.figure(figsize=(12, 8), constrained_layout=True) # # 图 1:纵向加速度 # ax1 = plt.subplot(2, 1, 1) # ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') # # 添加加速度阈值线 # if 'max_threshold' in df.columns and not df['max_threshold'].isnull().all(): # ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Max Threshold') # # 添加橙色背景标识急加速事件 # for idx, event in slam_accel_events.iterrows(): # label = 'Slam Accelerate Event' if idx == 0 else None # ax1.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax1.set_xlabel('Time (s)') # ax1.set_ylabel('Acceleration (m/s²)') # ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration') # ax1.grid(True) # ax1.legend() # # 图 2:速度 # ax2 = plt.subplot(2, 1, 2) # ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # # 添加橙色背景标识急加速事件 # for idx, event in slam_accel_events.iterrows(): # label = 'Slam Accelerate Event' if idx == 0 else None # ax2.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # ax2.set_xlabel('Time (s)') # ax2.set_ylabel('Velocity (m/s)') # ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed') # ax2.grid(True) # ax2.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"slam_accel_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Slam accelerate chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True) return None def get_metric_thresholds(calculator, metric_name: str) -> dict: """ 从配置文件中获取指标的阈值 Args: calculator: Calculator instance (FunctionCalculator, SafetyCalculator, ComfortCalculator, EfficientCalculator, TrafficCalculator) metric_name: 指标名称 Returns: dict: 包含min和max阈值的字典 """ logger = LogManager().get_logger() thresholds = {"min": None, "max": None} try: # 根据计算器类型获取配置 if hasattr(calculator, 'data_processed'): # 检查安全性指标配置 if hasattr(calculator.data_processed, 'safety_config') and 'safety' in calculator.data_processed.safety_config: config = calculator.data_processed.safety_config['safety'] metric_type = 'safety' # 检查舒适性指标配置 elif hasattr(calculator.data_processed, 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config: config = calculator.data_processed.comfort_config['comfort'] metric_type = 'comfort' # 检查功能性指标配置 elif hasattr(calculator.data_processed, 'function_config') and 'function' in calculator.data_processed.function_config: config = calculator.data_processed.function_config['function'] metric_type = 'function' # 检查高效性指标配置 elif hasattr(calculator.data_processed, 'efficient_config') and 'efficient' in calculator.data_processed.efficient_config: config = calculator.data_processed.efficient_config['efficient'] metric_type = 'efficient' # 检查交通性指标配置 elif hasattr(calculator.data_processed, 'traffic_config') and 'traffic' in calculator.data_processed.traffic_config: config = calculator.data_processed.traffic_config['traffic'] metric_type = 'traffic' else: # 直接检查calculator是否有function_config属性(针对FunctionCalculator) if hasattr(calculator, 'function_config') and 'function' in calculator.function_config: config = calculator.function_config['function'] metric_type = 'function' else: logger.warning(f"无法找到{metric_name}的配置信息") return thresholds else: # 直接检查calculator是否有function_config属性(针对FunctionCalculator) if hasattr(calculator, 'function_config') and 'function' in calculator.function_config: config = calculator.function_config['function'] metric_type = 'function' else: logger.warning(f"计算器没有data_processed属性或function_config属性") return thresholds # 递归查找指标配置 def find_metric_config(node, target_name): if isinstance(node, dict): if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node: return node for key, value in node.items(): result = find_metric_config(value, target_name) if result: return result return None # 查找指标配置 metric_config = find_metric_config(config, metric_name) if metric_config: thresholds["min"] = metric_config.get("min") thresholds["max"] = metric_config.get("max") logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}") else: logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息") except Exception as e: logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True) return thresholds def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """ Generate chart data for safety metrics Args: safety_calculator: SafetyCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if output_dir: os.makedirs(output_dir, exist_ok=True) else: output_dir = os.getcwd() # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'ttc': return generate_ttc_chart(safety_calculator, output_dir) elif metric_name.lower() == 'mttc': return generate_mttc_chart(safety_calculator, output_dir) elif metric_name.lower() == 'thw': return generate_thw_chart(safety_calculator, output_dir) elif metric_name.lower() == 'lonsd': return generate_lonsd_chart(safety_calculator, output_dir) elif metric_name.lower() == 'latsd': return generate_latsd_chart(safety_calculator, output_dir) elif metric_name.lower() == 'btn': return generate_btn_chart(safety_calculator, output_dir) elif metric_name.lower() == 'collisionrisk': return generate_collision_risk_chart(safety_calculator, output_dir) elif metric_name.lower() == 'collisionseverity': return generate_collision_severity_chart(safety_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate TTC metric chart with orange background for unsafe events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 ttc_data = safety_calculator.ttc_data if not ttc_data: logger.warning("Cannot generate TTC chart: empty data") return None # 创建DataFrame df = pd.DataFrame(ttc_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'TTC') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 生成时间戳 # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"ttc_data.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'simFrame': df['simFrame'], 'TTC': df['TTC'], 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"TTC data saved to: {csv_filename}") return csv_filename # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于TTC,小于最小阈值视为不安全 unsafe_condition = df['TTC'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_ttc': group['TTC'].min() }) # # 创建图表(第三步) # plt.figure(figsize=(12, 8)) # plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # # 添加橙色背景标识不安全事件 # for idx, event in enumerate(unsafe_events): # label = 'Unsafe TTC Event' if idx == 0 else None # plt.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # plt.xlabel('Time (s)') # plt.ylabel('TTC (s)') # plt.title('Time To Collision (TTC) Trend') # plt.grid(True) # plt.legend() # # 保存图像 # chart_filename = os.path.join(output_dir, f"ttc_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # # 记录不安全事件信息 # if unsafe_events: # logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件") # for i, event in enumerate(unsafe_events): # logger.info( # f"TTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s") # logger.info(f"TTC chart saved to: {chart_filename}") # return chart_filename except Exception as e: logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True) return None def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate MTTC metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 mttc_data = safety_calculator.mttc_data if not mttc_data: logger.warning("Cannot generate MTTC chart: empty data") return None # 创建DataFrame df = pd.DataFrame(mttc_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'MTTC') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于MTTC,小于最小阈值视为不安全 unsafe_condition = df['MTTC'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_mttc': group['MTTC'].min() }) # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # # 添加橙色背景标识不安全事件 # for idx, event in enumerate(unsafe_events): # label = 'Unsafe MTTC Event' if idx == 0 else None # plt.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # plt.xlabel('Time (s)') # plt.ylabel('MTTC (s)') # plt.title('Modified Time To Collision (MTTC) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"mttc_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"MTTC chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"mttc_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件") for i, event in enumerate(unsafe_events): logger.info( f"MTTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s") logger.info(f"MTTC data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True) return None def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate THW metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 thw_data = safety_calculator.thw_data if not thw_data: logger.warning("Cannot generate THW chart: empty data") return None # 创建DataFrame df = pd.DataFrame(thw_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'THW') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于THW,小于最小阈值视为不安全 unsafe_condition = df['THW'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_thw': group['THW'].min() }) # # 创建图表 # plt.figure(figsize=(12, 10)) # plt.plot(df['simTime'], df['THW'], 'c-', label='THW') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # # 添加橙色背景标识不安全事件 # for idx, event in enumerate(unsafe_events): # label = 'Unsafe THW Event' if idx == 0 else None # plt.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # plt.xlabel('Time (s)') # plt.ylabel('THW (s)') # plt.title('Time Headway (THW) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"thw_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"THW chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"thw_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件") for i, event in enumerate(unsafe_events): logger.info( f"THW不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s") logger.info(f"THW data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True) return None def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Longitudinal Safe Distance metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 lonsd_data = safety_calculator.lonsd_data if not lonsd_data: logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data") return None # 创建DataFrame df = pd.DataFrame(lonsd_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'LonSD') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title('Longitudinal Safe Distance (LonSD) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"lonsd_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"lonsd_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True) return None def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Lateral Safe Distance metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 latsd_data = safety_calculator.latsd_data if not latsd_data: logger.warning("Cannot generate Lateral Safe Distance chart: empty data") return None # 创建DataFrame df = pd.DataFrame(latsd_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'LatSD') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于LatSD,小于最小阈值视为不安全 unsafe_condition = df['LatSD'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_latsd': group['LatSD'].min() }) # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # # 添加橙色背景标识不安全事件 # for idx, event in enumerate(unsafe_events): # label = 'Unsafe LatSD Event' if idx == 0 else None # plt.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # plt.xlabel('Time (s)') # plt.ylabel('Distance (m)') # plt.title('Lateral Safe Distance (LatSD) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"latsd_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"latsd_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件") for i, event in enumerate(unsafe_events): logger.info( f"LatSD不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m") logger.info(f"Lateral Safe Distance data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True) return None def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Brake Threat Number metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 btn_data = safety_calculator.btn_data if not btn_data: logger.warning("Cannot generate Brake Threat Number chart: empty data") return None # 创建DataFrame df = pd.DataFrame(btn_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'BTN') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if max_threshold is not None: # 对于BTN,大于最大阈值视为不安全 unsafe_condition = df['BTN'] > max_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'max_btn': group['BTN'].max() }) # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # # 添加橙色背景标识不安全事件 # for idx, event in enumerate(unsafe_events): # label = 'Unsafe BTN Event' if idx == 0 else None # plt.axvspan(event['start_time'], event['end_time'], # alpha=0.3, color='orange', label=label) # plt.xlabel('Time (s)') # plt.ylabel('BTN') # plt.title('Brake Threat Number (BTN) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"btn_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Brake Threat Number chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"btn_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件") for i, event in enumerate(unsafe_events): logger.info( f"BTN不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}") logger.info(f"Brake Threat Number data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True) return None def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Collision Risk metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 risk_data = safety_calculator.collision_risk_data if not risk_data: logger.warning("Cannot generate Collision Risk chart: empty data") return None # 创建DataFrame df = pd.DataFrame(risk_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)') # plt.xlabel('Time (s)') # plt.ylabel('Risk Value (%)') # plt.title('Collision Risk (collisionRisk) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"collision_risk_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Collision Risk chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"collisionrisk_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Collision Risk data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True) return None def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Collision Severity metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 severity_data = safety_calculator.collision_severity_data if not severity_data: logger.warning("Cannot generate Collision Severity chart: empty data") return None # 创建DataFrame df = pd.DataFrame(severity_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # # 创建图表 # plt.figure(figsize=(12, 6)) # plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity') # # 添加阈值线 # if min_threshold is not None: # plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)') # if max_threshold is not None: # plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)') # plt.xlabel('Time (s)') # plt.ylabel('Severity (%)') # plt.title('Collision Severity (collisionSeverity) Trend') # plt.grid(True) # plt.legend() # # 保存图表 # chart_filename = os.path.join(output_dir, f"collision_severity_chart.png") # plt.savefig(chart_filename, dpi=300) # plt.close() # logger.info(f"Collision Severity chart saved to: {chart_filename}") # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"collisionseverity_data.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Collision Severity data saved to: {csv_filename}") return csv_filename except Exception as e: logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True) return None def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[ str]: """Generate chart data for traffic metrics""" # 待实现 return None def calculate_distance(ego_df, correctwarning): """计算预警距离""" dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist'] return dist def calculate_relative_speed(ego_df, correctwarning): """计算相对速度""" return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v'] # 使用function.py中已实现的scenario_sign_dict from modules.metric.function import scenario_sign_dict if __name__ == "__main__": # 测试代码 print("Metrics visualization utilities loaded.")