from pathlib import Path from typing import Optional import traceback from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config # from .processors.final_processor import FinalDataProcessor from core.processors.built_in.lst import data_precheck, run_cpp_engine, FinalDataProcessor def process_lst_data( zip_data_path: Path, output_base_dir: Path, trafficlight_json_path: Optional[Path] = None, utm_zone: int = 51, x_offset: float = 0.0, y_offset: float = 0.0, continue_to_iterate: bool = False, ) -> Optional[Path]: """ Processes LST data using an optimized pipeline. Args: zip_data_path: Path to the input ZIP file output_base_dir: Base directory for output trafficlight_json_path: Optional path to traffic light JSON file utm_zone: UTM zone for coordinate projection x_offset: X offset for C++ engine y_offset: Y offset for C++ engine continue_to_iterate: Flag to control iteration continuation Returns: Path to the final merged_ObjState.csv file if successful, None otherwise """ print(f"Starting LST data processing for: {zip_data_path.name}") # Validate input paths if not zip_data_path.exists(): print(f"Error: Input ZIP file not found: {zip_data_path}") return None if not trafficlight_json_path: print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}") trafficlight_json_path = None try: # Initialize configuration config = Config( zip_path=zip_data_path.resolve(), output_path=output_base_dir.resolve(), json_path=trafficlight_json_path.resolve() if trafficlight_json_path else None, dbc_path=Path("_internal/VBox.dbc").resolve(), engine_path=Path("_internal/engine").resolve(), map_path=Path("_internal/data_map").resolve(), utm_zone=utm_zone, x_offset=x_offset, y_offset=y_offset ) # Process built-in data types print("Processing built-in data types...") zip_processor = ZipCSVProcessor(config) zip_processor.process_zip() # Process rosbag data if available rosbag_processor = RosbagProcessor(config) rosbag_processor.process_zip_for_rosbags() # Run C++ engine for additional processing if not run_cpp_engine(config): raise RuntimeError("C++ engine execution failed") # Validate processed data if not data_precheck(config.output_dir): raise ValueError("Data quality pre-check failed") # Final processing of built-in data print("Processing and merging built-in data...") final_processor = FinalDataProcessor(config) if not final_processor.process(): raise RuntimeError("Final data processing failed") final_csv_path = config.output_dir / "merged_ObjState.csv" return final_csv_path except Exception as e: print(f"Error: Processing failed for {zip_data_path.name}: {str(e)}") print(f"Debug: Stacktrace: {traceback.format_exc()}") return None def process_pgvil_data( zip_data_path: Path, output_base_dir: Path, utm_zone: int = 51, x_offset: float = 0.0, y_offset: float = 0.0 ) -> Optional[Path]: """处理PGVIL数据 Args: zip_data_path: ZIP数据文件路径 output_base_dir: 输出基础目录 utm_zone: UTM坐标系区域 x_offset: X坐标偏移量 y_offset: Y坐标偏移量 Returns: Optional[Path]: 处理后的CSV文件路径,处理失败则返回None """ try: # 确保输出目录存在 output_base_dir.mkdir(parents=True, exist_ok=True) # 解压ZIP文件 if not extract_zip_file(zip_data_path, output_base_dir): return None # 查找所有PGVIL数据文件 pgvil_files = [] for root, _, files in os.walk(output_base_dir): for file in files: if file.lower().endswith(('.csv', '.json')): pgvil_files.append(Path(root) / file) if not pgvil_files: print(f"在 {output_base_dir} 中未找到PGVIL数据文件") return None print(f"找到 {len(pgvil_files)} 个PGVIL数据文件") # 处理所有PGVIL文件 all_data = [] for pgvil_file in pgvil_files: try: # 读取PGVIL文件 if pgvil_file.suffix.lower() == '.csv': df = pd.read_csv(pgvil_file) elif pgvil_file.suffix.lower() == '.json': with open(pgvil_file, 'r') as f: data = json.load(f) df = pd.DataFrame(data) # 确保必要的列存在 required_cols = ['simTime', 'simFrame', 'playerId'] for col in required_cols: if col not in df.columns: df[col] = 0 # 添加默认值 all_data.append(df) print(f"成功处理文件: {pgvil_file}") except Exception as e: print(f"处理文件 {pgvil_file} 时出错: {e}") if not all_data: print("没有成功处理任何PGVIL文件") return None # 合并所有数据 combined_df = pd.concat(all_data, ignore_index=True) # 保存处理后的数据 output_path = output_base_dir / "processed_pgvil_data.csv" combined_df.to_csv(output_path, index=False) print(f"成功处理所有PGVIL数据,结果保存到: {output_path}") return output_path except Exception as e: print(f"处理PGVIL数据时出错: {e}") import traceback traceback.print_exc() return None