optimized_processor.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. from pathlib import Path
  2. from typing import Optional
  3. import traceback
  4. from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
  5. # from .processors.final_processor import FinalDataProcessor
  6. from core.processors.built_in.lst import (
  7. data_precheck,
  8. run_cpp_engine,
  9. FinalDataProcessor,
  10. )
  11. from core.processors.built_in.pgvil import run_pgvil_engine, PGVILProcessor
  12. from core.processors.built_in.pgvil import Config as PGVILConfig
  13. def process_lst_data(
  14. zip_data_path: Path,
  15. output_base_dir: Path,
  16. trafficlight_json_path: Optional[Path] = None,
  17. utm_zone: int = 51,
  18. x_offset: float = 0.0,
  19. y_offset: float = 0.0,
  20. continue_to_iterate: bool = False,
  21. ) -> Optional[Path]:
  22. """
  23. Processes LST data using an optimized pipeline.
  24. Args:
  25. zip_data_path: Path to the input ZIP file
  26. output_base_dir: Base directory for output
  27. trafficlight_json_path: Optional path to traffic light JSON file
  28. utm_zone: UTM zone for coordinate projection
  29. x_offset: X offset for C++ engine
  30. y_offset: Y offset for C++ engine
  31. continue_to_iterate: Flag to control iteration continuation
  32. Returns:
  33. Path to the final merged_ObjState.csv file if successful, None otherwise
  34. """
  35. print(f"Starting LST data processing for: {zip_data_path.name}")
  36. # Validate input paths
  37. if not zip_data_path.exists():
  38. print(f"Error: Input ZIP file not found: {zip_data_path}")
  39. return None
  40. if not trafficlight_json_path:
  41. print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
  42. trafficlight_json_path = None
  43. try:
  44. # Initialize configuration
  45. config = Config(
  46. zip_path=zip_data_path.resolve(),
  47. output_path=output_base_dir.resolve(),
  48. json_path=(
  49. trafficlight_json_path.resolve() if trafficlight_json_path else None
  50. ),
  51. dbc_path=Path("_internal/VBox.dbc").resolve(),
  52. engine_path=Path("_internal/engine").resolve(),
  53. map_path=Path("_internal/data_map").resolve(),
  54. utm_zone=utm_zone,
  55. x_offset=x_offset,
  56. y_offset=y_offset,
  57. )
  58. # Process built-in data types
  59. print("Processing built-in data types...")
  60. zip_processor = ZipCSVProcessor(config)
  61. zip_processor.process_zip()
  62. # Process rosbag data if available
  63. rosbag_processor = RosbagProcessor(config)
  64. rosbag_processor.process_zip_for_rosbags()
  65. # Run C++ engine for additional processing
  66. if not run_cpp_engine(config):
  67. raise RuntimeError("C++ engine execution failed")
  68. # Validate processed data
  69. if not data_precheck(config.output_dir):
  70. raise ValueError("Data quality pre-check failed")
  71. # Final processing of built-in data
  72. print("Processing and merging built-in data...")
  73. final_processor = FinalDataProcessor(config)
  74. if not final_processor.process():
  75. raise RuntimeError("Final data processing failed")
  76. final_csv_path = config.output_dir / "merged_ObjState.csv"
  77. return final_csv_path
  78. except Exception as e:
  79. print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
  80. print(f"Debug: Stacktrace: {traceback.format_exc()}")
  81. return None
  82. def process_pgvil_data(
  83. zip_data_path: Path,
  84. output_base_dir: Path,
  85. utm_zone: int = 51,
  86. x_offset: float = 0.0,
  87. y_offset: float = 0.0,
  88. ) -> Optional[Path]:
  89. """处理PGVIL数据
  90. Args:
  91. zip_data_path: ZIP数据文件路径
  92. output_base_dir: 输出基础目录
  93. utm_zone: UTM坐标系区域
  94. x_offset: X坐标偏移量
  95. y_offset: Y坐标偏移量
  96. Returns:
  97. Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
  98. """
  99. pgvil_config = PGVILConfig(
  100. zip_path=zip_data_path,
  101. output_path=output_base_dir,
  102. utm_zone=utm_zone,
  103. x_offset=x_offset,
  104. y_offset=y_offset,
  105. engine_path=Path("_internal/engine").resolve(),
  106. map_path=Path("_internal/data_map").resolve(),
  107. )
  108. if not zip_data_path.exists():
  109. print(f"Error: Input ZIP file not found: {zip_data_path}")
  110. return None
  111. try:
  112. # 确保输出目录存在
  113. output_base_dir.mkdir(parents=True, exist_ok=True)
  114. processor = PGVILProcessor(pgvil_config)
  115. # 解压ZIP文件
  116. pgvil_root = processor.process_zip()
  117. if not pgvil_root.exists():
  118. raise RuntimeError("Failed to extract ZIP file")
  119. # Run C++ engine for additional processing
  120. if not run_pgvil_engine(pgvil_config):
  121. raise RuntimeError("C++ engine execution failed")
  122. merged_csv = processor.merge_csv_files()
  123. if merged_csv is None or not merged_csv.exists():
  124. raise RuntimeError("Failed to merge CSV files")
  125. print(f"merged_csv: {merged_csv}")
  126. return merged_csv
  127. except Exception as e:
  128. print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
  129. return None