#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/06/25 @Last Modified: 2025/04/25 @Summary: Comfort metrics """ import sys import math import scipy.signal import pandas as pd import numpy as np from pathlib import Path from typing import Dict, List, Any, Optional, Callable, Union, Tuple from modules.lib.score import Score from modules.lib.common import get_interpolation, get_frame_with_time from modules.lib import data_process from modules.lib.log_manager import LogManager COMFORT_INFO = [ "simTime", "simFrame", "speedX", "speedY", "accelX", "accelY", "curvHor", "lightMask", "v", "lat_acc", "lon_acc", "time_diff", "lon_acc_diff", "lon_acc_roc", "speedH", "accelH", "posH", # 确保包含航向角 ] # ---------------------- # 独立指标计算函数 # ---------------------- # 添加 Motion Sickness Dose Value (MSDV) 指标 #用于量化乘员因持续振动而产生的晕动风险。以下是需要添加的代码: ## 1. 在独立指标计算函数部分添加新函数 def calculate_ava_vav(data_processed) -> dict: """计算多维度综合加权加速度""" comfort = ComfortCalculator(data_processed) ava_vav_value = comfort.calculate_ava_vav() return {"ava_vav": float(ava_vav_value)} def calculate_msdv(data_processed) -> dict: """计算晕动剂量值(MSDV)指标""" comfort = ComfortCalculator(data_processed) msdv_value = comfort.calculate_msdv() return {"msdv": float(msdv_value)} def calculate_weaving(data_processed) -> dict: """计算蛇行指标""" comfort = ComfortCalculator(data_processed) zigzag_count = comfort.calculate_zigzag_count() return {"weaving": float(zigzag_count)} def calculate_shake(data_processed) -> dict: """计算晃动指标""" comfort = ComfortCalculator(data_processed) shake_count = comfort.calculate_shake_count() return {"shake": float(shake_count)} def calculate_cadence(data_processed) -> dict: """计算顿挫指标""" comfort = ComfortCalculator(data_processed) cadence_count = comfort.calculate_cadence_count() return {"cadence": float(cadence_count)} def calculate_slambrake(data_processed) -> dict: """计算急刹车指标""" comfort = ComfortCalculator(data_processed) slam_brake_count = comfort.calculate_slam_brake_count() return {"slamBrake": float(slam_brake_count)} def calculate_slamaccelerate(data_processed) -> dict: """计算急加速指标""" comfort = ComfortCalculator(data_processed) slam_accel_count = comfort.calculate_slam_accel_count() return {"slamAccelerate": float(slam_accel_count)} # 装饰器保持不变 def peak_valley_decorator(method): def wrapper(self, *args, **kwargs): peak_valley = self._peak_valley_determination(self.df) pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist() if len(pv_list) != 0: flag = True p_last = pv_list[0] for i in range(1, len(pv_list)): p_curr = pv_list[i] if self._peak_valley_judgment(p_last, p_curr): # method(self, p_curr, p_last) method(self, p_curr, p_last, flag, *args, **kwargs) else: p_last = p_curr return method else: flag = False p_curr = [0, 0] p_last = [0, 0] method(self, p_curr, p_last, flag, *args, **kwargs) return method return wrapper class ComfortRegistry: """舒适性指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() # 获取全局日志实例 self.data = data_processed self.comfort_config = data_processed.comfort_config["comfort"] self.metrics = self._extract_metrics(self.comfort_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """DFS遍历提取指标""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的舒适性指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """自动注册指标函数""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" try: registry[metric_name] = globals()[func_name] except KeyError: self.logger.error(f"未实现指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) # 新增:将每个指标的结果写入日志 self.logger.info(f'舒适性指标[{name}]计算结果: {result}') except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'舒适性指标计算结果:{results}') return results class ComfortCalculator: """舒适性指标计算类 - 提供核心计算功能""" def __init__(self, data_processed): self.data_processed = data_processed self.logger = LogManager().get_logger() self.data = data_processed.ego_data self.ego_df = pd.DataFrame() self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # 统计指标 self.calculated_value = { 'weaving': 0, 'shake': 0, 'cadence': 0, 'slamBrake': 0, 'slamAccelerate': 0, 'ava_vav': 0, # 添加新指标的默认值 'msdv': 0 # 添加MSDV指标的默认值 } self.time_list = self.data['simTime'].values.tolist() self.frame_list = self.data['simFrame'].values.tolist() self.zigzag_count = 0 self.shake_count = 0 self.cadence_count = 0 self.slam_brake_count = 0 self.slam_accel_count = 0 self.zigzag_time_list = [] self.zigzag_stre_list = [] self._initialize_data() def _initialize_data(self): """初始化数据""" self.ego_df = self.data[COMFORT_INFO].copy() self.df = self.ego_df.reset_index(drop=True) self._prepare_comfort_parameters() def _prepare_comfort_parameters(self): """准备舒适性计算所需参数""" # 计算加减速阈值 self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2]) self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5]) self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply( lambda x: 1 if x < 0 else 0) self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply( lambda x: 1 if x > 0 else 0) self.ego_df['cadence'] = self.ego_df.apply( lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1) def _cal_cur_ego_path(self, row): """计算车辆轨迹曲率""" try: divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2) if not divide: res = None else: res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide except: res = None return res def _peak_valley_determination(self, df): """确定角速度的峰谷""" peaks, _ = scipy.signal.find_peaks( df['speedH'], height=2.3, distance=3, prominence=2.3, width=1) valleys, _ = scipy.signal.find_peaks( -df['speedH'], height=2.3, distance=3, prominence=2.3, width=1) return sorted(list(peaks) + list(valleys)) def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6): """判断峰谷是否满足蛇行条件""" t_diff = p_curr[0] - p_last[0] v_diff = abs(p_curr[1] - p_last[1]) s = p_curr[1] * p_last[1] if t_diff < tw and v_diff > avg and s < 0: if [p_last[0], p_curr[0]] not in self.zigzag_time_list: self.zigzag_time_list.append([p_last[0], p_curr[0]]) return True return False def _cadence_process_new(self, lon_acc, ip_acc, ip_dec): """处理顿挫数据""" if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec: return np.nan elif abs(lon_acc) == 0: return 0 elif lon_acc > 0 and lon_acc < ip_acc: return 1 elif lon_acc < 0 and lon_acc > ip_dec: return -1 else: return 0 @peak_valley_decorator def _zigzag_count_func(self, p_curr, p_last, flag=True): """计算蛇行次数""" if flag: self.zigzag_count += 1 else: self.zigzag_count += 0 @peak_valley_decorator def _cal_zigzag_strength(self, p_curr, p_last, flag=True): """计算蛇行强度""" if flag: v_diff = abs(p_curr[1] - p_last[1]) t_diff = p_curr[0] - p_last[0] if t_diff > 0: self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度 else: self.zigzag_stre_list = [] def _get_zigzag_times(self): """获取所有蛇行时间点""" all_times = [] for time_range in self.zigzag_time_list: start, end = time_range # 获取这个时间范围内的所有时间点 times_in_range = self.ego_df[(self.ego_df['simTime'] >= start) & (self.ego_df['simTime'] <= end)]['simTime'].tolist() all_times.extend(times_in_range) return all_times def calculate_zigzag_count(self): """计算蛇行指标""" self._zigzag_count_func() return self.zigzag_count def calculate_shake_count(self): """计算晃动指标""" self._shake_detector() return self.shake_count def calculate_cadence_count(self): """计算顿挫指标""" self._cadence_detector() return self.cadence_count def calculate_slam_brake_count(self): """计算急刹车指标""" self._slam_brake_detector() return self.slam_brake_count def calculate_slam_accel_count(self): """计算急加速指标""" self._slam_accel_detector() return self.slam_accel_count def _shake_detector(self, T_diff=0.5): """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)""" # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算 time_list = [] frame_list = [] # 复制数据以避免修改原始数据 df = self.ego_df.copy() # 1. 计算横向加速度变化率 df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff() # 2. 计算横摆角速度变化率 df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff() # 3. 计算横摆角速度的短期变化特性 window_size = 5 # 5帧窗口 df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std() # 4. 基于车速的动态阈值 v0 = 20 * 5/18 # ≈5.56 m/s # 递减系数 k = 0.008 * 3.6 # =0.0288 per m/s df['lat_acc_threshold'] = df['v'].apply( lambda speed: max( 1.0, # 下限 1.0 m/s² min( 1.8, # 上限 1.8 m/s² 1.8 - k * (speed - v0) # 线性递减 ) ) ) df['speedH_threshold'] = df['v'].apply( lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60))) ) # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用 self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold'] self.ego_df['speedH_threshold'] = df['speedH_threshold'] self.ego_df['lat_acc_rate'] = df['lat_acc_rate'] self.ego_df['speedH_rate'] = df['speedH_rate'] self.ego_df['speedH_std'] = df['speedH_std'] # 5. 综合判断晃动条件 # 条件A: 横向加速度超过阈值 condition_A = df['lat_acc'].abs() > df['lat_acc_threshold'] # 条件B: 横向加速度变化率超过阈值 lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³) condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold # 条件C: 横摆角速度有明显变化但不呈现周期性 condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times())) # 综合条件: 满足条件A,且满足条件B或条件C shake_condition = condition_A & (condition_B | condition_C) # 筛选满足条件的数据 shake_df = df[shake_condition].copy() # 按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动 if not shake_df.empty: shake_df['frame_diff'] = shake_df['simFrame'].diff().fillna(0) shake_df['group'] = (shake_df['frame_diff'] > T_diff).cumsum() # 分组统计 shake_groups = shake_df.groupby('group') for _, group in shake_groups: if len(group) >= 2: # 至少2帧才算一次晃动 time_list.extend(group['simTime'].values) frame_list.extend(group['simFrame'].values) self.shake_count += 1 # 分组处理 TIME_RANGE = 1 t_list = time_list f_list = frame_list group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] if len(f_list) > 0: for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) # 输出图表值 shake_time = [[g[0], g[-1]] for g in group_time] shake_frame = [[g[0], g[-1]] for g in group_frame] self.shake_count = len(shake_time) if shake_time: time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'shake' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) return time_list def _cadence_detector(self): """顿挫检测器""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence']].copy() time_list = data['simTime'].values.tolist() data = data[data['cadence'] != np.nan] data['cadence_diff'] = data['cadence'].diff() data.dropna(subset='cadence_diff', inplace=True) data = data[data['cadence_diff'] != 0] t_list = data['simTime'].values.tolist() f_list = data['simFrame'].values.tolist() TIME_RANGE = 1 group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫 group_frame = [g for g in group_frame if len(g) >= 1] # 输出图表值 cadence_time = [[g[0], g[-1]] for g in group_time] cadence_frame = [[g[0], g[-1]] for g in group_frame] if cadence_time: time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'cadence' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) # 将顿挫组的起始时间为组重新统计时间 cadence_time_list = [time for pair in cadence_time for time in time_list if pair[0] <= time <= pair[1]] stre_list = [] freq_list = [] for g in group_time: # calculate strength g_df = data[data['simTime'].isin(g)] strength = g_df['lon_acc'].abs().mean() stre_list.append(strength) # calculate frequency cnt = len(g) t_start = g_df['simTime'].iloc[0] t_end = g_df['simTime'].iloc[-1] t_delta = t_end - t_start frequency = cnt / t_delta freq_list.append(frequency) self.cadence_count = len(freq_list) cadence_stre = sum(stre_list) / len(stre_list) if stre_list else 0 return cadence_time_list def _slam_brake_detector(self): """急刹车检测器""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake']].copy() res_df = data[data['slam_brake'] == 1] t_list = res_df['simTime'].values f_list = res_df['simFrame'].values.tolist() TIME_RANGE = 1 group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹 group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 slam_brake_time = [[g[0], g[-1]] for g in group_time] slam_brake_frame = [[g[0], g[-1]] for g in group_frame] if slam_brake_time: time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'slam_brake' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) time_list = [element for sublist in group_time for element in sublist] self.slam_brake_count = len(group_time) return time_list def _slam_accel_detector(self): """急加速检测器""" data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel']].copy() res_df = data.loc[data['slam_accel'] == 1] t_list = res_df['simTime'].values f_list = res_df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速 sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 slam_accel_time = [[g[0], g[-1]] for g in group_time] slam_accel_frame = [[g[0], g[-1]] for g in group_frame] if slam_accel_time: time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame']) discomfort_df = pd.concat([time_df, frame_df], axis=1) discomfort_df['type'] = 'slam_accel' self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True) time_list = [element for sublist in group_time for element in sublist] self.slam_accel_count = len(group_time) return time_list class ComfortManager: """舒适性指标计算主类""" def __init__(self, data_processed): self.data = data_processed self.logger = LogManager().get_logger() self.registry = ComfortRegistry(self.data) def report_statistic(self): """生成舒适性评分报告""" comfort_result = self.registry.batch_execute() return comfort_result if __name__ == '__main__': case_name = 'ICA' mode_label = 'PGVIL' data = data_process.DataPreprocessing(case_name, mode_label) comfort_instance = ComfortManager(data) try: comfort_result = comfort_instance.report_statistic() result = {'comfort': comfort_result} print(result) except Exception as e: print(f"An error occurred in Comfort.report_statistic: {e}") # 将之前定义在类外部的方法移动到ComfortCalculator类内部 def _apply_frequency_weighting(self, acceleration_data, weighting_type='Wk', fs=100): """应用ISO 2631-1:1997标准的频率加权滤波 参数: acceleration_data: 加速度时间序列数据 weighting_type: 加权类型,可选值包括: - 'Wk': 垂直方向(Z轴)加权 - 'Wd': 水平方向(X和Y轴)加权 - 'Wf': 运动病相关加权 fs: 采样频率(Hz) 返回: 加权后的加速度数据 """ # 检查数据有效性 if acceleration_data.empty or acceleration_data.isna().all(): return acceleration_data # 根据ISO 2631-1:1997标准设计滤波器 # 这些参数来自标准文档,用于构建数字滤波器 if weighting_type == 'Wk': # 垂直方向(Z轴) # Wk滤波器参数 f1 = 0.4 f2 = 100.0 f3 = 12.5 f4 = 12.5 Q1 = 0.63 Q2 = 0.5 Q3 = 0.63 Q4 = 0.63 K = 0.4 elif weighting_type == 'Wd': # 水平方向(X和Y轴) # Wd滤波器参数 f1 = 0.4 f2 = 100.0 f3 = 2.0 f4 = 2.0 Q1 = 0.63 Q2 = 0.5 Q3 = 0.63 Q4 = 0.63 K = 0.4 elif weighting_type == 'Wf': # 运动病相关 # Wf滤波器参数 f1 = 0.08 f2 = 0.63 f3 = 0.25 f4 = 0.8 Q1 = 0.63 Q2 = 0.86 Q3 = 0.8 Q4 = 0.8 K = 1.0 else: self.logger.warning(f"未知的加权类型: {weighting_type},使用原始数据") return acceleration_data # 将频率转换为角频率 w1 = 2 * np.pi * f1 w2 = 2 * np.pi * f2 w3 = 2 * np.pi * f3 w4 = 2 * np.pi * f4 # 设计高通滤波器(s域) b1 = [K * w1**2, 0] a1 = [1, w1/Q1, w1**2] # 设计低通滤波器(s域) b2 = [K, 0, 0] a2 = [1, w2/Q2, w2**2] # 设计加速度-速度转换滤波器(s域) b3 = [K, 0] a3 = [1, w3/Q3, w3**2] # 设计上升滤波器(s域) b4 = [K, 0, 0] a4 = [1, w4/Q4, w4**2] # 使用双线性变换将s域滤波器转换为z域 b1_z, a1_z = scipy.signal.bilinear(b1, a1, fs) b2_z, a2_z = scipy.signal.bilinear(b2, a2, fs) b3_z, a3_z = scipy.signal.bilinear(b3, a3, fs) b4_z, a4_z = scipy.signal.bilinear(b4, a4, fs) # 应用滤波器链 data_np = acceleration_data.to_numpy() filtered_data = scipy.signal.lfilter(b1_z, a1_z, data_np) filtered_data = scipy.signal.lfilter(b2_z, a2_z, filtered_data) filtered_data = scipy.signal.lfilter(b3_z, a3_z, filtered_data) filtered_data = scipy.signal.lfilter(b4_z, a4_z, filtered_data) return pd.Series(filtered_data, index=acceleration_data.index) def calculate_ava_vav(self): """计算多维度综合加权加速度 基于ISO 2631-1:1997标准,综合考虑车辆在三个平移方向和三个旋转方向的加速度或角速度 Returns: float: 多维度综合加权加速度值 """ # 定义各方向的权重系数 k_x = 1.0 # X方向加速度权重 k_y = 1.0 # Y方向加速度权重 k_z = 1.0 # Z方向加速度权重 k_roll = 0.63 # 横滚角速度权重 k_pitch = 0.8 # 俯仰角速度权重 k_yaw = 0.5 # 偏航角速度权重 # 获取数据 df = self.ego_df.copy() # 确保有必要的列 if 'accelX' not in df.columns or 'accelY' not in df.columns: self.logger.warning("缺少计算多维度综合加权加速度所需的数据列") return self.calculated_value['ava_vav'] # 将东北天坐标系下的加速度转换为车身坐标系下的加速度 # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶 if 'posH' not in df.columns: self.logger.warning("缺少航向角数据,无法进行坐标转换") return self.calculated_value['ava_vav'] df['posH_rad'] = np.radians(df['posH']) # 转换加速度到车身坐标系 # 注意:posH是航向角,北向为0度,顺时针为正 # 车身X轴 = 东向*sin(posH) + 北向*cos(posH) # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH) df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad']) df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad']) # Z方向加速度,如果没有则假设为0 df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df))) # 角速度数据,如果没有则使用角速度变化率代替 # 注意:speedH是航向角速度,需要转换为车身坐标系下的偏航角速度 omega_roll = df['rollRate'] if 'rollRate' in df.columns else pd.Series(np.zeros(len(df))) omega_pitch = df['pitchRate'] if 'pitchRate' in df.columns else pd.Series(np.zeros(len(df))) omega_yaw = df['speedH'] # 使用航向角速度作为偏航角速度 # 应用ISO 2631-1:1997标准的频率加权滤波 # 估计采样频率 - 假设数据是均匀采样的 if len(df) > 1: time_diff = df['simTime'].diff().median() fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz else: fs = 100 # 默认采样频率 # 对各方向加速度应用适当的频率加权 a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wd', fs) a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wd', fs) a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wk', fs) # 对角速度也应用适当的频率加权 # 注意:ISO标准没有直接指定角速度的加权,这里使用简化处理 omega_roll_weighted = omega_roll # 可以根据需要应用适当的滤波 omega_pitch_weighted = omega_pitch omega_yaw_weighted = omega_yaw # 计算加权均方根值 (r.m.s.) # 对每个方向的加速度/角速度平方后求平均,再开平方根 a_x_rms = np.sqrt(np.mean(a_x_weighted**2)) a_y_rms = np.sqrt(np.mean(a_y_weighted**2)) a_z_rms = np.sqrt(np.mean(a_z_weighted**2)) omega_roll_rms = np.sqrt(np.mean(omega_roll_weighted**2)) omega_pitch_rms = np.sqrt(np.mean(omega_pitch_weighted**2)) omega_yaw_rms = np.sqrt(np.mean(omega_yaw_weighted**2)) # 计算综合加权加速度 ava_vav = np.sqrt( k_x * a_x_rms**2 + k_y * a_y_rms**2 + k_z * a_z_rms**2 + k_roll * omega_roll_rms**2 + k_pitch * omega_pitch_rms**2 + k_yaw * omega_yaw_rms**2 ) # 记录计算结果 self.calculated_value['ava_vav'] = ava_vav self.logger.info(f"多维度综合加权加速度(ava_vav)计算结果: {ava_vav}") return ava_vav def calculate_msdv(self): """计算晕动剂量值(Motion Sickness Dose Value, MSDV) MSDV用于量化乘员因持续振动而产生的晕动风险,其物理意义是 "频率加权后的加速度有效值的平方对时间的累积", 能够反映乘员在一定时间内受到振动刺激的总量。 计算公式: MSDV = ∫[0,T] |a_ω(t)|² dt Returns: float: 晕动剂量值 """ # 获取数据 df = self.ego_df.copy() # 确保有必要的列 if 'accelX' not in df.columns or 'accelY' not in df.columns: self.logger.warning("缺少计算晕动剂量值所需的数据列") return self.calculated_value['msdv'] # 将东北天坐标系下的加速度转换为车身坐标系下的加速度 if 'posH' not in df.columns: self.logger.warning("缺少航向角数据,无法进行坐标转换") return self.calculated_value['msdv'] # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶 df['posH_rad'] = np.radians(df['posH']) # 转换加速度到车身坐标系 # 注意:posH是航向角,北向为0度,顺时针为正 # 车身X轴 = 东向*sin(posH) + 北向*cos(posH) # 车身Y轴 = 东向*cos(posH) - 北向*sin(posH) df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad']) df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad']) # Z方向加速度,如果没有则假设为0 df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df))) # 计算时间差 df['time_diff'] = df['simTime'].diff().fillna(0) # 应用ISO 2631-1:1997标准的频率加权滤波 # 估计采样频率 - 假设数据是均匀采样的 if len(df) > 1: time_diff = df['simTime'].diff().median() fs = 1.0 / time_diff if time_diff > 0 else 100 # 默认100Hz else: fs = 100 # 默认采样频率 # 对各方向加速度应用适当的频率加权 # 对于晕动评估,使用Wf加权滤波器 a_x_weighted = self._apply_frequency_weighting(df['a_x_body'], 'Wf', fs) a_y_weighted = self._apply_frequency_weighting(df['a_y_body'], 'Wf', fs) a_z_weighted = self._apply_frequency_weighting(df['a_z_body'], 'Wf', fs) # 计算MSDV - 对加速度平方进行时间积分 # 对于X方向(前后方向)- 主要影响晕动感 msdv_x = np.sqrt(np.sum(a_x_weighted**2 * df['time_diff'])) # 对于Y方向(左右方向) msdv_y = np.sqrt(np.sum(a_y_weighted**2 * df['time_diff'])) # 对于Z方向(上下方向)- 也对晕动有显著影响 msdv_z = np.sqrt(np.sum(a_z_weighted**2 * df['time_diff'])) # 综合MSDV - 可以使用向量和或加权和 # 根据ISO 2631标准,垂直方向(Z)的权重通常更高 msdv = np.sqrt(msdv_x**2 + msdv_y**2 + (1.4 * msdv_z)**2) # 记录计算结果 self.calculated_value['msdv'] = msdv self.logger.info(f"晕动剂量值(MSDV)计算结果: {msdv}") return msdv