#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/06/25 @Last Modified: 2025/04/25 @Summary: 舒适性指标计算模块 """ import scipy.signal import pandas as pd import numpy as np import os from pathlib import Path from typing import Dict, List, Any, Optional, Callable, Union, Tuple from modules.lib.score import Score from modules.lib.common import get_interpolation from modules.lib import data_process from modules.lib.log_manager import LogManager from modules.lib.chart_generator import generate_comfort_chart_data # ---------------------- # 指标计算函数 # ---------------------- def calculate_zigzag(data_processed, plot_path) -> dict: """计算蛇行指标""" comfort = ComfortCalculator(data_processed) zigzag_count = comfort.calculate_zigzag_count(plot_path) return {"zigzag": float(zigzag_count)} def calculate_shake(data_processed, plot_path) -> dict: """计算晃动指标""" comfort = ComfortCalculator(data_processed) shake_count = comfort.calculate_shake_count(plot_path) return {"shake": float(shake_count)} def calculate_cadence(data_processed, plot_path) -> dict: """计算顿挫指标""" comfort = ComfortCalculator(data_processed) cadence_count = comfort.calculate_cadence_count(plot_path) return {"cadence": float(cadence_count)} def calculate_topbrake(data_processed, plot_path) -> dict: """计算点杀指标""" comfort = ComfortCalculator(data_processed) topBrake_count = comfort.calculate_top_brake_count(plot_path) return {"topBrake": float(topBrake_count)} def calculate_slambrake(data_processed, plot_path) -> dict: """计算急刹车指标""" comfort = ComfortCalculator(data_processed) slam_brake_count = comfort.calculate_slam_brake_count(plot_path) return {"slamBrake": float(slam_brake_count)} def calculate_slamaccelerate(data_processed, plot_path) -> dict: """计算急加速指标""" comfort = ComfortCalculator(data_processed) slam_accel_count = comfort.calculate_slam_accel_count(plot_path) return {"slamAccelerate": float(slam_accel_count)} def calculate_sampling_frequency(df, time_column='simTime', default_fs=25): """计算时间序列数据的采样频率""" if len(df) < 2 or time_column not in df.columns: return default_fs if not df[time_column].is_monotonic_increasing: df = df.sort_values(time_column) time_diffs = df[time_column].diff().dropna() if time_diffs.empty or (time_diffs <= 0).any(): return default_fs median_time_diff = time_diffs.median() return 1.0 / median_time_diff if median_time_diff > 0 else default_fs # ---------------------- # 注册器与管理类 # ---------------------- class ComfortRegistry: """舒适性指标注册器""" def __init__(self, data_processed, plot_path): self.logger = LogManager().get_logger() self.data = data_processed self.output_dir = plot_path if not hasattr(data_processed, 'comfort_config') or not data_processed.comfort_config: self.logger.warning("舒适性配置为空,跳过舒适性指标计算") self.comfort_config = {} self.metrics = [] self._registry = {} return self.comfort_config = data_processed.comfort_config.get("comfort", {}) self.metrics = self._extract_metrics(self.comfort_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """DFS遍历提取指标""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) return metrics def _build_registry(self) -> dict: """自动注册指标函数""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" try: registry[metric_name] = globals()[func_name] except KeyError: self.logger.error(f"未实现指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data, self.output_dir) results.update(result) except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None return results class ComfortManager: """舒适性指标计算主类""" def __init__(self, data_processed, plot_path): self.data = data_processed self.logger = LogManager().get_logger() self.plot_path = plot_path if not hasattr(data_processed, 'comfort_config') or not data_processed.comfort_config: self.logger.warning("舒适性配置为空,跳过舒适性指标计算初始化") self.registry = None else: self.registry = ComfortRegistry(self.data, self.plot_path) def report_statistic(self): """生成舒适性评分报告""" if self.registry is None: self.logger.info("舒适性指标管理器未初始化,返回空结果") return {} return self.registry.batch_execute() # ---------------------- # 舒适性计算核心类 # ---------------------- class ComfortCalculator: """舒适性指标计算核心类""" def __init__(self, data_processed): self.data_processed = data_processed self.logger = LogManager().get_logger() self.data = data_processed.ego_data self.ego_df = pd.DataFrame() self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.COMFORT_INFO = [ "simTime", "simFrame", "speedX", "speedY", "accelX", "accelY", "curvHor", "lightMask", "v", "speedH", "accelH", "posH", "lon_v_vehicle", "lat_v_vehicle", "lat_acc_vehicle", "lon_acc_vehicle", "lat_acc_rate", "lon_acc_rate" ] self.calculated_value = { 'zigzag': 0, 'shake': 0, 'cadence': 0, 'topBrake': 0, 'slamBrake': 0, 'slamAccelerate': 0 } self._initialize_data() self.fs = calculate_sampling_frequency(self.ego_df) self.logger.info(f"采样频率: {self.fs} Hz") self.zigzag_count = 0 self.shake_count = 0 self.cadence_count = 0 self.slam_brake_count = 0 self.slam_accel_count = 0 self.zigzag_time_list = [] self.zigzag_stre_list = [] self.shake_events = [] def _initialize_data(self): """初始化数据""" self.ego_df = self.data[self.COMFORT_INFO].copy() self.df = self.ego_df.reset_index(drop=True) self._prepare_comfort_parameters() def _prepare_comfort_parameters(self): """准备舒适性计算所需参数""" speed_field = 'lon_v_vehicle' self.ego_df['ip_acc'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, 4], point2=[72, 2]) self.ego_df['ip_dec'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5]) acc_field = 'lon_acc_vehicle' self.ego_df['slam_brake'] = (self.ego_df[acc_field] - self.ego_df['ip_dec']).apply( lambda x: 1 if x < 0 else 0) self.ego_df['slam_accel'] = (self.ego_df[acc_field] - self.ego_df['ip_acc']).apply( lambda x: 1 if x > 0 else 0) # ---------------------- # 指标计算公共接口 # ---------------------- def calculate_zigzag_count(self, plot_path): """计算蛇行指标""" zigzag_events = self._detect_zigzag_events_new() self.log_events('zigzag', zigzag_events) self.generate_metric_chart('zigzag', plot_path) return len(zigzag_events) def calculate_shake_count(self, plot_path): """计算晃动指标""" shake_events = self._shake_detector() self.log_events('shake', shake_events) self.generate_metric_chart('shake', plot_path) return len(shake_events) def calculate_cadence_count(self, plot_path): """计算顿挫指标""" cadence_events = self._cadence_detector() self.log_events('cadence', cadence_events) self.generate_metric_chart('cadence', plot_path) return len(cadence_events) def calculate_top_brake_count(self, plot_path): """计算点杀指标""" top_brake_events = self._top_brake_detector() self.log_events('topBrake', top_brake_events) return len(top_brake_events) def calculate_slam_brake_count(self, plot_path): """计算急刹车指标""" slam_brake_events = self._slam_brake_detector() self.log_events('slamBrake', slam_brake_events) self.generate_metric_chart('slamBrake', plot_path) return len(slam_brake_events) def calculate_slam_accel_count(self, plot_path): """计算急加速指标""" slam_accel_events = self._slam_accel_detector() self.log_events('slamAccelerate', slam_accel_events) self.generate_metric_chart('slamaccelerate', plot_path) return len(slam_accel_events) # ---------------------- # 事件检测核心方法 # ---------------------- def _detect_zigzag_events_new(self, window_size=10.0, min_zcr=2, min_theta_range=5.0, max_theta_range=30.0): """检测画龙事件""" df = self.ego_df.copy() if 'speedH' not in df.columns or 'posH' not in df.columns: self.logger.warning("缺少航向角速度或航向角度数据,无法进行画龙检测") return [] df = df.sort_values('simTime') df['time_diff'] = df['simTime'].diff().fillna(0) fs = self.fs df['theta'] = df['posH'] df['omega'] = df['speedH'] if 'v' in df.columns: df['speed_kmh'] = df['v'] * 3.6 else: df['speed_kmh'] = 0.0 if len(df) > 10: b, a = scipy.signal.butter(2, 2 / (fs / 2), btype='low') df['theta_filtered'] = scipy.signal.filtfilt(b, a, df['theta']) df['omega_filtered'] = scipy.signal.filtfilt(b, a, df['omega']) else: df['theta_filtered'] = df['theta'] df['omega_filtered'] = df['omega'] df['theta_diff'] = df['theta_filtered'].diff().fillna(0) df['theta_diff2'] = df['theta_diff'].diff().fillna(0) def get_min_omega_threshold(speed_kmh): if speed_kmh < 10: return 10.0 elif speed_kmh < 30: return 6.0 elif speed_kmh < 60: return 4.0 elif speed_kmh < 90: return 3.0 else: return 2.5 window_points = int(window_size * fs) if window_points < 5: window_points = 5 zigzag_events = [] current_event = None for i in range(window_points, len(df)): window_data = df.iloc[i - window_points:i] omega_sign = np.sign(window_data['omega_filtered']) sign_changes = np.sum(np.abs(np.diff(omega_sign)) > 1.5) if sign_changes == 0: continue avg_speed_kmh = window_data['speed_kmh'].mean() dynamic_min_omega = get_min_omega_threshold(avg_speed_kmh) omega_over_threshold = np.any(np.abs(window_data['omega_filtered']) > dynamic_min_omega) theta_range = np.max(window_data['theta_filtered']) - np.min(window_data['theta_filtered']) theta_diff_sum = window_data['theta_diff'].sum() theta_diff_abs_sum = np.abs(window_data['theta_diff']).sum() direction_consistency = np.abs(theta_diff_sum) / theta_diff_abs_sum if theta_diff_abs_sum > 0 else 0 theta_diff2_sign = np.sign(window_data['theta_diff2']) theta_diff2_sign_changes = np.sum(np.abs(np.diff(theta_diff2_sign)) > 1.5) is_likely_turn = direction_consistency > 0.7 and theta_range > 10.0 has_zigzag_pattern = direction_consistency < 0.5 and theta_diff2_sign_changes >= 3 is_zigzag = ( sign_changes >= min_zcr and omega_over_threshold and min_theta_range <= theta_range <= max_theta_range and not is_likely_turn and has_zigzag_pattern ) current_time = df.iloc[i]['simTime'] if is_zigzag and current_event is None: current_event = { 'start_time': current_time - window_size, 'start_frame': df.iloc[i - window_points]['simFrame'], 'end_time': current_time, 'end_frame': df.iloc[i]['simFrame'], } elif is_zigzag and current_event is not None: current_event['end_time'] = current_time current_event['end_frame'] = df.iloc[i]['simFrame'] elif not is_zigzag and current_event is not None: duration = current_event['end_time'] - current_event['start_time'] if duration >= window_size: zigzag_events.append(current_event) self._add_event_to_df(current_event, 'zigzag') current_event = None if current_event is not None: duration = current_event['end_time'] - current_event['start_time'] if duration >= window_size: zigzag_events.append(current_event) self._add_event_to_df(current_event, 'zigzag') self.zigzag_count = len(zigzag_events) return zigzag_events def _shake_detector(self, T_diff=0.5): """检测晃动事件""" df = self.ego_df.copy() if 'lat_acc_vehicle' not in df.columns: self.logger.warning("缺少计算晃动指标所需的数据列") return [] window_size = 25 df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std() v0 = 20 * 5 / 18 k = 0.008 * 3.6 df['lat_acc_threshold'] = df['v'].apply( lambda speed: max(1.0, min(1.8, 1.8 - k * (speed - v0)) )) df['speedH_threshold'] = df['v'].apply( lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)) )) condition_A = df['lat_acc_vehicle'].abs() > df['lat_acc_threshold'] condition_B = df['lat_acc_rate'].abs() > 0.5 condition_C = (df['speedH_std'] > df['speedH_threshold']) shake_condition = condition_A & (condition_B | condition_C) event_groups = (shake_condition != shake_condition.shift()).cumsum() shake_events = [] for _, group in df[shake_condition].groupby(event_groups): if len(group) >= 10: start_time = group['simTime'].iloc[0] end_time = group['simTime'].iloc[-1] duration = end_time - start_time if duration >= T_diff: shake_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], }) self._add_event_to_df({ 'start_time': start_time, 'end_time': end_time, 'start_frame': group['simFrame'].iloc[0], 'end_frame': group['simFrame'].iloc[-1], }, 'shake') self.shake_count = len(shake_events) self.ego_df = df.copy() self.shake_events = shake_events return shake_events def _cadence_detector(self): """检测顿挫事件 - 通过连续的加减速动作来识别顿挫。 要求: 1. 连续加减速动作间隔不超过2个采样周期 2. 每组连续加减速动作至少要有3个及以上 3. 两个顿挫事件之间的时间间隔至少要1秒以上 """ # import matplotlib.pyplot as plt # import matplotlib.dates as mdates # from matplotlib.patches import Rectangle df = self.ego_df.copy() required_fields = ['simTime', 'simFrame', 'lon_v_vehicle', 'lon_acc_vehicle', 'lon_acc_rate'] if not all(field in df.columns for field in required_fields): missing_fields = [field for field in required_fields if field not in df.columns] self.logger.warning(f"顿挫检测缺少必要字段: {missing_fields}") return [] # 确保数据按时间排序 df = df.sort_values('simTime') if len(df) < 10: return [] # 计算采样频率和周期 fs = self.fs sample_period = 1.0 / fs max_interval = 2 * sample_period # 最大允许间隔为2个采样周期 min_event_interval = 1.0 # 两个顿挫事件之间的最小间隔(秒) # 数据预处理和平滑 if len(df) > 10: # b, a = scipy.signal.butter(2, 2 / (fs / 2), btype='low') # df['acc_filtered'] = scipy.signal.filtfilt(b, a, df['lon_acc_vehicle']) # ============ 双通道滤波 ============ cutoff = 5 # Hz,具体可根据采样频率和车辆动力特性调整 b, a = scipy.signal.butter(2, cutoff / (fs / 2), btype='low') df['acc_filtered'] = scipy.signal.filtfilt(b, a, df['lon_acc_vehicle']) else: df['acc_filtered'] = df['lon_acc_vehicle'] # 检测加速度方向变化 df['acc_direction'] = np.sign(df['acc_filtered']) df['acc_direction_change'] = (df['acc_direction'] != df['acc_direction'].shift(1)).astype(int) # 设置阈值 acc_threshold = 0.05 # 加速度阈值 min_changes = 3 # 最小连续变化次数 # 检测顿挫事件 cadence_events = [] current_changes = [] last_event_end = df['simTime'].iloc[0] - min_event_interval # 初始化为第一个时间点减去间隔 for i in range(1, len(df)): current_time = df.iloc[i]['simTime'] current_frame = df.iloc[i]['simFrame'] if df.iloc[i]['acc_direction_change'] and abs(df.iloc[i]['acc_filtered']) > acc_threshold: if not current_changes or (current_time - current_changes[-1]['time'] <= max_interval): current_changes.append({ 'time': current_time, 'frame': current_frame }) else: # 如果间隔过大,检查之前的变化是否构成顿挫事件 if len(current_changes) >= min_changes: start_time = current_changes[0]['time'] end_time = current_changes[-1]['time'] # 检查与上一个事件的时间间隔 if start_time - last_event_end >= min_event_interval: event = { 'start_time': start_time, 'start_frame': current_changes[0]['frame'], 'end_time': end_time, 'end_frame': current_changes[-1]['frame'] } cadence_events.append(event) self._add_event_to_df(event, 'cadence') last_event_end = end_time # 开始新的变化序列 current_changes = [{ 'time': current_time, 'frame': current_frame }] # 处理最后一组变化 if len(current_changes) >= min_changes: start_time = current_changes[0]['time'] end_time = current_changes[-1]['time'] if start_time - last_event_end >= min_event_interval: event = { 'start_time': start_time, 'start_frame': current_changes[0]['frame'], 'end_time': end_time, 'end_frame': current_changes[-1]['frame'] } cadence_events.append(event) self._add_event_to_df(event, 'cadence') self.cadence_count = len(cadence_events) # # ======================== New plotting code added ======================== # if len(df) > 0: # try: # # Create figure and axes # plt.figure(figsize=(15, 8)) # # Plot original and filtered acceleration # plt.plot(df['simTime'], df['lon_acc_vehicle'], # label='Original Acceleration', alpha=0.6, color='blue') # plt.plot(df['simTime'], df['acc_filtered'], # label='Filtered Acceleration', linewidth=2, color='red') # # Mark direction change points # change_points = df[df['acc_direction_change'] == 1] # plt.scatter(change_points['simTime'], change_points['acc_filtered'], # color='green', s=50, zorder=5, label='Direction Change Points') # # Mark points exceeding threshold # threshold_points = df[abs(df['acc_filtered']) > acc_threshold] # plt.scatter(threshold_points['simTime'], threshold_points['acc_filtered'], # color='purple', s=20, zorder=4, alpha=0.5, label='Exceeds Threshold Points') # # Plot event regions # for event in cadence_events: # start = event['start_time'] # end = event['end_time'] # plt.axvspan(start, end, alpha=0.3, color='orange', label='cadence_events' if 'cadence' not in plt.gca().get_legend_handles_labels()[1] else "") # # Add event labels # mid_time = start + (end - start) / 2 # plt.text(mid_time, plt.ylim()[1]*0.9, f'Cadence {len(cadence_events.index(event))+1}', # ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8)) # # Add threshold lines # plt.axhline(y=acc_threshold, color='gray', linestyle='--', alpha=0.7) # plt.axhline(y=-acc_threshold, color='gray', linestyle='--', alpha=0.7) # plt.text(df['simTime'].iloc[-1], acc_threshold+0.05, f'Threshold: {acc_threshold} m/s²', # ha='right', color='gray') # # Set legend and title # plt.title(f'Cadence Event Detection (Total Events Detected: {len(cadence_events)})') # plt.xlabel('Time (s)') # plt.ylabel('Longitudinal Acceleration (m/s²)') # plt.legend(loc='upper right') # plt.grid(True, linestyle='--', alpha=0.6) # # Auto-adjust time axis format # if (df['simTime'].max() - df['simTime'].min()) > 60: # plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) # else: # plt.gca().xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:.1f}s')) # plt.tight_layout() # # Save plot # plot_path = f"./cadence_detection.png" # plt.savefig(plot_path, dpi=150) # plt.close() # self.logger.info(f"Cadence detection results saved to: {plot_path}") # except Exception as e: # self.logger.error(f"Error plotting cadence detection graph: {str(e)}") # # ======================== End of plotting code ======================== return cadence_events def _top_brake_detector(self): """ Point Brake Detector (Top Brake) - Waveform analysis based on jerk signal This function identifies "point brake" events, defined as independent, sudden braking actions with intensity between normal braking and emergency braking. It distinguishes from continuous acceleration/deceleration "cadence" phenomena by analyzing individual complete waveforms in the jerk signal. Point Brake Characteristics: 1. Independent event: Starts and ends in a relatively stable state (acceleration near zero). 2. Braking intensity: Deceleration stronger than normal braking but weaker than emergency braking. 3. Suddenness: Peak jerk must be large enough to indicate rapid acceleration change. 4. Duration: Reasonable duration (e.g., 0.2 to 1.5 seconds). """ # import matplotlib.pyplot as plt # from matplotlib.patches import Rectangle # import matplotlib.dates as mdates # 1. Data Preparation & Checks df = self.ego_df.copy() required_cols = ['simTime', 'simFrame', 'lon_acc_rate', 'lon_acc_vehicle', 'v', 'ip_dec'] if any(col not in df.columns for col in required_cols): missing = [c for c in required_cols if c not in df.columns] self.logger.warning(f"Point brake detection missing required fields: {missing}") return [] fs = self.fs if len(df) < fs: # Need at least 1 second of data return [] # Define Key Thresholds NORMAL_BRAKE_THRESHOLD = -1.2 # m/s^2 MIN_PEAK_JERK = 3.0 # m/s^3 MERGE_TIME_GAP_THRESHOLD = (1/fs)*2 # seconds # 2. Smooth Jerk Signal window_length = max(5, int(fs * 0.2) // 2 * 2 + 1) # Must be odd if len(df) > window_length: df['jerk_filtered'] = scipy.signal.savgol_filter(df['lon_acc_rate'], window_length, 2) else: df['jerk_filtered'] = df['lon_acc_rate'] # 3. Find Jerk Zero-Crossings df['jerk_sign'] = np.sign(df['jerk_filtered']) zero_crossings = df.index[df['jerk_sign'].ne(df['jerk_sign'].shift(1)) & df['jerk_sign'].ne(0)].tolist() if len(zero_crossings) < 2: return [] # 4. Identify Waveforms and Validate Events top_brake_events = [] for i in range(len(zero_crossings) - 1): start_idx = zero_crossings[i] end_idx = zero_crossings[i+1] event_slice = df.loc[start_idx:end_idx] if len(event_slice) < 2: continue # a. Duration Validation duration = event_slice['simTime'].iloc[-1] - event_slice['simTime'].iloc[0] if not ((1/fs)*3 <= duration <= 1.5): continue min_acc = event_slice['lon_acc_vehicle'].min() emergency_brake_threshold = df.loc[start_idx, 'ip_dec'] # b. Braking Intensity Validation is_top_brake_strength = min_acc < NORMAL_BRAKE_THRESHOLD and min_acc > emergency_brake_threshold if not is_top_brake_strength: continue # c. Waveform Amplitude Validation peak_jerk = event_slice['jerk_filtered'].abs().max() if peak_jerk < MIN_PEAK_JERK: continue # d. Isolated Event Validation acc_at_start = df.loc[start_idx, 'lon_acc_vehicle'] acc_at_end = df.loc[end_idx, 'lon_acc_vehicle'] if not (abs(acc_at_start) < 0.5 or abs(acc_at_end) < 0.5): continue # 5. Create Valid Event event = { 'start_time': df.loc[start_idx, 'simTime'], 'end_time': df.loc[end_idx, 'simTime'], 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], 'peak_jerk': peak_jerk, 'min_acc': min_acc } top_brake_events.append(event) # 6. Post-processing: Merge very close events if not top_brake_events: return [] merged_events = [] current_event = top_brake_events[0] for next_event in top_brake_events[1:]: time_gap = next_event['start_time'] - current_event['end_time'] if time_gap < MERGE_TIME_GAP_THRESHOLD: current_event['end_time'] = next_event['end_time'] current_event['end_frame'] = next_event['end_frame'] current_event['peak_jerk'] = max(current_event['peak_jerk'], next_event['peak_jerk']) current_event['min_acc'] = min(current_event['min_acc'], next_event['min_acc']) else: merged_events.append(current_event) current_event = next_event merged_events.append(current_event) for event in merged_events: self._add_event_to_df(event, 'topBrake_merged') # # ===================== Visualization ===================== # try: # plt.figure(figsize=(14, 10)) # plt.suptitle(f'Point Brake Detection Analysis (Detected: {len(merged_events)} events)', fontsize=16) # # 1. Jerk Analysis # ax1 = plt.subplot(3, 1, 1) # plt.plot(df['simTime'], df['lon_acc_rate'], 'b-', alpha=0.5, label='Raw Jerk') # plt.plot(df['simTime'], df['jerk_filtered'], 'r-', label='Filtered Jerk', linewidth=1.5) # # Mark zero crossings # zero_points = df.loc[zero_crossings] # plt.scatter(zero_points['simTime'], zero_points['jerk_filtered'], # c='green', s=40, marker='o', label='Zero Crossings', zorder=5) # # Threshold lines # plt.axhline(y=MIN_PEAK_JERK, color='gray', linestyle='--', alpha=0.7) # plt.axhline(y=-MIN_PEAK_JERK, color='gray', linestyle='--', alpha=0.7) # plt.text(df['simTime'].iloc[-1], MIN_PEAK_JERK+0.2, f'Jerk Threshold: ±{MIN_PEAK_JERK} m/s³', # ha='right', color='gray') # plt.ylabel('Jerk (m/s³)') # plt.title('Jerk Signal Analysis') # plt.grid(True, linestyle='--', alpha=0.6) # plt.legend(loc='upper right') # # 2. Acceleration Analysis # ax2 = plt.subplot(3, 1, 2, sharex=ax1) # plt.plot(df['simTime'], df['lon_acc_vehicle'], 'g-', label='Longitudinal Acceleration') # # Threshold lines # plt.axhline(y=0, color='k', linestyle='-', alpha=0.5) # plt.axhline(y=NORMAL_BRAKE_THRESHOLD, color='orange', linestyle='--', alpha=0.8) # # Mark emergency brake threshold (varies over time) # plt.plot(df['simTime'], df['ip_dec'], 'm--', label='Emergency Brake Threshold') # plt.ylabel('Acceleration (m/s²)') # plt.title('Acceleration Analysis') # plt.grid(True, linestyle='--', alpha=0.6) # plt.legend(loc='upper right') # # 3. Velocity Analysis # ax3 = plt.subplot(3, 1, 3, sharex=ax1) # plt.plot(df['simTime'], df['v'], 'b-', label='Vehicle Speed') # plt.ylabel('Speed (m/s)') # plt.xlabel('Time (s)') # plt.title('Vehicle Speed') # plt.grid(True, linestyle='--', alpha=0.6) # # Highlight detected events across all subplots # for event in merged_events: # start = event['start_time'] # end = event['end_time'] # duration = end - start # # Add shaded region # for ax in [ax1, ax2, ax3]: # ax.axvspan(start, end, alpha=0.2, color='red') # # Add event info on acceleration plot # ax2.text((start+end)/2, event['min_acc']-0.2, # f"Min Acc: {event['min_acc']:.2f} m/s²\nPeak Jerk: {event['peak_jerk']:.2f} m/s³", # ha='center', va='top', fontsize=9, # bbox=dict(facecolor='white', alpha=0.8, edgecolor='red')) # # Add event info on speed plot # ax3.text((start+end)/2, df['v'].min() + 0.1*(df['v'].max()-df['v'].min()), # f"Point Brake\n{duration:.2f}s", # ha='center', fontsize=10, color='red', # bbox=dict(facecolor='white', alpha=0.8)) # # Format x-axis for time # if (df['simTime'].max() - df['simTime'].min()) > 60: # plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) # else: # plt.gca().xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:.1f}s')) # plt.tight_layout(rect=[0, 0, 1, 0.96]) # Make room for suptitle # # Save the plot # plot_path = f"./top_brake_analysis.png" # plt.savefig(plot_path, dpi=150) # plt.close() # self.logger.info(f"Point brake analysis plot saved to: {plot_path}") # except Exception as e: # self.logger.error(f"Error generating point brake analysis plot: {str(e)}") # # ===================== End Visualization ===================== return merged_events def _slam_brake_detector(self): """检测急刹车事件""" df = self.ego_df.copy() if 'slam_brake' not in df.columns: self.logger.warning("缺少计算急刹车指标所需的数据列") return min_duration = 0.5 slam_brake_events = [] in_event = False start_idx = 0 for i, row in df.iterrows(): if row['slam_brake'] == 1 and not in_event: in_event = True start_idx = i elif row['slam_brake'] == 0 and in_event: in_event = False end_idx = i - 1 start_time = df.loc[start_idx, 'simTime'] end_time = df.loc[end_idx, 'simTime'] duration = end_time - start_time if duration >= min_duration: slam_brake_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }) self._add_event_to_df({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }, 'slam_brake') if in_event: end_idx = len(df) - 1 start_time = df.loc[start_idx, 'simTime'] end_time = df.loc[end_idx, 'simTime'] duration = end_time - start_time if duration >= min_duration: slam_brake_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }) self._add_event_to_df({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }, 'slam_brake') self.slam_brake_count = len(slam_brake_events) return slam_brake_events def _slam_accel_detector(self): """检测急加速事件""" df = self.ego_df.copy() if 'slam_accel' not in df.columns: self.logger.warning("缺少计算急加速指标所需的数据列") return min_duration = 0.5 slam_accel_events = [] in_event = False start_idx = 0 for i, row in df.iterrows(): if row['slam_accel'] == 1 and not in_event: in_event = True start_idx = i elif row['slam_accel'] == 0 and in_event: in_event = False end_idx = i - 1 start_time = df.loc[start_idx, 'simTime'] end_time = df.loc[end_idx, 'simTime'] duration = end_time - start_time if duration >= min_duration: slam_accel_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }) self._add_event_to_df({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }, 'slam_accel') if in_event: end_idx = len(df) - 1 start_time = df.loc[start_idx, 'simTime'] end_time = df.loc[end_idx, 'simTime'] duration = end_time - start_time if duration >= min_duration: slam_accel_events.append({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }) self._add_event_to_df({ 'start_time': start_time, 'end_time': end_time, 'start_frame': df.loc[start_idx, 'simFrame'], 'end_frame': df.loc[end_idx, 'simFrame'], }, 'slam_accel') self.slam_accel_count = len(slam_accel_events) return slam_accel_events # ---------------------- # 辅助方法 # ---------------------- def _add_event_to_df(self, event, event_type): """添加事件到数据框""" new_row = pd.DataFrame([{ 'start_time': event['start_time'], 'end_time': event['end_time'], 'start_frame': event['start_frame'], 'end_frame': event['end_frame'], 'type': event_type }]) self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True) def log_events(self, metric_name: str, events: list): """记录指标事件到日志""" if not events: self.logger.info(f"未检测到 {metric_name} 事件") return self.logger.info(f"检测到 {len(events)} 个 {metric_name} 事件:") for i, event in enumerate(events): duration = event.get('end_time', 0) - event.get('start_time', 0) self.logger.info( f"{metric_name} 事件 #{i+1}: " f"开始时间={event.get('start_time', 'N/A'):.2f}s, " f"结束时间={event.get('end_time', 'N/A'):.2f}s, " f"持续时间={duration:.2f}s, " f"开始帧={event.get('start_frame', 'N/A')}, " f"结束帧={event.get('end_frame', 'N/A')}" ) def generate_metric_chart(self, metric_name: str, plot_path: Path) -> None: """生成指标图表""" if not plot_path: plot_path = os.path.join(os.getcwd(), 'data') os.makedirs(plot_path, exist_ok=True) chart_path = generate_comfort_chart_data(self, metric_name, plot_path) if chart_path: self.logger.info(f"{metric_name}图表已生成: {chart_path}")