from pathlib import Path from typing import Dict, Any, Optional import pandas as pd import numpy as np from pyproj import Proj from core.processors.built_in.base import BaseDataProcessor class PGVILDataProcessor(BaseDataProcessor): """处理仿真内置数据的处理器""" def __init__(self): super().__init__("pgvil_processor") self.required_columns = { 'simTime': float, 'simFrame': int, 'playerId': int, 'v': float, 'speedX': float, 'speedY': float, 'posH': float, 'speedH': float, 'posX': float, 'posY': float, 'accelX': float, 'accelY': float } # 初始化UTM投影 self.projection = Proj(proj='utm', zone=51, ellps='WGS84', preserve_units='m') def process_data(self, input_path: Path, output_path: Path, **kwargs) -> Optional[Path]: """处理PGVIL数据 Args: input_path: 输入文件路径 output_path: 输出目录路径 **kwargs: 额外参数 - utm_zone: UTM区域 - x_offset: X偏移 - y_offset: Y偏移 Returns: 处理后的文件路径 """ try: # 读取数据 df = pd.read_csv(input_path) if df.empty: print(f"输入文件为空: {input_path}") return None # 基本数据清理 df = self._clean_data(df) # 坐标转换 utm_zone = kwargs.get('utm_zone', 51) x_offset = kwargs.get('x_offset', 0.0) y_offset = kwargs.get('y_offset', 0.0) df = self._process_coordinates(df, utm_zone, x_offset, y_offset) # 计算额外字段 df = self._calculate_additional_fields(df) # 确保输出目录存在 output_path.parent.mkdir(parents=True, exist_ok=True) # 保存处理后的数据 df.to_csv(output_path, index=False) print(f"数据处理完成,已保存至: {output_path}") return output_path except Exception as e: print(f"处理PGVIL数据时出错: {e}") import traceback traceback.print_exc() return None def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """清理数据""" # 删除重复行 df = df.drop_duplicates() # 确保所需列存在 for col, dtype in self.required_columns.items(): if col not in df.columns: df[col] = 0 if dtype in (int, float) else '' else: df[col] = df[col].astype(dtype) # 处理空值 numeric_cols = [col for col, dtype in self.required_columns.items() if dtype in (int, float)] df[numeric_cols] = df[numeric_cols].fillna(0) # 按时间和ID排序 df.sort_values(['simTime', 'simFrame', 'playerId'], inplace=True) return df def _process_coordinates(self, df: pd.DataFrame, utm_zone: int, x_offset: float, y_offset: float) -> pd.DataFrame: """处理坐标数据""" if 'lat' in df.columns and 'lon' in df.columns: # 原始经纬度转UTM projection = Proj(proj='utm', zone=utm_zone, ellps='WGS84', preserve_units='m') x, y = projection(df['lon'].values, df['lat'].values) # 应用偏移 df['posX'] = x + x_offset df['posY'] = y + y_offset return df def _calculate_additional_fields(self, df: pd.DataFrame) -> pd.DataFrame: """计算额外的字段""" # 示例:计算合成速度 if all(col in df.columns for col in ['speedX', 'speedY']): df['v'] = np.sqrt(df['speedX']**2 + df['speedY']**2) # 示例:计算行驶距离 if 'v' in df.columns and 'simTime' in df.columns: df['travelDist'] = df.sort_values('simTime').groupby('playerId')['v'].cumsum() * df['simTime'].diff() return df def validate_output(self, output_path: Path) -> bool: """验证输出数据 Args: output_path: 输出文件路径 Returns: 验证是否通过 """ try: if not output_path.exists(): print(f"输出文件不存在: {output_path}") return False df = pd.read_csv(output_path) # 验证所需列 missing_cols = [col for col in self.required_columns.keys() if col not in df.columns] if missing_cols: print(f"缺少必需列: {missing_cols}") return False # 验证数据类型 for col, dtype in self.required_columns.items(): if df[col].dtype != dtype: print(f"列 {col} 的数据类型错误,应为 {dtype}") return False # 验证数值范围 if df['simTime'].min() < 0: print("simTime存在负值") return False if df['simFrame'].min() <= 0: print("simFrame存在非正值") return False return True except Exception as e: print(f"验证输出数据时出错: {e}") return False from typing import Dict, Optional from pathlib import Path import pandas as pd from .base import BaseProcessor from core.error_handler import ErrorHandler from core.config_manager import get_config class Config: """PGVIL处理器配置类""" def __init__(self, output_dir: Path, data_dir: Path): self.output_dir = output_dir self.data_dir = data_dir class PGVILProcessor(BaseProcessor): """PGVIL数据处理器,实现PGVIL特有的处理逻辑""" def __init__(self, config: Config): super().__init__(config.output_dir) self.config = config @ErrorHandler.measure_performance def process_built_in_data(self) -> Dict[str, Path]: """实现PGVIL特有的内置数据处理逻辑 处理顺序: 1. 处理CAN数据 2. 处理传感器数据 3. 处理其他PGVIL特有数据 4. 合并内置数据 Returns: 处理结果文件路径字典 """ result_files = {} # 1. 处理CAN数据 print("1. 处理CAN数据...") can_results = self._process_can_data() if can_results: result_files.update(can_results) else: print("警告: CAN数据处理失败或无数据") # 2. 处理传感器数据 print("\n2. 处理传感器数据...") sensor_results = self._process_sensor_data() if sensor_results: result_files.update(sensor_results) else: print("警告: 传感器数据处理失败或无数据") # 3. 处理其他PGVIL特有数据 print("\n3. 处理其他PGVIL数据...") other_results = self._process_other_data() if other_results: result_files.update(other_results) # 4. 合并内置数据 print("\n4. 合并内置数据...") if not self._merge_built_in_data(result_files): print("警告: 内置数据合并失败") return result_files def _process_can_data(self) -> Dict[str, Path]: """处理CAN数据""" # TODO: 实现CAN数据处理逻辑 return {} def _process_sensor_data(self) -> Dict[str, Path]: """处理传感器数据""" # TODO: 实现传感器数据处理逻辑 return {} def _process_other_data(self) -> Dict[str, Path]: """处理其他PGVIL特有数据""" # TODO: 实现其他数据处理逻辑 return {} def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool: """合并PGVIL内置数据 Args: result_files: 处理结果文件路径字典 Returns: 合并是否成功 """ try: # 实现PGVIL特有的数据合并逻辑 return True except Exception as e: print(f"内置数据合并失败: {e}") return False