optimization_plan.md 24 KB

数据预处理系统优化方案

概述

通过对当前数据预处理系统的代码分析,我们发现了多个可以优化的方面,包括性能瓶颈、代码结构、错误处理和文档完善等。本文档提供了一个全面的优化方案,旨在提高系统的性能、可维护性和可扩展性。

1. 性能优化

1.1 数据处理流程优化

当前问题

  • merge_data_process_LST.py 中的数据处理流程存在多次不必要的数据转换和中间文件生成
  • 大量使用循环处理数据,而非利用 pandas 的向量化操作
  • 数据库查询未优化,可能导致内存使用过高

优化建议

  1. 减少中间文件生成

    • 移除 OUTPUT_CSV_TEMP_OBJSTATE 等临时文件,直接在内存中处理数据
    • 使用 pandas 的 pipe() 方法创建数据处理管道,减少中间 DataFrame 的创建
  2. 利用 pandas 向量化操作

    • _process_gnss_table_process_can_table_optimized 中的循环替换为向量化操作
    • 示例代码:

      # 替换这样的循环处理:
      processed_data = []
      for row in rows:
      row_dict = dict(zip(db_columns, row))
      record = {}
      # 处理每行数据...
      processed_data.append(record)
      df_final = pd.DataFrame(processed_data)
         
      # 使用向量化操作:
      df = pd.DataFrame(rows, columns=db_columns)
      df['simTime'] = (df['second'] + df['usecond'] / 1e6).round(2)
      df['playerId'] = PLAYER_ID_EGO
      # 其他列的向量化处理...
      
  3. 优化数据库查询

    • 使用分批查询处理大型数据库,避免一次性加载所有数据
    • 示例代码:

      def _process_db_file_in_batches(self, db_path, output_dir, table_type, csv_name, batch_size=10000):
      output_csv_path = output_dir / csv_name
      with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn:
         # 获取总行数
         cursor = conn.cursor()
         cursor.execute(f"SELECT COUNT(*) FROM {table_type}")
         total_rows = cursor.fetchone()[0]
                 
         # 分批处理
         with open(output_csv_path, 'w', newline='') as f:
             writer = None  # 将在第一批数据后初始化
                     
             for offset in range(0, total_rows, batch_size):
                 query = f"SELECT {', '.join(db_columns)} FROM {table_type} LIMIT {batch_size} OFFSET {offset}"
                 cursor.execute(query)
                 batch_rows = cursor.fetchall()
                         
                 # 处理这一批数据
                 batch_df = self._process_batch(batch_rows, db_columns)
                         
                 # 写入CSV(第一批包含表头)
                 if offset == 0:
                     batch_df.to_csv(f, index=False)
                 else:
                     batch_df.to_csv(f, index=False, header=False)
      
  4. 并行处理

    • 使用 concurrent.futuresmultiprocessing 并行处理独立的数据文件
    • 示例代码:

      from concurrent.futures import ProcessPoolExecutor
         
      def process_zip_parallel(self):
      with zipfile.ZipFile(self.config.zip_path, "r") as zip_ref:
         db_files_to_process = []
         # 找出需要处理的文件...
                 
         with tempfile.TemporaryDirectory() as tmp_dir_str:
             tmp_dir = Path(tmp_dir_str)
             # 提取所有文件
             for file_info, table_type, csv_name in db_files_to_process:
                 extracted_path = tmp_dir / Path(file_info.filename).name
                 with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target:
                     shutil.copyfileobj(source, target)
                     
             # 并行处理文件
             with ProcessPoolExecutor(max_workers=min(os.cpu_count(), len(db_files_to_process))) as executor:
                 futures = []
                 for file_info, table_type, csv_name in db_files_to_process:
                     extracted_path = tmp_dir / Path(file_info.filename).name
                     futures.append(executor.submit(
                         self._process_db_file, extracted_path, self.config.output_dir, table_type, csv_name
                     ))
                         
                 # 等待所有任务完成
                 for future in futures:
                     future.result()
      

1.2 内存优化

当前问题

  • 处理大型数据集时可能导致内存溢出
  • 多次创建完整的 DataFrame 副本

优化建议

  1. 使用迭代器和生成器

    • 对于大型文件处理,使用 pandas 的 chunksize 参数分块读取
    • 示例代码:

      def merge_large_csv_files(self, file1, file2, output_file, on_columns, chunksize=10000):
      # 打开输出文件
      with open(output_file, 'w', newline='') as f_out:
         # 读取第一个文件的表头
         df1_header = pd.read_csv(file1, nrows=0)
         df2_header = pd.read_csv(file2, nrows=0)
                 
         # 创建合并后的表头
         merged_columns = list(df1_header.columns)
         for col in df2_header.columns:
             if col not in on_columns and col not in merged_columns:
                 merged_columns.append(col)
                 
         # 写入表头
         writer = csv.writer(f_out)
         writer.writerow(merged_columns)
                 
         # 分块读取和处理第一个文件
         for df1_chunk in pd.read_csv(file1, chunksize=chunksize):
             # 对于每个块,读取第二个文件并合并
             for df2_chunk in pd.read_csv(file2, chunksize=chunksize):
                 merged_chunk = pd.merge(df1_chunk, df2_chunk, on=on_columns, how='outer')
                 # 只写入数据,不写入表头
                 merged_chunk.to_csv(f_out, mode='a', header=False, index=False)
      
  2. 减少 DataFrame 复制

    • 使用 inplace=True 参数进行原地操作
    • 使用 .loc.iloc 进行赋值而不是创建新的 DataFrame
    • 示例代码:

      # 替换这样的代码:
      df_new = df[some_condition]
      df_new['new_column'] = some_value
         
      # 使用这样的代码:
      df.loc[some_condition, 'new_column'] = some_value
      

2. 代码结构优化

2.1 插件系统重构

当前问题

  • 插件加载机制不够灵活,无法动态配置
  • 插件接口定义较为简单,缺乏高级功能(如进度报告、取消操作等)
  • 缺少插件版本控制和依赖管理

优化建议

  1. 增强插件接口

    • 添加进度报告和取消操作的支持
    • 添加插件元数据(作者、版本、依赖等)
    • 示例代码:

      from abc import ABC, abstractmethod
      from pathlib import Path
      import pandas as pd
      from typing import Dict, Any, Optional, Callable
         
      class PluginMetadata:
      """插件元数据类"""
      def __init__(self, name: str, version: str, author: str, description: str, dependencies: Dict[str, str] = None):
         self.name = name
         self.version = version
         self.author = author
         self.description = description
         self.dependencies = dependencies or {}
         
      class CustomDataProcessorPlugin(ABC):
      """增强的插件接口"""
             
      @abstractmethod
      def get_metadata(self) -> PluginMetadata:
         """返回插件元数据"""
         pass
             
      @abstractmethod
      def can_handle(self, zip_path: Path, folder_name: str) -> bool:
         """检查是否可以处理该数据"""
         pass
             
      @abstractmethod
      def process_data(self, zip_path: Path, folder_name: str, output_dir: Path, 
                      progress_callback: Optional[Callable[[float, str], None]] = None,
                      cancel_check: Optional[Callable[[], bool]] = None) -> Optional[Path]:
         """处理数据并支持进度报告和取消"""
         pass
             
      @abstractmethod
      def get_required_columns(self) -> Dict[str, Any]:
         """返回插件提供的列和类型"""
         pass
             
      def validate_dependencies(self) -> bool:
         """验证插件依赖是否满足"""
         metadata = self.get_metadata()
         for package, version in metadata.dependencies.items():
             try:
                 import importlib
                 module = importlib.import_module(package)
                 if hasattr(module, '__version__') and module.__version__ < version:
                     print(f"警告: {package} 版本 {module.__version__} 低于所需的 {version}")
                     return False
             except ImportError:
                 print(f"错误: 缺少依赖 {package} {version}")
                 return False
         return True
      
  2. 改进插件管理器

    • 支持插件热加载和卸载
    • 添加插件配置和优先级管理
    • 示例代码:

      class EnhancedPluginManager:
      """增强的插件管理器"""
             
      def __init__(self, plugin_dir: Path, config_file: Optional[Path] = None):
         self.plugin_dir = plugin_dir
         self.config_file = config_file
         self.plugins: Dict[str, Type[CustomDataProcessorPlugin]] = {}
         self.plugin_instances: Dict[str, CustomDataProcessorPlugin] = {}
         self.plugin_configs: Dict[str, Dict[str, Any]] = {}
         self.plugin_priorities: Dict[str, int] = {}
                 
         self._load_plugin_configs()
         self._load_plugins()
             
      def _load_plugin_configs(self):
         """加载插件配置"""
         if not self.config_file or not self.config_file.exists():
             return
                     
         try:
             import yaml
             with open(self.config_file, 'r') as f:
                 config = yaml.safe_load(f)
                         
             if 'plugins' in config:
                 for plugin_name, plugin_config in config['plugins'].items():
                     self.plugin_configs[plugin_name] = plugin_config
                     if 'priority' in plugin_config:
                         self.plugin_priorities[plugin_name] = plugin_config['priority']
                     if 'enabled' in plugin_config and not plugin_config['enabled']:
                         print(f"插件 {plugin_name} 已禁用")
         except Exception as e:
             print(f"加载插件配置失败: {e}")
             
      def reload_plugins(self):
         """重新加载所有插件"""
         self.plugins.clear()
         self.plugin_instances.clear()
         self._load_plugins()
             
      def get_sorted_plugins(self) -> List[str]:
         """按优先级返回排序后的插件名称"""
         return sorted(self.plugins.keys(), 
                       key=lambda name: self.plugin_priorities.get(name, 0),
                       reverse=True)
      

2.2 模块化重构

当前问题

  • merge_data_process_LST.py 文件过大,包含多个职责
  • 配置管理分散在多个地方
  • 错误处理不一致

优化建议

  1. 拆分大型模块

    • ZipCSVProcessor 拆分为多个专注于单一职责的类
    • 创建专门的配置管理模块
    • 示例结构:

      core/
      __init__.py
      config.py          # 配置管理
      plugin_interface.py
      plugin_manager.py
      resource_manager.py
      processors/
      __init__.py
      base_processor.py  # 处理器基类
      zip_processor.py   # ZIP文件处理
      db_processor.py    # 数据库处理
      gnss_processor.py  # GNSS数据处理
      can_processor.py   # CAN数据处理
      merge_processor.py # 数据合并处理
      utils/
      __init__.py
      logging_utils.py   # 日志工具
      error_utils.py     # 错误处理工具
      projection_utils.py # 坐标投影工具
      
  2. 统一配置管理

    • 创建集中式配置管理类
    • 支持从文件、环境变量和命令行加载配置
    • 示例代码:

      import os
      import argparse
      import yaml
      from pathlib import Path
      from typing import Any, Dict, Optional
         
      class ConfigManager:
      """统一配置管理类"""
             
      def __init__(self):
         self.config: Dict[str, Any] = {}
         self.config_file: Optional[Path] = None
             
      def load_defaults(self):
         """加载默认配置"""
         self.config = {
             'zip_path': None,
             'output_dir': Path('output'),
             'utm_zone': 51,
             'x_offset': 0.0,
             'y_offset': 0.0,
             'plugins_dir': Path('plugins'),
             'resources_dir': Path('resources'),
             'log_level': 'INFO',
         }
             
      def load_from_file(self, config_file: Path):
         """从YAML文件加载配置"""
         if not config_file.exists():
             print(f"配置文件不存在: {config_file}")
             return
                     
         try:
             with open(config_file, 'r') as f:
                 file_config = yaml.safe_load(f)
                 self.config.update(file_config)
             self.config_file = config_file
             print(f"已加载配置文件: {config_file}")
         except Exception as e:
             print(f"加载配置文件失败: {e}")
             
      def load_from_env(self, prefix: str = 'DATA_PROCESS_'):
         """从环境变量加载配置"""
         for key, value in os.environ.items():
             if key.startswith(prefix):
                 config_key = key[len(prefix):].lower()
                 self.config[config_key] = value
             
      def load_from_args(self, args: argparse.Namespace):
         """从命令行参数加载配置"""
         for key, value in vars(args).items():
             if value is not None:  # 只覆盖非None值
                 self.config[key] = value
             
      def get(self, key: str, default: Any = None) -> Any:
         """获取配置项"""
         return self.config.get(key, default)
             
      def set(self, key: str, value: Any):
         """设置配置项"""
         self.config[key] = value
             
      def save(self, file_path: Optional[Path] = None):
         """保存配置到文件"""
         save_path = file_path or self.config_file
         if not save_path:
             print("未指定配置文件路径")
             return False
                     
         try:
             with open(save_path, 'w') as f:
                 yaml.dump(self.config, f, default_flow_style=False)
             print(f"配置已保存到: {save_path}")
             return True
         except Exception as e:
             print(f"保存配置失败: {e}")
             return False
      

3. 错误处理和日志改进

3.1 结构化日志系统

当前问题

  • 使用简单的 print 语句输出日志,难以过滤和分析
  • 缺乏日志级别和上下文信息
  • 无法将日志输出到文件或其他目标

优化建议

  1. 引入 logging 模块

    • 创建结构化的日志系统
    • 支持不同的日志级别和格式
    • 示例代码:

      import logging
      import sys
      from pathlib import Path
      from typing import Optional
         
      class LoggingManager:
      """日志管理类"""
             
      def __init__(self):
         self.logger = logging.getLogger('data_processor')
         self.log_file: Optional[Path] = None
         self.log_level = logging.INFO
         self._configure_default_logger()
             
      def _configure_default_logger(self):
         """配置默认日志器"""
         self.logger.setLevel(logging.DEBUG)  # 设置最低级别
                 
         # 创建控制台处理器
         console_handler = logging.StreamHandler(sys.stdout)
         console_handler.setLevel(self.log_level)
                 
         # 设置格式
         formatter = logging.Formatter(
             '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
         )
         console_handler.setFormatter(formatter)
                 
         # 添加处理器
         self.logger.addHandler(console_handler)
             
      def set_log_level(self, level: str):
         """设置日志级别"""
         level_map = {
             'DEBUG': logging.DEBUG,
             'INFO': logging.INFO,
             'WARNING': logging.WARNING,
             'ERROR': logging.ERROR,
             'CRITICAL': logging.CRITICAL
         }
                 
         if level.upper() in level_map:
             self.log_level = level_map[level.upper()]
             for handler in self.logger.handlers:
                 if isinstance(handler, logging.StreamHandler) and handler.stream == sys.stdout:
                     handler.setLevel(self.log_level)
         else:
             self.logger.warning(f"未知的日志级别: {level},使用默认级别 INFO")
             
      def add_file_handler(self, log_file: Path):
         """添加文件处理器"""
         try:
             # 确保目录存在
             log_file.parent.mkdir(parents=True, exist_ok=True)
                     
             # 创建文件处理器
             file_handler = logging.FileHandler(log_file, encoding='utf-8')
             file_handler.setLevel(logging.DEBUG)  # 文件记录所有级别
                     
             # 设置格式
             formatter = logging.Formatter(
                 '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
             )
             file_handler.setFormatter(formatter)
                     
             # 添加处理器
             self.logger.addHandler(file_handler)
             self.log_file = log_file
             self.logger.info(f"日志文件已设置为: {log_file}")
         except Exception as e:
             self.logger.error(f"设置日志文件失败: {e}")
             
      def get_logger(self, name: Optional[str] = None):
         """获取命名日志器"""
         if name:
             return logging.getLogger(f"data_processor.{name}")
         return self.logger
      
  2. 在代码中使用结构化日志

    • 替换所有 print 语句为适当的日志调用
    • 添加上下文信息和错误详情
    • 示例代码:

      # 创建日志管理器实例
      logging_manager = LoggingManager()
         
      # 在主模块中配置
      def configure_logging(args):
      logging_manager.set_log_level(args.log_level)
      if args.log_file:
         logging_manager.add_file_handler(Path(args.log_file))
         
      # 在各个模块中使用
      class ZipProcessor:
      def __init__(self, config):
         self.config = config
         self.logger = logging_manager.get_logger('zip_processor')
             
      def process_zip(self):
         self.logger.info(f"开始处理ZIP文件: {self.config.zip_path}")
         try:
             # 处理逻辑...
             self.logger.debug(f"找到 {len(db_files)} 个数据库文件")
         except zipfile.BadZipFile:
             self.logger.error(f"无效的ZIP文件: {self.config.zip_path}")
         except Exception as e:
             self.logger.exception(f"处理ZIP文件时发生错误: {e}")
      

3.2 增强错误处理

当前问题

  • 错误处理不一致,有些地方捕获异常但没有提供足够的上下文
  • 缺少错误恢复机制
  • 用户无法轻松理解错误原因

优化建议

  1. 创建自定义异常类

    • 定义特定于应用程序的异常类型
    • 提供更多上下文信息
    • 示例代码:

      class DataProcessError(Exception):
      """数据处理错误的基类"""
      pass
         
      class ConfigError(DataProcessError):
      """配置错误"""
      pass
         
      class DatabaseError(DataProcessError):
      """数据库操作错误"""
      pass
         
      class PluginError(DataProcessError):
      """插件相关错误"""
      pass
         
      class FileProcessError(DataProcessError):
      """文件处理错误"""
      def __init__(self, file_path, message, original_error=None):
         self.file_path = file_path
         self.original_error = original_error
         super().__init__(f"处理文件 {file_path} 时出错: {message}")
      
  2. 统一错误处理策略

    • 创建错误处理工具类
    • 实现错误恢复和重试机制
    • 示例代码:

      import time
      from functools import wraps
      from typing import Callable, TypeVar, Any, Optional
         
      T = TypeVar('T')
         
      class ErrorHandler:
      """错误处理工具类"""
             
      @staticmethod
      def retry(max_attempts: int = 3, delay: float = 1.0, 
               backoff: float = 2.0, exceptions: tuple = (Exception,),
               logger: Optional[logging.Logger] = None):
         """重试装饰器"""
         def decorator(func: Callable[..., T]) -> Callable[..., T]:
             @wraps(func)
             def wrapper(*args, **kwargs) -> T:
                 attempt = 1
                 current_delay = delay
                         
                 while attempt <= max_attempts:
                     try:
                         return func(*args, **kwargs)
                     except exceptions as e:
                         if logger:
                             logger.warning(
                                 f"尝试 {attempt}/{max_attempts} 失败: {e}. "
                                 f"{'重试中...' if attempt < max_attempts else '放弃.'}"
                             )
                                 
                         if attempt == max_attempts:
                             raise
                                 
                         time.sleep(current_delay)
                         current_delay *= backoff
                         attempt += 1
                     
             return wrapper
         return decorator
             
      @staticmethod
      def safe_operation(default_value: Any = None, 
                       logger: Optional[logging.Logger] = None):
         """安全操作装饰器,出错时返回默认值"""
         def decorator(func: Callable[..., T]) -> Callable[..., Optional[T]]:
             @wraps(func)
             def wrapper(*args, **kwargs) -> Optional[T]:
                 try:
                     return func(*args, **kwargs)
                 except Exception as e:
                     if logger:
                         logger.error(f"操作失败: {e}")
                     return default_value
                     
             return wrapper
         return decorator
      

4. 测试和文档完善

4.1 单元测试

当前问题

  • 缺少自动化测试
  • 难以验证代码更改的影响

优化建议

  1. 创建测试框架
    • 使用 pytest 或 unittest 创建测试框架
    • 为核心功能编写单元测试
    • 示例代码: ```python # tests/test_config_manager.py import pytest import tempfile from pathlib import Path import os from core.config import ConfigManager
      class TestConfigManager: def setup_method(self): self.config_manager = ConfigManager() self.config_manager.load_defaults()
      def test_defaults(self): assert self.config_manager.get('utm_zone') == 51 assert self.config_manager.get('x_offset') == 0.0 assert self.config_manager.get('output_dir') == Path('output')
      def test_load_from_file(self): with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: f.write("utm_zone: 52\nx_offset: 1.5\n") temp_path = Path(f.name)
      try: self.config_manager.load_from_file(temp_path) assert self.config_manager.get('utm_zone') == 52 assert self.config_manager.get('x_offset') == 1.5 # 未覆盖的值应保