123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- from pathlib import Path
- from typing import Optional
- import traceback
- import sys
- import os
- from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
- # from .processors.final_processor import FinalDataProcessor
- from core.processors.built_in.lst import (
- data_precheck,
- run_cpp_engine,
- FinalDataProcessor,
- )
- from core.processors.built_in.pgvil import run_pgvil_engine, PGVILProcessor
- from core.processors.built_in.pgvil import Config as PGVILConfig
- def get_base_path():
- """获取可执行文件所在目录"""
- # if getattr(sys, 'frozen', False): #pyinstaller打包后环境(pyinstaller会自动设置sys.frozen)
- if "__compiled__" in globals(): # nuikta会通过这个语句检测是否被Nuitka打包
- base_path = os.path.dirname(sys.executable) # 可执行文件目录
- # base_path1 = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable)) # 可执行文件目录
- # base_path = Path(base_path1).parent
- print("base_path", Path(base_path))
- else: # 开发环境
- base_path = os.path.dirname('.')
- print("file path is", base_path)
- return base_path
- def resource_path(relative_path):
- """ 获取资源绝对路径,兼容开发环境和单文件模式 """
- if hasattr(sys, '_MEIPASS'):
- base_path = sys._MEIPASS
- else:
- base_path = os.path.abspath(".")
- return os.path.join(base_path, relative_path)
- def process_lst_data(
- zip_data_path: Path,
- output_base_dir: Path,
- trafficlight_json_path: Optional[Path] = None,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0,
- continue_to_iterate: bool = False,
- ) -> Optional[Path]:
- """
- Processes LST data using an optimized pipeline.
- Args:
- zip_data_path: Path to the input ZIP file
- output_base_dir: Base directory for output
- trafficlight_json_path: Optional path to traffic light JSON file
- utm_zone: UTM zone for coordinate projection
- x_offset: X offset for C++ engine
- y_offset: Y offset for C++ engine
- continue_to_iterate: Flag to control iteration continuation
- Returns:
- Path to the final merged_ObjState.csv file if successful, None otherwise
- """
- print(f"Starting LST data processing for: {zip_data_path.name}")
- # Validate input paths
- if not zip_data_path.exists():
- print(f"Error: Input ZIP file not found: {zip_data_path}")
- return None
- if not trafficlight_json_path:
- print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
- trafficlight_json_path = None
- try:
- base_path = get_base_path()
- # Initialize configuration
- config = Config(
- zip_path=zip_data_path.resolve(),
- output_path=output_base_dir.resolve(),
- json_path=(
- trafficlight_json_path.resolve() if trafficlight_json_path else None
- ),
- # dbc_path = os.path.join(base_path, '_internal/VBox.dbc'),
- # engine_path = os.path.join(base_path, '_internal/engine'),
- # map_path = os.path.join(base_path, '_internal/data_map'),
- dbc_path=resource_path('VBox.dbc'),
- engine_path=resource_path('engine'),
- map_path=resource_path('data_map'),
- utm_zone=utm_zone,
- x_offset=x_offset,
- y_offset=y_offset,
- )
- print("engine path is", config.engine_path)
- # Process built-in data types
- print("Processing built-in data types...")
- zip_processor = ZipCSVProcessor(config)
- zip_processor.process_zip()
- # Process rosbag data if available
- rosbag_processor = RosbagProcessor(config)
- rosbag_processor.process_zip_for_rosbags()
- # Run C++ engine for additional processing
- if not run_cpp_engine(config):
- raise RuntimeError("C++ engine execution failed")
- # Validate processed data
- if not data_precheck(config.output_dir):
- raise ValueError("Data quality pre-check failed")
- # Final processing of built-in data
- print("Processing and merging built-in data...")
- final_processor = FinalDataProcessor(config)
- if not final_processor.process():
- raise RuntimeError("Final data processing failed")
- final_csv_path = config.output_dir / "merged_ObjState.csv"
- return final_csv_path
- except Exception as e:
- print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
- print(f"Debug: Stacktrace: {traceback.format_exc()}")
- return None
- def process_pgvil_data(
- zip_data_path: Path,
- output_base_dir: Path,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0,
- ) -> Optional[Path]:
- """处理PGVIL数据
- Args:
- zip_data_path: ZIP数据文件路径
- output_base_dir: 输出基础目录
- utm_zone: UTM坐标系区域
- x_offset: X坐标偏移量
- y_offset: Y坐标偏移量
- Returns:
- Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
- """
- base_path = get_base_path()
- pgvil_config = PGVILConfig(
- zip_path=zip_data_path,
- output_path=output_base_dir,
- utm_zone=utm_zone,
- x_offset=x_offset,
- y_offset=y_offset,
- # engine_path = os.path.join(base_path, '_internal/engine'),
- # map_path = os.path.join(base_path, '_internal/data_map'),
- engine_path=resource_path('engine'),
- map_path=resource_path('data_map')
- )
- if not zip_data_path.exists():
- print(f"Error: Input ZIP file not found: {zip_data_path}")
- return None
- try:
- # 确保输出目录存在
- output_base_dir.mkdir(parents=True, exist_ok=True)
- processor = PGVILProcessor(pgvil_config)
- # 解压ZIP文件
- pgvil_root = processor.process_zip()
- if not pgvil_root.exists():
- raise RuntimeError("Failed to extract ZIP file")
- # Run C++ engine for additional processing
- if not run_pgvil_engine(pgvil_config):
- raise RuntimeError("C++ engine execution failed")
- merged_csv = processor.merge_csv_files()
- if merged_csv is None or not merged_csv.exists():
- raise RuntimeError("Failed to merge CSV files")
- print(f"merged_csv: {merged_csv}")
- return merged_csv
- except Exception as e:
- print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
- return None
|