123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/06/25
- @Last Modified: 2025/04/25
- @Summary: 舒适性指标计算模块
- """
- import scipy.signal
- import pandas as pd
- import numpy as np
- import os
- from pathlib import Path
- from typing import Dict, List, Any, Optional, Callable, Union, Tuple
- from modules.lib.score import Score
- from modules.lib.common import get_interpolation
- from modules.lib import data_process
- from modules.lib.log_manager import LogManager
- from modules.lib.chart_generator import generate_comfort_chart_data
- # ----------------------
- # 指标计算函数
- # ----------------------
- def calculate_zigzag(data_processed, plot_path) -> dict:
- """计算蛇行指标"""
- comfort = ComfortCalculator(data_processed)
- zigzag_count = comfort.calculate_zigzag_count(plot_path)
- return {"zigzag": float(zigzag_count)}
- def calculate_shake(data_processed, plot_path) -> dict:
- """计算晃动指标"""
- comfort = ComfortCalculator(data_processed)
- shake_count = comfort.calculate_shake_count(plot_path)
- return {"shake": float(shake_count)}
- def calculate_cadence(data_processed, plot_path) -> dict:
- """计算顿挫指标"""
- comfort = ComfortCalculator(data_processed)
- cadence_count = comfort.calculate_cadence_count(plot_path)
- return {"cadence": float(cadence_count)}
- def calculate_topbrake(data_processed, plot_path) -> dict:
- """计算点杀指标"""
- comfort = ComfortCalculator(data_processed)
- topBrake_count = comfort.calculate_top_brake_count(plot_path)
- return {"topBrake": float(topBrake_count)}
- def calculate_slambrake(data_processed, plot_path) -> dict:
- """计算急刹车指标"""
- comfort = ComfortCalculator(data_processed)
- slam_brake_count = comfort.calculate_slam_brake_count(plot_path)
- return {"slamBrake": float(slam_brake_count)}
- def calculate_slamaccelerate(data_processed, plot_path) -> dict:
- """计算急加速指标"""
- comfort = ComfortCalculator(data_processed)
- slam_accel_count = comfort.calculate_slam_accel_count(plot_path)
- return {"slamAccelerate": float(slam_accel_count)}
- def calculate_sampling_frequency(df, time_column='simTime', default_fs=25):
- """计算时间序列数据的采样频率"""
- if len(df) < 2 or time_column not in df.columns:
- return default_fs
-
- if not df[time_column].is_monotonic_increasing:
- df = df.sort_values(time_column)
-
- time_diffs = df[time_column].diff().dropna()
-
- if time_diffs.empty or (time_diffs <= 0).any():
- return default_fs
-
- median_time_diff = time_diffs.median()
- return 1.0 / median_time_diff if median_time_diff > 0 else default_fs
- # ----------------------
- # 注册器与管理类
- # ----------------------
- class ComfortRegistry:
- """舒适性指标注册器"""
- def __init__(self, data_processed, plot_path):
- self.logger = LogManager().get_logger()
- self.data = data_processed
- self.output_dir = plot_path
- if not hasattr(data_processed, 'comfort_config') or not data_processed.comfort_config:
- self.logger.warning("舒适性配置为空,跳过舒适性指标计算")
- self.comfort_config = {}
- self.metrics = []
- self._registry = {}
- return
- self.comfort_config = data_processed.comfort_config.get("comfort", {})
- self.metrics = self._extract_metrics(self.comfort_config)
- self._registry = self._build_registry()
- def _extract_metrics(self, config_node: dict) -> list:
- """DFS遍历提取指标"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
- metrics.append(node['name'])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- return metrics
- def _build_registry(self) -> dict:
- """自动注册指标函数"""
- registry = {}
- for metric_name in self.metrics:
- func_name = f"calculate_{metric_name.lower()}"
- try:
- registry[metric_name] = globals()[func_name]
- except KeyError:
- self.logger.error(f"未实现指标函数: {func_name}")
- return registry
- def batch_execute(self) -> dict:
- """批量执行指标计算"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data, self.output_dir)
- results.update(result)
- except Exception as e:
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- return results
- class ComfortManager:
- """舒适性指标计算主类"""
- def __init__(self, data_processed, plot_path):
- self.data = data_processed
- self.logger = LogManager().get_logger()
- self.plot_path = plot_path
- if not hasattr(data_processed, 'comfort_config') or not data_processed.comfort_config:
- self.logger.warning("舒适性配置为空,跳过舒适性指标计算初始化")
- self.registry = None
- else:
- self.registry = ComfortRegistry(self.data, self.plot_path)
- def report_statistic(self):
- """生成舒适性评分报告"""
- if self.registry is None:
- self.logger.info("舒适性指标管理器未初始化,返回空结果")
- return {}
- return self.registry.batch_execute()
- # ----------------------
- # 舒适性计算核心类
- # ----------------------
- class ComfortCalculator:
- """舒适性指标计算核心类"""
- def __init__(self, data_processed):
- self.data_processed = data_processed
- self.logger = LogManager().get_logger()
- self.data = data_processed.ego_data
- self.ego_df = pd.DataFrame()
- self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.COMFORT_INFO = [
- "simTime", "simFrame", "speedX", "speedY", "accelX", "accelY",
- "curvHor", "lightMask", "v", "speedH", "accelH", "posH",
- "lon_v_vehicle", "lat_v_vehicle", "lat_acc_vehicle",
- "lon_acc_vehicle", "lat_acc_rate", "lon_acc_rate"
- ]
- self.calculated_value = {
- 'zigzag': 0, 'shake': 0, 'cadence': 0,
- 'topBrake': 0, 'slamBrake': 0, 'slamAccelerate': 0
- }
- self._initialize_data()
- self.fs = calculate_sampling_frequency(self.ego_df)
- self.logger.info(f"采样频率: {self.fs} Hz")
- self.zigzag_count = 0
- self.shake_count = 0
- self.cadence_count = 0
- self.slam_brake_count = 0
- self.slam_accel_count = 0
- self.zigzag_time_list = []
- self.zigzag_stre_list = []
- self.shake_events = []
- def _initialize_data(self):
- """初始化数据"""
- self.ego_df = self.data[self.COMFORT_INFO].copy()
- self.df = self.ego_df.reset_index(drop=True)
- self._prepare_comfort_parameters()
- def _prepare_comfort_parameters(self):
- """准备舒适性计算所需参数"""
- speed_field = 'lon_v_vehicle'
- self.ego_df['ip_acc'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
- self.ego_df['ip_dec'] = self.ego_df[speed_field].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
- acc_field = 'lon_acc_vehicle'
- self.ego_df['slam_brake'] = (self.ego_df[acc_field] - self.ego_df['ip_dec']).apply(
- lambda x: 1 if x < 0 else 0)
- self.ego_df['slam_accel'] = (self.ego_df[acc_field] - self.ego_df['ip_acc']).apply(
- lambda x: 1 if x > 0 else 0)
- # ----------------------
- # 指标计算公共接口
- # ----------------------
- def calculate_zigzag_count(self, plot_path):
- """计算蛇行指标"""
- zigzag_events = self._detect_zigzag_events_new()
- self.log_events('zigzag', zigzag_events)
- self.generate_metric_chart('zigzag', plot_path)
- return len(zigzag_events)
- def calculate_shake_count(self, plot_path):
- """计算晃动指标"""
- shake_events = self._shake_detector()
- self.log_events('shake', shake_events)
- self.generate_metric_chart('shake', plot_path)
- return len(shake_events)
- def calculate_cadence_count(self, plot_path):
- """计算顿挫指标"""
- cadence_events = self._cadence_detector()
- self.log_events('cadence', cadence_events)
- self.generate_metric_chart('cadence', plot_path)
- return len(cadence_events)
-
- def calculate_top_brake_count(self, plot_path):
- """计算点杀指标"""
- top_brake_events = self._top_brake_detector()
- self.log_events('topBrake', top_brake_events)
- return len(top_brake_events)
-
- def calculate_slam_brake_count(self, plot_path):
- """计算急刹车指标"""
- slam_brake_events = self._slam_brake_detector()
- self.log_events('slamBrake', slam_brake_events)
- self.generate_metric_chart('slamBrake', plot_path)
- return len(slam_brake_events)
- def calculate_slam_accel_count(self, plot_path):
- """计算急加速指标"""
- slam_accel_events = self._slam_accel_detector()
- self.log_events('slamAccelerate', slam_accel_events)
- self.generate_metric_chart('slamaccelerate', plot_path)
- return len(slam_accel_events)
- # ----------------------
- # 事件检测核心方法
- # ----------------------
- def _detect_zigzag_events_new(self, window_size=10.0, min_zcr=2,
- min_theta_range=5.0, max_theta_range=30.0):
- """检测画龙事件"""
- df = self.ego_df.copy()
- if 'speedH' not in df.columns or 'posH' not in df.columns:
- self.logger.warning("缺少航向角速度或航向角度数据,无法进行画龙检测")
- return []
- df = df.sort_values('simTime')
- df['time_diff'] = df['simTime'].diff().fillna(0)
- fs = self.fs
- df['theta'] = df['posH']
- df['omega'] = df['speedH']
- if 'v' in df.columns:
- df['speed_kmh'] = df['v'] * 3.6
- else:
- df['speed_kmh'] = 0.0
- if len(df) > 10:
- b, a = scipy.signal.butter(2, 2 / (fs / 2), btype='low')
- df['theta_filtered'] = scipy.signal.filtfilt(b, a, df['theta'])
- df['omega_filtered'] = scipy.signal.filtfilt(b, a, df['omega'])
- else:
- df['theta_filtered'] = df['theta']
- df['omega_filtered'] = df['omega']
- df['theta_diff'] = df['theta_filtered'].diff().fillna(0)
- df['theta_diff2'] = df['theta_diff'].diff().fillna(0)
- def get_min_omega_threshold(speed_kmh):
- if speed_kmh < 10: return 10.0
- elif speed_kmh < 30: return 6.0
- elif speed_kmh < 60: return 4.0
- elif speed_kmh < 90: return 3.0
- else: return 2.5
- window_points = int(window_size * fs)
- if window_points < 5: window_points = 5
- zigzag_events = []
- current_event = None
- for i in range(window_points, len(df)):
- window_data = df.iloc[i - window_points:i]
- omega_sign = np.sign(window_data['omega_filtered'])
- sign_changes = np.sum(np.abs(np.diff(omega_sign)) > 1.5)
- if sign_changes == 0: continue
- avg_speed_kmh = window_data['speed_kmh'].mean()
- dynamic_min_omega = get_min_omega_threshold(avg_speed_kmh)
- omega_over_threshold = np.any(np.abs(window_data['omega_filtered']) > dynamic_min_omega)
- theta_range = np.max(window_data['theta_filtered']) - np.min(window_data['theta_filtered'])
- theta_diff_sum = window_data['theta_diff'].sum()
- theta_diff_abs_sum = np.abs(window_data['theta_diff']).sum()
- direction_consistency = np.abs(theta_diff_sum) / theta_diff_abs_sum if theta_diff_abs_sum > 0 else 0
-
- theta_diff2_sign = np.sign(window_data['theta_diff2'])
- theta_diff2_sign_changes = np.sum(np.abs(np.diff(theta_diff2_sign)) > 1.5)
-
- is_likely_turn = direction_consistency > 0.7 and theta_range > 10.0
- has_zigzag_pattern = direction_consistency < 0.5 and theta_diff2_sign_changes >= 3
- is_zigzag = (
- sign_changes >= min_zcr and
- omega_over_threshold and
- min_theta_range <= theta_range <= max_theta_range and
- not is_likely_turn and
- has_zigzag_pattern
- )
- current_time = df.iloc[i]['simTime']
- if is_zigzag and current_event is None:
- current_event = {
- 'start_time': current_time - window_size,
- 'start_frame': df.iloc[i - window_points]['simFrame'],
- 'end_time': current_time,
- 'end_frame': df.iloc[i]['simFrame'],
- }
- elif is_zigzag and current_event is not None:
- current_event['end_time'] = current_time
- current_event['end_frame'] = df.iloc[i]['simFrame']
- elif not is_zigzag and current_event is not None:
- duration = current_event['end_time'] - current_event['start_time']
- if duration >= window_size:
- zigzag_events.append(current_event)
- self._add_event_to_df(current_event, 'zigzag')
- current_event = None
- if current_event is not None:
- duration = current_event['end_time'] - current_event['start_time']
- if duration >= window_size:
- zigzag_events.append(current_event)
- self._add_event_to_df(current_event, 'zigzag')
- self.zigzag_count = len(zigzag_events)
- return zigzag_events
- def _shake_detector(self, T_diff=0.5):
- """检测晃动事件"""
- df = self.ego_df.copy()
- if 'lat_acc_vehicle' not in df.columns:
- self.logger.warning("缺少计算晃动指标所需的数据列")
- return []
- window_size = 25
- df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
- v0 = 20 * 5 / 18
- k = 0.008 * 3.6
- df['lat_acc_threshold'] = df['v'].apply(
- lambda speed: max(1.0, min(1.8, 1.8 - k * (speed - v0))
- ))
- df['speedH_threshold'] = df['v'].apply(
- lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60))
- ))
- condition_A = df['lat_acc_vehicle'].abs() > df['lat_acc_threshold']
- condition_B = df['lat_acc_rate'].abs() > 0.5
- condition_C = (df['speedH_std'] > df['speedH_threshold'])
- shake_condition = condition_A & (condition_B | condition_C)
- event_groups = (shake_condition != shake_condition.shift()).cumsum()
- shake_events = []
- for _, group in df[shake_condition].groupby(event_groups):
- if len(group) >= 10:
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= T_diff:
- shake_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- })
- self._add_event_to_df({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- }, 'shake')
- self.shake_count = len(shake_events)
- self.ego_df = df.copy()
- self.shake_events = shake_events
- return shake_events
- def _cadence_detector(self):
- """检测顿挫事件 - 通过连续的加减速动作来识别顿挫。
- 要求:
- 1. 连续加减速动作间隔不超过2个采样周期
- 2. 每组连续加减速动作至少要有3个及以上
- 3. 两个顿挫事件之间的时间间隔至少要1秒以上
- """
- # import matplotlib.pyplot as plt
- # import matplotlib.dates as mdates
- # from matplotlib.patches import Rectangle
-
- df = self.ego_df.copy()
- required_fields = ['simTime', 'simFrame', 'lon_v_vehicle', 'lon_acc_vehicle', 'lon_acc_rate']
- if not all(field in df.columns for field in required_fields):
- missing_fields = [field for field in required_fields if field not in df.columns]
- self.logger.warning(f"顿挫检测缺少必要字段: {missing_fields}")
- return []
- # 确保数据按时间排序
- df = df.sort_values('simTime')
- if len(df) < 10:
- return []
- # 计算采样频率和周期
- fs = self.fs
- sample_period = 1.0 / fs
- max_interval = 2 * sample_period # 最大允许间隔为2个采样周期
- min_event_interval = 1.0 # 两个顿挫事件之间的最小间隔(秒)
-
- # 数据预处理和平滑
- if len(df) > 10:
- # b, a = scipy.signal.butter(2, 2 / (fs / 2), btype='low')
- # df['acc_filtered'] = scipy.signal.filtfilt(b, a, df['lon_acc_vehicle'])
- # ============ 双通道滤波 ============
- cutoff = 5 # Hz,具体可根据采样频率和车辆动力特性调整
- b, a = scipy.signal.butter(2, cutoff / (fs / 2), btype='low')
- df['acc_filtered'] = scipy.signal.filtfilt(b, a, df['lon_acc_vehicle'])
- else:
- df['acc_filtered'] = df['lon_acc_vehicle']
-
- # 检测加速度方向变化
- df['acc_direction'] = np.sign(df['acc_filtered'])
- df['acc_direction_change'] = (df['acc_direction'] != df['acc_direction'].shift(1)).astype(int)
-
- # 设置阈值
- acc_threshold = 0.05 # 加速度阈值
- min_changes = 3 # 最小连续变化次数
-
- # 检测顿挫事件
- cadence_events = []
- current_changes = []
- last_event_end = df['simTime'].iloc[0] - min_event_interval # 初始化为第一个时间点减去间隔
-
- for i in range(1, len(df)):
- current_time = df.iloc[i]['simTime']
- current_frame = df.iloc[i]['simFrame']
-
- if df.iloc[i]['acc_direction_change'] and abs(df.iloc[i]['acc_filtered']) > acc_threshold:
- if not current_changes or (current_time - current_changes[-1]['time'] <= max_interval):
- current_changes.append({
- 'time': current_time,
- 'frame': current_frame
- })
- else:
- # 如果间隔过大,检查之前的变化是否构成顿挫事件
- if len(current_changes) >= min_changes:
- start_time = current_changes[0]['time']
- end_time = current_changes[-1]['time']
-
- # 检查与上一个事件的时间间隔
- if start_time - last_event_end >= min_event_interval:
- event = {
- 'start_time': start_time,
- 'start_frame': current_changes[0]['frame'],
- 'end_time': end_time,
- 'end_frame': current_changes[-1]['frame']
- }
- cadence_events.append(event)
- self._add_event_to_df(event, 'cadence')
- last_event_end = end_time
-
- # 开始新的变化序列
- current_changes = [{
- 'time': current_time,
- 'frame': current_frame
- }]
-
- # 处理最后一组变化
- if len(current_changes) >= min_changes:
- start_time = current_changes[0]['time']
- end_time = current_changes[-1]['time']
- if start_time - last_event_end >= min_event_interval:
- event = {
- 'start_time': start_time,
- 'start_frame': current_changes[0]['frame'],
- 'end_time': end_time,
- 'end_frame': current_changes[-1]['frame']
- }
- cadence_events.append(event)
- self._add_event_to_df(event, 'cadence')
-
- self.cadence_count = len(cadence_events)
-
- # # ======================== New plotting code added ========================
- # if len(df) > 0:
- # try:
- # # Create figure and axes
- # plt.figure(figsize=(15, 8))
-
- # # Plot original and filtered acceleration
- # plt.plot(df['simTime'], df['lon_acc_vehicle'],
- # label='Original Acceleration', alpha=0.6, color='blue')
- # plt.plot(df['simTime'], df['acc_filtered'],
- # label='Filtered Acceleration', linewidth=2, color='red')
-
- # # Mark direction change points
- # change_points = df[df['acc_direction_change'] == 1]
- # plt.scatter(change_points['simTime'], change_points['acc_filtered'],
- # color='green', s=50, zorder=5, label='Direction Change Points')
-
- # # Mark points exceeding threshold
- # threshold_points = df[abs(df['acc_filtered']) > acc_threshold]
- # plt.scatter(threshold_points['simTime'], threshold_points['acc_filtered'],
- # color='purple', s=20, zorder=4, alpha=0.5, label='Exceeds Threshold Points')
-
- # # Plot event regions
- # for event in cadence_events:
- # start = event['start_time']
- # end = event['end_time']
- # plt.axvspan(start, end, alpha=0.3, color='orange', label='cadence_events' if 'cadence' not in plt.gca().get_legend_handles_labels()[1] else "")
-
- # # Add event labels
- # mid_time = start + (end - start) / 2
- # plt.text(mid_time, plt.ylim()[1]*0.9, f'Cadence {len(cadence_events.index(event))+1}',
- # ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8))
-
- # # Add threshold lines
- # plt.axhline(y=acc_threshold, color='gray', linestyle='--', alpha=0.7)
- # plt.axhline(y=-acc_threshold, color='gray', linestyle='--', alpha=0.7)
- # plt.text(df['simTime'].iloc[-1], acc_threshold+0.05, f'Threshold: {acc_threshold} m/s²',
- # ha='right', color='gray')
-
- # # Set legend and title
- # plt.title(f'Cadence Event Detection (Total Events Detected: {len(cadence_events)})')
- # plt.xlabel('Time (s)')
- # plt.ylabel('Longitudinal Acceleration (m/s²)')
- # plt.legend(loc='upper right')
- # plt.grid(True, linestyle='--', alpha=0.6)
-
- # # Auto-adjust time axis format
- # if (df['simTime'].max() - df['simTime'].min()) > 60:
- # plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
- # else:
- # plt.gca().xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:.1f}s'))
-
- # plt.tight_layout()
-
- # # Save plot
- # plot_path = f"./cadence_detection.png"
- # plt.savefig(plot_path, dpi=150)
- # plt.close()
-
- # self.logger.info(f"Cadence detection results saved to: {plot_path}")
- # except Exception as e:
- # self.logger.error(f"Error plotting cadence detection graph: {str(e)}")
- # # ======================== End of plotting code ========================
-
- return cadence_events
- def _top_brake_detector(self):
- """
- Point Brake Detector (Top Brake) - Waveform analysis based on jerk signal
-
- This function identifies "point brake" events, defined as independent, sudden braking actions
- with intensity between normal braking and emergency braking. It distinguishes from continuous
- acceleration/deceleration "cadence" phenomena by analyzing individual complete waveforms in
- the jerk signal.
-
- Point Brake Characteristics:
- 1. Independent event: Starts and ends in a relatively stable state (acceleration near zero).
- 2. Braking intensity: Deceleration stronger than normal braking but weaker than emergency braking.
- 3. Suddenness: Peak jerk must be large enough to indicate rapid acceleration change.
- 4. Duration: Reasonable duration (e.g., 0.2 to 1.5 seconds).
- """
- # import matplotlib.pyplot as plt
- # from matplotlib.patches import Rectangle
- # import matplotlib.dates as mdates
-
- # 1. Data Preparation & Checks
- df = self.ego_df.copy()
- required_cols = ['simTime', 'simFrame', 'lon_acc_rate', 'lon_acc_vehicle', 'v', 'ip_dec']
- if any(col not in df.columns for col in required_cols):
- missing = [c for c in required_cols if c not in df.columns]
- self.logger.warning(f"Point brake detection missing required fields: {missing}")
- return []
- fs = self.fs
- if len(df) < fs: # Need at least 1 second of data
- return []
-
- # Define Key Thresholds
- NORMAL_BRAKE_THRESHOLD = -1.2 # m/s^2
- MIN_PEAK_JERK = 3.0 # m/s^3
- MERGE_TIME_GAP_THRESHOLD = (1/fs)*2 # seconds
- # 2. Smooth Jerk Signal
- window_length = max(5, int(fs * 0.2) // 2 * 2 + 1) # Must be odd
- if len(df) > window_length:
- df['jerk_filtered'] = scipy.signal.savgol_filter(df['lon_acc_rate'], window_length, 2)
- else:
- df['jerk_filtered'] = df['lon_acc_rate']
- # 3. Find Jerk Zero-Crossings
- df['jerk_sign'] = np.sign(df['jerk_filtered'])
- zero_crossings = df.index[df['jerk_sign'].ne(df['jerk_sign'].shift(1)) & df['jerk_sign'].ne(0)].tolist()
- if len(zero_crossings) < 2:
- return []
- # 4. Identify Waveforms and Validate Events
- top_brake_events = []
- for i in range(len(zero_crossings) - 1):
- start_idx = zero_crossings[i]
- end_idx = zero_crossings[i+1]
-
- event_slice = df.loc[start_idx:end_idx]
- if len(event_slice) < 2:
- continue
- # a. Duration Validation
- duration = event_slice['simTime'].iloc[-1] - event_slice['simTime'].iloc[0]
- if not ((1/fs)*3 <= duration <= 1.5):
- continue
- min_acc = event_slice['lon_acc_vehicle'].min()
- emergency_brake_threshold = df.loc[start_idx, 'ip_dec']
-
- # b. Braking Intensity Validation
- is_top_brake_strength = min_acc < NORMAL_BRAKE_THRESHOLD and min_acc > emergency_brake_threshold
- if not is_top_brake_strength:
- continue
- # c. Waveform Amplitude Validation
- peak_jerk = event_slice['jerk_filtered'].abs().max()
- if peak_jerk < MIN_PEAK_JERK:
- continue
- # d. Isolated Event Validation
- acc_at_start = df.loc[start_idx, 'lon_acc_vehicle']
- acc_at_end = df.loc[end_idx, 'lon_acc_vehicle']
- if not (abs(acc_at_start) < 0.5 or abs(acc_at_end) < 0.5):
- continue
- # 5. Create Valid Event
- event = {
- 'start_time': df.loc[start_idx, 'simTime'],
- 'end_time': df.loc[end_idx, 'simTime'],
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- 'peak_jerk': peak_jerk,
- 'min_acc': min_acc
- }
- top_brake_events.append(event)
- # 6. Post-processing: Merge very close events
- if not top_brake_events:
- return []
- merged_events = []
- current_event = top_brake_events[0]
- for next_event in top_brake_events[1:]:
- time_gap = next_event['start_time'] - current_event['end_time']
- if time_gap < MERGE_TIME_GAP_THRESHOLD:
- current_event['end_time'] = next_event['end_time']
- current_event['end_frame'] = next_event['end_frame']
- current_event['peak_jerk'] = max(current_event['peak_jerk'], next_event['peak_jerk'])
- current_event['min_acc'] = min(current_event['min_acc'], next_event['min_acc'])
- else:
- merged_events.append(current_event)
- current_event = next_event
-
- merged_events.append(current_event)
-
- for event in merged_events:
- self._add_event_to_df(event, 'topBrake_merged')
-
- # # ===================== Visualization =====================
- # try:
- # plt.figure(figsize=(14, 10))
- # plt.suptitle(f'Point Brake Detection Analysis (Detected: {len(merged_events)} events)', fontsize=16)
-
- # # 1. Jerk Analysis
- # ax1 = plt.subplot(3, 1, 1)
- # plt.plot(df['simTime'], df['lon_acc_rate'], 'b-', alpha=0.5, label='Raw Jerk')
- # plt.plot(df['simTime'], df['jerk_filtered'], 'r-', label='Filtered Jerk', linewidth=1.5)
-
- # # Mark zero crossings
- # zero_points = df.loc[zero_crossings]
- # plt.scatter(zero_points['simTime'], zero_points['jerk_filtered'],
- # c='green', s=40, marker='o', label='Zero Crossings', zorder=5)
-
- # # Threshold lines
- # plt.axhline(y=MIN_PEAK_JERK, color='gray', linestyle='--', alpha=0.7)
- # plt.axhline(y=-MIN_PEAK_JERK, color='gray', linestyle='--', alpha=0.7)
- # plt.text(df['simTime'].iloc[-1], MIN_PEAK_JERK+0.2, f'Jerk Threshold: ±{MIN_PEAK_JERK} m/s³',
- # ha='right', color='gray')
-
- # plt.ylabel('Jerk (m/s³)')
- # plt.title('Jerk Signal Analysis')
- # plt.grid(True, linestyle='--', alpha=0.6)
- # plt.legend(loc='upper right')
-
- # # 2. Acceleration Analysis
- # ax2 = plt.subplot(3, 1, 2, sharex=ax1)
- # plt.plot(df['simTime'], df['lon_acc_vehicle'], 'g-', label='Longitudinal Acceleration')
-
- # # Threshold lines
- # plt.axhline(y=0, color='k', linestyle='-', alpha=0.5)
- # plt.axhline(y=NORMAL_BRAKE_THRESHOLD, color='orange', linestyle='--', alpha=0.8)
-
- # # Mark emergency brake threshold (varies over time)
- # plt.plot(df['simTime'], df['ip_dec'], 'm--', label='Emergency Brake Threshold')
-
- # plt.ylabel('Acceleration (m/s²)')
- # plt.title('Acceleration Analysis')
- # plt.grid(True, linestyle='--', alpha=0.6)
- # plt.legend(loc='upper right')
-
- # # 3. Velocity Analysis
- # ax3 = plt.subplot(3, 1, 3, sharex=ax1)
- # plt.plot(df['simTime'], df['v'], 'b-', label='Vehicle Speed')
- # plt.ylabel('Speed (m/s)')
- # plt.xlabel('Time (s)')
- # plt.title('Vehicle Speed')
- # plt.grid(True, linestyle='--', alpha=0.6)
-
- # # Highlight detected events across all subplots
- # for event in merged_events:
- # start = event['start_time']
- # end = event['end_time']
- # duration = end - start
-
- # # Add shaded region
- # for ax in [ax1, ax2, ax3]:
- # ax.axvspan(start, end, alpha=0.2, color='red')
-
- # # Add event info on acceleration plot
- # ax2.text((start+end)/2, event['min_acc']-0.2,
- # f"Min Acc: {event['min_acc']:.2f} m/s²\nPeak Jerk: {event['peak_jerk']:.2f} m/s³",
- # ha='center', va='top', fontsize=9,
- # bbox=dict(facecolor='white', alpha=0.8, edgecolor='red'))
-
- # # Add event info on speed plot
- # ax3.text((start+end)/2, df['v'].min() + 0.1*(df['v'].max()-df['v'].min()),
- # f"Point Brake\n{duration:.2f}s",
- # ha='center', fontsize=10, color='red',
- # bbox=dict(facecolor='white', alpha=0.8))
-
- # # Format x-axis for time
- # if (df['simTime'].max() - df['simTime'].min()) > 60:
- # plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
- # else:
- # plt.gca().xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:.1f}s'))
-
- # plt.tight_layout(rect=[0, 0, 1, 0.96]) # Make room for suptitle
-
- # # Save the plot
- # plot_path = f"./top_brake_analysis.png"
- # plt.savefig(plot_path, dpi=150)
- # plt.close()
-
- # self.logger.info(f"Point brake analysis plot saved to: {plot_path}")
- # except Exception as e:
- # self.logger.error(f"Error generating point brake analysis plot: {str(e)}")
- # # ===================== End Visualization =====================
-
- return merged_events
- def _slam_brake_detector(self):
- """检测急刹车事件"""
- df = self.ego_df.copy()
- if 'slam_brake' not in df.columns:
- self.logger.warning("缺少计算急刹车指标所需的数据列")
- return
- min_duration = 0.5
- slam_brake_events = []
- in_event = False
- start_idx = 0
- for i, row in df.iterrows():
- if row['slam_brake'] == 1 and not in_event:
- in_event = True
- start_idx = i
- elif row['slam_brake'] == 0 and in_event:
- in_event = False
- end_idx = i - 1
- start_time = df.loc[start_idx, 'simTime']
- end_time = df.loc[end_idx, 'simTime']
- duration = end_time - start_time
- if duration >= min_duration:
- slam_brake_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- })
- self._add_event_to_df({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- }, 'slam_brake')
- if in_event:
- end_idx = len(df) - 1
- start_time = df.loc[start_idx, 'simTime']
- end_time = df.loc[end_idx, 'simTime']
- duration = end_time - start_time
- if duration >= min_duration:
- slam_brake_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- })
- self._add_event_to_df({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- }, 'slam_brake')
- self.slam_brake_count = len(slam_brake_events)
- return slam_brake_events
- def _slam_accel_detector(self):
- """检测急加速事件"""
- df = self.ego_df.copy()
- if 'slam_accel' not in df.columns:
- self.logger.warning("缺少计算急加速指标所需的数据列")
- return
- min_duration = 0.5
- slam_accel_events = []
- in_event = False
- start_idx = 0
- for i, row in df.iterrows():
- if row['slam_accel'] == 1 and not in_event:
- in_event = True
- start_idx = i
- elif row['slam_accel'] == 0 and in_event:
- in_event = False
- end_idx = i - 1
- start_time = df.loc[start_idx, 'simTime']
- end_time = df.loc[end_idx, 'simTime']
- duration = end_time - start_time
- if duration >= min_duration:
- slam_accel_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- })
- self._add_event_to_df({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- }, 'slam_accel')
- if in_event:
- end_idx = len(df) - 1
- start_time = df.loc[start_idx, 'simTime']
- end_time = df.loc[end_idx, 'simTime']
- duration = end_time - start_time
- if duration >= min_duration:
- slam_accel_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- })
- self._add_event_to_df({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': df.loc[start_idx, 'simFrame'],
- 'end_frame': df.loc[end_idx, 'simFrame'],
- }, 'slam_accel')
- self.slam_accel_count = len(slam_accel_events)
- return slam_accel_events
- # ----------------------
- # 辅助方法
- # ----------------------
- def _add_event_to_df(self, event, event_type):
- """添加事件到数据框"""
- new_row = pd.DataFrame([{
- 'start_time': event['start_time'],
- 'end_time': event['end_time'],
- 'start_frame': event['start_frame'],
- 'end_frame': event['end_frame'],
- 'type': event_type
- }])
- self.discomfort_df = pd.concat([self.discomfort_df, new_row], ignore_index=True)
- def log_events(self, metric_name: str, events: list):
- """记录指标事件到日志"""
- if not events:
- self.logger.info(f"未检测到 {metric_name} 事件")
- return
-
- self.logger.info(f"检测到 {len(events)} 个 {metric_name} 事件:")
- for i, event in enumerate(events):
- duration = event.get('end_time', 0) - event.get('start_time', 0)
- self.logger.info(
- f"{metric_name} 事件 #{i+1}: "
- f"开始时间={event.get('start_time', 'N/A'):.2f}s, "
- f"结束时间={event.get('end_time', 'N/A'):.2f}s, "
- f"持续时间={duration:.2f}s, "
- f"开始帧={event.get('start_frame', 'N/A')}, "
- f"结束帧={event.get('end_frame', 'N/A')}"
- )
- def generate_metric_chart(self, metric_name: str, plot_path: Path) -> None:
- """生成指标图表"""
- if not plot_path:
- plot_path = os.path.join(os.getcwd(), 'data')
- os.makedirs(plot_path, exist_ok=True)
- chart_path = generate_comfort_chart_data(self, metric_name, plot_path)
- if chart_path:
- self.logger.info(f"{metric_name}图表已生成: {chart_path}")
|