123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- from pathlib import Path
- from typing import Dict, Any, List, Callable, Optional, Union, Tuple
- import pandas as pd
- import numpy as np
- import concurrent.futures
- import time
- import os
- import traceback
- from core.error_handler import ProcessingError, ErrorHandler
- class DataPipeline:
- """数据处理管道,提供高效的数据处理流程"""
-
- def __init__(self, name: str = "default_pipeline"):
- self.name = name
- self.steps = []
- print(f"初始化流水线 '{name}'")
-
- def add_step(self, name: str, func: Callable) -> 'DataPipeline':
- """添加处理步骤
-
- Args:
- name: 步骤名称
- func: 处理函数,接收一个DataFrame和可选参数,返回处理后的DataFrame
-
- Returns:
- 当前管道实例,支持链式调用
- """
- self.steps.append((name, func))
- print(f"添加处理步骤: {name}")
- return self
-
- @ErrorHandler.measure_performance
- def process(self, data: pd.DataFrame, **kwargs) -> pd.DataFrame:
- """执行所有处理步骤
-
- Args:
- data: 输入数据
- **kwargs: 传递给处理函数的额外参数
-
- Returns:
- 处理后的数据
- """
- result = data.copy()
-
- for name, func in self.steps:
- start_time = time.time()
- print(f"开始执行步骤: {name}")
-
- try:
- # 执行处理步骤
- result = func(result, **kwargs)
-
- # 记录处理时间和结果信息
- elapsed = time.time() - start_time
- print(f"步骤 {name} 在 {elapsed:.2f} 秒内完成,结果行数: {len(result)}")
-
- except Exception as e:
- print(f"步骤 {name} 失败: {e}")
- traceback.print_exc()
- raise ProcessingError(f"处理步骤 {name} 失败: {e}")
-
- return result
-
- @ErrorHandler.measure_performance
- def process_batch(self, data: pd.DataFrame, batch_size: int = 10000, **kwargs) -> pd.DataFrame:
- """分批处理数据
-
- Args:
- data: 输入数据
- batch_size: 批次大小
- **kwargs: 传递给处理函数的额外参数
-
- Returns:
- 处理后的数据
- """
- if len(data) <= batch_size:
- return self.process(data, **kwargs)
-
- print(f"开始分批处理数据,总行数: {len(data)},批次大小: {batch_size}")
-
- # 计算批次数
- num_batches = (len(data) + batch_size - 1) // batch_size
- results = []
-
- for i in range(num_batches):
- start_idx = i * batch_size
- end_idx = min((i + 1) * batch_size, len(data))
- batch = data.iloc[start_idx:end_idx].copy()
-
- print(f"处理批次 {i+1}/{num_batches},行数: {len(batch)}")
- batch_result = self.process(batch, **kwargs)
- results.append(batch_result)
-
- # 合并结果
- result = pd.concat(results, ignore_index=True)
- print(f"分批处理完成,总行数: {len(result)}")
-
- return result
-
- @ErrorHandler.measure_performance
- def process_files_parallel(self, file_paths: List[Path], read_func: Callable[[Path], pd.DataFrame],
- write_func: Callable[[pd.DataFrame, Path], None], output_dir: Path,
- max_workers: Optional[int] = None, **kwargs) -> List[Path]:
- """并行处理多个文件
-
- Args:
- file_paths: 输入文件路径列表
- read_func: 读取文件的函数,接收文件路径,返回DataFrame
- write_func: 写入文件的函数,接收DataFrame和输出路径
- output_dir: 输出目录
- max_workers: 最大工作线程数
- **kwargs: 传递给处理函数的额外参数
-
- Returns:
- 处理后的文件路径列表
- """
- if max_workers is None or max_workers <= 1 or len(file_paths) <= 1:
- return self.process_files_sequential(file_paths, read_func, write_func, output_dir, **kwargs)
-
- print(f"开始并行处理{len(file_paths)}个文件")
- output_paths = []
-
- # 确保输出目录存在
- output_dir.mkdir(parents=True, exist_ok=True)
-
- # 定义处理单个文件的函数
- def process_file(file_path):
- try:
- # 读取文件
- data = read_func(file_path)
-
- # 处理数据
- processed_data = self.process(data, **kwargs)
-
- # 确定输出路径
- output_path = output_dir / f"processed_{file_path.name}"
-
- # 写入结果
- write_func(processed_data, output_path)
-
- print(f"文件处理完成: {file_path.name} -> {output_path.name}")
- return output_path
-
- except Exception as e:
- print(f"处理文件失败: {file_path}: {e}")
- traceback.print_exc()
- return None
-
- # 使用线程池并行处理文件
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {executor.submit(process_file, file_path): file_path for file_path in file_paths}
-
- for future in concurrent.futures.as_completed(futures):
- file_path = futures[future]
- try:
- output_path = future.result()
- if output_path:
- output_paths.append(output_path)
- except Exception as e:
- print(f"获取处理结果失败: {file_path}: {e}")
- traceback.print_exc()
-
- print(f"并行处理完成,成功处理: {len(output_paths)}/{len(file_paths)}个文件")
- return output_paths
-
- def process_files_sequential(self, file_paths: List[Path], read_func: Callable[[Path], pd.DataFrame],
- write_func: Callable[[pd.DataFrame, Path], None], output_dir: Path,
- **kwargs) -> List[Path]:
- """顺序处理多个文件
-
- Args:
- file_paths: 输入文件路径列表
- read_func: 读取文件的函数,接收文件路径,返回DataFrame
- write_func: 写入文件的函数,接收DataFrame和输出路径
- output_dir: 输出目录
- **kwargs: 传递给处理函数的额外参数
-
- Returns:
- 处理后的文件路径列表
- """
- print(f"开始顺序处理{len(file_paths)}个文件")
- output_paths = []
-
- # 确保输出目录存在
- output_dir.mkdir(parents=True, exist_ok=True)
-
- for file_path in file_paths:
- try:
- # 读取文件
- data = read_func(file_path)
-
- # 处理数据
- processed_data = self.process(data, **kwargs)
-
- # 确定输出路径
- output_path = output_dir / f"processed_{file_path.name}"
-
- # 写入结果
- write_func(processed_data, output_path)
-
- print(f"文件处理完成: {file_path.name} -> {output_path.name}")
- output_paths.append(output_path)
-
- except Exception as e:
- print(f"处理文件失败: {file_path}: {e}")
- traceback.print_exc()
-
- print(f"顺序处理完成,成功处理: {len(output_paths)}/{len(file_paths)}个文件")
- return output_paths
-
- @staticmethod
- def create_csv_reader(usecols: Optional[List[str]] = None, dtype: Optional[Dict[str, Any]] = None,
- **pandas_kwargs) -> Callable[[Path], pd.DataFrame]:
- """创建CSV文件读取函数
-
- Args:
- usecols: 需要读取的列
- dtype: 列的数据类型
- **pandas_kwargs: 传递给pd.read_csv的额外参数
-
- Returns:
- 读取函数
- """
- def read_csv(file_path: Path) -> pd.DataFrame:
- return pd.read_csv(file_path, usecols=usecols, dtype=dtype, **pandas_kwargs)
- return read_csv
-
- @staticmethod
- def create_csv_writer(index: bool = False, **pandas_kwargs) -> Callable[[pd.DataFrame, Path], None]:
- """创建CSV文件写入函数
-
- Args:
- index: 是否写入索引
- **pandas_kwargs: 传递给pd.to_csv的额外参数
-
- Returns:
- 写入函数
- """
- def write_csv(data: pd.DataFrame, file_path: Path) -> None:
- data.to_csv(file_path, index=index, **pandas_kwargs)
- return write_csv
-
- @staticmethod
- def create_filter_step(filter_func: Callable[[pd.DataFrame], pd.Series], name: str = "filter_data") -> Tuple[str, Callable]:
- """创建数据过滤步骤
-
- Args:
- filter_func: 过滤函数,接收DataFrame返回布尔Series
- name: 步骤名称
-
- Returns:
- 处理步骤元组(名称, 函数)
- """
- def filter_step(data: pd.DataFrame, **kwargs) -> pd.DataFrame:
- mask = filter_func(data)
- filtered_data = data[mask].copy()
- print(f"过滤前行数: {len(data)}, 过滤后行数: {len(filtered_data)}, 过滤掉: {len(data) - len(filtered_data)}行")
- return filtered_data
- return (name, filter_step)
-
- @staticmethod
- def create_transform_step(transform_func: Callable[[pd.DataFrame], pd.DataFrame], name: str = "transform_data") -> Tuple[str, Callable]:
- """创建数据转换步骤
-
- Args:
- transform_func: 转换函数,接收DataFrame返回转换后的DataFrame
- name: 步骤名称
-
- Returns:
- 处理步骤元组(名称, 函数)
- """
- def transform_step(data: pd.DataFrame, **kwargs) -> pd.DataFrame:
- result = transform_func(data)
- print(f"转换完成: {name}, 输入行数: {len(data)}, 输出行数: {len(result)}")
- return result
- return (name, transform_step)
-
- @staticmethod
- def create_column_mapping_step(mapping: Dict[str, str], name: str = "map_columns") -> Tuple[str, Callable]:
- """创建列映射步骤
-
- Args:
- mapping: 列名映射字典,键为原列名,值为新列名
- name: 步骤名称
-
- Returns:
- 处理步骤元组(名称, 函数)
- """
- def map_columns(data: pd.DataFrame, **kwargs) -> pd.DataFrame:
- result = data.rename(columns=mapping)
- print(f"列映射完成,映射了 {len(mapping)} 列")
- return result
- return (name, map_columns)
|