plugin_interface.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from abc import ABC, abstractmethod
  2. from pathlib import Path
  3. import pandas as pd
  4. from typing import Dict, Any, Optional, Tuple
  5. import tempfile
  6. import zipfile
  7. class CustomDataProcessorPlugin(ABC):
  8. """自定义数据处理插件的基类,提供通用方法"""
  9. @abstractmethod
  10. def can_handle(self, zip_path: Path, folder_name: str) -> bool:
  11. """检查是否可以处理指定的数据文件夹"""
  12. pass
  13. @abstractmethod
  14. def get_required_columns(self) -> Dict[str, Any]:
  15. """返回必需的列和它们的类型"""
  16. pass
  17. def process_data(self, zip_path: Path, folder_name: str, output_dir: Path) -> Optional[pd.DataFrame]:
  18. """处理数据的主方法,返回处理后的DataFrame"""
  19. try:
  20. with tempfile.TemporaryDirectory() as temp_dir:
  21. temp_path = Path(temp_dir)
  22. # 提取数据文件
  23. csv_files = self._extract_files(zip_path, folder_name, temp_path)
  24. if not csv_files:
  25. return None
  26. # 处理数据
  27. df = self._process_extracted_files(csv_files)
  28. if df is None or df.empty:
  29. return None
  30. return df
  31. except Exception as e:
  32. print(f"处理数据时发生错误: {e}")
  33. import traceback
  34. traceback.print_exc()
  35. return None
  36. def _extract_files(self, zip_path: Path, folder_name: str, temp_path: Path) -> list:
  37. """从ZIP文件中提取数据文件,不限制文件格式"""
  38. try:
  39. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  40. # 获取指定文件夹下的所有文件,不限制格式
  41. files = [name for name in zip_ref.namelist()
  42. if name.startswith(f"{folder_name}/")]
  43. if not files:
  44. print(f"在{folder_name}中未找到数据文件")
  45. return []
  46. # 解压所有文件
  47. for file_name in files:
  48. zip_ref.extract(file_name, temp_path)
  49. # 返回解压后的所有文件路径
  50. return list(temp_path.glob(f"{folder_name}/*"))
  51. except Exception as e:
  52. print(f"提取文件时发生错误: {e}")
  53. return []
  54. @abstractmethod
  55. def _process_extracted_files(self, file_paths: list) -> Optional[pd.DataFrame]:
  56. """处理提取的文件"""
  57. pass
  58. def _validate_data(self, df: pd.DataFrame) -> bool:
  59. """验证数据是否满足要求"""
  60. required_cols = self.get_required_columns()
  61. # 检查必需列是否存在
  62. missing_cols = [col for col in required_cols.keys() if col not in df.columns]
  63. if missing_cols:
  64. print(f"错误:缺少必需列: {missing_cols}")
  65. return False
  66. # 检查数据类型
  67. for col, dtype in required_cols.items():
  68. try:
  69. df[col] = df[col].astype(dtype)
  70. except Exception as e:
  71. print(f"错误:列 {col} 的数据类型转换失败: {e}")
  72. return False
  73. return True
  74. def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
  75. """数据预处理的通用方法"""
  76. # 移除重复行
  77. df = df.drop_duplicates()
  78. # 确保时间戳列的精度为3位
  79. if 'simTime' in df.columns:
  80. df['simTime'] = df['simTime'].round(3)
  81. # 按时间排序
  82. if 'simTime' in df.columns:
  83. df.sort_values('simTime', inplace=True)
  84. return df