#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2023/06/25 @Last Modified: 2025/05/20 @Summary: Chart generation utilities for metrics visualization """ import os import numpy as np import pandas as pd import matplotlib matplotlib.use('Agg') # 使用非图形界面的后端 import matplotlib.pyplot as plt from typing import Optional, Dict, List, Any, Union from pathlib import Path from modules.lib.log_manager import LogManager def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """ Generate chart data for function metrics Args: function_calculator: FunctionCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if output_dir: os.makedirs(output_dir, exist_ok=True) else: output_dir = os.getcwd() # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'latestwarningdistance_ttc_lst': return generate_warning_ttc_chart(function_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]: """ Generate TTC warning chart with data visualization. This version first saves data to CSV, then uses the CSV to generate the chart. Args: function_calculator: FunctionCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 ego_df = function_calculator.ego_data.copy() scenario_name = function_calculator.data.function_config["function"]["scenario"]["name"] correctwarning = scenario_sign_dict[scenario_name] warning_dist = calculate_distance(ego_df, correctwarning) warning_speed = calculate_relative_speed(ego_df, correctwarning) if warning_dist.empty: logger.warning("Cannot generate TTC warning chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据 csv_filename = os.path.join(output_dir, f"warning_ttc_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': ego_df['simTime'], 'warning_distance': warning_dist, 'warning_speed': warning_speed, 'ttc': warning_dist / warning_speed }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Warning TTC data saved to: {csv_filename}") # 从 CSV 读取数据 df = pd.read_csv(csv_filename) # 创建图表 plt.figure(figsize=(12, 8), constrained_layout=True) # 图 1:预警距离 ax1 = plt.subplot(3, 1, 1) ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance') ax1.set_xlabel('Time (s)') ax1.set_ylabel('Distance (m)') ax1.set_title('Warning Distance Over Time') ax1.grid(True) ax1.legend() # 图 2:相对速度 ax2 = plt.subplot(3, 1, 2) ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed') ax2.set_xlabel('Time (s)') ax2.set_ylabel('Speed (m/s)') ax2.set_title('Relative Speed Over Time') ax2.grid(True) ax2.legend() # 图 3:TTC ax3 = plt.subplot(3, 1, 3) ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC') ax3.set_xlabel('Time (s)') ax3.set_ylabel('TTC (s)') ax3.set_title('Time To Collision (TTC) Over Time') ax3.grid(True) ax3.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"warning_ttc_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Warning TTC chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate warning TTC chart: {str(e)}", exc_info=True) return None def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """ Generate chart data for comfort metrics Args: comfort_calculator: ComfortCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if output_dir: os.makedirs(output_dir, exist_ok=True) else: output_dir = os.getcwd() # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'shake': return generate_shake_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'zigzag': return generate_zigzag_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'cadence': return generate_cadence_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'slambrake': return generate_slam_brake_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'slamaccelerate': return generate_slam_accelerate_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'vdv': return generate_vdv_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'ava_vav': return generate_ava_vav_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'msdv': return generate_msdv_chart(comfort_calculator, output_dir) elif metric_name.lower() == 'motionsickness': return generate_motion_sickness_chart(comfort_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate shake metric chart with orange background for shake events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() shake_events = comfort_calculator.shake_events if df.empty: logger.warning("Cannot generate shake chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"shake_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lat_acc': df['lat_acc'], 'lat_acc_rate': df['lat_acc_rate'], 'speedH_std': df['speedH_std'], 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None]*len(df))), 'lat_acc_rate_threshold': 0.5, 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None]*len(df))), }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Shake data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) import matplotlib.pyplot as plt plt.figure(figsize=(12, 8), constrained_layout=True) # 图 1:横向加速度 ax1 = plt.subplot(3, 1, 1) ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration') if 'lat_acc_threshold' in df.columns: ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold') for idx, event in enumerate(shake_events): label = 'Shake Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Lateral Acceleration (m/s²)') ax1.set_title('Shake Event Detection - Lateral Acceleration') ax1.grid(True) ax1.legend() # 图 2:lat_acc_rate ax2 = plt.subplot(3, 1, 2) ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate') ax2.axhline( y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold' ) for idx, event in enumerate(shake_events): label = 'Shake Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Angular Velocity (m/s³)') ax2.set_title('Shake Event Detection - lat_acc_rate') ax2.grid(True) ax2.legend() # 图 3:speedH_std ax3 = plt.subplot(3, 1, 3) ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std') if 'speedH_std_threshold' in df.columns: ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold') for idx, event in enumerate(shake_events): label = 'Shake Event' if idx == 0 else None ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax3.set_xlabel('Time (s)') ax3.set_ylabel('Angular Velocity (deg/s)') ax3.set_title('Shake Event Detection - speedH_std') ax3.grid(True) ax3.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"shake_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Shake chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True) return None def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate zigzag metric chart with orange background for zigzag events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() zigzag_events = comfort_calculator.discomfort_df[ comfort_calculator.discomfort_df['type'] == 'zigzag' ].copy() if df.empty: logger.warning("Cannot generate zigzag chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"zigzag_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'speedH': df['speedH'], 'posH': df['posH'], 'min_speedH_threshold': -2.3, # 可替换为动态阈值 'max_speedH_threshold': 2.3 }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Zigzag data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) import matplotlib.pyplot as plt plt.figure(figsize=(12, 8), constrained_layout=True) # ===== 子图1:Yaw Rate ===== ax1 = plt.subplot(2, 1, 1) ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate') # 添加 speedH 上下限阈值线 ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)') ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)') # 添加橙色背景:Zigzag Events for idx, event in zigzag_events.iterrows(): label = 'Zigzag Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Angular Velocity (deg/s)') ax1.set_title('Zigzag Event Detection - Yaw Rate') ax1.grid(True) ax1.legend(loc='upper left') # ===== 子图2:Yaw Angle ===== ax2 = plt.subplot(2, 1, 2) ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw') # 添加橙色背景:Zigzag Events for idx, event in zigzag_events.iterrows(): label = 'Zigzag Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Yaw (deg)') ax2.set_title('Zigzag Event Detection - Yaw Angle') ax2.grid(True) ax2.legend(loc='upper left') # 保存图像 chart_filename = os.path.join(output_dir, f"zigzag_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Zigzag chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True) return None def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate cadence metric chart with orange background for cadence events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy() if df.empty: logger.warning("Cannot generate cadence chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"cadence_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'v': df['v'], 'ip_acc': df.get('ip_acc', pd.Series([None]*len(df))), 'ip_dec': df.get('ip_dec', pd.Series([None]*len(df))) }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Cadence data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) import matplotlib.pyplot as plt plt.figure(figsize=(12, 8), constrained_layout=True) # 图 1:纵向加速度 ax1 = plt.subplot(2, 1, 1) ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') if 'ip_acc' in df.columns and 'ip_dec' in df.columns: ax1.plot(df['simTime'], df['ip_acc'], 'r--', label='Acceleration Threshold') ax1.plot(df['simTime'], df['ip_dec'], 'g--', label='Deceleration Threshold') # 添加橙色背景标识顿挫事件 for idx, event in cadence_events.iterrows(): label = 'Cadence Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Longitudinal Acceleration (m/s²)') ax1.set_title('Cadence Event Detection - Longitudinal Acceleration') ax1.grid(True) ax1.legend() # 图 2:速度 ax2 = plt.subplot(2, 1, 2) ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # 添加橙色背景标识顿挫事件 for idx, event in cadence_events.iterrows(): label = 'Cadence Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Velocity (m/s)') ax2.set_title('Cadence Event Detection - Vehicle Speed') ax2.grid(True) ax2.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"cadence_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Cadence chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True) return None def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate slam brake metric chart with orange background for slam brake events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() slam_brake_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy() if df.empty: logger.warning("Cannot generate slam brake chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"slam_brake_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'v': df['v'], 'min_threshold': df.get('ip_dec', pd.Series([None]*len(df))), 'max_threshold': 0.0 }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Slam brake data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) plt.figure(figsize=(12, 8), constrained_layout=True) # 图 1:纵向加速度 ax1 = plt.subplot(2, 1, 1) ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') if 'min_threshold' in df.columns: ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Deceleration Threshold') # 添加橙色背景标识急刹车事件 for idx, event in slam_brake_events.iterrows(): label = 'Slam Brake Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Longitudinal Acceleration (m/s²)') ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration') ax1.grid(True) ax1.legend() # 图 2:速度 ax2 = plt.subplot(2, 1, 2) ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # 添加橙色背景标识急刹车事件 for idx, event in slam_brake_events.iterrows(): label = 'Slam Brake Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Velocity (m/s)') ax2.set_title('Slam Brake Event Detection - Vehicle Speed') ax2.grid(True) ax2.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"slam_brake_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Slam brake chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True) return None def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate slam accelerate metric chart with orange background for slam accelerate events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() slam_accel_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'slam_accel'].copy() if df.empty: logger.warning("Cannot generate slam accelerate chart: empty data") return None # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"slam_accel_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'lon_acc': df['lon_acc'], 'v': df['v'], 'min_threshold': 0.0, 'max_threshold': df.get('ip_acc', pd.Series([None]*len(df))) }) df_csv.to_csv(csv_filename, index=False) logger.info(f"Slam accelerate data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) plt.figure(figsize=(12, 8), constrained_layout=True) # 图 1:纵向加速度 ax1 = plt.subplot(2, 1, 1) ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration') if 'max_threshold' in df.columns: ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Acceleration Threshold') # 添加橙色背景标识急加速事件 for idx, event in slam_accel_events.iterrows(): label = 'Slam Accelerate Event' if idx == 0 else None ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Longitudinal Acceleration (m/s²)') ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration') ax1.grid(True) ax1.legend() # 图 2:速度 ax2 = plt.subplot(2, 1, 2) ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity') # 添加橙色背景标识急加速事件 for idx, event in slam_accel_events.iterrows(): label = 'Slam Accelerate Event' if idx == 0 else None ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) ax2.set_xlabel('Time (s)') ax2.set_ylabel('Velocity (m/s)') ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed') ax2.grid(True) ax2.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"slam_accel_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"Slam accelerate chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True) return None def get_metric_thresholds(calculator, metric_name: str) -> dict: """ 从配置文件中获取指标的阈值 Args: calculator: Calculator instance (SafetyCalculator or ComfortCalculator) metric_name: 指标名称 Returns: dict: 包含min和max阈值的字典 """ logger = LogManager().get_logger() thresholds = {"min": None, "max": None} try: # 根据计算器类型获取配置 if hasattr(calculator, 'data_processed'): if hasattr(calculator.data_processed, 'safety_config') and 'safety' in calculator.data_processed.safety_config: config = calculator.data_processed.safety_config['safety'] metric_type = 'safety' elif hasattr(calculator.data_processed, 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config: config = calculator.data_processed.comfort_config['comfort'] metric_type = 'comfort' else: logger.warning(f"无法找到{metric_name}的配置信息") return thresholds else: logger.warning(f"计算器没有data_processed属性") return thresholds # 递归查找指标配置 def find_metric_config(node, target_name): if isinstance(node, dict): if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node: return node for key, value in node.items(): result = find_metric_config(value, target_name) if result: return result return None # 查找指标配置 metric_config = find_metric_config(config, metric_name) if metric_config: thresholds["min"] = metric_config.get("min") thresholds["max"] = metric_config.get("max") logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}") else: logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息") except Exception as e: logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True) return thresholds def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """ Generate chart data for safety metrics Args: safety_calculator: SafetyCalculator instance metric_name: Metric name output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 确保输出目录存在 if output_dir: os.makedirs(output_dir, exist_ok=True) else: output_dir = os.getcwd() # 根据指标名称选择不同的图表生成方法 if metric_name.lower() == 'ttc': return generate_ttc_chart(safety_calculator, output_dir) elif metric_name.lower() == 'mttc': return generate_mttc_chart(safety_calculator, output_dir) elif metric_name.lower() == 'thw': return generate_thw_chart(safety_calculator, output_dir) elif metric_name.lower() == 'lonsd': return generate_lonsd_chart(safety_calculator, output_dir) elif metric_name.lower() == 'latsd': return generate_latsd_chart(safety_calculator, output_dir) elif metric_name.lower() == 'btn': return generate_btn_chart(safety_calculator, output_dir) elif metric_name.lower() == 'collisionrisk': return generate_collision_risk_chart(safety_calculator, output_dir) elif metric_name.lower() == 'collisionseverity': return generate_collision_severity_chart(safety_calculator, output_dir) else: logger.warning(f"Chart generation not implemented for metric [{metric_name}]") return None except Exception as e: logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True) return None def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate TTC metric chart with orange background for unsafe events. This version first saves data to CSV, then uses the CSV to generate the chart. Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 ttc_data = safety_calculator.ttc_data if not ttc_data: logger.warning("Cannot generate TTC chart: empty data") return None # 创建DataFrame df = pd.DataFrame(ttc_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'TTC') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"ttc_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'simFrame': df['simFrame'], 'TTC': df['TTC'], 'min_threshold': min_threshold, 'max_threshold': max_threshold }) df_csv.to_csv(csv_filename, index=False) logger.info(f"TTC data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于TTC,小于最小阈值视为不安全 unsafe_condition = df['TTC'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_ttc': group['TTC'].min() }) # 创建图表(第三步) plt.figure(figsize=(12, 8)) plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # 添加橙色背景标识不安全事件 for idx, event in enumerate(unsafe_events): label = 'Unsafe TTC Event' if idx == 0 else None plt.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) plt.xlabel('Time (s)') plt.ylabel('TTC (s)') plt.title('Time To Collision (TTC) Trend') plt.grid(True) plt.legend() # 保存图像 chart_filename = os.path.join(output_dir, f"ttc_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件") for i, event in enumerate(unsafe_events): logger.info(f"TTC不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s") logger.info(f"TTC chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True) return None def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate MTTC metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 mttc_data = safety_calculator.mttc_data if not mttc_data: logger.warning("Cannot generate MTTC chart: empty data") return None # 创建DataFrame df = pd.DataFrame(mttc_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'MTTC') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于MTTC,小于最小阈值视为不安全 unsafe_condition = df['MTTC'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_mttc': group['MTTC'].min() }) # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # 添加橙色背景标识不安全事件 for idx, event in enumerate(unsafe_events): label = 'Unsafe MTTC Event' if idx == 0 else None plt.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) plt.xlabel('Time (s)') plt.ylabel('MTTC (s)') plt.title('Modified Time To Collision (MTTC) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"mttc_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"mttc_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件") for i, event in enumerate(unsafe_events): logger.info(f"MTTC不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s") logger.info(f"MTTC chart saved to: {chart_filename}") logger.info(f"MTTC data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True) return None def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate THW metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 thw_data = safety_calculator.thw_data if not thw_data: logger.warning("Cannot generate THW chart: empty data") return None # 创建DataFrame df = pd.DataFrame(thw_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'THW') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于THW,小于最小阈值视为不安全 unsafe_condition = df['THW'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_thw': group['THW'].min() }) # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['THW'], 'c-', label='THW') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # 添加橙色背景标识不安全事件 for idx, event in enumerate(unsafe_events): label = 'Unsafe THW Event' if idx == 0 else None plt.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) plt.xlabel('Time (s)') plt.ylabel('THW (s)') plt.title('Time Headway (THW) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"thw_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"thw_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件") for i, event in enumerate(unsafe_events): logger.info(f"THW不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s") logger.info(f"THW chart saved to: {chart_filename}") logger.info(f"THW data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True) return None def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Longitudinal Safe Distance metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 lonsd_data = safety_calculator.lonsd_data if not lonsd_data: logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data") return None # 创建DataFrame df = pd.DataFrame(lonsd_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'LonSD') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)') plt.xlabel('Time (s)') plt.ylabel('Distance (m)') plt.title('Longitudinal Safe Distance (LonSD) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"lonsd_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"lonsd_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}") logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True) return None def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Lateral Safe Distance metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 latsd_data = safety_calculator.latsd_data if not latsd_data: logger.warning("Cannot generate Lateral Safe Distance chart: empty data") return None # 创建DataFrame df = pd.DataFrame(latsd_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'LatSD') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if min_threshold is not None: # 对于LatSD,小于最小阈值视为不安全 unsafe_condition = df['LatSD'] < min_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'min_latsd': group['LatSD'].min() }) # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)') # 添加橙色背景标识不安全事件 for idx, event in enumerate(unsafe_events): label = 'Unsafe LatSD Event' if idx == 0 else None plt.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) plt.xlabel('Time (s)') plt.ylabel('Distance (m)') plt.title('Lateral Safe Distance (LatSD) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"latsd_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"latsd_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件") for i, event in enumerate(unsafe_events): logger.info(f"LatSD不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m") logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}") logger.info(f"Lateral Safe Distance data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True) return None def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Brake Threat Number metric chart with orange background for unsafe events Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 btn_data = safety_calculator.btn_data if not btn_data: logger.warning("Cannot generate Brake Threat Number chart: empty data") return None # 创建DataFrame df = pd.DataFrame(btn_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'BTN') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 检测超阈值事件 unsafe_events = [] if max_threshold is not None: # 对于BTN,大于最大阈值视为不安全 unsafe_condition = df['BTN'] > max_threshold event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum() for _, group in df[unsafe_condition].groupby(event_groups): if len(group) >= 2: # 至少2帧才算一次事件 start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= 0.1: # 只记录持续时间超过0.1秒的事件 unsafe_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], 'duration': duration, 'max_btn': group['BTN'].max() }) # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})') # 添加橙色背景标识不安全事件 for idx, event in enumerate(unsafe_events): label = 'Unsafe BTN Event' if idx == 0 else None plt.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label) plt.xlabel('Time (s)') plt.ylabel('BTN') plt.title('Brake Threat Number (BTN) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"btn_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"btn_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) # 记录不安全事件信息 if unsafe_events: logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件") for i, event in enumerate(unsafe_events): logger.info(f"BTN不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}") logger.info(f"Brake Threat Number chart saved to: {chart_filename}") logger.info(f"Brake Threat Number data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True) return None def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Collision Risk metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 risk_data = safety_calculator.collision_risk_data if not risk_data: logger.warning("Cannot generate Collision Risk chart: empty data") return None # 创建DataFrame df = pd.DataFrame(risk_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)') plt.xlabel('Time (s)') plt.ylabel('Risk Value (%)') plt.title('Collision Risk (collisionRisk) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"collision_risk_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"collisionrisk_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Collision Risk chart saved to: {chart_filename}") logger.info(f"Collision Risk data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True) return None def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]: """ Generate Collision Severity metric chart Args: safety_calculator: SafetyCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 severity_data = safety_calculator.collision_severity_data if not severity_data: logger.warning("Cannot generate Collision Severity chart: empty data") return None # 创建DataFrame df = pd.DataFrame(severity_data) # 获取阈值 thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 创建图表 plt.figure(figsize=(12, 6)) plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity') # 添加阈值线 if min_threshold is not None: plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)') if max_threshold is not None: plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)') plt.xlabel('Time (s)') plt.ylabel('Severity (%)') plt.title('Collision Severity (collisionSeverity) Trend') plt.grid(True) plt.legend() # 保存图表 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") chart_filename = os.path.join(output_dir, f"collision_severity_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() # 保存CSV数据,包含阈值信息 csv_filename = os.path.join(output_dir, f"collisionseverity_data_{timestamp}.csv") df_csv = df.copy() df_csv['min_threshold'] = min_threshold df_csv['max_threshold'] = max_threshold df_csv.to_csv(csv_filename, index=False) logger.info(f"Collision Severity chart saved to: {chart_filename}") logger.info(f"Collision Severity data saved to: {csv_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True) return None def generate_vdv_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate VDV (Vibration Dose Value) metric chart with data saved to CSV first. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() vdv_value = comfort_calculator.calculated_value.get('vdv', 0) if df.empty: logger.warning("Cannot generate VDV chart: empty data") return None # 确保有必要的列 if 'accelX' not in df.columns or 'accelY' not in df.columns: logger.warning("Missing required columns for VDV chart") return None # 获取阈值 thresholds = get_metric_thresholds(comfort_calculator, 'vdv') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 将东北天坐标系下的加速度转换为车身坐标系下的加速度 if 'posH' not in df.columns: logger.warning("Missing heading angle data for coordinate transformation") return None # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶 df['posH_rad'] = np.radians(df['posH']) # 转换加速度到车身坐标系 df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad']) df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad']) df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df))) # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"vdv_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'a_x_body': df['a_x_body'], 'a_y_body': df['a_y_body'], 'a_z_body': df['a_z_body'], 'v': df['v'], 'min_threshold': min_threshold, 'max_threshold': max_threshold, 'vdv_value': vdv_value }) df_csv.to_csv(csv_filename, index=False) logger.info(f"VDV data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) plt.figure(figsize=(12, 8)) # 绘制三轴加速度 plt.subplot(3, 1, 1) plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration') # 添加阈值线 if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None: min_threshold = df['min_threshold'].iloc[0] plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})') if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None: max_threshold = df['max_threshold'].iloc[0] plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body X-axis Acceleration (Longitudinal)') plt.grid(True) plt.legend() plt.subplot(3, 1, 2) plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body Y-axis Acceleration (Lateral)') plt.grid(True) plt.legend() plt.subplot(3, 1, 3) plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') vdv_value = df['vdv_value'].iloc[0] if 'vdv_value' in df.columns else 0 plt.title(f'Body Z-axis Acceleration (Vertical) - VDV value: {vdv_value:.4f}') plt.grid(True) plt.legend() plt.tight_layout() # 保存图像 chart_filename = os.path.join(output_dir, f"vdv_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"VDV chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate VDV chart: {str(e)}", exc_info=True) return None def generate_ava_vav_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate AVA_VAV (Average Vibration Acceleration Value) metric chart with data saved to CSV first. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() ava_vav_value = comfort_calculator.calculated_value.get('ava_vav', 0) if df.empty: logger.warning("Cannot generate AVA_VAV chart: empty data") return None # 确保有必要的列 if 'accelX' not in df.columns or 'accelY' not in df.columns: logger.warning("Missing required columns for AVA_VAV chart") return None # 获取阈值 thresholds = get_metric_thresholds(comfort_calculator, 'ava_vav') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 将东北天坐标系下的加速度转换为车身坐标系下的加速度 if 'posH' not in df.columns: logger.warning("Missing heading angle data for coordinate transformation") return None # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶 df['posH_rad'] = np.radians(df['posH']) # 转换加速度到车身坐标系 df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad']) df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad']) df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df))) # 角速度数据 df['omega_roll'] = df['rollRate'] if 'rollRate' in df.columns else pd.Series(np.zeros(len(df))) df['omega_pitch'] = df['pitchRate'] if 'pitchRate' in df.columns else pd.Series(np.zeros(len(df))) df['omega_yaw'] = df['speedH'] # 使用航向角速度作为偏航角速度 # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"ava_vav_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'a_x_body': df['a_x_body'], 'a_y_body': df['a_y_body'], 'a_z_body': df['a_z_body'], 'omega_roll': df['omega_roll'], 'omega_pitch': df['omega_pitch'], 'omega_yaw': df['omega_yaw'], 'min_threshold': min_threshold, 'max_threshold': max_threshold, 'ava_vav_value': ava_vav_value }) df_csv.to_csv(csv_filename, index=False) logger.info(f"AVA_VAV data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) plt.figure(figsize=(12, 10)) # 绘制三轴加速度 plt.subplot(3, 2, 1) plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration') # 添加阈值线 if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None: min_threshold = df['min_threshold'].iloc[0] plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})') if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None: max_threshold = df['max_threshold'].iloc[0] plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body X-axis Acceleration (Longitudinal)') plt.grid(True) plt.legend() plt.subplot(3, 2, 3) plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body Y-axis Acceleration (Lateral)') plt.grid(True) plt.legend() plt.subplot(3, 2, 5) plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body Z-axis Acceleration (Vertical)') plt.grid(True) plt.legend() # 绘制三轴角速度 plt.subplot(3, 2, 2) plt.plot(df['simTime'], df['omega_roll'], 'r-', label='Roll Rate') plt.xlabel('Time (s)') plt.ylabel('Angular Velocity (deg/s)') plt.title('Roll Rate') plt.grid(True) plt.legend() plt.subplot(3, 2, 4) plt.plot(df['simTime'], df['omega_pitch'], 'g-', label='Pitch Rate') plt.xlabel('Time (s)') plt.ylabel('Angular Velocity (deg/s)') plt.title('Pitch Rate') plt.grid(True) plt.legend() plt.subplot(3, 2, 6) plt.plot(df['simTime'], df['omega_yaw'], 'b-', label='Yaw Rate') plt.xlabel('Time (s)') plt.ylabel('Angular Velocity (deg/s)') ava_vav_value = df['ava_vav_value'].iloc[0] if 'ava_vav_value' in df.columns else 0 plt.title(f'Yaw Rate - AVA_VAV value: {ava_vav_value:.4f}') plt.grid(True) plt.legend() plt.tight_layout() # 保存图像 chart_filename = os.path.join(output_dir, f"ava_vav_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"AVA_VAV chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate AVA_VAV chart: {str(e)}", exc_info=True) return None def generate_msdv_chart(comfort_calculator, output_dir: str) -> Optional[str]: """ Generate MSDV (Motion Sickness Dose Value) metric chart with data saved to CSV first. This version first saves data to CSV, then uses the CSV to generate the chart. Args: comfort_calculator: ComfortCalculator instance output_dir: Output directory Returns: str: Chart file path, or None if generation fails """ logger = LogManager().get_logger() try: # 获取数据 df = comfort_calculator.ego_df.copy() msdv_value = comfort_calculator.calculated_value.get('msdv', 0) motion_sickness_prob = comfort_calculator.calculated_value.get('motionSickness', 0) if df.empty: logger.warning("Cannot generate MSDV chart: empty data") return None # 确保有必要的列 if 'accelX' not in df.columns or 'accelY' not in df.columns: logger.warning("Missing required columns for MSDV chart") return None # 获取阈值 thresholds = get_metric_thresholds(comfort_calculator, 'msdv') min_threshold = thresholds.get('min') max_threshold = thresholds.get('max') # 将东北天坐标系下的加速度转换为车身坐标系下的加速度 if 'posH' not in df.columns: logger.warning("Missing heading angle data for coordinate transformation") return None # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶 df['posH_rad'] = np.radians(df['posH']) # 转换加速度到车身坐标系 df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad']) df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad']) df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df))) # 生成时间戳 import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 CSV 数据(第一步) csv_filename = os.path.join(output_dir, f"msdv_data_{timestamp}.csv") df_csv = pd.DataFrame({ 'simTime': df['simTime'], 'a_x_body': df['a_x_body'], 'a_y_body': df['a_y_body'], 'a_z_body': df['a_z_body'], 'v': df['v'], 'min_threshold': min_threshold, 'max_threshold': max_threshold, 'msdv_value': msdv_value, 'motion_sickness_prob': motion_sickness_prob }) df_csv.to_csv(csv_filename, index=False) logger.info(f"MSDV data saved to: {csv_filename}") # 第二步:从 CSV 读取(可验证保存数据无误) df = pd.read_csv(csv_filename) # 创建图表(第三步) plt.figure(figsize=(12, 8)) # 绘制三轴加速度 plt.subplot(3, 1, 1) plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration') # 添加阈值线 if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None: min_threshold = df['min_threshold'].iloc[0] plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})') if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None: max_threshold = df['max_threshold'].iloc[0] plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body X-axis Acceleration (Longitudinal)') plt.grid(True) plt.legend() plt.subplot(3, 1, 2) plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') plt.title('Body Y-axis Acceleration (Lateral)') plt.grid(True) plt.legend() plt.subplot(3, 1, 3) plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration') plt.xlabel('Time (s)') plt.ylabel('Acceleration (m/s²)') msdv_value = df['msdv_value'].iloc[0] if 'msdv_value' in df.columns else 0 motion_sickness_prob = df['motion_sickness_prob'].iloc[0] if 'motion_sickness_prob' in df.columns else 0 plt.title(f'Body Z-axis Acceleration (Vertical) - MSDV: {msdv_value:.4f}, Motion Sickness Probability: {motion_sickness_prob:.2f}%') plt.grid(True) plt.legend() plt.tight_layout() # 保存图像 chart_filename = os.path.join(output_dir, f"msdv_chart_{timestamp}.png") plt.savefig(chart_filename, dpi=300) plt.close() logger.info(f"MSDV chart saved to: {chart_filename}") return chart_filename except Exception as e: logger.error(f"Failed to generate MSDV chart: {str(e)}", exc_info=True) return None def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """Generate chart data for traffic metrics""" # 待实现 return None def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]: """Generate chart data for function metrics""" # 待实现 return None