123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- import os
- import numpy as np
- import pandas as pd
- import yaml
- from modules.lib.log_manager import LogManager
- class DataPreprocessing:
- def __init__(self, data_path, config_path):
- self.data_path = data_path
- self.case_name = os.path.basename(os.path.normpath(data_path))
- self.config_path = config_path
- # 初始化配置容器
- self.vehicle_config = {}
- self.safety_config = {}
- self.comfort_config = {}
- self.efficient_config = {}
- self.function_config = {}
- self.traffic_config = {}
- # 数据容器
- self.object_df = pd.DataFrame()
- self.driver_ctrl_df = pd.DataFrame()
- self.vehicle_sys_df = pd.DataFrame()
- self.ego_data_df = pd.DataFrame()
- self.obj_data = {}
- self.ego_data = {}
- self.obj_id_list = []
- self.data_quality_level = 15
- # 加载配置并处理数据
- self._get_yaml_config()
- self._process_mode()
- self.report_info = self._get_report_info(self.ego_data)
- def _process_mode(self):
- """当前支持默认处理方式"""
- self._real_process_object_df()
- def _get_yaml_config(self):
- with open(self.config_path, 'r') as f:
- full_config = yaml.safe_load(f)
- T_threshold_config = {"T_threshold": full_config.get("T_threshold", {})}
- self.vehicle_config = full_config.get("vehicle", {})
- self.safety_config = {"safety": full_config.get("safety", {})}
- self.safety_config.update(T_threshold_config)
- self.comfort_config = {"comfort": full_config.get("comfort", {})}
- self.comfort_config.update(T_threshold_config)
- self.efficient_config = {"efficient": full_config.get("efficient", {})}
- self.efficient_config.update(T_threshold_config)
- self.function_config = {"function": full_config.get("function", {})}
- self.function_config.update(T_threshold_config)
- self.traffic_config = {"traffic": full_config.get("traffic", {})}
- self.traffic_config.update(T_threshold_config)
- def _plot_coordinate_comparison(self, ego_df):
- """绘制全局坐标系与自车坐标系的运动学量对比图"""
- try:
- import matplotlib.pyplot as plt
- from matplotlib.gridspec import GridSpec
- import os
- plt.figure(figsize=(18, 10))
- gs = GridSpec(3, 3, figure=plt.gcf()) # 改为 3 行 3 列布局
- # 1. 速度对比 - 纵向分量
- ax1 = plt.subplot(gs[0, 0])
- # ax1.plot(ego_df['simTime'], ego_df['speedX'], 'r-', label='Global Vx')
- # ax1.plot(ego_df['simTime'], ego_df['speedY'], 'g-', label='Global Vy')
- ax1.plot(ego_df['simTime'], ego_df['lon_v_vehicle'], 'b-', label='Ego Frame Lon V (2D)')
- # ax1.plot(ego_df['simTime'], ego_df['lon_v_vehicle_3D'], 'y--', label='Ego Frame Lon V (3D)')
- ax1.set_title('Longitudinal Velocity Comparison')
- ax1.set_ylabel('Velocity (m/s)')
- ax1.legend()
- ax1.grid(True)
- # 2. 速度对比 - 横向分量
- ax2 = plt.subplot(gs[0, 1])
- # ax2.plot(ego_df['simTime'], ego_df['speedX'], 'r-', label='Global Vx')
- # ax2.plot(ego_df['simTime'], ego_df['speedY'], 'g-', label='Global Vy')
- ax2.plot(ego_df['simTime'], ego_df['lat_v_vehicle'], 'b-', label='Ego Frame Lat V (2D)')
- # ax2.plot(ego_df['simTime'], ego_df['lat_v_vehicle_3D'], 'y--', label='Ego Frame Lat V (3D)')
- ax2.set_title('Lateral Velocity Comparison')
- ax2.legend()
- ax2.grid(True)
- # 3. 航向角速度(新增子图)
- ax3 = plt.subplot(gs[0, 2])
- ax3.plot(ego_df['simTime'], ego_df['speedH'], 'm-', label='Heading Rate')
- ax3.set_title('Ego Heading Rate')
- ax3.set_ylabel('Yaw Rate (deg/s)')
- ax3.grid(True)
- # 4. 加速度对比 - 纵向分量
- ax4 = plt.subplot(gs[1, 0])
- # ax4.plot(ego_df['simTime'], ego_df['accelX'], 'r-', label='Global Ax')
- # ax4.plot(ego_df['simTime'], ego_df['accelY'], 'g-', label='Global Ay')
- ax4.plot(ego_df['simTime'], ego_df['lon_acc_vehicle'], 'b-', label='Ego Frame Lon A (2D)')
- # ax4.plot(ego_df['simTime'], ego_df['lon_acc_vehicle_3D'], 'y--', label='Ego Frame Lon A (3D)')
- ax4.set_title('Longitudinal Acceleration Comparison')
- ax4.set_ylabel('Acceleration (m/s²)')
- ax4.legend()
- ax4.grid(True)
- # 5. 加速度对比 - 横向分量
- ax5 = plt.subplot(gs[1, 1])
- # ax5.plot(ego_df['simTime'], ego_df['accelX'], 'r-', label='Global Ax')
- # ax5.plot(ego_df['simTime'], ego_df['accelY'], 'g-', label='Global Ay')
- ax5.plot(ego_df['simTime'], ego_df['lon_acc_rate'], 'b-', label='Ego Frame Lat A (2D)')
- # ax5.plot(ego_df['simTime'], ego_df['lat_acc_vehicle_3D'], 'y--', label='Ego Frame Lat A (3D)')
- ax5.set_title('Lateral Acceleration Comparison')
- ax5.legend()
- ax5.grid(True)
- # 6. 航向角变化(原图,平移至右侧)
- ax6 = plt.subplot(gs[1, 2])
- ax6.plot(ego_df['simTime'], ego_df['ego_posH'], 'b-', label='Ego Heading')
- ax6.set_title('Ego Vehicle Heading')
- ax6.set_ylabel('Heading (deg)')
- ax6.grid(True)
- plt.tight_layout()
- # 保存图像到当前目录
- plot_path = os.path.join(os.getcwd(), 'coordinate_transform_comparison.png')
- plt.savefig(plot_path)
- plt.close()
- logger = LogManager().get_logger()
- logger.info(f"坐标系转换对比图已保存至: {plot_path}")
- except ImportError:
- logger = LogManager().get_logger()
- logger.warning("Matplotlib未安装,无法生成坐标系转换对比图")
- except Exception as e:
- logger = LogManager().get_logger()
- logger.error(f"生成坐标系转换对比图时出错: {e}", exc_info=True)
- def _real_process_object_df(self):
- """读取并处理对象数据(含自车)"""
- try:
- merged_csv_path = os.path.join(self.data_path, "merged_ObjState.csv")
- if not os.path.exists(merged_csv_path):
- logger = LogManager().get_logger()
- logger.error(f"文件不存在: {merged_csv_path}")
- raise FileNotFoundError(f"文件不存在: {merged_csv_path}")
- df = pd.read_csv(
- merged_csv_path,
- dtype={"simTime": float},
- on_bad_lines="skip",
- na_values=["", "NA", "null", "NaN"]
- ).drop_duplicates(subset=["simTime", "simFrame", "playerId"])
- # 列名标准化
- df.columns = [col.replace("+AF8-", "_") for col in df.columns]
- df = df.dropna(subset=["type"]).reset_index(drop=True)
- # ===== 修正航向角处理 =====
- # 确保航向角在-180到180度范围内(0度正北,90度正东)
- if 'posH' in df.columns:
- df['posH'] = df['posH'] % 360 # 规范化到0-360度
- df['posH'] = np.where(df['posH'] > 180, df['posH'] - 360, df['posH'])
- # ===== 提取自车数据 =====
- # 自车ID固定为1
- EGO_PLAYER_ID = 1
- ego_mask = df['playerId'] == EGO_PLAYER_ID
- # 确保所有车辆都有pitch和roll列
- for col in ['pitch', 'roll']:
- if col not in df.columns:
- df[col] = 0.0
- # 提取自车姿态数据
- ego_cols = ['simTime', 'posX', 'posY', 'posH', 'pitch', 'roll']
- ego_df = df[ego_mask][ego_cols].copy()
- ego_df.columns = ['simTime'] + [f'ego_{col}' for col in ego_cols[1:]]
- # 合并自车姿态信息到主表
- df = pd.merge(df, ego_df, on='simTime', how='left')
- # ===== 计算相对位置(在自车坐标系下)=====
- # 计算全局坐标差
- dx_global = df['posX'] - df['ego_posX']
- dy_global = df['posY'] - df['ego_posY']
- # 转换为弧度并计算三角函数
- ego_heading_rad = np.deg2rad(df['ego_posH'])
- cos_h = np.cos(ego_heading_rad)
- sin_h = np.sin(ego_heading_rad)
- # 向量化计算相对坐标(在自车坐标系下)
- df['x_relative_start_dist'] = dx_global * sin_h + dy_global * cos_h
- df['y_relative_start_dist'] = -dx_global * cos_h + dy_global * sin_h
- # 自车相对位置设为0
- df.loc[ego_mask, 'x_relative_start_dist'] = 0.0
- df.loc[ego_mask, 'y_relative_start_dist'] = 0.0
- # 保存原始数据
- self.object_df = df.copy()
- # 处理运动学数据(所有车辆在自车坐标系下)
- all_processed_dfs = []
- self.obj_data = {}
- for obj_id, obj_df in df.groupby("playerId"):
- # 使用自车姿态计算运动学量
- processed_df = self._calculate_kinematics_in_ego_frame(obj_df)
- self.obj_data[obj_id] = processed_df
- all_processed_dfs.append(processed_df)
- # 合并处理后的数据
- if all_processed_dfs:
- combined_processed = pd.concat(all_processed_dfs)
- # 生成唯一键
- self.object_df['merge_key'] = self.object_df['playerId'].astype(str) + '_' + self.object_df[
- 'simTime'].astype(str)
- combined_processed['merge_key'] = combined_processed['playerId'].astype(str) + '_' + combined_processed[
- 'simTime'].astype(str)
- # 合并新列
- new_columns = [col for col in combined_processed.columns
- if col not in self.object_df.columns or col == 'merge_key']
- self.object_df = pd.merge(
- self.object_df,
- combined_processed[new_columns],
- on='merge_key',
- how='left'
- ).drop(columns=['merge_key'])
- # 设置自车数据
- self.obj_id_list = list(self.obj_data.keys())
- self.ego_data = self.obj_data.get(EGO_PLAYER_ID, None)
- if self.ego_data is not None:
- # self._plot_coordinate_comparison(self.ego_data)
- pass
- except Exception as e:
- logger = LogManager().get_logger()
- logger.error(f"处理对象数据帧时出错: {e}", exc_info=True)
- raise
- def _calculate_kinematics_in_ego_frame(self, df):
- """计算车辆在自车坐标系下的运动学量,优化版本"""
- # 创建副本避免修改原始数据,只复制必要的列以减少内存使用
- needed_cols = ['simTime', 'ego_posH', 'ego_pitch', 'ego_roll', 'speedX', 'speedY', 'accelX', 'accelY', 'speedH']
- df = df[needed_cols + [col for col in df.columns if col not in needed_cols]].copy()
- # 确保有必要的列,使用向量化操作
- for col in ['speedZ', 'accelZ']:
- if col not in df.columns:
- df[col] = 0.0
- # 处理时间序列 - 使用向量化操作
- df.sort_values(by="simTime", inplace=True)
- df.reset_index(drop=True, inplace=True)
- df["time_diff"] = df["simTime"].diff().replace(0, np.nan).fillna(method="bfill").fillna(0.01)
- # 预计算所有三角函数值,避免重复计算
- # ===== 1. 航向角处理 =====
- ego_yaw_rad = np.deg2rad(90 - df['ego_posH'].values) # 转换为数学坐标系
- ego_pitch_rad = np.deg2rad(df['ego_pitch'].values)
- ego_roll_rad = np.deg2rad(df['ego_roll'].values)
- yaw_rad = np.deg2rad(df['ego_posH'].values)
- # 预计算所有三角函数值
- cy_math, sy_math = np.cos(ego_yaw_rad), np.sin(ego_yaw_rad)
- cp, sp = np.cos(ego_pitch_rad), np.sin(ego_pitch_rad)
- cr, sr = np.cos(ego_roll_rad), np.sin(ego_roll_rad)
- cy, sy = np.cos(yaw_rad), np.sin(yaw_rad)
- # 获取全局速度/加速度向量
- vx = df['speedX'].values # 东
- vy = df['speedY'].values # 北
- vz = df['speedZ'].values # 天
- ax = df['accelX'].values
- ay = df['accelY'].values
- az = df['accelZ'].values
- # ===== 2. 三维转换 =====
- # 构建旋转矩阵 - 使用批量计算
- r00 = cy_math * cp
- r01 = cy_math * sp * sr - sy_math * cr
- r02 = cy_math * sp * cr + sy_math * sr
- r10 = sy_math * cp
- r11 = sy_math * sp * sr + cy_math * cr
- r12 = sy_math * sp * cr - cy_math * sr
- r20 = -sp
- r21 = cp * sr
- r22 = cp * cr
- # 转换速度向量 - 使用向量化计算代替einsum
- df['lon_v_vehicle_3D'] = r00 * vx + r01 * vy + r02 * vz
- df['lat_v_vehicle_3D'] = r10 * vx + r11 * vy + r12 * vz
- df['vel_z_vehicle_3D'] = r20 * vx + r21 * vy + r22 * vz
- # 转换加速度向量 - 使用向量化计算
- df['lon_acc_vehicle_3D'] = r00 * ax + r01 * ay + r02 * az
- df['lat_acc_vehicle_3D'] = r10 * ax + r11 * ay + r12 * az
- df['acc_z_vehicle_3D'] = r20 * ax + r21 * ay + r22 * az
- # ===== 3. 二维转换(与3D水平面一致)=====
- # 使用原始航向角定义(0°=北,90°=东,顺时针为正)
- # 使用向量化计算
- df['lon_v_vehicle'] = vx * sy - vy * cy
- df['lat_v_vehicle'] = vx * cy + vy * sy
- df['lon_acc_vehicle'] = ax * sy - ay * cy
- df['lat_acc_vehicle'] = ax * cy + ay * sy
- # ===== 5. 计算其他运动学量 =====
- # 速度大小 - 使用向量化计算
- df["v"] = np.sqrt(vx ** 2 + vy ** 2)
- # 加速度变化率 - 使用向量化计算
- lat_acc_diff = np.diff(df["lat_acc_vehicle"].values, prepend=df["lat_acc_vehicle"].values[0])
- lon_acc_diff = np.diff(df["lon_acc_vehicle"].values, prepend=df["lon_acc_vehicle"].values[0])
- yawrate_diff = np.diff(df["speedH"].values, prepend=df["speedH"].values[0])
- time_diff = df["time_diff"].values
- # 使用numpy操作代替pandas操作
- df["lat_acc_rate"] = np.where(time_diff == 0, 0, lat_acc_diff / time_diff)
- df["lon_acc_rate"] = np.where(time_diff == 0, 0, lon_acc_diff / time_diff)
- df["accelH"] = np.where(time_diff == 0, 0, yawrate_diff / time_diff)
- # 替换无穷值
- df.replace([np.inf, -np.inf], np.nan, inplace=True)
- df.fillna(0, inplace=True)
- return df
- def _get_report_info(self, df):
- # 提取里程与时长
- return {
- "mileage": self._mileage_cal(df),
- "duration": self._duration_cal(df)
- }
- def _mileage_cal(self, df):
- if len(df) < 2:
- return 0.0
- if df["travelDist"].nunique() == 1:
- temp_df = df.copy()
- temp_df["time_diff"] = temp_df["simTime"].diff().fillna(0)
- temp_df["avg_speed"] = (temp_df["v"] + temp_df["v"].shift()).fillna(0) / 2
- temp_df["distance_increment"] = temp_df["avg_speed"] * temp_df["time_diff"] / 3.6
- temp_df["travelDist"] = temp_df["distance_increment"].cumsum().fillna(0)
- return round(temp_df["travelDist"].iloc[-1] - temp_df["travelDist"].iloc[0], 2)
- else:
- return round(df["travelDist"].max() - df["travelDist"].min(), 2)
- def _duration_cal(self, df):
- return df["simTime"].iloc[-1] - df["simTime"].iloc[0]
|