# 数据预处理系统优化方案 ## 概述 通过对当前数据预处理系统的代码分析,我们发现了多个可以优化的方面,包括性能瓶颈、代码结构、错误处理和文档完善等。本文档提供了一个全面的优化方案,旨在提高系统的性能、可维护性和可扩展性。 ## 1. 性能优化 ### 1.1 数据处理流程优化 #### 当前问题 - `merge_data_process_LST.py` 中的数据处理流程存在多次不必要的数据转换和中间文件生成 - 大量使用循环处理数据,而非利用 pandas 的向量化操作 - 数据库查询未优化,可能导致内存使用过高 #### 优化建议 1. **减少中间文件生成** - 移除 `OUTPUT_CSV_TEMP_OBJSTATE` 等临时文件,直接在内存中处理数据 - 使用 pandas 的 `pipe()` 方法创建数据处理管道,减少中间 DataFrame 的创建 2. **利用 pandas 向量化操作** - 将 `_process_gnss_table` 和 `_process_can_table_optimized` 中的循环替换为向量化操作 - 示例代码: ```python # 替换这样的循环处理: processed_data = [] for row in rows: row_dict = dict(zip(db_columns, row)) record = {} # 处理每行数据... processed_data.append(record) df_final = pd.DataFrame(processed_data) # 使用向量化操作: df = pd.DataFrame(rows, columns=db_columns) df['simTime'] = (df['second'] + df['usecond'] / 1e6).round(2) df['playerId'] = PLAYER_ID_EGO # 其他列的向量化处理... ``` 3. **优化数据库查询** - 使用分批查询处理大型数据库,避免一次性加载所有数据 - 示例代码: ```python def _process_db_file_in_batches(self, db_path, output_dir, table_type, csv_name, batch_size=10000): output_csv_path = output_dir / csv_name with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: # 获取总行数 cursor = conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM {table_type}") total_rows = cursor.fetchone()[0] # 分批处理 with open(output_csv_path, 'w', newline='') as f: writer = None # 将在第一批数据后初始化 for offset in range(0, total_rows, batch_size): query = f"SELECT {', '.join(db_columns)} FROM {table_type} LIMIT {batch_size} OFFSET {offset}" cursor.execute(query) batch_rows = cursor.fetchall() # 处理这一批数据 batch_df = self._process_batch(batch_rows, db_columns) # 写入CSV(第一批包含表头) if offset == 0: batch_df.to_csv(f, index=False) else: batch_df.to_csv(f, index=False, header=False) ``` 4. **并行处理** - 使用 `concurrent.futures` 或 `multiprocessing` 并行处理独立的数据文件 - 示例代码: ```python from concurrent.futures import ProcessPoolExecutor def process_zip_parallel(self): with zipfile.ZipFile(self.config.zip_path, "r") as zip_ref: db_files_to_process = [] # 找出需要处理的文件... with tempfile.TemporaryDirectory() as tmp_dir_str: tmp_dir = Path(tmp_dir_str) # 提取所有文件 for file_info, table_type, csv_name in db_files_to_process: extracted_path = tmp_dir / Path(file_info.filename).name with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target: shutil.copyfileobj(source, target) # 并行处理文件 with ProcessPoolExecutor(max_workers=min(os.cpu_count(), len(db_files_to_process))) as executor: futures = [] for file_info, table_type, csv_name in db_files_to_process: extracted_path = tmp_dir / Path(file_info.filename).name futures.append(executor.submit( self._process_db_file, extracted_path, self.config.output_dir, table_type, csv_name )) # 等待所有任务完成 for future in futures: future.result() ``` ### 1.2 内存优化 #### 当前问题 - 处理大型数据集时可能导致内存溢出 - 多次创建完整的 DataFrame 副本 #### 优化建议 1. **使用迭代器和生成器** - 对于大型文件处理,使用 pandas 的 `chunksize` 参数分块读取 - 示例代码: ```python def merge_large_csv_files(self, file1, file2, output_file, on_columns, chunksize=10000): # 打开输出文件 with open(output_file, 'w', newline='') as f_out: # 读取第一个文件的表头 df1_header = pd.read_csv(file1, nrows=0) df2_header = pd.read_csv(file2, nrows=0) # 创建合并后的表头 merged_columns = list(df1_header.columns) for col in df2_header.columns: if col not in on_columns and col not in merged_columns: merged_columns.append(col) # 写入表头 writer = csv.writer(f_out) writer.writerow(merged_columns) # 分块读取和处理第一个文件 for df1_chunk in pd.read_csv(file1, chunksize=chunksize): # 对于每个块,读取第二个文件并合并 for df2_chunk in pd.read_csv(file2, chunksize=chunksize): merged_chunk = pd.merge(df1_chunk, df2_chunk, on=on_columns, how='outer') # 只写入数据,不写入表头 merged_chunk.to_csv(f_out, mode='a', header=False, index=False) ``` 2. **减少 DataFrame 复制** - 使用 `inplace=True` 参数进行原地操作 - 使用 `.loc` 或 `.iloc` 进行赋值而不是创建新的 DataFrame - 示例代码: ```python # 替换这样的代码: df_new = df[some_condition] df_new['new_column'] = some_value # 使用这样的代码: df.loc[some_condition, 'new_column'] = some_value ``` ## 2. 代码结构优化 ### 2.1 插件系统重构 #### 当前问题 - 插件加载机制不够灵活,无法动态配置 - 插件接口定义较为简单,缺乏高级功能(如进度报告、取消操作等) - 缺少插件版本控制和依赖管理 #### 优化建议 1. **增强插件接口** - 添加进度报告和取消操作的支持 - 添加插件元数据(作者、版本、依赖等) - 示例代码: ```python from abc import ABC, abstractmethod from pathlib import Path import pandas as pd from typing import Dict, Any, Optional, Callable class PluginMetadata: """插件元数据类""" def __init__(self, name: str, version: str, author: str, description: str, dependencies: Dict[str, str] = None): self.name = name self.version = version self.author = author self.description = description self.dependencies = dependencies or {} class CustomDataProcessorPlugin(ABC): """增强的插件接口""" @abstractmethod def get_metadata(self) -> PluginMetadata: """返回插件元数据""" pass @abstractmethod def can_handle(self, zip_path: Path, folder_name: str) -> bool: """检查是否可以处理该数据""" pass @abstractmethod def process_data(self, zip_path: Path, folder_name: str, output_dir: Path, progress_callback: Optional[Callable[[float, str], None]] = None, cancel_check: Optional[Callable[[], bool]] = None) -> Optional[Path]: """处理数据并支持进度报告和取消""" pass @abstractmethod def get_required_columns(self) -> Dict[str, Any]: """返回插件提供的列和类型""" pass def validate_dependencies(self) -> bool: """验证插件依赖是否满足""" metadata = self.get_metadata() for package, version in metadata.dependencies.items(): try: import importlib module = importlib.import_module(package) if hasattr(module, '__version__') and module.__version__ < version: print(f"警告: {package} 版本 {module.__version__} 低于所需的 {version}") return False except ImportError: print(f"错误: 缺少依赖 {package} {version}") return False return True ``` 2. **改进插件管理器** - 支持插件热加载和卸载 - 添加插件配置和优先级管理 - 示例代码: ```python class EnhancedPluginManager: """增强的插件管理器""" def __init__(self, plugin_dir: Path, config_file: Optional[Path] = None): self.plugin_dir = plugin_dir self.config_file = config_file self.plugins: Dict[str, Type[CustomDataProcessorPlugin]] = {} self.plugin_instances: Dict[str, CustomDataProcessorPlugin] = {} self.plugin_configs: Dict[str, Dict[str, Any]] = {} self.plugin_priorities: Dict[str, int] = {} self._load_plugin_configs() self._load_plugins() def _load_plugin_configs(self): """加载插件配置""" if not self.config_file or not self.config_file.exists(): return try: import yaml with open(self.config_file, 'r') as f: config = yaml.safe_load(f) if 'plugins' in config: for plugin_name, plugin_config in config['plugins'].items(): self.plugin_configs[plugin_name] = plugin_config if 'priority' in plugin_config: self.plugin_priorities[plugin_name] = plugin_config['priority'] if 'enabled' in plugin_config and not plugin_config['enabled']: print(f"插件 {plugin_name} 已禁用") except Exception as e: print(f"加载插件配置失败: {e}") def reload_plugins(self): """重新加载所有插件""" self.plugins.clear() self.plugin_instances.clear() self._load_plugins() def get_sorted_plugins(self) -> List[str]: """按优先级返回排序后的插件名称""" return sorted(self.plugins.keys(), key=lambda name: self.plugin_priorities.get(name, 0), reverse=True) ``` ### 2.2 模块化重构 #### 当前问题 - `merge_data_process_LST.py` 文件过大,包含多个职责 - 配置管理分散在多个地方 - 错误处理不一致 #### 优化建议 1. **拆分大型模块** - 将 `ZipCSVProcessor` 拆分为多个专注于单一职责的类 - 创建专门的配置管理模块 - 示例结构: ``` core/ __init__.py config.py # 配置管理 plugin_interface.py plugin_manager.py resource_manager.py processors/ __init__.py base_processor.py # 处理器基类 zip_processor.py # ZIP文件处理 db_processor.py # 数据库处理 gnss_processor.py # GNSS数据处理 can_processor.py # CAN数据处理 merge_processor.py # 数据合并处理 utils/ __init__.py logging_utils.py # 日志工具 error_utils.py # 错误处理工具 projection_utils.py # 坐标投影工具 ``` 2. **统一配置管理** - 创建集中式配置管理类 - 支持从文件、环境变量和命令行加载配置 - 示例代码: ```python import os import argparse import yaml from pathlib import Path from typing import Any, Dict, Optional class ConfigManager: """统一配置管理类""" def __init__(self): self.config: Dict[str, Any] = {} self.config_file: Optional[Path] = None def load_defaults(self): """加载默认配置""" self.config = { 'zip_path': None, 'output_dir': Path('output'), 'utm_zone': 51, 'x_offset': 0.0, 'y_offset': 0.0, 'plugins_dir': Path('plugins'), 'resources_dir': Path('resources'), 'log_level': 'INFO', } def load_from_file(self, config_file: Path): """从YAML文件加载配置""" if not config_file.exists(): print(f"配置文件不存在: {config_file}") return try: with open(config_file, 'r') as f: file_config = yaml.safe_load(f) self.config.update(file_config) self.config_file = config_file print(f"已加载配置文件: {config_file}") except Exception as e: print(f"加载配置文件失败: {e}") def load_from_env(self, prefix: str = 'DATA_PROCESS_'): """从环境变量加载配置""" for key, value in os.environ.items(): if key.startswith(prefix): config_key = key[len(prefix):].lower() self.config[config_key] = value def load_from_args(self, args: argparse.Namespace): """从命令行参数加载配置""" for key, value in vars(args).items(): if value is not None: # 只覆盖非None值 self.config[key] = value def get(self, key: str, default: Any = None) -> Any: """获取配置项""" return self.config.get(key, default) def set(self, key: str, value: Any): """设置配置项""" self.config[key] = value def save(self, file_path: Optional[Path] = None): """保存配置到文件""" save_path = file_path or self.config_file if not save_path: print("未指定配置文件路径") return False try: with open(save_path, 'w') as f: yaml.dump(self.config, f, default_flow_style=False) print(f"配置已保存到: {save_path}") return True except Exception as e: print(f"保存配置失败: {e}") return False ``` ## 3. 错误处理和日志改进 ### 3.1 结构化日志系统 #### 当前问题 - 使用简单的 `print` 语句输出日志,难以过滤和分析 - 缺乏日志级别和上下文信息 - 无法将日志输出到文件或其他目标 #### 优化建议 1. **引入 logging 模块** - 创建结构化的日志系统 - 支持不同的日志级别和格式 - 示例代码: ```python import logging import sys from pathlib import Path from typing import Optional class LoggingManager: """日志管理类""" def __init__(self): self.logger = logging.getLogger('data_processor') self.log_file: Optional[Path] = None self.log_level = logging.INFO self._configure_default_logger() def _configure_default_logger(self): """配置默认日志器""" self.logger.setLevel(logging.DEBUG) # 设置最低级别 # 创建控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(self.log_level) # 设置格式 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter) # 添加处理器 self.logger.addHandler(console_handler) def set_log_level(self, level: str): """设置日志级别""" level_map = { 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL } if level.upper() in level_map: self.log_level = level_map[level.upper()] for handler in self.logger.handlers: if isinstance(handler, logging.StreamHandler) and handler.stream == sys.stdout: handler.setLevel(self.log_level) else: self.logger.warning(f"未知的日志级别: {level},使用默认级别 INFO") def add_file_handler(self, log_file: Path): """添加文件处理器""" try: # 确保目录存在 log_file.parent.mkdir(parents=True, exist_ok=True) # 创建文件处理器 file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.DEBUG) # 文件记录所有级别 # 设置格式 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' ) file_handler.setFormatter(formatter) # 添加处理器 self.logger.addHandler(file_handler) self.log_file = log_file self.logger.info(f"日志文件已设置为: {log_file}") except Exception as e: self.logger.error(f"设置日志文件失败: {e}") def get_logger(self, name: Optional[str] = None): """获取命名日志器""" if name: return logging.getLogger(f"data_processor.{name}") return self.logger ``` 2. **在代码中使用结构化日志** - 替换所有 `print` 语句为适当的日志调用 - 添加上下文信息和错误详情 - 示例代码: ```python # 创建日志管理器实例 logging_manager = LoggingManager() # 在主模块中配置 def configure_logging(args): logging_manager.set_log_level(args.log_level) if args.log_file: logging_manager.add_file_handler(Path(args.log_file)) # 在各个模块中使用 class ZipProcessor: def __init__(self, config): self.config = config self.logger = logging_manager.get_logger('zip_processor') def process_zip(self): self.logger.info(f"开始处理ZIP文件: {self.config.zip_path}") try: # 处理逻辑... self.logger.debug(f"找到 {len(db_files)} 个数据库文件") except zipfile.BadZipFile: self.logger.error(f"无效的ZIP文件: {self.config.zip_path}") except Exception as e: self.logger.exception(f"处理ZIP文件时发生错误: {e}") ``` ### 3.2 增强错误处理 #### 当前问题 - 错误处理不一致,有些地方捕获异常但没有提供足够的上下文 - 缺少错误恢复机制 - 用户无法轻松理解错误原因 #### 优化建议 1. **创建自定义异常类** - 定义特定于应用程序的异常类型 - 提供更多上下文信息 - 示例代码: ```python class DataProcessError(Exception): """数据处理错误的基类""" pass class ConfigError(DataProcessError): """配置错误""" pass class DatabaseError(DataProcessError): """数据库操作错误""" pass class PluginError(DataProcessError): """插件相关错误""" pass class FileProcessError(DataProcessError): """文件处理错误""" def __init__(self, file_path, message, original_error=None): self.file_path = file_path self.original_error = original_error super().__init__(f"处理文件 {file_path} 时出错: {message}") ``` 2. **统一错误处理策略** - 创建错误处理工具类 - 实现错误恢复和重试机制 - 示例代码: ```python import time from functools import wraps from typing import Callable, TypeVar, Any, Optional T = TypeVar('T') class ErrorHandler: """错误处理工具类""" @staticmethod def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (Exception,), logger: Optional[logging.Logger] = None): """重试装饰器""" def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: attempt = 1 current_delay = delay while attempt <= max_attempts: try: return func(*args, **kwargs) except exceptions as e: if logger: logger.warning( f"尝试 {attempt}/{max_attempts} 失败: {e}. " f"{'重试中...' if attempt < max_attempts else '放弃.'}" ) if attempt == max_attempts: raise time.sleep(current_delay) current_delay *= backoff attempt += 1 return wrapper return decorator @staticmethod def safe_operation(default_value: Any = None, logger: Optional[logging.Logger] = None): """安全操作装饰器,出错时返回默认值""" def decorator(func: Callable[..., T]) -> Callable[..., Optional[T]]: @wraps(func) def wrapper(*args, **kwargs) -> Optional[T]: try: return func(*args, **kwargs) except Exception as e: if logger: logger.error(f"操作失败: {e}") return default_value return wrapper return decorator ``` ## 4. 测试和文档完善 ### 4.1 单元测试 #### 当前问题 - 缺少自动化测试 - 难以验证代码更改的影响 #### 优化建议 1. **创建测试框架** - 使用 pytest 或 unittest 创建测试框架 - 为核心功能编写单元测试 - 示例代码: ```python # tests/test_config_manager.py import pytest import tempfile from pathlib import Path import os from core.config import ConfigManager class TestConfigManager: def setup_method(self): self.config_manager = ConfigManager() self.config_manager.load_defaults() def test_defaults(self): assert self.config_manager.get('utm_zone') == 51 assert self.config_manager.get('x_offset') == 0.0 assert self.config_manager.get('output_dir') == Path('output') def test_load_from_file(self): with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: f.write("utm_zone: 52\nx_offset: 1.5\n") temp_path = Path(f.name) try: self.config_manager.load_from_file(temp_path) assert self.config_manager.get('utm_zone') == 52 assert self.config_manager.get('x_offset') == 1.5 # 未覆盖的值应保