from pathlib import Path from typing import Dict, Any, List, Callable, Optional, Union, Tuple import pandas as pd import numpy as np import concurrent.futures import time import os import traceback from core.error_handler import ProcessingError, ErrorHandler class DataPipeline: """数据处理管道,提供高效的数据处理流程""" def __init__(self, name: str = "default_pipeline"): self.name = name self.steps = [] print(f"初始化流水线 '{name}'") def add_step(self, name: str, func: Callable) -> 'DataPipeline': """添加处理步骤 Args: name: 步骤名称 func: 处理函数,接收一个DataFrame和可选参数,返回处理后的DataFrame Returns: 当前管道实例,支持链式调用 """ self.steps.append((name, func)) print(f"添加处理步骤: {name}") return self @ErrorHandler.measure_performance def process(self, data: pd.DataFrame, **kwargs) -> pd.DataFrame: """执行所有处理步骤 Args: data: 输入数据 **kwargs: 传递给处理函数的额外参数 Returns: 处理后的数据 """ result = data.copy() for name, func in self.steps: start_time = time.time() print(f"开始执行步骤: {name}") try: # 执行处理步骤 result = func(result, **kwargs) # 记录处理时间和结果信息 elapsed = time.time() - start_time print(f"步骤 {name} 在 {elapsed:.2f} 秒内完成,结果行数: {len(result)}") except Exception as e: print(f"步骤 {name} 失败: {e}") traceback.print_exc() raise ProcessingError(f"处理步骤 {name} 失败: {e}") return result @ErrorHandler.measure_performance def process_batch(self, data: pd.DataFrame, batch_size: int = 10000, **kwargs) -> pd.DataFrame: """分批处理数据 Args: data: 输入数据 batch_size: 批次大小 **kwargs: 传递给处理函数的额外参数 Returns: 处理后的数据 """ if len(data) <= batch_size: return self.process(data, **kwargs) print(f"开始分批处理数据,总行数: {len(data)},批次大小: {batch_size}") # 计算批次数 num_batches = (len(data) + batch_size - 1) // batch_size results = [] for i in range(num_batches): start_idx = i * batch_size end_idx = min((i + 1) * batch_size, len(data)) batch = data.iloc[start_idx:end_idx].copy() print(f"处理批次 {i+1}/{num_batches},行数: {len(batch)}") batch_result = self.process(batch, **kwargs) results.append(batch_result) # 合并结果 result = pd.concat(results, ignore_index=True) print(f"分批处理完成,总行数: {len(result)}") return result @ErrorHandler.measure_performance def process_files_parallel(self, file_paths: List[Path], read_func: Callable[[Path], pd.DataFrame], write_func: Callable[[pd.DataFrame, Path], None], output_dir: Path, max_workers: Optional[int] = None, **kwargs) -> List[Path]: """并行处理多个文件 Args: file_paths: 输入文件路径列表 read_func: 读取文件的函数,接收文件路径,返回DataFrame write_func: 写入文件的函数,接收DataFrame和输出路径 output_dir: 输出目录 max_workers: 最大工作线程数 **kwargs: 传递给处理函数的额外参数 Returns: 处理后的文件路径列表 """ if max_workers is None or max_workers <= 1 or len(file_paths) <= 1: return self.process_files_sequential(file_paths, read_func, write_func, output_dir, **kwargs) print(f"开始并行处理{len(file_paths)}个文件") output_paths = [] # 确保输出目录存在 output_dir.mkdir(parents=True, exist_ok=True) # 定义处理单个文件的函数 def process_file(file_path): try: # 读取文件 data = read_func(file_path) # 处理数据 processed_data = self.process(data, **kwargs) # 确定输出路径 output_path = output_dir / f"processed_{file_path.name}" # 写入结果 write_func(processed_data, output_path) print(f"文件处理完成: {file_path.name} -> {output_path.name}") return output_path except Exception as e: print(f"处理文件失败: {file_path}: {e}") traceback.print_exc() return None # 使用线程池并行处理文件 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(process_file, file_path): file_path for file_path in file_paths} for future in concurrent.futures.as_completed(futures): file_path = futures[future] try: output_path = future.result() if output_path: output_paths.append(output_path) except Exception as e: print(f"获取处理结果失败: {file_path}: {e}") traceback.print_exc() print(f"并行处理完成,成功处理: {len(output_paths)}/{len(file_paths)}个文件") return output_paths def process_files_sequential(self, file_paths: List[Path], read_func: Callable[[Path], pd.DataFrame], write_func: Callable[[pd.DataFrame, Path], None], output_dir: Path, **kwargs) -> List[Path]: """顺序处理多个文件 Args: file_paths: 输入文件路径列表 read_func: 读取文件的函数,接收文件路径,返回DataFrame write_func: 写入文件的函数,接收DataFrame和输出路径 output_dir: 输出目录 **kwargs: 传递给处理函数的额外参数 Returns: 处理后的文件路径列表 """ print(f"开始顺序处理{len(file_paths)}个文件") output_paths = [] # 确保输出目录存在 output_dir.mkdir(parents=True, exist_ok=True) for file_path in file_paths: try: # 读取文件 data = read_func(file_path) # 处理数据 processed_data = self.process(data, **kwargs) # 确定输出路径 output_path = output_dir / f"processed_{file_path.name}" # 写入结果 write_func(processed_data, output_path) print(f"文件处理完成: {file_path.name} -> {output_path.name}") output_paths.append(output_path) except Exception as e: print(f"处理文件失败: {file_path}: {e}") traceback.print_exc() print(f"顺序处理完成,成功处理: {len(output_paths)}/{len(file_paths)}个文件") return output_paths @staticmethod def create_csv_reader(usecols: Optional[List[str]] = None, dtype: Optional[Dict[str, Any]] = None, **pandas_kwargs) -> Callable[[Path], pd.DataFrame]: """创建CSV文件读取函数 Args: usecols: 需要读取的列 dtype: 列的数据类型 **pandas_kwargs: 传递给pd.read_csv的额外参数 Returns: 读取函数 """ def read_csv(file_path: Path) -> pd.DataFrame: return pd.read_csv(file_path, usecols=usecols, dtype=dtype, **pandas_kwargs) return read_csv @staticmethod def create_csv_writer(index: bool = False, **pandas_kwargs) -> Callable[[pd.DataFrame, Path], None]: """创建CSV文件写入函数 Args: index: 是否写入索引 **pandas_kwargs: 传递给pd.to_csv的额外参数 Returns: 写入函数 """ def write_csv(data: pd.DataFrame, file_path: Path) -> None: data.to_csv(file_path, index=index, **pandas_kwargs) return write_csv @staticmethod def create_filter_step(filter_func: Callable[[pd.DataFrame], pd.Series], name: str = "filter_data") -> Tuple[str, Callable]: """创建数据过滤步骤 Args: filter_func: 过滤函数,接收DataFrame返回布尔Series name: 步骤名称 Returns: 处理步骤元组(名称, 函数) """ def filter_step(data: pd.DataFrame, **kwargs) -> pd.DataFrame: mask = filter_func(data) filtered_data = data[mask].copy() print(f"过滤前行数: {len(data)}, 过滤后行数: {len(filtered_data)}, 过滤掉: {len(data) - len(filtered_data)}行") return filtered_data return (name, filter_step) @staticmethod def create_transform_step(transform_func: Callable[[pd.DataFrame], pd.DataFrame], name: str = "transform_data") -> Tuple[str, Callable]: """创建数据转换步骤 Args: transform_func: 转换函数,接收DataFrame返回转换后的DataFrame name: 步骤名称 Returns: 处理步骤元组(名称, 函数) """ def transform_step(data: pd.DataFrame, **kwargs) -> pd.DataFrame: result = transform_func(data) print(f"转换完成: {name}, 输入行数: {len(data)}, 输出行数: {len(result)}") return result return (name, transform_step) @staticmethod def create_column_mapping_step(mapping: Dict[str, str], name: str = "map_columns") -> Tuple[str, Callable]: """创建列映射步骤 Args: mapping: 列名映射字典,键为原列名,值为新列名 name: 步骤名称 Returns: 处理步骤元组(名称, 函数) """ def map_columns(data: pd.DataFrame, **kwargs) -> pd.DataFrame: result = data.rename(columns=mapping) print(f"列映射完成,映射了 {len(mapping)} 列") return result return (name, map_columns)