from abc import ABC, abstractmethod from pathlib import Path import pandas as pd from typing import Dict, Any, Optional, Tuple import tempfile import zipfile class CustomDataProcessorPlugin(ABC): """自定义数据处理插件的基类,提供通用方法""" @abstractmethod def can_handle(self, zip_path: Path, folder_name: str) -> bool: """检查是否可以处理指定的数据文件夹""" pass @abstractmethod def get_required_columns(self) -> Dict[str, Any]: """返回必需的列和它们的类型""" pass def process_data(self, zip_path: Path, folder_name: str, output_dir: Path) -> Optional[pd.DataFrame]: """处理数据的主方法,返回处理后的DataFrame""" try: with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # 提取数据文件 csv_files = self._extract_files(zip_path, folder_name, temp_path) if not csv_files: return None # 处理数据 df = self._process_extracted_files(csv_files) if df is None or df.empty: return None return df except Exception as e: print(f"处理数据时发生错误: {e}") import traceback traceback.print_exc() return None def _extract_files(self, zip_path: Path, folder_name: str, temp_path: Path) -> list: """从ZIP文件中提取数据文件,不限制文件格式""" try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # 获取指定文件夹下的所有文件,不限制格式 files = [name for name in zip_ref.namelist() if name.startswith(f"{folder_name}/")] if not files: print(f"在{folder_name}中未找到数据文件") return [] # 解压所有文件 for file_name in files: zip_ref.extract(file_name, temp_path) # 返回解压后的所有文件路径 return list(temp_path.glob(f"{folder_name}/*")) except Exception as e: print(f"提取文件时发生错误: {e}") return [] @abstractmethod def _process_extracted_files(self, file_paths: list) -> Optional[pd.DataFrame]: """处理提取的文件""" pass def _validate_data(self, df: pd.DataFrame) -> bool: """验证数据是否满足要求""" required_cols = self.get_required_columns() # 检查必需列是否存在 missing_cols = [col for col in required_cols.keys() if col not in df.columns] if missing_cols: print(f"错误:缺少必需列: {missing_cols}") return False # 检查数据类型 for col, dtype in required_cols.items(): try: df[col] = df[col].astype(dtype) except Exception as e: print(f"错误:列 {col} 的数据类型转换失败: {e}") return False return True def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: """数据预处理的通用方法""" # 移除重复行 df = df.drop_duplicates() # 确保时间戳列的精度为3位 if 'simTime' in df.columns: df['simTime'] = df['simTime'].round(3) # 按时间排序 if 'simTime' in df.columns: df.sort_values('simTime', inplace=True) return df