123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- from pathlib import Path
- from typing import Dict, Any, Optional
- import pandas as pd
- import numpy as np
- from pyproj import Proj
- from core.processors.built_in.base import BaseDataProcessor
- class PGVILDataProcessor(BaseDataProcessor):
- """处理仿真内置数据的处理器"""
-
- def __init__(self):
- super().__init__("pgvil_processor")
- self.required_columns = {
- 'simTime': float,
- 'simFrame': int,
- 'playerId': int,
- 'v': float,
- 'speedX': float,
- 'speedY': float,
- 'posH': float,
- 'speedH': float,
- 'posX': float,
- 'posY': float,
- 'accelX': float,
- 'accelY': float
- }
- # 初始化UTM投影
- self.projection = Proj(proj='utm', zone=51, ellps='WGS84', preserve_units='m')
-
- def process_data(self,
- input_path: Path,
- output_path: Path,
- **kwargs) -> Optional[Path]:
- """处理PGVIL数据
-
- Args:
- input_path: 输入文件路径
- output_path: 输出目录路径
- **kwargs: 额外参数
- - utm_zone: UTM区域
- - x_offset: X偏移
- - y_offset: Y偏移
-
- Returns:
- 处理后的文件路径
- """
- try:
- # 读取数据
- df = pd.read_csv(input_path)
- if df.empty:
- print(f"输入文件为空: {input_path}")
- return None
-
- # 基本数据清理
- df = self._clean_data(df)
-
- # 坐标转换
- utm_zone = kwargs.get('utm_zone', 51)
- x_offset = kwargs.get('x_offset', 0.0)
- y_offset = kwargs.get('y_offset', 0.0)
-
- df = self._process_coordinates(df, utm_zone, x_offset, y_offset)
-
- # 计算额外字段
- df = self._calculate_additional_fields(df)
-
- # 确保输出目录存在
- output_path.parent.mkdir(parents=True, exist_ok=True)
-
- # 保存处理后的数据
- df.to_csv(output_path, index=False)
- print(f"数据处理完成,已保存至: {output_path}")
-
- return output_path
-
- except Exception as e:
- print(f"处理PGVIL数据时出错: {e}")
- import traceback
- traceback.print_exc()
- return None
-
- def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
- """清理数据"""
- # 删除重复行
- df = df.drop_duplicates()
-
- # 确保所需列存在
- for col, dtype in self.required_columns.items():
- if col not in df.columns:
- df[col] = 0 if dtype in (int, float) else ''
- else:
- df[col] = df[col].astype(dtype)
-
- # 处理空值
- numeric_cols = [col for col, dtype in self.required_columns.items()
- if dtype in (int, float)]
- df[numeric_cols] = df[numeric_cols].fillna(0)
-
- # 按时间和ID排序
- df.sort_values(['simTime', 'simFrame', 'playerId'], inplace=True)
-
- return df
-
- def _process_coordinates(self,
- df: pd.DataFrame,
- utm_zone: int,
- x_offset: float,
- y_offset: float) -> pd.DataFrame:
- """处理坐标数据"""
- if 'lat' in df.columns and 'lon' in df.columns:
- # 原始经纬度转UTM
- projection = Proj(proj='utm', zone=utm_zone, ellps='WGS84', preserve_units='m')
- x, y = projection(df['lon'].values, df['lat'].values)
-
- # 应用偏移
- df['posX'] = x + x_offset
- df['posY'] = y + y_offset
-
- return df
-
- def _calculate_additional_fields(self, df: pd.DataFrame) -> pd.DataFrame:
- """计算额外的字段"""
- # 示例:计算合成速度
- if all(col in df.columns for col in ['speedX', 'speedY']):
- df['v'] = np.sqrt(df['speedX']**2 + df['speedY']**2)
-
- # 示例:计算行驶距离
- if 'v' in df.columns and 'simTime' in df.columns:
- df['travelDist'] = df.sort_values('simTime').groupby('playerId')['v'].cumsum() * df['simTime'].diff()
-
- return df
-
- def validate_output(self, output_path: Path) -> bool:
- """验证输出数据
-
- Args:
- output_path: 输出文件路径
-
- Returns:
- 验证是否通过
- """
- try:
- if not output_path.exists():
- print(f"输出文件不存在: {output_path}")
- return False
-
- df = pd.read_csv(output_path)
-
- # 验证所需列
- missing_cols = [col for col in self.required_columns.keys()
- if col not in df.columns]
- if missing_cols:
- print(f"缺少必需列: {missing_cols}")
- return False
-
- # 验证数据类型
- for col, dtype in self.required_columns.items():
- if df[col].dtype != dtype:
- print(f"列 {col} 的数据类型错误,应为 {dtype}")
- return False
-
- # 验证数值范围
- if df['simTime'].min() < 0:
- print("simTime存在负值")
- return False
-
- if df['simFrame'].min() <= 0:
- print("simFrame存在非正值")
- return False
-
- return True
-
- except Exception as e:
- print(f"验证输出数据时出错: {e}")
- return False
- from typing import Dict, Optional
- from pathlib import Path
- import pandas as pd
- from .base import BaseProcessor
- from core.error_handler import ErrorHandler
- from core.config_manager import get_config
- class Config:
- """PGVIL处理器配置类"""
- def __init__(self,
- output_dir: Path,
- data_dir: Path):
- self.output_dir = output_dir
- self.data_dir = data_dir
- class PGVILProcessor(BaseProcessor):
- """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
-
- def __init__(self, config: Config):
- super().__init__(config.output_dir)
- self.config = config
-
- @ErrorHandler.measure_performance
- def process_built_in_data(self) -> Dict[str, Path]:
- """实现PGVIL特有的内置数据处理逻辑
-
- 处理顺序:
- 1. 处理CAN数据
- 2. 处理传感器数据
- 3. 处理其他PGVIL特有数据
- 4. 合并内置数据
-
- Returns:
- 处理结果文件路径字典
- """
- result_files = {}
-
- # 1. 处理CAN数据
- print("1. 处理CAN数据...")
- can_results = self._process_can_data()
- if can_results:
- result_files.update(can_results)
- else:
- print("警告: CAN数据处理失败或无数据")
-
- # 2. 处理传感器数据
- print("\n2. 处理传感器数据...")
- sensor_results = self._process_sensor_data()
- if sensor_results:
- result_files.update(sensor_results)
- else:
- print("警告: 传感器数据处理失败或无数据")
-
- # 3. 处理其他PGVIL特有数据
- print("\n3. 处理其他PGVIL数据...")
- other_results = self._process_other_data()
- if other_results:
- result_files.update(other_results)
-
- # 4. 合并内置数据
- print("\n4. 合并内置数据...")
- if not self._merge_built_in_data(result_files):
- print("警告: 内置数据合并失败")
-
- return result_files
-
- def _process_can_data(self) -> Dict[str, Path]:
- """处理CAN数据"""
- # TODO: 实现CAN数据处理逻辑
- return {}
-
- def _process_sensor_data(self) -> Dict[str, Path]:
- """处理传感器数据"""
- # TODO: 实现传感器数据处理逻辑
- return {}
-
- def _process_other_data(self) -> Dict[str, Path]:
- """处理其他PGVIL特有数据"""
- # TODO: 实现其他数据处理逻辑
- return {}
-
- def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
- """合并PGVIL内置数据
-
- Args:
- result_files: 处理结果文件路径字典
-
- Returns:
- 合并是否成功
- """
- try:
- # 实现PGVIL特有的数据合并逻辑
- return True
- except Exception as e:
- print(f"内置数据合并失败: {e}")
- return False
|