from pathlib import Path from typing import Optional import traceback from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config # from .processors.final_processor import FinalDataProcessor from core.processors.built_in.lst import ( data_precheck, run_cpp_engine, FinalDataProcessor, ) from core.processors.built_in.pgvil import run_pgvil_engine, PGVILProcessor from core.processors.built_in.pgvil import Config as PGVILConfig def process_lst_data( zip_data_path: Path, output_base_dir: Path, trafficlight_json_path: Optional[Path] = None, utm_zone: int = 51, x_offset: float = 0.0, y_offset: float = 0.0, continue_to_iterate: bool = False, ) -> Optional[Path]: """ Processes LST data using an optimized pipeline. Args: zip_data_path: Path to the input ZIP file output_base_dir: Base directory for output trafficlight_json_path: Optional path to traffic light JSON file utm_zone: UTM zone for coordinate projection x_offset: X offset for C++ engine y_offset: Y offset for C++ engine continue_to_iterate: Flag to control iteration continuation Returns: Path to the final merged_ObjState.csv file if successful, None otherwise """ print(f"Starting LST data processing for: {zip_data_path.name}") # Validate input paths if not zip_data_path.exists(): print(f"Error: Input ZIP file not found: {zip_data_path}") return None if not trafficlight_json_path: print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}") trafficlight_json_path = None try: # Initialize configuration config = Config( zip_path=zip_data_path.resolve(), output_path=output_base_dir.resolve(), json_path=( trafficlight_json_path.resolve() if trafficlight_json_path else None ), dbc_path=Path("_internal/VBox.dbc").resolve(), engine_path=Path("_internal/engine").resolve(), map_path=Path("_internal/data_map").resolve(), utm_zone=utm_zone, x_offset=x_offset, y_offset=y_offset, ) # Process built-in data types print("Processing built-in data types...") zip_processor = ZipCSVProcessor(config) zip_processor.process_zip() # Process rosbag data if available rosbag_processor = RosbagProcessor(config) rosbag_processor.process_zip_for_rosbags() # Run C++ engine for additional processing if not run_cpp_engine(config): raise RuntimeError("C++ engine execution failed") # Validate processed data if not data_precheck(config.output_dir): raise ValueError("Data quality pre-check failed") # Final processing of built-in data print("Processing and merging built-in data...") final_processor = FinalDataProcessor(config) if not final_processor.process(): raise RuntimeError("Final data processing failed") final_csv_path = config.output_dir / "merged_ObjState.csv" return final_csv_path except Exception as e: print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}") print(f"Debug: Stacktrace: {traceback.format_exc()}") return None def process_pgvil_data( zip_data_path: Path, output_base_dir: Path, utm_zone: int = 51, x_offset: float = 0.0, y_offset: float = 0.0, ) -> Optional[Path]: """处理PGVIL数据 Args: zip_data_path: ZIP数据文件路径 output_base_dir: 输出基础目录 utm_zone: UTM坐标系区域 x_offset: X坐标偏移量 y_offset: Y坐标偏移量 Returns: Optional[Path]: 处理后的CSV文件路径,处理失败则返回None """ pgvil_config = PGVILConfig( zip_path=zip_data_path, output_path=output_base_dir, utm_zone=utm_zone, x_offset=x_offset, y_offset=y_offset, engine_path=Path("_internal/engine").resolve(), map_path=Path("_internal/data_map").resolve(), ) if not zip_data_path.exists(): print(f"Error: Input ZIP file not found: {zip_data_path}") return None try: # 确保输出目录存在 output_base_dir.mkdir(parents=True, exist_ok=True) processor = PGVILProcessor(pgvil_config) # 解压ZIP文件 pgvil_root = processor.process_zip() if not pgvil_root.exists(): raise RuntimeError("Failed to extract ZIP file") # Run C++ engine for additional processing if not run_pgvil_engine(pgvil_config): raise RuntimeError("C++ engine execution failed") merged_csv = processor.merge_csv_files() if merged_csv is None or not merged_csv.exists(): raise RuntimeError("Failed to merge CSV files") print(f"merged_csv: {merged_csv}") return merged_csv except Exception as e: print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}") return None