import zipfile from pathlib import Path import pandas as pd from typing import List, Optional import shutil class ResourceManager: """管理插件资源和数据验证""" # 内置处理器类型及其对应的关键词 BUILT_IN_PROCESSORS = { "lst": ["rosbag", "gnss", "can", "hmi"], "pgvil": ["pgvil", "acu", "radar"] # pgvil处理器支持的数据类型 } def __init__(self, resources_dir: Path): self.resources_dir = resources_dir if not self.resources_dir.exists(): self.resources_dir.mkdir(parents=True) def list_zip_folders(self, zip_path: Path, processor_type: str = "lst") -> List[str]: """列出ZIP文件中的顶层文件夹,排除内置处理器的关键词文件夹 Args: zip_path: ZIP文件路径 processor_type: 内置处理器类型,可选 "lst" 或 "pgvil" Returns: 不包含内置处理器关键词的文件夹列表 """ folders = set() built_in_keywords = self.BUILT_IN_PROCESSORS.get(processor_type, []) try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: for name in zip_ref.namelist(): parts = Path(name).parts if len(parts) > 1: # 至少包含一个文件夹 folder = parts[0].lower() # 只返回不包含内置处理器关键词的文件夹 if not any(keyword in folder for keyword in built_in_keywords): folders.add(parts[0]) except Exception as e: print(f"读取ZIP文件出错: {e}") return [] return list(folders) def list_rosbag_files(self, zip_path: Path) -> List[str]: """列出ZIP文件中的所有Rosbag文件""" rosbag_files = set() try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: for name in zip_ref.namelist(): if 'Rosbag/' in name and name.endswith('.bag'): rosbag_files.add(name) except Exception as e: print(f"读取ZIP文件中的Rosbag失败: {e}") return [] return list(rosbag_files) def is_rosbag_file(self, zip_path: Path) -> bool: """检查ZIP文件中是否包含Rosbag数据""" try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # 查找任何包含'rosbag'关键词且扩展名为.bag的文件 for name in zip_ref.namelist(): if 'rosbag' in name.lower() and name.endswith('.bag'): return True return False except Exception as e: print(f"检查Rosbag文件失败: {e}") return False def validate_rosbag_output(self, output_path: Path) -> bool: """验证Rosbag处理后的输出文件是否有效""" try: if not output_path.exists(): print(f"错误:输出文件不存在: {output_path}") return False df = pd.read_csv(output_path) # Rosbag数据必需列 required_columns = ['simTime', 'event_Type'] # 检查必需列 missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: print(f"错误:缺少必需列: {missing_cols}") return False # 检查simTime列的有效性 if df['simTime'].isna().any(): print("错误:simTime列包含空值") return False return True except Exception as e: print(f"验证Rosbag输出时出错: {e}") return False def validate_plugin_output(self, output_path: Path) -> bool: """验证插件输出文件是否有效""" try: if not output_path.exists(): print(f"错误:输出文件不存在: {output_path}") return False df = pd.read_csv(output_path) required_columns = ['simTime', 'playerId', 'simFrame'] # 检查必需列是否存在 missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: print(f"错误:缺少必需列: {missing_cols}") return False # 检查空值 for col in required_columns: if df[col].isna().any(): print(f"错误:{col} 包含空值") return False return True except Exception as e: print(f"验证输出文件时出错: {e}") return False def validate_plugin_df(self, df: pd.DataFrame) -> bool: """验证插件输出的DataFrame是否符合要求""" try: required_columns = ['simTime', 'simFrame', 'playerId'] missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: print(f"错误:缺少必需列: {missing_cols}") return False if df['playerId'].isna().any(): print("错误:playerId 包含空值") return False if df['simTime'].isna().any(): print("错误:simTime 包含空值") return False if df['simFrame'].isna().any(): print("错误:simFrame 包含空值") return False return True except Exception as e: print(f"验证DataFrame时出错: {e}") return False def merge_plugin_data(self, main_file: Path, plugin_file: Path, output_file: Path) -> bool: try: df_main = pd.read_csv(main_file) df_plugin = pd.read_csv(plugin_file) print(f"主数据形状: {df_main.shape}") print(f"插件数据形状: {df_plugin.shape}") # 只保留需要的列进行合并 merge_columns = ['simTime', 'playerId'] # 确保时间戳精度匹配 df_main['simTime'] = df_main['simTime'].round(3) df_plugin['simTime'] = df_plugin['simTime'].round(3) # 按时间排序 df_main.sort_values(['simTime', 'playerId'], inplace=True) df_plugin.sort_values(['simTime', 'playerId'], inplace=True) # 使用 merge_asof 进行基于时间的合并,只使用 simTime 和 playerId df_merged = pd.merge_asof( df_main, df_plugin.drop('simFrame', axis=1, errors='ignore'), # 删除插件数据中的 simFrame on='simTime', by=['playerId'], direction='nearest', tolerance=0.01 # 100ms的容差 ) print(f"合并后的数据形状: {df_merged.shape}") print(f"从插件添加的列: {[col for col in df_plugin.columns if col not in merge_columns and col != 'simFrame']}") df_merged.to_csv(output_file, index=False) print(f"成功合并数据到: {output_file}") return True except Exception as e: print(f"合并插件数据时出错: {e}") import traceback traceback.print_exc() return False def copy_resource(self, resource_name: str, target_dir: Path) -> Optional[Path]: """复制资源文件到目标目录""" source_path = self.resources_dir / resource_name if not source_path.exists(): return None try: # 创建目标目录(如果不存在) target_dir.mkdir(parents=True, exist_ok=True) # 复制文件 target_path = target_dir / resource_name shutil.copy2(source_path, target_path) return target_path except Exception as e: print(f"Error copying resource {resource_name}: {e}") return None