123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from pathlib import Path
- from typing import Optional
- import traceback
- from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
- # from .processors.final_processor import FinalDataProcessor
- from core.processors.built_in.lst import (
- data_precheck,
- run_cpp_engine,
- FinalDataProcessor,
- )
- from core.processors.built_in.pgvil import run_pgvil_engine, PGVILProcessor
- from core.processors.built_in.pgvil import Config as PGVILConfig
- def process_lst_data(
- zip_data_path: Path,
- output_base_dir: Path,
- trafficlight_json_path: Optional[Path] = None,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0,
- continue_to_iterate: bool = False,
- ) -> Optional[Path]:
- """
- Processes LST data using an optimized pipeline.
- Args:
- zip_data_path: Path to the input ZIP file
- output_base_dir: Base directory for output
- trafficlight_json_path: Optional path to traffic light JSON file
- utm_zone: UTM zone for coordinate projection
- x_offset: X offset for C++ engine
- y_offset: Y offset for C++ engine
- continue_to_iterate: Flag to control iteration continuation
- Returns:
- Path to the final merged_ObjState.csv file if successful, None otherwise
- """
- print(f"Starting LST data processing for: {zip_data_path.name}")
- # Validate input paths
- if not zip_data_path.exists():
- print(f"Error: Input ZIP file not found: {zip_data_path}")
- return None
- if not trafficlight_json_path:
- print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
- trafficlight_json_path = None
- try:
- # Initialize configuration
- config = Config(
- zip_path=zip_data_path.resolve(),
- output_path=output_base_dir.resolve(),
- json_path=(
- trafficlight_json_path.resolve() if trafficlight_json_path else None
- ),
- dbc_path=Path("_internal/VBox.dbc").resolve(),
- engine_path=Path("_internal/engine").resolve(),
- map_path=Path("_internal/data_map").resolve(),
- utm_zone=utm_zone,
- x_offset=x_offset,
- y_offset=y_offset,
- )
- # Process built-in data types
- print("Processing built-in data types...")
- zip_processor = ZipCSVProcessor(config)
- zip_processor.process_zip()
- # Process rosbag data if available
- rosbag_processor = RosbagProcessor(config)
- rosbag_processor.process_zip_for_rosbags()
- # Run C++ engine for additional processing
- if not run_cpp_engine(config):
- raise RuntimeError("C++ engine execution failed")
- # Validate processed data
- if not data_precheck(config.output_dir):
- raise ValueError("Data quality pre-check failed")
- # Final processing of built-in data
- print("Processing and merging built-in data...")
- final_processor = FinalDataProcessor(config)
- if not final_processor.process():
- raise RuntimeError("Final data processing failed")
- final_csv_path = config.output_dir / "merged_ObjState.csv"
- return final_csv_path
- except Exception as e:
- print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
- print(f"Debug: Stacktrace: {traceback.format_exc()}")
- return None
- def process_pgvil_data(
- zip_data_path: Path,
- output_base_dir: Path,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0,
- ) -> Optional[Path]:
- """处理PGVIL数据
- Args:
- zip_data_path: ZIP数据文件路径
- output_base_dir: 输出基础目录
- utm_zone: UTM坐标系区域
- x_offset: X坐标偏移量
- y_offset: Y坐标偏移量
- Returns:
- Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
- """
- pgvil_config = PGVILConfig(
- zip_path=zip_data_path,
- output_path=output_base_dir,
- utm_zone=utm_zone,
- x_offset=x_offset,
- y_offset=y_offset,
- engine_path=Path("_internal/engine").resolve(),
- map_path=Path("_internal/data_map").resolve(),
- )
- if not zip_data_path.exists():
- print(f"Error: Input ZIP file not found: {zip_data_path}")
- return None
- try:
- # 确保输出目录存在
- output_base_dir.mkdir(parents=True, exist_ok=True)
- processor = PGVILProcessor(pgvil_config)
- # 解压ZIP文件
- pgvil_root = processor.process_zip()
- if not pgvil_root.exists():
- raise RuntimeError("Failed to extract ZIP file")
-
- # Run C++ engine for additional processing
- if not run_pgvil_engine(pgvil_config):
- raise RuntimeError("C++ engine execution failed")
- merged_csv = processor.merge_csv_files()
- if merged_csv is None or not merged_csv.exists():
- raise RuntimeError("Failed to merge CSV files")
- print(f"merged_csv: {merged_csv}")
- return merged_csv
- except Exception as e:
- print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
- return None
|