pgvil.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from pathlib import Path
  2. from typing import Dict, Any, Optional
  3. import pandas as pd
  4. import numpy as np
  5. from pyproj import Proj
  6. from core.processors.built_in.base import BaseDataProcessor
  7. class PGVILDataProcessor(BaseDataProcessor):
  8. """处理仿真内置数据的处理器"""
  9. def __init__(self):
  10. super().__init__("pgvil_processor")
  11. self.required_columns = {
  12. 'simTime': float,
  13. 'simFrame': int,
  14. 'playerId': int,
  15. 'v': float,
  16. 'speedX': float,
  17. 'speedY': float,
  18. 'posH': float,
  19. 'speedH': float,
  20. 'posX': float,
  21. 'posY': float,
  22. 'accelX': float,
  23. 'accelY': float
  24. }
  25. # 初始化UTM投影
  26. self.projection = Proj(proj='utm', zone=51, ellps='WGS84', preserve_units='m')
  27. def process_data(self,
  28. input_path: Path,
  29. output_path: Path,
  30. **kwargs) -> Optional[Path]:
  31. """处理PGVIL数据
  32. Args:
  33. input_path: 输入文件路径
  34. output_path: 输出目录路径
  35. **kwargs: 额外参数
  36. - utm_zone: UTM区域
  37. - x_offset: X偏移
  38. - y_offset: Y偏移
  39. Returns:
  40. 处理后的文件路径
  41. """
  42. try:
  43. # 读取数据
  44. df = pd.read_csv(input_path)
  45. if df.empty:
  46. print(f"输入文件为空: {input_path}")
  47. return None
  48. # 基本数据清理
  49. df = self._clean_data(df)
  50. # 坐标转换
  51. utm_zone = kwargs.get('utm_zone', 51)
  52. x_offset = kwargs.get('x_offset', 0.0)
  53. y_offset = kwargs.get('y_offset', 0.0)
  54. df = self._process_coordinates(df, utm_zone, x_offset, y_offset)
  55. # 计算额外字段
  56. df = self._calculate_additional_fields(df)
  57. # 确保输出目录存在
  58. output_path.parent.mkdir(parents=True, exist_ok=True)
  59. # 保存处理后的数据
  60. df.to_csv(output_path, index=False)
  61. print(f"数据处理完成,已保存至: {output_path}")
  62. return output_path
  63. except Exception as e:
  64. print(f"处理PGVIL数据时出错: {e}")
  65. import traceback
  66. traceback.print_exc()
  67. return None
  68. def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
  69. """清理数据"""
  70. # 删除重复行
  71. df = df.drop_duplicates()
  72. # 确保所需列存在
  73. for col, dtype in self.required_columns.items():
  74. if col not in df.columns:
  75. df[col] = 0 if dtype in (int, float) else ''
  76. else:
  77. df[col] = df[col].astype(dtype)
  78. # 处理空值
  79. numeric_cols = [col for col, dtype in self.required_columns.items()
  80. if dtype in (int, float)]
  81. df[numeric_cols] = df[numeric_cols].fillna(0)
  82. # 按时间和ID排序
  83. df.sort_values(['simTime', 'simFrame', 'playerId'], inplace=True)
  84. return df
  85. def _process_coordinates(self,
  86. df: pd.DataFrame,
  87. utm_zone: int,
  88. x_offset: float,
  89. y_offset: float) -> pd.DataFrame:
  90. """处理坐标数据"""
  91. if 'lat' in df.columns and 'lon' in df.columns:
  92. # 原始经纬度转UTM
  93. projection = Proj(proj='utm', zone=utm_zone, ellps='WGS84', preserve_units='m')
  94. x, y = projection(df['lon'].values, df['lat'].values)
  95. # 应用偏移
  96. df['posX'] = x + x_offset
  97. df['posY'] = y + y_offset
  98. return df
  99. def _calculate_additional_fields(self, df: pd.DataFrame) -> pd.DataFrame:
  100. """计算额外的字段"""
  101. # 示例:计算合成速度
  102. if all(col in df.columns for col in ['speedX', 'speedY']):
  103. df['v'] = np.sqrt(df['speedX']**2 + df['speedY']**2)
  104. # 示例:计算行驶距离
  105. if 'v' in df.columns and 'simTime' in df.columns:
  106. df['travelDist'] = df.sort_values('simTime').groupby('playerId')['v'].cumsum() * df['simTime'].diff()
  107. return df
  108. def validate_output(self, output_path: Path) -> bool:
  109. """验证输出数据
  110. Args:
  111. output_path: 输出文件路径
  112. Returns:
  113. 验证是否通过
  114. """
  115. try:
  116. if not output_path.exists():
  117. print(f"输出文件不存在: {output_path}")
  118. return False
  119. df = pd.read_csv(output_path)
  120. # 验证所需列
  121. missing_cols = [col for col in self.required_columns.keys()
  122. if col not in df.columns]
  123. if missing_cols:
  124. print(f"缺少必需列: {missing_cols}")
  125. return False
  126. # 验证数据类型
  127. for col, dtype in self.required_columns.items():
  128. if df[col].dtype != dtype:
  129. print(f"列 {col} 的数据类型错误,应为 {dtype}")
  130. return False
  131. # 验证数值范围
  132. if df['simTime'].min() < 0:
  133. print("simTime存在负值")
  134. return False
  135. if df['simFrame'].min() <= 0:
  136. print("simFrame存在非正值")
  137. return False
  138. return True
  139. except Exception as e:
  140. print(f"验证输出数据时出错: {e}")
  141. return False
  142. from typing import Dict, Optional
  143. from pathlib import Path
  144. import pandas as pd
  145. from .base import BaseProcessor
  146. from core.error_handler import ErrorHandler
  147. from core.config_manager import get_config
  148. class Config:
  149. """PGVIL处理器配置类"""
  150. def __init__(self,
  151. output_dir: Path,
  152. data_dir: Path):
  153. self.output_dir = output_dir
  154. self.data_dir = data_dir
  155. class PGVILProcessor(BaseProcessor):
  156. """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
  157. def __init__(self, config: Config):
  158. super().__init__(config.output_dir)
  159. self.config = config
  160. @ErrorHandler.measure_performance
  161. def process_built_in_data(self) -> Dict[str, Path]:
  162. """实现PGVIL特有的内置数据处理逻辑
  163. 处理顺序:
  164. 1. 处理CAN数据
  165. 2. 处理传感器数据
  166. 3. 处理其他PGVIL特有数据
  167. 4. 合并内置数据
  168. Returns:
  169. 处理结果文件路径字典
  170. """
  171. result_files = {}
  172. # 1. 处理CAN数据
  173. print("1. 处理CAN数据...")
  174. can_results = self._process_can_data()
  175. if can_results:
  176. result_files.update(can_results)
  177. else:
  178. print("警告: CAN数据处理失败或无数据")
  179. # 2. 处理传感器数据
  180. print("\n2. 处理传感器数据...")
  181. sensor_results = self._process_sensor_data()
  182. if sensor_results:
  183. result_files.update(sensor_results)
  184. else:
  185. print("警告: 传感器数据处理失败或无数据")
  186. # 3. 处理其他PGVIL特有数据
  187. print("\n3. 处理其他PGVIL数据...")
  188. other_results = self._process_other_data()
  189. if other_results:
  190. result_files.update(other_results)
  191. # 4. 合并内置数据
  192. print("\n4. 合并内置数据...")
  193. if not self._merge_built_in_data(result_files):
  194. print("警告: 内置数据合并失败")
  195. return result_files
  196. def _process_can_data(self) -> Dict[str, Path]:
  197. """处理CAN数据"""
  198. # TODO: 实现CAN数据处理逻辑
  199. return {}
  200. def _process_sensor_data(self) -> Dict[str, Path]:
  201. """处理传感器数据"""
  202. # TODO: 实现传感器数据处理逻辑
  203. return {}
  204. def _process_other_data(self) -> Dict[str, Path]:
  205. """处理其他PGVIL特有数据"""
  206. # TODO: 实现其他数据处理逻辑
  207. return {}
  208. def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
  209. """合并PGVIL内置数据
  210. Args:
  211. result_files: 处理结果文件路径字典
  212. Returns:
  213. 合并是否成功
  214. """
  215. try:
  216. # 实现PGVIL特有的数据合并逻辑
  217. return True
  218. except Exception as e:
  219. print(f"内置数据合并失败: {e}")
  220. return False