resource_manager.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import zipfile
  2. from pathlib import Path
  3. import pandas as pd
  4. from typing import List, Optional
  5. import shutil
  6. class ResourceManager:
  7. """管理插件资源和数据验证"""
  8. # 内置处理器类型及其对应的关键词
  9. BUILT_IN_PROCESSORS = {
  10. "lst": ["rosbag", "gnss", "can", "hmi"],
  11. "pgvil": ["pgvil", "acu", "radar"] # pgvil处理器支持的数据类型
  12. }
  13. def __init__(self, resources_dir: Path):
  14. self.resources_dir = resources_dir
  15. if not self.resources_dir.exists():
  16. self.resources_dir.mkdir(parents=True)
  17. def list_zip_folders(self, zip_path: Path, processor_type: str = "lst") -> List[str]:
  18. """列出ZIP文件中的顶层文件夹,排除内置处理器的关键词文件夹
  19. Args:
  20. zip_path: ZIP文件路径
  21. processor_type: 内置处理器类型,可选 "lst" 或 "pgvil"
  22. Returns:
  23. 不包含内置处理器关键词的文件夹列表
  24. """
  25. folders = set()
  26. built_in_keywords = self.BUILT_IN_PROCESSORS.get(processor_type, [])
  27. try:
  28. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  29. for name in zip_ref.namelist():
  30. parts = Path(name).parts
  31. if len(parts) > 1: # 至少包含一个文件夹
  32. folder = parts[0].lower()
  33. # 只返回不包含内置处理器关键词的文件夹
  34. if not any(keyword in folder for keyword in built_in_keywords):
  35. folders.add(parts[0])
  36. except Exception as e:
  37. print(f"读取ZIP文件出错: {e}")
  38. return []
  39. return list(folders)
  40. def list_rosbag_files(self, zip_path: Path) -> List[str]:
  41. """列出ZIP文件中的所有Rosbag文件"""
  42. rosbag_files = set()
  43. try:
  44. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  45. for name in zip_ref.namelist():
  46. if 'Rosbag/' in name and name.endswith('.bag'):
  47. rosbag_files.add(name)
  48. except Exception as e:
  49. print(f"读取ZIP文件中的Rosbag失败: {e}")
  50. return []
  51. return list(rosbag_files)
  52. def is_rosbag_file(self, zip_path: Path) -> bool:
  53. """检查ZIP文件中是否包含Rosbag数据"""
  54. try:
  55. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  56. # 查找任何包含'rosbag'关键词且扩展名为.bag的文件
  57. for name in zip_ref.namelist():
  58. if 'rosbag' in name.lower() and name.endswith('.bag'):
  59. return True
  60. return False
  61. except Exception as e:
  62. print(f"检查Rosbag文件失败: {e}")
  63. return False
  64. def validate_rosbag_output(self, output_path: Path) -> bool:
  65. """验证Rosbag处理后的输出文件是否有效"""
  66. try:
  67. if not output_path.exists():
  68. print(f"错误:输出文件不存在: {output_path}")
  69. return False
  70. df = pd.read_csv(output_path)
  71. # Rosbag数据必需列
  72. required_columns = ['simTime', 'event_Type']
  73. # 检查必需列
  74. missing_cols = [col for col in required_columns if col not in df.columns]
  75. if missing_cols:
  76. print(f"错误:缺少必需列: {missing_cols}")
  77. return False
  78. # 检查simTime列的有效性
  79. if df['simTime'].isna().any():
  80. print("错误:simTime列包含空值")
  81. return False
  82. return True
  83. except Exception as e:
  84. print(f"验证Rosbag输出时出错: {e}")
  85. return False
  86. def validate_plugin_output(self, output_path: Path) -> bool:
  87. """验证插件输出文件是否有效"""
  88. try:
  89. if not output_path.exists():
  90. print(f"错误:输出文件不存在: {output_path}")
  91. return False
  92. df = pd.read_csv(output_path)
  93. required_columns = ['simTime', 'playerId', 'simFrame']
  94. # 检查必需列是否存在
  95. missing_cols = [col for col in required_columns if col not in df.columns]
  96. if missing_cols:
  97. print(f"错误:缺少必需列: {missing_cols}")
  98. return False
  99. # 检查空值
  100. for col in required_columns:
  101. if df[col].isna().any():
  102. print(f"错误:{col} 包含空值")
  103. return False
  104. return True
  105. except Exception as e:
  106. print(f"验证输出文件时出错: {e}")
  107. return False
  108. def validate_plugin_df(self, df: pd.DataFrame) -> bool:
  109. """验证插件输出的DataFrame是否符合要求"""
  110. try:
  111. required_columns = ['simTime', 'simFrame', 'playerId']
  112. missing_cols = [col for col in required_columns if col not in df.columns]
  113. if missing_cols:
  114. print(f"错误:缺少必需列: {missing_cols}")
  115. return False
  116. if df['playerId'].isna().any():
  117. print("错误:playerId 包含空值")
  118. return False
  119. if df['simTime'].isna().any():
  120. print("错误:simTime 包含空值")
  121. return False
  122. if df['simFrame'].isna().any():
  123. print("错误:simFrame 包含空值")
  124. return False
  125. return True
  126. except Exception as e:
  127. print(f"验证DataFrame时出错: {e}")
  128. return False
  129. def merge_plugin_data(self, main_file: Path, plugin_file: Path, output_file: Path) -> bool:
  130. try:
  131. df_main = pd.read_csv(main_file)
  132. df_plugin = pd.read_csv(plugin_file)
  133. print(f"主数据形状: {df_main.shape}")
  134. print(f"插件数据形状: {df_plugin.shape}")
  135. # 只保留需要的列进行合并
  136. merge_columns = ['simTime', 'playerId']
  137. # 确保时间戳精度匹配
  138. df_main['simTime'] = df_main['simTime'].round(3)
  139. df_plugin['simTime'] = df_plugin['simTime'].round(3)
  140. # 按时间排序
  141. df_main.sort_values(['simTime', 'playerId'], inplace=True)
  142. df_plugin.sort_values(['simTime', 'playerId'], inplace=True)
  143. # 使用 merge_asof 进行基于时间的合并,只使用 simTime 和 playerId
  144. df_merged = pd.merge_asof(
  145. df_main,
  146. df_plugin.drop('simFrame', axis=1, errors='ignore'), # 删除插件数据中的 simFrame
  147. on='simTime',
  148. by=['playerId'],
  149. direction='nearest',
  150. tolerance=0.01 # 100ms的容差
  151. )
  152. print(f"合并后的数据形状: {df_merged.shape}")
  153. print(f"从插件添加的列: {[col for col in df_plugin.columns if col not in merge_columns and col != 'simFrame']}")
  154. df_merged.to_csv(output_file, index=False)
  155. print(f"成功合并数据到: {output_file}")
  156. return True
  157. except Exception as e:
  158. print(f"合并插件数据时出错: {e}")
  159. import traceback
  160. traceback.print_exc()
  161. return False
  162. def copy_resource(self, resource_name: str, target_dir: Path) -> Optional[Path]:
  163. """复制资源文件到目标目录"""
  164. source_path = self.resources_dir / resource_name
  165. if not source_path.exists():
  166. return None
  167. try:
  168. # 创建目标目录(如果不存在)
  169. target_dir.mkdir(parents=True, exist_ok=True)
  170. # 复制文件
  171. target_path = target_dir / resource_name
  172. shutil.copy2(source_path, target_path)
  173. return target_path
  174. except Exception as e:
  175. print(f"Error copying resource {resource_name}: {e}")
  176. return None