123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from abc import ABC, abstractmethod
- from pathlib import Path
- import pandas as pd
- from typing import Dict, Any, Optional, Tuple
- import tempfile
- import zipfile
- class CustomDataProcessorPlugin(ABC):
- """自定义数据处理插件的基类,提供通用方法"""
-
- @abstractmethod
- def can_handle(self, zip_path: Path, folder_name: str) -> bool:
- """检查是否可以处理指定的数据文件夹"""
- pass
-
- @abstractmethod
- def get_required_columns(self) -> Dict[str, Any]:
- """返回必需的列和它们的类型"""
- pass
- def process_data(self, zip_path: Path, folder_name: str, output_dir: Path) -> Optional[pd.DataFrame]:
- """处理数据的主方法,返回处理后的DataFrame"""
- try:
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
-
- # 提取数据文件
- csv_files = self._extract_files(zip_path, folder_name, temp_path)
- if not csv_files:
- return None
-
- # 处理数据
- df = self._process_extracted_files(csv_files)
- if df is None or df.empty:
- return None
-
- return df
-
- except Exception as e:
- print(f"处理数据时发生错误: {e}")
- import traceback
- traceback.print_exc()
- return None
- def _extract_files(self, zip_path: Path, folder_name: str, temp_path: Path) -> list:
- """从ZIP文件中提取数据文件,不限制文件格式"""
- try:
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- # 获取指定文件夹下的所有文件,不限制格式
- files = [name for name in zip_ref.namelist()
- if name.startswith(f"{folder_name}/")]
-
- if not files:
- print(f"在{folder_name}中未找到数据文件")
- return []
-
- # 解压所有文件
- for file_name in files:
- zip_ref.extract(file_name, temp_path)
-
- # 返回解压后的所有文件路径
- return list(temp_path.glob(f"{folder_name}/*"))
-
- except Exception as e:
- print(f"提取文件时发生错误: {e}")
- return []
- @abstractmethod
- def _process_extracted_files(self, file_paths: list) -> Optional[pd.DataFrame]:
- """处理提取的文件"""
- pass
- def _validate_data(self, df: pd.DataFrame) -> bool:
- """验证数据是否满足要求"""
- required_cols = self.get_required_columns()
-
- # 检查必需列是否存在
- missing_cols = [col for col in required_cols.keys() if col not in df.columns]
- if missing_cols:
- print(f"错误:缺少必需列: {missing_cols}")
- return False
-
- # 检查数据类型
- for col, dtype in required_cols.items():
- try:
- df[col] = df[col].astype(dtype)
- except Exception as e:
- print(f"错误:列 {col} 的数据类型转换失败: {e}")
- return False
-
- return True
- def _preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
- """数据预处理的通用方法"""
- # 移除重复行
- df = df.drop_duplicates()
-
- # 确保时间戳列的精度为3位
- if 'simTime' in df.columns:
- df['simTime'] = df['simTime'].round(3)
-
- # 按时间排序
- if 'simTime' in df.columns:
- df.sort_values('simTime', inplace=True)
-
- return df
|