optimized_processor.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. from pathlib import Path
  2. from typing import Optional
  3. import traceback
  4. from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
  5. # from .processors.final_processor import FinalDataProcessor
  6. from core.processors.built_in.lst import data_precheck, run_cpp_engine, FinalDataProcessor
  7. def process_lst_data(
  8. zip_data_path: Path,
  9. output_base_dir: Path,
  10. trafficlight_json_path: Optional[Path] = None,
  11. utm_zone: int = 51,
  12. x_offset: float = 0.0,
  13. y_offset: float = 0.0,
  14. continue_to_iterate: bool = False,
  15. ) -> Optional[Path]:
  16. """
  17. Processes LST data using an optimized pipeline.
  18. Args:
  19. zip_data_path: Path to the input ZIP file
  20. output_base_dir: Base directory for output
  21. trafficlight_json_path: Optional path to traffic light JSON file
  22. utm_zone: UTM zone for coordinate projection
  23. x_offset: X offset for C++ engine
  24. y_offset: Y offset for C++ engine
  25. continue_to_iterate: Flag to control iteration continuation
  26. Returns:
  27. Path to the final merged_ObjState.csv file if successful, None otherwise
  28. """
  29. print(f"Starting LST data processing for: {zip_data_path.name}")
  30. # Validate input paths
  31. if not zip_data_path.exists():
  32. print(f"Error: Input ZIP file not found: {zip_data_path}")
  33. return None
  34. if not trafficlight_json_path:
  35. print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
  36. trafficlight_json_path = None
  37. try:
  38. # Initialize configuration
  39. config = Config(
  40. zip_path=zip_data_path.resolve(),
  41. output_path=output_base_dir.resolve(),
  42. json_path=trafficlight_json_path.resolve() if trafficlight_json_path else None,
  43. dbc_path=Path("_internal/VBox.dbc").resolve(),
  44. engine_path=Path("_internal/engine").resolve(),
  45. map_path=Path("_internal/data_map").resolve(),
  46. utm_zone=utm_zone,
  47. x_offset=x_offset,
  48. y_offset=y_offset
  49. )
  50. # Process built-in data types
  51. print("Processing built-in data types...")
  52. zip_processor = ZipCSVProcessor(config)
  53. zip_processor.process_zip()
  54. # Process rosbag data if available
  55. rosbag_processor = RosbagProcessor(config)
  56. rosbag_processor.process_zip_for_rosbags()
  57. # Run C++ engine for additional processing
  58. if not run_cpp_engine(config):
  59. raise RuntimeError("C++ engine execution failed")
  60. # Validate processed data
  61. if not data_precheck(config.output_dir):
  62. raise ValueError("Data quality pre-check failed")
  63. # Final processing of built-in data
  64. print("Processing and merging built-in data...")
  65. final_processor = FinalDataProcessor(config)
  66. if not final_processor.process():
  67. raise RuntimeError("Final data processing failed")
  68. final_csv_path = config.output_dir / "merged_ObjState.csv"
  69. return final_csv_path
  70. except Exception as e:
  71. print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
  72. print(f"Debug: Stacktrace: {traceback.format_exc()}")
  73. return None
  74. def process_pgvil_data(
  75. zip_data_path: Path,
  76. output_base_dir: Path,
  77. utm_zone: int = 51,
  78. x_offset: float = 0.0,
  79. y_offset: float = 0.0
  80. ) -> Optional[Path]:
  81. """处理PGVIL数据
  82. Args:
  83. zip_data_path: ZIP数据文件路径
  84. output_base_dir: 输出基础目录
  85. utm_zone: UTM坐标系区域
  86. x_offset: X坐标偏移量
  87. y_offset: Y坐标偏移量
  88. Returns:
  89. Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
  90. """
  91. try:
  92. # 确保输出目录存在
  93. output_base_dir.mkdir(parents=True, exist_ok=True)
  94. # 解压ZIP文件
  95. if not extract_zip_file(zip_data_path, output_base_dir):
  96. return None
  97. # 查找所有PGVIL数据文件
  98. pgvil_files = []
  99. for root, _, files in os.walk(output_base_dir):
  100. for file in files:
  101. if file.lower().endswith(('.csv', '.json')):
  102. pgvil_files.append(Path(root) / file)
  103. if not pgvil_files:
  104. print(f"在 {output_base_dir} 中未找到PGVIL数据文件")
  105. return None
  106. print(f"找到 {len(pgvil_files)} 个PGVIL数据文件")
  107. # 处理所有PGVIL文件
  108. all_data = []
  109. for pgvil_file in pgvil_files:
  110. try:
  111. # 读取PGVIL文件
  112. if pgvil_file.suffix.lower() == '.csv':
  113. df = pd.read_csv(pgvil_file)
  114. elif pgvil_file.suffix.lower() == '.json':
  115. with open(pgvil_file, 'r') as f:
  116. data = json.load(f)
  117. df = pd.DataFrame(data)
  118. # 确保必要的列存在
  119. required_cols = ['simTime', 'simFrame', 'playerId']
  120. for col in required_cols:
  121. if col not in df.columns:
  122. df[col] = 0 # 添加默认值
  123. all_data.append(df)
  124. print(f"成功处理文件: {pgvil_file}")
  125. except Exception as e:
  126. print(f"处理文件 {pgvil_file} 时出错: {e}")
  127. if not all_data:
  128. print("没有成功处理任何PGVIL文件")
  129. return None
  130. # 合并所有数据
  131. combined_df = pd.concat(all_data, ignore_index=True)
  132. # 保存处理后的数据
  133. output_path = output_base_dir / "processed_pgvil_data.csv"
  134. combined_df.to_csv(output_path, index=False)
  135. print(f"成功处理所有PGVIL数据,结果保存到: {output_path}")
  136. return output_path
  137. except Exception as e:
  138. print(f"处理PGVIL数据时出错: {e}")
  139. import traceback
  140. traceback.print_exc()
  141. return None