|
- import zipfile
- from pathlib import Path
- import pandas as pd
- from typing import List, Optional
- import shutil
- class ResourceManager:
- """管理插件资源和数据验证"""
-
- # 内置处理器类型及其对应的关键词
- BUILT_IN_PROCESSORS = {
- "lst": ["rosbag", "gnss", "can", "hmi"],
- "pgvil": ["pgvil", "acu", "radar"] # pgvil处理器支持的数据类型
- }
-
- def __init__(self, resources_dir: Path):
- self.resources_dir = resources_dir
- if not self.resources_dir.exists():
- self.resources_dir.mkdir(parents=True)
-
- def list_zip_folders(self, zip_path: Path, processor_type: str = "lst") -> List[str]:
- """列出ZIP文件中的顶层文件夹,排除内置处理器的关键词文件夹
-
- Args:
- zip_path: ZIP文件路径
- processor_type: 内置处理器类型,可选 "lst" 或 "pgvil"
-
- Returns:
- 不包含内置处理器关键词的文件夹列表
- """
- folders = set()
- built_in_keywords = self.BUILT_IN_PROCESSORS.get(processor_type, [])
-
- try:
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- for name in zip_ref.namelist():
- parts = Path(name).parts
- if len(parts) > 1: # 至少包含一个文件夹
- folder = parts[0].lower()
- # 只返回不包含内置处理器关键词的文件夹
- if not any(keyword in folder for keyword in built_in_keywords):
- folders.add(parts[0])
-
- except Exception as e:
- print(f"读取ZIP文件出错: {e}")
- return []
-
- return list(folders)
-
- def list_rosbag_files(self, zip_path: Path) -> List[str]:
- """列出ZIP文件中的所有Rosbag文件"""
- rosbag_files = set()
- try:
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- for name in zip_ref.namelist():
- if 'Rosbag/' in name and name.endswith('.bag'):
- rosbag_files.add(name)
- except Exception as e:
- print(f"读取ZIP文件中的Rosbag失败: {e}")
- return []
- return list(rosbag_files)
- def is_rosbag_file(self, zip_path: Path) -> bool:
- """检查ZIP文件中是否包含Rosbag数据"""
- try:
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- # 查找任何包含'rosbag'关键词且扩展名为.bag的文件
- for name in zip_ref.namelist():
- if 'rosbag' in name.lower() and name.endswith('.bag'):
- return True
- return False
-
- except Exception as e:
- print(f"检查Rosbag文件失败: {e}")
- return False
- def validate_rosbag_output(self, output_path: Path) -> bool:
- """验证Rosbag处理后的输出文件是否有效"""
- try:
- if not output_path.exists():
- print(f"错误:输出文件不存在: {output_path}")
- return False
- df = pd.read_csv(output_path)
-
- # Rosbag数据必需列
- required_columns = ['simTime', 'event_Type']
-
- # 检查必需列
- missing_cols = [col for col in required_columns if col not in df.columns]
- if missing_cols:
- print(f"错误:缺少必需列: {missing_cols}")
- return False
- # 检查simTime列的有效性
- if df['simTime'].isna().any():
- print("错误:simTime列包含空值")
- return False
- return True
-
- except Exception as e:
- print(f"验证Rosbag输出时出错: {e}")
- return False
-
- def validate_plugin_output(self, output_path: Path) -> bool:
- """验证插件输出文件是否有效"""
- try:
- if not output_path.exists():
- print(f"错误:输出文件不存在: {output_path}")
- return False
- df = pd.read_csv(output_path)
- required_columns = ['simTime', 'playerId', 'simFrame']
-
- # 检查必需列是否存在
- missing_cols = [col for col in required_columns if col not in df.columns]
- if missing_cols:
- print(f"错误:缺少必需列: {missing_cols}")
- return False
-
- # 检查空值
- for col in required_columns:
- if df[col].isna().any():
- print(f"错误:{col} 包含空值")
- return False
-
- return True
-
- except Exception as e:
- print(f"验证输出文件时出错: {e}")
- return False
- def validate_plugin_df(self, df: pd.DataFrame) -> bool:
- """验证插件输出的DataFrame是否符合要求"""
- try:
- required_columns = ['simTime', 'simFrame', 'playerId']
-
- missing_cols = [col for col in required_columns if col not in df.columns]
- if missing_cols:
- print(f"错误:缺少必需列: {missing_cols}")
- return False
-
- if df['playerId'].isna().any():
- print("错误:playerId 包含空值")
- return False
- if df['simTime'].isna().any():
- print("错误:simTime 包含空值")
- return False
- if df['simFrame'].isna().any():
- print("错误:simFrame 包含空值")
- return False
-
- return True
-
- except Exception as e:
- print(f"验证DataFrame时出错: {e}")
- return False
-
- def merge_plugin_data(self, main_file: Path, plugin_file: Path, output_file: Path) -> bool:
- try:
- df_main = pd.read_csv(main_file)
- df_plugin = pd.read_csv(plugin_file)
-
- print(f"主数据形状: {df_main.shape}")
- print(f"插件数据形状: {df_plugin.shape}")
-
- # 只保留需要的列进行合并
- merge_columns = ['simTime', 'playerId']
-
- # 确保时间戳精度匹配
- df_main['simTime'] = df_main['simTime'].round(3)
- df_plugin['simTime'] = df_plugin['simTime'].round(3)
-
- # 按时间排序
- df_main.sort_values(['simTime', 'playerId'], inplace=True)
- df_plugin.sort_values(['simTime', 'playerId'], inplace=True)
- # 使用 merge_asof 进行基于时间的合并,只使用 simTime 和 playerId
- df_merged = pd.merge_asof(
- df_main,
- df_plugin.drop('simFrame', axis=1, errors='ignore'), # 删除插件数据中的 simFrame
- on='simTime',
- by=['playerId'],
- direction='nearest',
- tolerance=0.01 # 100ms的容差
- )
-
- print(f"合并后的数据形状: {df_merged.shape}")
- print(f"从插件添加的列: {[col for col in df_plugin.columns if col not in merge_columns and col != 'simFrame']}")
-
- df_merged.to_csv(output_file, index=False)
- print(f"成功合并数据到: {output_file}")
-
- return True
-
- except Exception as e:
- print(f"合并插件数据时出错: {e}")
- import traceback
- traceback.print_exc()
- return False
-
- def copy_resource(self, resource_name: str, target_dir: Path) -> Optional[Path]:
- """复制资源文件到目标目录"""
- source_path = self.resources_dir / resource_name
- if not source_path.exists():
- return None
-
- try:
- # 创建目标目录(如果不存在)
- target_dir.mkdir(parents=True, exist_ok=True)
-
- # 复制文件
- target_path = target_dir / resource_name
- shutil.copy2(source_path, target_path)
-
- return target_path
-
- except Exception as e:
- print(f"Error copying resource {resource_name}: {e}")
- return None
|