123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- from pathlib import Path
- from typing import Optional
- import traceback
- from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
- # from .processors.final_processor import FinalDataProcessor
- from core.processors.built_in.lst import data_precheck, run_cpp_engine, FinalDataProcessor
- def process_lst_data(
- zip_data_path: Path,
- output_base_dir: Path,
- trafficlight_json_path: Optional[Path] = None,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0,
- continue_to_iterate: bool = False,
- ) -> Optional[Path]:
- """
- Processes LST data using an optimized pipeline.
- Args:
- zip_data_path: Path to the input ZIP file
- output_base_dir: Base directory for output
- trafficlight_json_path: Optional path to traffic light JSON file
- utm_zone: UTM zone for coordinate projection
- x_offset: X offset for C++ engine
- y_offset: Y offset for C++ engine
- continue_to_iterate: Flag to control iteration continuation
- Returns:
- Path to the final merged_ObjState.csv file if successful, None otherwise
- """
- print(f"Starting LST data processing for: {zip_data_path.name}")
-
- # Validate input paths
- if not zip_data_path.exists():
- print(f"Error: Input ZIP file not found: {zip_data_path}")
- return None
-
- if not trafficlight_json_path:
- print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
- trafficlight_json_path = None
- try:
- # Initialize configuration
- config = Config(
- zip_path=zip_data_path.resolve(),
- output_path=output_base_dir.resolve(),
- json_path=trafficlight_json_path.resolve() if trafficlight_json_path else None,
- dbc_path=Path("_internal/VBox.dbc").resolve(),
- engine_path=Path("_internal/engine").resolve(),
- map_path=Path("_internal/data_map").resolve(),
- utm_zone=utm_zone,
- x_offset=x_offset,
- y_offset=y_offset
- )
- # Process built-in data types
- print("Processing built-in data types...")
- zip_processor = ZipCSVProcessor(config)
- zip_processor.process_zip()
- # Process rosbag data if available
- rosbag_processor = RosbagProcessor(config)
- rosbag_processor.process_zip_for_rosbags()
- # Run C++ engine for additional processing
- if not run_cpp_engine(config):
- raise RuntimeError("C++ engine execution failed")
- # Validate processed data
- if not data_precheck(config.output_dir):
- raise ValueError("Data quality pre-check failed")
- # Final processing of built-in data
- print("Processing and merging built-in data...")
- final_processor = FinalDataProcessor(config)
-
- if not final_processor.process():
- raise RuntimeError("Final data processing failed")
- final_csv_path = config.output_dir / "merged_ObjState.csv"
-
- return final_csv_path
- except Exception as e:
- print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}")
- print(f"Debug: Stacktrace: {traceback.format_exc()}")
- return None
- def process_pgvil_data(
- zip_data_path: Path,
- output_base_dir: Path,
- utm_zone: int = 51,
- x_offset: float = 0.0,
- y_offset: float = 0.0
- ) -> Optional[Path]:
- """处理PGVIL数据
-
- Args:
- zip_data_path: ZIP数据文件路径
- output_base_dir: 输出基础目录
- utm_zone: UTM坐标系区域
- x_offset: X坐标偏移量
- y_offset: Y坐标偏移量
-
- Returns:
- Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
- """
- try:
- # 确保输出目录存在
- output_base_dir.mkdir(parents=True, exist_ok=True)
-
- # 解压ZIP文件
- if not extract_zip_file(zip_data_path, output_base_dir):
- return None
-
- # 查找所有PGVIL数据文件
- pgvil_files = []
- for root, _, files in os.walk(output_base_dir):
- for file in files:
- if file.lower().endswith(('.csv', '.json')):
- pgvil_files.append(Path(root) / file)
-
- if not pgvil_files:
- print(f"在 {output_base_dir} 中未找到PGVIL数据文件")
- return None
-
- print(f"找到 {len(pgvil_files)} 个PGVIL数据文件")
-
- # 处理所有PGVIL文件
- all_data = []
- for pgvil_file in pgvil_files:
- try:
- # 读取PGVIL文件
- if pgvil_file.suffix.lower() == '.csv':
- df = pd.read_csv(pgvil_file)
- elif pgvil_file.suffix.lower() == '.json':
- with open(pgvil_file, 'r') as f:
- data = json.load(f)
- df = pd.DataFrame(data)
-
- # 确保必要的列存在
- required_cols = ['simTime', 'simFrame', 'playerId']
- for col in required_cols:
- if col not in df.columns:
- df[col] = 0 # 添加默认值
-
- all_data.append(df)
- print(f"成功处理文件: {pgvil_file}")
- except Exception as e:
- print(f"处理文件 {pgvil_file} 时出错: {e}")
-
- if not all_data:
- print("没有成功处理任何PGVIL文件")
- return None
-
- # 合并所有数据
- combined_df = pd.concat(all_data, ignore_index=True)
-
- # 保存处理后的数据
- output_path = output_base_dir / "processed_pgvil_data.csv"
- combined_df.to_csv(output_path, index=False)
- print(f"成功处理所有PGVIL数据,结果保存到: {output_path}")
- return output_path
-
- except Exception as e:
- print(f"处理PGVIL数据时出错: {e}")
- import traceback
- traceback.print_exc()
- return None
|