import os import numpy as np import pandas as pd import yaml from modules.lib.log_manager import LogManager class DataPreprocessing: def __init__(self, data_path, config_path): self.data_path = data_path self.case_name = os.path.basename(os.path.normpath(data_path)) self.config_path = config_path # 初始化配置容器 self.vehicle_config = {} self.safety_config = {} self.comfort_config = {} self.efficient_config = {} self.function_config = {} self.traffic_config = {} # 数据容器 self.object_df = pd.DataFrame() self.driver_ctrl_df = pd.DataFrame() self.vehicle_sys_df = pd.DataFrame() self.ego_data_df = pd.DataFrame() self.obj_data = {} self.ego_data = {} self.obj_id_list = [] self.data_quality_level = 15 # 加载配置并处理数据 self._get_yaml_config() self._process_mode() self.report_info = self._get_report_info(self.ego_data) def _process_mode(self): """当前支持默认处理方式""" self._real_process_object_df() def _get_yaml_config(self): with open(self.config_path, 'r') as f: full_config = yaml.safe_load(f) T_threshold_config = {"T_threshold": full_config.get("T_threshold", {})} self.vehicle_config = full_config.get("vehicle", {}) self.safety_config = {"safety": full_config.get("safety", {})} self.safety_config.update(T_threshold_config) self.comfort_config = {"comfort": full_config.get("comfort", {})} self.comfort_config.update(T_threshold_config) self.efficient_config = {"efficient": full_config.get("efficient", {})} self.efficient_config.update(T_threshold_config) self.function_config = {"function": full_config.get("function", {})} self.function_config.update(T_threshold_config) self.traffic_config = {"traffic": full_config.get("traffic", {})} self.traffic_config.update(T_threshold_config) def _plot_coordinate_comparison(self, ego_df): """绘制全局坐标系与自车坐标系的运动学量对比图""" try: import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec import os plt.figure(figsize=(18, 10)) gs = GridSpec(3, 3, figure=plt.gcf()) # 改为 3 行 3 列布局 # 1. 速度对比 - 纵向分量 ax1 = plt.subplot(gs[0, 0]) # ax1.plot(ego_df['simTime'], ego_df['speedX'], 'r-', label='Global Vx') # ax1.plot(ego_df['simTime'], ego_df['speedY'], 'g-', label='Global Vy') ax1.plot(ego_df['simTime'], ego_df['lon_v_vehicle'], 'b-', label='Ego Frame Lon V (2D)') # ax1.plot(ego_df['simTime'], ego_df['lon_v_vehicle_3D'], 'y--', label='Ego Frame Lon V (3D)') ax1.set_title('Longitudinal Velocity Comparison') ax1.set_ylabel('Velocity (m/s)') ax1.legend() ax1.grid(True) # 2. 速度对比 - 横向分量 ax2 = plt.subplot(gs[0, 1]) # ax2.plot(ego_df['simTime'], ego_df['speedX'], 'r-', label='Global Vx') # ax2.plot(ego_df['simTime'], ego_df['speedY'], 'g-', label='Global Vy') ax2.plot(ego_df['simTime'], ego_df['lat_v_vehicle'], 'b-', label='Ego Frame Lat V (2D)') # ax2.plot(ego_df['simTime'], ego_df['lat_v_vehicle_3D'], 'y--', label='Ego Frame Lat V (3D)') ax2.set_title('Lateral Velocity Comparison') ax2.legend() ax2.grid(True) # 3. 航向角速度(新增子图) ax3 = plt.subplot(gs[0, 2]) ax3.plot(ego_df['simTime'], ego_df['speedH'], 'm-', label='Heading Rate') ax3.set_title('Ego Heading Rate') ax3.set_ylabel('Yaw Rate (deg/s)') ax3.grid(True) # 4. 加速度对比 - 纵向分量 ax4 = plt.subplot(gs[1, 0]) # ax4.plot(ego_df['simTime'], ego_df['accelX'], 'r-', label='Global Ax') # ax4.plot(ego_df['simTime'], ego_df['accelY'], 'g-', label='Global Ay') ax4.plot(ego_df['simTime'], ego_df['lon_acc_vehicle'], 'b-', label='Ego Frame Lon A (2D)') # ax4.plot(ego_df['simTime'], ego_df['lon_acc_vehicle_3D'], 'y--', label='Ego Frame Lon A (3D)') ax4.set_title('Longitudinal Acceleration Comparison') ax4.set_ylabel('Acceleration (m/s²)') ax4.legend() ax4.grid(True) # 5. 加速度对比 - 横向分量 ax5 = plt.subplot(gs[1, 1]) # ax5.plot(ego_df['simTime'], ego_df['accelX'], 'r-', label='Global Ax') # ax5.plot(ego_df['simTime'], ego_df['accelY'], 'g-', label='Global Ay') ax5.plot(ego_df['simTime'], ego_df['lon_acc_rate'], 'b-', label='Ego Frame Lat A (2D)') # ax5.plot(ego_df['simTime'], ego_df['lat_acc_vehicle_3D'], 'y--', label='Ego Frame Lat A (3D)') ax5.set_title('Lateral Acceleration Comparison') ax5.legend() ax5.grid(True) # 6. 航向角变化(原图,平移至右侧) ax6 = plt.subplot(gs[1, 2]) ax6.plot(ego_df['simTime'], ego_df['ego_posH'], 'b-', label='Ego Heading') ax6.set_title('Ego Vehicle Heading') ax6.set_ylabel('Heading (deg)') ax6.grid(True) plt.tight_layout() # 保存图像到当前目录 plot_path = os.path.join(os.getcwd(), 'coordinate_transform_comparison.png') plt.savefig(plot_path) plt.close() logger = LogManager().get_logger() logger.info(f"坐标系转换对比图已保存至: {plot_path}") except ImportError: logger = LogManager().get_logger() logger.warning("Matplotlib未安装,无法生成坐标系转换对比图") except Exception as e: logger = LogManager().get_logger() logger.error(f"生成坐标系转换对比图时出错: {e}", exc_info=True) def _real_process_object_df(self): """读取并处理对象数据(含自车)""" try: merged_csv_path = os.path.join(self.data_path, "merged_ObjState.csv") if not os.path.exists(merged_csv_path): logger = LogManager().get_logger() logger.error(f"文件不存在: {merged_csv_path}") raise FileNotFoundError(f"文件不存在: {merged_csv_path}") df = pd.read_csv( merged_csv_path, dtype={"simTime": float}, on_bad_lines="skip", na_values=["", "NA", "null", "NaN"] ).drop_duplicates(subset=["simTime", "simFrame", "playerId"]) # 列名标准化 df.columns = [col.replace("+AF8-", "_") for col in df.columns] df = df.dropna(subset=["type"]).reset_index(drop=True) # ===== 修正航向角处理 ===== # 确保航向角在-180到180度范围内(0度正北,90度正东) if 'posH' in df.columns: df['posH'] = df['posH'] % 360 # 规范化到0-360度 df['posH'] = np.where(df['posH'] > 180, df['posH'] - 360, df['posH']) # ===== 提取自车数据 ===== # 自车ID固定为1 EGO_PLAYER_ID = 1 ego_mask = df['playerId'] == EGO_PLAYER_ID # 确保所有车辆都有pitch和roll列 for col in ['pitch', 'roll']: if col not in df.columns: df[col] = 0.0 # 提取自车姿态数据 ego_cols = ['simTime', 'posX', 'posY', 'posH', 'pitch', 'roll'] ego_df = df[ego_mask][ego_cols].copy() ego_df.columns = ['simTime'] + [f'ego_{col}' for col in ego_cols[1:]] # 合并自车姿态信息到主表 df = pd.merge(df, ego_df, on='simTime', how='left') # ===== 计算相对位置(在自车坐标系下)===== # 计算全局坐标差 dx_global = df['posX'] - df['ego_posX'] dy_global = df['posY'] - df['ego_posY'] # 转换为弧度并计算三角函数 ego_heading_rad = np.deg2rad(df['ego_posH']) cos_h = np.cos(ego_heading_rad) sin_h = np.sin(ego_heading_rad) # 向量化计算相对坐标(在自车坐标系下) df['x_relative_start_dist'] = dx_global * sin_h + dy_global * cos_h df['y_relative_start_dist'] = -dx_global * cos_h + dy_global * sin_h # 自车相对位置设为0 df.loc[ego_mask, 'x_relative_start_dist'] = 0.0 df.loc[ego_mask, 'y_relative_start_dist'] = 0.0 # 保存原始数据 self.object_df = df.copy() # 处理运动学数据(所有车辆在自车坐标系下) all_processed_dfs = [] self.obj_data = {} for obj_id, obj_df in df.groupby("playerId"): # 使用自车姿态计算运动学量 processed_df = self._calculate_kinematics_in_ego_frame(obj_df) self.obj_data[obj_id] = processed_df all_processed_dfs.append(processed_df) # 合并处理后的数据 if all_processed_dfs: combined_processed = pd.concat(all_processed_dfs) # 生成唯一键 self.object_df['merge_key'] = self.object_df['playerId'].astype(str) + '_' + self.object_df[ 'simTime'].astype(str) combined_processed['merge_key'] = combined_processed['playerId'].astype(str) + '_' + combined_processed[ 'simTime'].astype(str) # 合并新列 new_columns = [col for col in combined_processed.columns if col not in self.object_df.columns or col == 'merge_key'] self.object_df = pd.merge( self.object_df, combined_processed[new_columns], on='merge_key', how='left' ).drop(columns=['merge_key']) # 设置自车数据 self.obj_id_list = list(self.obj_data.keys()) self.ego_data = self.obj_data.get(EGO_PLAYER_ID, None) if self.ego_data is not None: # self._plot_coordinate_comparison(self.ego_data) pass except Exception as e: logger = LogManager().get_logger() logger.error(f"处理对象数据帧时出错: {e}", exc_info=True) raise def _calculate_kinematics_in_ego_frame(self, df): """计算车辆在自车坐标系下的运动学量,优化版本""" # 创建副本避免修改原始数据,只复制必要的列以减少内存使用 needed_cols = ['simTime', 'ego_posH', 'ego_pitch', 'ego_roll', 'speedX', 'speedY', 'accelX', 'accelY', 'speedH'] df = df[needed_cols + [col for col in df.columns if col not in needed_cols]].copy() # 确保有必要的列,使用向量化操作 for col in ['speedZ', 'accelZ']: if col not in df.columns: df[col] = 0.0 # 处理时间序列 - 使用向量化操作 df.sort_values(by="simTime", inplace=True) df.reset_index(drop=True, inplace=True) df["time_diff"] = df["simTime"].diff().replace(0, np.nan).fillna(method="bfill").fillna(0.01) # 预计算所有三角函数值,避免重复计算 # ===== 1. 航向角处理 ===== ego_yaw_rad = np.deg2rad(90 - df['ego_posH'].values) # 转换为数学坐标系 ego_pitch_rad = np.deg2rad(df['ego_pitch'].values) ego_roll_rad = np.deg2rad(df['ego_roll'].values) yaw_rad = np.deg2rad(df['ego_posH'].values) # 预计算所有三角函数值 cy_math, sy_math = np.cos(ego_yaw_rad), np.sin(ego_yaw_rad) cp, sp = np.cos(ego_pitch_rad), np.sin(ego_pitch_rad) cr, sr = np.cos(ego_roll_rad), np.sin(ego_roll_rad) cy, sy = np.cos(yaw_rad), np.sin(yaw_rad) # 获取全局速度/加速度向量 vx = df['speedX'].values # 东 vy = df['speedY'].values # 北 vz = df['speedZ'].values # 天 ax = df['accelX'].values ay = df['accelY'].values az = df['accelZ'].values # ===== 2. 三维转换 ===== # 构建旋转矩阵 - 使用批量计算 r00 = cy_math * cp r01 = cy_math * sp * sr - sy_math * cr r02 = cy_math * sp * cr + sy_math * sr r10 = sy_math * cp r11 = sy_math * sp * sr + cy_math * cr r12 = sy_math * sp * cr - cy_math * sr r20 = -sp r21 = cp * sr r22 = cp * cr # 转换速度向量 - 使用向量化计算代替einsum df['lon_v_vehicle_3D'] = r00 * vx + r01 * vy + r02 * vz df['lat_v_vehicle_3D'] = r10 * vx + r11 * vy + r12 * vz df['vel_z_vehicle_3D'] = r20 * vx + r21 * vy + r22 * vz # 转换加速度向量 - 使用向量化计算 df['lon_acc_vehicle_3D'] = r00 * ax + r01 * ay + r02 * az df['lat_acc_vehicle_3D'] = r10 * ax + r11 * ay + r12 * az df['acc_z_vehicle_3D'] = r20 * ax + r21 * ay + r22 * az # ===== 3. 二维转换(与3D水平面一致)===== # 使用原始航向角定义(0°=北,90°=东,顺时针为正) # 使用向量化计算 df['lon_v_vehicle'] = vx * sy - vy * cy df['lat_v_vehicle'] = vx * cy + vy * sy df['lon_acc_vehicle'] = ax * sy - ay * cy df['lat_acc_vehicle'] = ax * cy + ay * sy # ===== 5. 计算其他运动学量 ===== # 速度大小 - 使用向量化计算 df["v"] = np.sqrt(vx ** 2 + vy ** 2) # 加速度变化率 - 使用向量化计算 lat_acc_diff = np.diff(df["lat_acc_vehicle"].values, prepend=df["lat_acc_vehicle"].values[0]) lon_acc_diff = np.diff(df["lon_acc_vehicle"].values, prepend=df["lon_acc_vehicle"].values[0]) yawrate_diff = np.diff(df["speedH"].values, prepend=df["speedH"].values[0]) time_diff = df["time_diff"].values # 使用numpy操作代替pandas操作 df["lat_acc_rate"] = np.where(time_diff == 0, 0, lat_acc_diff / time_diff) df["lon_acc_rate"] = np.where(time_diff == 0, 0, lon_acc_diff / time_diff) df["accelH"] = np.where(time_diff == 0, 0, yawrate_diff / time_diff) # 替换无穷值 df.replace([np.inf, -np.inf], np.nan, inplace=True) df.fillna(0, inplace=True) return df def _get_report_info(self, df): # 提取里程与时长 return { "mileage": self._mileage_cal(df), "duration": self._duration_cal(df) } def _mileage_cal(self, df): if len(df) < 2: return 0.0 if df["travelDist"].nunique() == 1: temp_df = df.copy() temp_df["time_diff"] = temp_df["simTime"].diff().fillna(0) temp_df["avg_speed"] = (temp_df["v"] + temp_df["v"].shift()).fillna(0) / 2 temp_df["distance_increment"] = temp_df["avg_speed"] * temp_df["time_diff"] / 3.6 temp_df["travelDist"] = temp_df["distance_increment"].cumsum().fillna(0) return round(temp_df["travelDist"].iloc[-1] - temp_df["travelDist"].iloc[0], 2) else: return round(df["travelDist"].max() - df["travelDist"].min(), 2) def _duration_cal(self, df): return df["simTime"].iloc[-1] - df["simTime"].iloc[0]