|
@@ -3,272 +3,432 @@ from typing import Dict, Any, Optional
|
|
import pandas as pd
|
|
import pandas as pd
|
|
import numpy as np
|
|
import numpy as np
|
|
from pyproj import Proj
|
|
from pyproj import Proj
|
|
|
|
+from dataclasses import dataclass, field
|
|
|
|
+from typing import Dict, Optional
|
|
|
|
+from pathlib import Path
|
|
|
|
+import pandas as pd
|
|
|
|
+from core.error_handler import ErrorHandler
|
|
|
|
+from core.config_manager import get_config
|
|
|
|
+import sys
|
|
|
|
+import csv
|
|
|
|
+import os
|
|
|
|
+import zipfile
|
|
|
|
+import argparse
|
|
|
|
+from genpy import Message
|
|
|
|
+import shutil
|
|
|
|
+import tempfile
|
|
|
|
+import pandas as pd
|
|
|
|
+import subprocess
|
|
|
|
+import pandas as pd
|
|
|
|
+import numpy as np
|
|
|
|
+
|
|
|
|
+@dataclass
|
|
|
|
+class Config:
|
|
|
|
+ """PGVIL处理器配置类"""
|
|
|
|
+
|
|
|
|
+ zip_path: Path
|
|
|
|
+ output_path: Path
|
|
|
|
+ engine_path: Optional[Path] = None
|
|
|
|
+ map_path: Optional[Path] = None
|
|
|
|
+ utm_zone: int = 51 # Example UTM zone
|
|
|
|
+ x_offset: float = 0.0
|
|
|
|
+ y_offset: float = 0.0
|
|
|
|
|
|
-from core.processors.built_in.base import BaseDataProcessor
|
|
|
|
|
|
+ def __post_init__(self):
|
|
|
|
+ # Use output_path directly as output_dir to avoid nested directories
|
|
|
|
+ self.output_dir = self.output_path
|
|
|
|
+ self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
-class PGVILDataProcessor(BaseDataProcessor):
|
|
|
|
- """处理仿真内置数据的处理器"""
|
|
|
|
|
|
+def run_pgvil_engine(config: Config):
|
|
|
|
+ """Runs the external C++ preprocessing engine."""
|
|
|
|
+ if not config.engine_path or not config.map_path:
|
|
|
|
+ print("C++ engine path or map path not configured. Skipping C++ engine execution.")
|
|
|
|
+ return True # Return True assuming it's optional or handled elsewhere
|
|
|
|
+
|
|
|
|
+ engine_cmd = [
|
|
|
|
+ str(config.engine_path),
|
|
|
|
+ str(config.map_path),
|
|
|
|
+ str(config.output_dir),
|
|
|
|
+ str(config.x_offset),
|
|
|
|
+ str(config.y_offset)
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ print(f"--- Running C++ Preprocessing Engine ---")
|
|
|
|
+ print(f"Command: {' '.join(engine_cmd)}")
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ result = subprocess.run(
|
|
|
|
+ engine_cmd,
|
|
|
|
+ check=True, # Raise exception on non-zero exit code
|
|
|
|
+ capture_output=True, # Capture stdout/stderr
|
|
|
|
+ text=True, # Decode output as text
|
|
|
|
+ cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed.
|
|
|
|
+ )
|
|
|
|
+ print("C++ Engine Output:")
|
|
|
|
+ print(result.stdout)
|
|
|
|
+ if result.stderr:
|
|
|
|
+ print("C++ Engine Error Output:")
|
|
|
|
+ print(result.stderr)
|
|
|
|
+ print("--- C++ Engine Finished Successfully ---")
|
|
|
|
+ return True
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
+ print(f"Error: C++ engine executable not found at {config.engine_path}.")
|
|
|
|
+ return False
|
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
|
+ print(f"Error: C++ engine failed with exit code {e.returncode}.")
|
|
|
|
+ print("C++ Engine Output (stdout):")
|
|
|
|
+ print(e.stdout)
|
|
|
|
+ print("C++ Engine Output (stderr):")
|
|
|
|
+ print(e.stderr)
|
|
|
|
+ return False
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(f"An unexpected error occurred while running the C++ engine: {e}")
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def remove_conflicting_columns(df_object, df_csv_info):
|
|
|
|
+ """
|
|
|
|
+ delete the columns that are in both dataframes and are not simTime, simFrame, or playerId
|
|
|
|
+ """
|
|
|
|
+ conflicting_columns = set(df_object.columns) & set(df_csv_info.columns)
|
|
|
|
+ for col in conflicting_columns:
|
|
|
|
+ if col not in ["simTime", "simFrame", "playerId"]:
|
|
|
|
+ del df_csv_info[col]
|
|
|
|
+ return df_csv_info
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def align_simtime_by_simframe(df):
|
|
|
|
+ # 创建一个映射,将simFrame映射到其对应的simTime代表值
|
|
|
|
+ sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict()
|
|
|
|
|
|
- def __init__(self):
|
|
|
|
- super().__init__("pgvil_processor")
|
|
|
|
- self.required_columns = {
|
|
|
|
- 'simTime': float,
|
|
|
|
- 'simFrame': int,
|
|
|
|
- 'playerId': int,
|
|
|
|
- 'v': float,
|
|
|
|
- 'speedX': float,
|
|
|
|
- 'speedY': float,
|
|
|
|
- 'posH': float,
|
|
|
|
- 'speedH': float,
|
|
|
|
- 'posX': float,
|
|
|
|
- 'posY': float,
|
|
|
|
- 'accelX': float,
|
|
|
|
- 'accelY': float
|
|
|
|
- }
|
|
|
|
- # 初始化UTM投影
|
|
|
|
- self.projection = Proj(proj='utm', zone=51, ellps='WGS84', preserve_units='m')
|
|
|
|
-
|
|
|
|
- def process_data(self,
|
|
|
|
- input_path: Path,
|
|
|
|
- output_path: Path,
|
|
|
|
- **kwargs) -> Optional[Path]:
|
|
|
|
- """处理PGVIL数据
|
|
|
|
-
|
|
|
|
- Args:
|
|
|
|
- input_path: 输入文件路径
|
|
|
|
- output_path: 输出目录路径
|
|
|
|
- **kwargs: 额外参数
|
|
|
|
- - utm_zone: UTM区域
|
|
|
|
- - x_offset: X偏移
|
|
|
|
- - y_offset: Y偏移
|
|
|
|
|
|
+ # 使用映射来更新DataFrame中的simTime值
|
|
|
|
+ df['simTime'] = df['simFrame'].map(sim_frame_to_time_map)
|
|
|
|
+ # 检查simFrame列是否为空或包含非整数类型的数据
|
|
|
|
+ if df['simFrame'].empty or not df['simFrame'].apply(lambda x: isinstance(x, (int, np.integer))).all():
|
|
|
|
+ return df
|
|
|
|
+
|
|
|
|
+ # 识别缺失的simFrame
|
|
|
|
+ all_frames = np.arange(df['simFrame'].min(), df['simFrame'].max() + 1)
|
|
|
|
+ missing_frames = set(all_frames) - set(df['simFrame'])
|
|
|
|
+ new_rows = []
|
|
|
|
+ # 填补缺失的simFrame
|
|
|
|
+ for missing_frame in missing_frames:
|
|
|
|
+ prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max()
|
|
|
|
+ next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min()
|
|
|
|
|
|
- Returns:
|
|
|
|
- 处理后的文件路径
|
|
|
|
- """
|
|
|
|
- try:
|
|
|
|
- # 读取数据
|
|
|
|
- df = pd.read_csv(input_path)
|
|
|
|
- if df.empty:
|
|
|
|
- print(f"输入文件为空: {input_path}")
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- # 基本数据清理
|
|
|
|
- df = self._clean_data(df)
|
|
|
|
-
|
|
|
|
- # 坐标转换
|
|
|
|
- utm_zone = kwargs.get('utm_zone', 51)
|
|
|
|
- x_offset = kwargs.get('x_offset', 0.0)
|
|
|
|
- y_offset = kwargs.get('y_offset', 0.0)
|
|
|
|
-
|
|
|
|
- df = self._process_coordinates(df, utm_zone, x_offset, y_offset)
|
|
|
|
-
|
|
|
|
- # 计算额外字段
|
|
|
|
- df = self._calculate_additional_fields(df)
|
|
|
|
-
|
|
|
|
- # 确保输出目录存在
|
|
|
|
- output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
+ if prev_frame is not None and next_frame is not None:
|
|
|
|
+ prev_row = df[df['simFrame'] == prev_frame].iloc[0]
|
|
|
|
+ next_row = df[df['simFrame'] == next_frame].iloc[0]
|
|
|
|
|
|
- # 保存处理后的数据
|
|
|
|
- df.to_csv(output_path, index=False)
|
|
|
|
- print(f"数据处理完成,已保存至: {output_path}")
|
|
|
|
|
|
+ # 计算平均值并创建新行
|
|
|
|
+ new_row = prev_row.copy()
|
|
|
|
+ new_row['simFrame'] = missing_frame
|
|
|
|
+ for col in df.columns:
|
|
|
|
+ if col not in ['simTime', 'simFrame']:
|
|
|
|
+ new_row[col] = (prev_row[col] + next_row[col]) / 2
|
|
|
|
|
|
- return output_path
|
|
|
|
|
|
+ # 更新simTime值
|
|
|
|
+ new_row['simTime'] = sim_frame_to_time_map.get(missing_frame, np.nan)
|
|
|
|
|
|
- except Exception as e:
|
|
|
|
- print(f"处理PGVIL数据时出错: {e}")
|
|
|
|
- import traceback
|
|
|
|
- traceback.print_exc()
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
- def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
- """清理数据"""
|
|
|
|
- # 删除重复行
|
|
|
|
- df = df.drop_duplicates()
|
|
|
|
-
|
|
|
|
- # 确保所需列存在
|
|
|
|
- for col, dtype in self.required_columns.items():
|
|
|
|
- if col not in df.columns:
|
|
|
|
- df[col] = 0 if dtype in (int, float) else ''
|
|
|
|
- else:
|
|
|
|
- df[col] = df[col].astype(dtype)
|
|
|
|
-
|
|
|
|
- # 处理空值
|
|
|
|
- numeric_cols = [col for col, dtype in self.required_columns.items()
|
|
|
|
- if dtype in (int, float)]
|
|
|
|
- df[numeric_cols] = df[numeric_cols].fillna(0)
|
|
|
|
-
|
|
|
|
- # 按时间和ID排序
|
|
|
|
- df.sort_values(['simTime', 'simFrame', 'playerId'], inplace=True)
|
|
|
|
|
|
+ # 将新行添加到DataFrame中
|
|
|
|
+ new_rows.append(new_row)
|
|
|
|
+ if new_rows:
|
|
|
|
+ df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
|
|
|
|
+ return df.sort_values(by='simFrame').reset_index(drop=True)
|
|
|
|
+
|
|
|
|
+def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,):
|
|
|
|
+ """
|
|
|
|
+ 将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中
|
|
|
|
+ """
|
|
|
|
+ useful_cols = [col for col in external_df.columns if col not in ignore_cols]
|
|
|
|
+ for col in useful_cols:
|
|
|
|
+ col_name = f"{prefix}_{col}" if prefix else col
|
|
|
|
+ mapping = external_df.set_index('nearest_simTime')[col].to_dict()
|
|
|
|
+ merged_df[col_name] = merged_df['simTime'].map(mapping)
|
|
|
|
+ return merged_df
|
|
|
|
+
|
|
|
|
+def read_csv_with_filled_columns(file_path):
|
|
|
|
+ try:
|
|
|
|
+ # 确保 file_path 是字符串类型且有效
|
|
|
|
+ if not isinstance(file_path, str):
|
|
|
|
+ raise ValueError("提供的文件路径无效")
|
|
|
|
+ if not os.path.exists(file_path):
|
|
|
|
+ raise FileNotFoundError(f"文件 {file_path} 不存在")
|
|
|
|
|
|
- return df
|
|
|
|
-
|
|
|
|
- def _process_coordinates(self,
|
|
|
|
- df: pd.DataFrame,
|
|
|
|
- utm_zone: int,
|
|
|
|
- x_offset: float,
|
|
|
|
- y_offset: float) -> pd.DataFrame:
|
|
|
|
- """处理坐标数据"""
|
|
|
|
- if 'lat' in df.columns and 'lon' in df.columns:
|
|
|
|
- # 原始经纬度转UTM
|
|
|
|
- projection = Proj(proj='utm', zone=utm_zone, ellps='WGS84', preserve_units='m')
|
|
|
|
- x, y = projection(df['lon'].values, df['lat'].values)
|
|
|
|
-
|
|
|
|
- # 应用偏移
|
|
|
|
- df['posX'] = x + x_offset
|
|
|
|
- df['posY'] = y + y_offset
|
|
|
|
|
|
+ # 使用 on_bad_lines='skip' 跳过格式错误的行
|
|
|
|
+ df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行
|
|
|
|
|
|
|
|
+ # 强制填充缺失的列为 NaN,确保列数一致
|
|
|
|
+ if not df.empty: # 确保 df 不为空
|
|
|
|
+ df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值
|
|
return df
|
|
return df
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}")
|
|
|
|
+ return pd.DataFrame() # 返回空的 DataFrame 以便继续处理
|
|
|
|
+
|
|
|
|
+def convert_heading(posH_rad):
|
|
|
|
+ # 将弧度转换为角度
|
|
|
|
+ angle_deg = np.degrees(posH_rad)
|
|
|
|
+ # 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360
|
|
|
|
+ heading_deg = (90 - angle_deg) % 360
|
|
|
|
+ return round(heading_deg,3)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class PGVILProcessor:
|
|
|
|
+ """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
|
|
|
|
|
|
- def _calculate_additional_fields(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
- """计算额外的字段"""
|
|
|
|
- # 示例:计算合成速度
|
|
|
|
- if all(col in df.columns for col in ['speedX', 'speedY']):
|
|
|
|
- df['v'] = np.sqrt(df['speedX']**2 + df['speedY']**2)
|
|
|
|
|
|
+ def __init__(self, config: Config):
|
|
|
|
+ self.config = config
|
|
|
|
+
|
|
|
|
+ def process_zip(self) -> Path:
|
|
|
|
+ """处理输入ZIP文件,并返回输出目录路径
|
|
|
|
+ zip_path
|
|
|
|
+ output_dir
|
|
|
|
+ """
|
|
|
|
+ print(f"Processing ZIP: {self.config.zip_path}")
|
|
|
|
+ zip_path = self.config.zip_path
|
|
|
|
+ output_dir = Path(self.config.output_dir)#将目录路径转换为Path对象
|
|
|
|
+
|
|
|
|
+ # 创建以 ZIP 名称为子目录的提取目录
|
|
|
|
+ zip_name = Path(zip_path).stem
|
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
+
|
|
|
|
+ # 提取HMIdata和RDBdata中的CSV文件
|
|
|
|
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
|
|
+ for name in zip_ref.namelist():
|
|
|
|
+ if ('HMIdata/' in name or 'RDBdata/' in name) and name.endswith('.csv'):
|
|
|
|
+ # 原 zip 内的子路径最后一部分为文件名
|
|
|
|
+ filename = os.path.basename(name)
|
|
|
|
+ src = zip_ref.open(name)
|
|
|
|
+ dst_path = output_dir / filename
|
|
|
|
+ # print(f"提取 {name} 到 {dst_path}")
|
|
|
|
+ with open(dst_path, 'wb') as dst_file:
|
|
|
|
+ shutil.copyfileobj(src, dst_file)
|
|
|
|
+ # 更新 config 中的输出目录为刚才的子目录
|
|
|
|
+ self.config.output_dir = output_dir
|
|
|
|
+ return output_dir
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def merge_csv_files(self):
|
|
|
|
+ x_offset = self.config.x_offset
|
|
|
|
+ y_offset = self.config.y_offset
|
|
|
|
+ data_path = self.config.output_dir
|
|
|
|
+ # X_OFFSET = 258109.4239876
|
|
|
|
+ # Y_OFFSET = 4149969.964821
|
|
|
|
+
|
|
|
|
+ # 定义CSV文件路径
|
|
|
|
+ try:
|
|
|
|
+ obj_state_path = os.path.join(data_path, "ObjState.csv")
|
|
|
|
+ ego_map_path = os.path.join(data_path, "EgoMap.csv")
|
|
|
|
+ lane_map_path = os.path.join(data_path, "LaneMap.csv")
|
|
|
|
+ laneINfo_path = os.path.join(data_path, "LaneInfo.csv")
|
|
|
|
+ roadPos_path = os.path.join(data_path, "RoadPos.csv")
|
|
|
|
+ vehicleystems_path = os.path.join(data_path, "VehicleSystems.csv")
|
|
|
|
+ trafficlight_path = os.path.join(data_path, "TrafficLight.csv")
|
|
|
|
+ function_path = os.path.join(data_path, "Function.csv")
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
+ raise Exception("File not found")
|
|
|
|
|
|
- # 示例:计算行驶距离
|
|
|
|
- if 'v' in df.columns and 'simTime' in df.columns:
|
|
|
|
- df['travelDist'] = df.sort_values('simTime').groupby('playerId')['v'].cumsum() * df['simTime'].diff()
|
|
|
|
|
|
+ df_object = read_csv_with_filled_columns(obj_state_path)
|
|
|
|
+ df_map_info = read_csv_with_filled_columns(ego_map_path)
|
|
|
|
+ df_lane_map = read_csv_with_filled_columns(lane_map_path)
|
|
|
|
+ df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
|
|
|
|
+ df_roadPos = read_csv_with_filled_columns(roadPos_path)
|
|
|
|
+ df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path)
|
|
|
|
+ df_trafficlight = read_csv_with_filled_columns(trafficlight_path)
|
|
|
|
+ df_function = read_csv_with_filled_columns(function_path)
|
|
|
|
+
|
|
|
|
+ # 检查并转换数值型列
|
|
|
|
+ def convert_numeric_columns(df):
|
|
|
|
+ numeric_cols = df.select_dtypes(include=['number']).columns
|
|
|
|
+ df[numeric_cols] = df[numeric_cols].astype(float)
|
|
|
|
+ return df
|
|
|
|
+
|
|
|
|
+ df_object = convert_numeric_columns(df_object)
|
|
|
|
+ df_map_info = convert_numeric_columns(df_map_info)
|
|
|
|
+ df_lane_map = convert_numeric_columns(df_lane_map)
|
|
|
|
+ df_laneINfo = convert_numeric_columns(df_laneINfo)
|
|
|
|
+ df_roadPos = convert_numeric_columns(df_roadPos)
|
|
|
|
+ df_vehicleystems = convert_numeric_columns(df_vehicleystems)
|
|
|
|
+ df_trafficlight = convert_numeric_columns(df_trafficlight)
|
|
|
|
+ df_function = convert_numeric_columns(df_function)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # 对df_object中的posX和posY应用偏置
|
|
|
|
+ df_object['posX'] += x_offset
|
|
|
|
+ df_object['posY'] += y_offset
|
|
|
|
+
|
|
|
|
+ # 对齐simTime和simFrame
|
|
|
|
+ df_object = align_simtime_by_simframe(df_object)
|
|
|
|
+ df_map_info = align_simtime_by_simframe(df_map_info)
|
|
|
|
+ df_lane_map = align_simtime_by_simframe(df_lane_map)
|
|
|
|
+ df_laneINfo = align_simtime_by_simframe(df_laneINfo)
|
|
|
|
+ df_roadPos = align_simtime_by_simframe(df_roadPos)
|
|
|
|
+ df_vehicleystems = align_simtime_by_simframe(df_vehicleystems)
|
|
|
|
+ df_trafficlight = align_simtime_by_simframe(df_trafficlight)
|
|
|
|
|
|
- return df
|
|
|
|
-
|
|
|
|
- def validate_output(self, output_path: Path) -> bool:
|
|
|
|
- """验证输出数据
|
|
|
|
|
|
+ del_ego_map = remove_conflicting_columns(df_object, df_map_info)#去掉重复的列
|
|
|
|
+ # 合并数据
|
|
|
|
+ merged_df = pd.merge(df_object, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # 使用simTime, simFrame, playerId合并ObjState和LaneMap\trafficlight\trafficlight
|
|
|
|
+ del_lane_map = remove_conflicting_columns(merged_df, df_lane_map)
|
|
|
|
+ merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
|
+ del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo)
|
|
|
|
+ merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
|
+ del_roadPos = remove_conflicting_columns(merged_df, df_roadPos)
|
|
|
|
+ merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
|
+ del_trafficlight = remove_conflicting_columns(merged_df, df_trafficlight)
|
|
|
|
+ merged_df = pd.merge(merged_df, del_trafficlight, on=["simTime", "simFrame"], how="left").drop_duplicates()
|
|
|
|
+ del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems)
|
|
|
|
+ merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
|
|
|
- Args:
|
|
|
|
- output_path: 输出文件路径
|
|
|
|
-
|
|
|
|
- Returns:
|
|
|
|
- 验证是否通过
|
|
|
|
|
|
+ tolerance = 0.01
|
|
|
|
+ def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01):
|
|
|
|
+ # 找到最接近的时间点,并且该时间点的差异小于 tolerance
|
|
|
|
+ closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
|
|
|
|
+ return closest_time
|
|
|
|
+
|
|
|
|
+ #创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
|
|
|
|
+ sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()}
|
|
|
|
+
|
|
|
|
+ df_function = df_function.sort_values(by='simTime').reset_index(drop=True)#按simTime列排序
|
|
|
|
+ #找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
|
|
|
|
+ df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
|
|
|
|
+ df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
|
|
|
|
+
|
|
|
|
+ #确保df_function中的nearest_index为整数类型,且去掉NaN值
|
|
|
|
+ df_function_renamed = df_function.rename(columns={'simTime': 'function_simTime'})#重命名 df_function 中的 simTime 列
|
|
|
|
+ df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
|
|
|
|
+ df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
|
|
|
|
+
|
|
|
|
+ ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
|
|
|
|
+ merged_df = mergecopy_by_simtime(merged_df, df_function_valid,ignore_cols)
|
|
"""
|
|
"""
|
|
- try:
|
|
|
|
- if not output_path.exists():
|
|
|
|
- print(f"输出文件不存在: {output_path}")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- df = pd.read_csv(output_path)
|
|
|
|
-
|
|
|
|
- # 验证所需列
|
|
|
|
- missing_cols = [col for col in self.required_columns.keys()
|
|
|
|
- if col not in df.columns]
|
|
|
|
- if missing_cols:
|
|
|
|
- print(f"缺少必需列: {missing_cols}")
|
|
|
|
- return False
|
|
|
|
|
|
+ def check_matching(df_function, sim_time_to_index, tolerance=0.01):
|
|
|
|
+ #检查 function.csv 中的所有行是否都成功匹配
|
|
|
|
|
|
- # 验证数据类型
|
|
|
|
- for col, dtype in self.required_columns.items():
|
|
|
|
- if df[col].dtype != dtype:
|
|
|
|
- print(f"列 {col} 的数据类型错误,应为 {dtype}")
|
|
|
|
- return False
|
|
|
|
|
|
+ # 计算每个 simTime 对应的 nearest_simTime
|
|
|
|
+ df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
|
|
|
|
|
|
- # 验证数值范围
|
|
|
|
- if df['simTime'].min() < 0:
|
|
|
|
- print("simTime存在负值")
|
|
|
|
- return False
|
|
|
|
|
|
+ # 检查是否有没有匹配到的行
|
|
|
|
+ unmatched_rows = df_function[df_function['nearest_simTime'].isna()]
|
|
|
|
+ if not unmatched_rows.empty:
|
|
|
|
+ print(f"没有匹配上的行: {unmatched_rows}")
|
|
|
|
+ else:
|
|
|
|
+ print("所有行都成功匹配!")
|
|
|
|
|
|
- if df['simFrame'].min() <= 0:
|
|
|
|
- print("simFrame存在非正值")
|
|
|
|
- return False
|
|
|
|
|
|
+
|
|
|
|
+ # 统计匹配上了的行数和没有匹配上的行数
|
|
|
|
+ total_rows = len(df_function)
|
|
|
|
+ matched_rows = len(df_function) - len(unmatched_rows)
|
|
|
|
+ print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}")
|
|
|
|
|
|
- return True
|
|
|
|
|
|
+ return unmatched_rows
|
|
|
|
+ # 调用检查函数
|
|
|
|
+ unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01)
|
|
|
|
+ # 获取最后一行的 simTime
|
|
|
|
+ last_row_simtime = df_function.iloc[-1]['simTime']
|
|
|
|
+ print(f"最后一行的 simTime: {last_row_simtime}")
|
|
|
|
+ # 获取最后一行的 nearest_simTime
|
|
|
|
+ last_row_nearest_simtime = df_function.iloc[-1]['nearest_simTime']
|
|
|
|
+ print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}")
|
|
|
|
+ """
|
|
|
|
+ # 将弧度转换为角度
|
|
|
|
+ merged_df['posH'] = merged_df['posH'].apply(convert_heading)
|
|
|
|
+ if 'posP' in merged_df.columns:
|
|
|
|
+ merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True)
|
|
|
|
+ if 'posR' in merged_df.columns:
|
|
|
|
+ merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True)
|
|
|
|
|
|
- except Exception as e:
|
|
|
|
- print(f"验证输出数据时出错: {e}")
|
|
|
|
- return False
|
|
|
|
|
|
+ # 先使用 infer_objects 来确保类型一致
|
|
|
|
+ merged_df = merged_df.infer_objects()
|
|
|
|
+ merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN
|
|
|
|
+ merged_csv_path = Path(data_path) / "merged_ObjState.csv"
|
|
|
|
|
|
-from typing import Dict, Optional
|
|
|
|
-from pathlib import Path
|
|
|
|
-import pandas as pd
|
|
|
|
|
|
+ # merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN")
|
|
|
|
+ merged_df.to_csv(merged_csv_path, index=False)
|
|
|
|
+
|
|
|
|
+ return merged_csv_path
|
|
|
|
|
|
-from .base import BaseProcessor
|
|
|
|
-from core.error_handler import ErrorHandler
|
|
|
|
-from core.config_manager import get_config
|
|
|
|
-
|
|
|
|
-class Config:
|
|
|
|
- """PGVIL处理器配置类"""
|
|
|
|
- def __init__(self,
|
|
|
|
- output_dir: Path,
|
|
|
|
- data_dir: Path):
|
|
|
|
- self.output_dir = output_dir
|
|
|
|
- self.data_dir = data_dir
|
|
|
|
|
|
|
|
-class PGVILProcessor(BaseProcessor):
|
|
|
|
- """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
|
|
|
|
-
|
|
|
|
- def __init__(self, config: Config):
|
|
|
|
- super().__init__(config.output_dir)
|
|
|
|
- self.config = config
|
|
|
|
|
|
|
|
- @ErrorHandler.measure_performance
|
|
|
|
- def process_built_in_data(self) -> Dict[str, Path]:
|
|
|
|
- """实现PGVIL特有的内置数据处理逻辑
|
|
|
|
|
|
+ # @ErrorHandler.measure_performance
|
|
|
|
+ # def process_built_in_data(self) -> Dict[str, Path]:
|
|
|
|
+ # """实现PGVIL特有的内置数据处理逻辑
|
|
|
|
|
|
- 处理顺序:
|
|
|
|
- 1. 处理CAN数据
|
|
|
|
- 2. 处理传感器数据
|
|
|
|
- 3. 处理其他PGVIL特有数据
|
|
|
|
- 4. 合并内置数据
|
|
|
|
|
|
+ # 处理顺序:
|
|
|
|
+ # 1. 处理CAN数据
|
|
|
|
+ # 2. 处理传感器数据
|
|
|
|
+ # 3. 处理其他PGVIL特有数据
|
|
|
|
+ # 4. 合并内置数据
|
|
|
|
|
|
- Returns:
|
|
|
|
- 处理结果文件路径字典
|
|
|
|
- """
|
|
|
|
- result_files = {}
|
|
|
|
|
|
+ # Returns:
|
|
|
|
+ # 处理结果文件路径字典
|
|
|
|
+ # """
|
|
|
|
+ # result_files = {}
|
|
|
|
|
|
- # 1. 处理CAN数据
|
|
|
|
- print("1. 处理CAN数据...")
|
|
|
|
- can_results = self._process_can_data()
|
|
|
|
- if can_results:
|
|
|
|
- result_files.update(can_results)
|
|
|
|
- else:
|
|
|
|
- print("警告: CAN数据处理失败或无数据")
|
|
|
|
|
|
+ # # 1. 处理CAN数据
|
|
|
|
+ # print("1. 处理CAN数据...")
|
|
|
|
+ # can_results = self._process_can_data()
|
|
|
|
+ # if can_results:
|
|
|
|
+ # result_files.update(can_results)
|
|
|
|
+ # else:
|
|
|
|
+ # print("警告: CAN数据处理失败或无数据")
|
|
|
|
|
|
- # 2. 处理传感器数据
|
|
|
|
- print("\n2. 处理传感器数据...")
|
|
|
|
- sensor_results = self._process_sensor_data()
|
|
|
|
- if sensor_results:
|
|
|
|
- result_files.update(sensor_results)
|
|
|
|
- else:
|
|
|
|
- print("警告: 传感器数据处理失败或无数据")
|
|
|
|
|
|
+ # # 2. 处理传感器数据
|
|
|
|
+ # print("\n2. 处理传感器数据...")
|
|
|
|
+ # sensor_results = self._process_sensor_data()
|
|
|
|
+ # if sensor_results:
|
|
|
|
+ # result_files.update(sensor_results)
|
|
|
|
+ # else:
|
|
|
|
+ # print("警告: 传感器数据处理失败或无数据")
|
|
|
|
|
|
- # 3. 处理其他PGVIL特有数据
|
|
|
|
- print("\n3. 处理其他PGVIL数据...")
|
|
|
|
- other_results = self._process_other_data()
|
|
|
|
- if other_results:
|
|
|
|
- result_files.update(other_results)
|
|
|
|
|
|
+ # # 3. 处理其他PGVIL特有数据
|
|
|
|
+ # print("\n3. 处理其他PGVIL数据...")
|
|
|
|
+ # other_results = self._process_other_data()
|
|
|
|
+ # if other_results:
|
|
|
|
+ # result_files.update(other_results)
|
|
|
|
|
|
- # 4. 合并内置数据
|
|
|
|
- print("\n4. 合并内置数据...")
|
|
|
|
- if not self._merge_built_in_data(result_files):
|
|
|
|
- print("警告: 内置数据合并失败")
|
|
|
|
|
|
+ # # 4. 合并内置数据
|
|
|
|
+ # print("\n4. 合并内置数据...")
|
|
|
|
+ # if not self._merge_built_in_data(result_files):
|
|
|
|
+ # print("警告: 内置数据合并失败")
|
|
|
|
|
|
- return result_files
|
|
|
|
|
|
+ # return result_files
|
|
|
|
|
|
- def _process_can_data(self) -> Dict[str, Path]:
|
|
|
|
- """处理CAN数据"""
|
|
|
|
- # TODO: 实现CAN数据处理逻辑
|
|
|
|
- return {}
|
|
|
|
|
|
+ # def _process_can_data(self) -> Dict[str, Path]:
|
|
|
|
+ # """处理CAN数据"""
|
|
|
|
+ # # TODO: 实现CAN数据处理逻辑
|
|
|
|
+ # return {}
|
|
|
|
|
|
- def _process_sensor_data(self) -> Dict[str, Path]:
|
|
|
|
- """处理传感器数据"""
|
|
|
|
- # TODO: 实现传感器数据处理逻辑
|
|
|
|
- return {}
|
|
|
|
|
|
+ # def _process_sensor_data(self) -> Dict[str, Path]:
|
|
|
|
+ # """处理传感器数据"""
|
|
|
|
+ # # TODO: 实现传感器数据处理逻辑
|
|
|
|
+ # return {}
|
|
|
|
|
|
- def _process_other_data(self) -> Dict[str, Path]:
|
|
|
|
- """处理其他PGVIL特有数据"""
|
|
|
|
- # TODO: 实现其他数据处理逻辑
|
|
|
|
- return {}
|
|
|
|
|
|
+ # def _process_other_data(self) -> Dict[str, Path]:
|
|
|
|
+ # """处理其他PGVIL特有数据"""
|
|
|
|
+ # # TODO: 实现其他数据处理逻辑
|
|
|
|
+ # return {}
|
|
|
|
|
|
- def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
|
|
|
|
- """合并PGVIL内置数据
|
|
|
|
|
|
+ # def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
|
|
|
|
+ # """合并PGVIL内置数据
|
|
|
|
|
|
- Args:
|
|
|
|
- result_files: 处理结果文件路径字典
|
|
|
|
|
|
+ # Args:
|
|
|
|
+ # result_files: 处理结果文件路径字典
|
|
|
|
|
|
- Returns:
|
|
|
|
- 合并是否成功
|
|
|
|
- """
|
|
|
|
- try:
|
|
|
|
- # 实现PGVIL特有的数据合并逻辑
|
|
|
|
- return True
|
|
|
|
- except Exception as e:
|
|
|
|
- print(f"内置数据合并失败: {e}")
|
|
|
|
- return False
|
|
|
|
|
|
+ # Returns:
|
|
|
|
+ # 合并是否成功
|
|
|
|
+ # """
|
|
|
|
+ # try:
|
|
|
|
+ # # 实现PGVIL特有的数据合并逻辑
|
|
|
|
+ # return True
|
|
|
|
+ # except Exception as e:
|
|
|
|
+ # print(f"内置数据合并失败: {e}")
|
|
|
|
+ # return False
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|