通过对当前数据预处理系统的代码分析,我们发现了多个可以优化的方面,包括性能瓶颈、代码结构、错误处理和文档完善等。本文档提供了一个全面的优化方案,旨在提高系统的性能、可维护性和可扩展性。
merge_data_process_LST.py
中的数据处理流程存在多次不必要的数据转换和中间文件生成减少中间文件生成
OUTPUT_CSV_TEMP_OBJSTATE
等临时文件,直接在内存中处理数据pipe()
方法创建数据处理管道,减少中间 DataFrame 的创建利用 pandas 向量化操作
_process_gnss_table
和 _process_can_table_optimized
中的循环替换为向量化操作示例代码:
# 替换这样的循环处理:
processed_data = []
for row in rows:
row_dict = dict(zip(db_columns, row))
record = {}
# 处理每行数据...
processed_data.append(record)
df_final = pd.DataFrame(processed_data)
# 使用向量化操作:
df = pd.DataFrame(rows, columns=db_columns)
df['simTime'] = (df['second'] + df['usecond'] / 1e6).round(2)
df['playerId'] = PLAYER_ID_EGO
# 其他列的向量化处理...
优化数据库查询
示例代码:
def _process_db_file_in_batches(self, db_path, output_dir, table_type, csv_name, batch_size=10000):
output_csv_path = output_dir / csv_name
with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn:
# 获取总行数
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_type}")
total_rows = cursor.fetchone()[0]
# 分批处理
with open(output_csv_path, 'w', newline='') as f:
writer = None # 将在第一批数据后初始化
for offset in range(0, total_rows, batch_size):
query = f"SELECT {', '.join(db_columns)} FROM {table_type} LIMIT {batch_size} OFFSET {offset}"
cursor.execute(query)
batch_rows = cursor.fetchall()
# 处理这一批数据
batch_df = self._process_batch(batch_rows, db_columns)
# 写入CSV(第一批包含表头)
if offset == 0:
batch_df.to_csv(f, index=False)
else:
batch_df.to_csv(f, index=False, header=False)
并行处理
concurrent.futures
或 multiprocessing
并行处理独立的数据文件示例代码:
from concurrent.futures import ProcessPoolExecutor
def process_zip_parallel(self):
with zipfile.ZipFile(self.config.zip_path, "r") as zip_ref:
db_files_to_process = []
# 找出需要处理的文件...
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
# 提取所有文件
for file_info, table_type, csv_name in db_files_to_process:
extracted_path = tmp_dir / Path(file_info.filename).name
with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target:
shutil.copyfileobj(source, target)
# 并行处理文件
with ProcessPoolExecutor(max_workers=min(os.cpu_count(), len(db_files_to_process))) as executor:
futures = []
for file_info, table_type, csv_name in db_files_to_process:
extracted_path = tmp_dir / Path(file_info.filename).name
futures.append(executor.submit(
self._process_db_file, extracted_path, self.config.output_dir, table_type, csv_name
))
# 等待所有任务完成
for future in futures:
future.result()
使用迭代器和生成器
chunksize
参数分块读取示例代码:
def merge_large_csv_files(self, file1, file2, output_file, on_columns, chunksize=10000):
# 打开输出文件
with open(output_file, 'w', newline='') as f_out:
# 读取第一个文件的表头
df1_header = pd.read_csv(file1, nrows=0)
df2_header = pd.read_csv(file2, nrows=0)
# 创建合并后的表头
merged_columns = list(df1_header.columns)
for col in df2_header.columns:
if col not in on_columns and col not in merged_columns:
merged_columns.append(col)
# 写入表头
writer = csv.writer(f_out)
writer.writerow(merged_columns)
# 分块读取和处理第一个文件
for df1_chunk in pd.read_csv(file1, chunksize=chunksize):
# 对于每个块,读取第二个文件并合并
for df2_chunk in pd.read_csv(file2, chunksize=chunksize):
merged_chunk = pd.merge(df1_chunk, df2_chunk, on=on_columns, how='outer')
# 只写入数据,不写入表头
merged_chunk.to_csv(f_out, mode='a', header=False, index=False)
减少 DataFrame 复制
inplace=True
参数进行原地操作.loc
或 .iloc
进行赋值而不是创建新的 DataFrame示例代码:
# 替换这样的代码:
df_new = df[some_condition]
df_new['new_column'] = some_value
# 使用这样的代码:
df.loc[some_condition, 'new_column'] = some_value
增强插件接口
示例代码:
from abc import ABC, abstractmethod
from pathlib import Path
import pandas as pd
from typing import Dict, Any, Optional, Callable
class PluginMetadata:
"""插件元数据类"""
def __init__(self, name: str, version: str, author: str, description: str, dependencies: Dict[str, str] = None):
self.name = name
self.version = version
self.author = author
self.description = description
self.dependencies = dependencies or {}
class CustomDataProcessorPlugin(ABC):
"""增强的插件接口"""
@abstractmethod
def get_metadata(self) -> PluginMetadata:
"""返回插件元数据"""
pass
@abstractmethod
def can_handle(self, zip_path: Path, folder_name: str) -> bool:
"""检查是否可以处理该数据"""
pass
@abstractmethod
def process_data(self, zip_path: Path, folder_name: str, output_dir: Path,
progress_callback: Optional[Callable[[float, str], None]] = None,
cancel_check: Optional[Callable[[], bool]] = None) -> Optional[Path]:
"""处理数据并支持进度报告和取消"""
pass
@abstractmethod
def get_required_columns(self) -> Dict[str, Any]:
"""返回插件提供的列和类型"""
pass
def validate_dependencies(self) -> bool:
"""验证插件依赖是否满足"""
metadata = self.get_metadata()
for package, version in metadata.dependencies.items():
try:
import importlib
module = importlib.import_module(package)
if hasattr(module, '__version__') and module.__version__ < version:
print(f"警告: {package} 版本 {module.__version__} 低于所需的 {version}")
return False
except ImportError:
print(f"错误: 缺少依赖 {package} {version}")
return False
return True
改进插件管理器
示例代码:
class EnhancedPluginManager:
"""增强的插件管理器"""
def __init__(self, plugin_dir: Path, config_file: Optional[Path] = None):
self.plugin_dir = plugin_dir
self.config_file = config_file
self.plugins: Dict[str, Type[CustomDataProcessorPlugin]] = {}
self.plugin_instances: Dict[str, CustomDataProcessorPlugin] = {}
self.plugin_configs: Dict[str, Dict[str, Any]] = {}
self.plugin_priorities: Dict[str, int] = {}
self._load_plugin_configs()
self._load_plugins()
def _load_plugin_configs(self):
"""加载插件配置"""
if not self.config_file or not self.config_file.exists():
return
try:
import yaml
with open(self.config_file, 'r') as f:
config = yaml.safe_load(f)
if 'plugins' in config:
for plugin_name, plugin_config in config['plugins'].items():
self.plugin_configs[plugin_name] = plugin_config
if 'priority' in plugin_config:
self.plugin_priorities[plugin_name] = plugin_config['priority']
if 'enabled' in plugin_config and not plugin_config['enabled']:
print(f"插件 {plugin_name} 已禁用")
except Exception as e:
print(f"加载插件配置失败: {e}")
def reload_plugins(self):
"""重新加载所有插件"""
self.plugins.clear()
self.plugin_instances.clear()
self._load_plugins()
def get_sorted_plugins(self) -> List[str]:
"""按优先级返回排序后的插件名称"""
return sorted(self.plugins.keys(),
key=lambda name: self.plugin_priorities.get(name, 0),
reverse=True)
merge_data_process_LST.py
文件过大,包含多个职责拆分大型模块
ZipCSVProcessor
拆分为多个专注于单一职责的类示例结构:
core/
__init__.py
config.py # 配置管理
plugin_interface.py
plugin_manager.py
resource_manager.py
processors/
__init__.py
base_processor.py # 处理器基类
zip_processor.py # ZIP文件处理
db_processor.py # 数据库处理
gnss_processor.py # GNSS数据处理
can_processor.py # CAN数据处理
merge_processor.py # 数据合并处理
utils/
__init__.py
logging_utils.py # 日志工具
error_utils.py # 错误处理工具
projection_utils.py # 坐标投影工具
统一配置管理
示例代码:
import os
import argparse
import yaml
from pathlib import Path
from typing import Any, Dict, Optional
class ConfigManager:
"""统一配置管理类"""
def __init__(self):
self.config: Dict[str, Any] = {}
self.config_file: Optional[Path] = None
def load_defaults(self):
"""加载默认配置"""
self.config = {
'zip_path': None,
'output_dir': Path('output'),
'utm_zone': 51,
'x_offset': 0.0,
'y_offset': 0.0,
'plugins_dir': Path('plugins'),
'resources_dir': Path('resources'),
'log_level': 'INFO',
}
def load_from_file(self, config_file: Path):
"""从YAML文件加载配置"""
if not config_file.exists():
print(f"配置文件不存在: {config_file}")
return
try:
with open(config_file, 'r') as f:
file_config = yaml.safe_load(f)
self.config.update(file_config)
self.config_file = config_file
print(f"已加载配置文件: {config_file}")
except Exception as e:
print(f"加载配置文件失败: {e}")
def load_from_env(self, prefix: str = 'DATA_PROCESS_'):
"""从环境变量加载配置"""
for key, value in os.environ.items():
if key.startswith(prefix):
config_key = key[len(prefix):].lower()
self.config[config_key] = value
def load_from_args(self, args: argparse.Namespace):
"""从命令行参数加载配置"""
for key, value in vars(args).items():
if value is not None: # 只覆盖非None值
self.config[key] = value
def get(self, key: str, default: Any = None) -> Any:
"""获取配置项"""
return self.config.get(key, default)
def set(self, key: str, value: Any):
"""设置配置项"""
self.config[key] = value
def save(self, file_path: Optional[Path] = None):
"""保存配置到文件"""
save_path = file_path or self.config_file
if not save_path:
print("未指定配置文件路径")
return False
try:
with open(save_path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False)
print(f"配置已保存到: {save_path}")
return True
except Exception as e:
print(f"保存配置失败: {e}")
return False
print
语句输出日志,难以过滤和分析引入 logging 模块
示例代码:
import logging
import sys
from pathlib import Path
from typing import Optional
class LoggingManager:
"""日志管理类"""
def __init__(self):
self.logger = logging.getLogger('data_processor')
self.log_file: Optional[Path] = None
self.log_level = logging.INFO
self._configure_default_logger()
def _configure_default_logger(self):
"""配置默认日志器"""
self.logger.setLevel(logging.DEBUG) # 设置最低级别
# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(self.log_level)
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
# 添加处理器
self.logger.addHandler(console_handler)
def set_log_level(self, level: str):
"""设置日志级别"""
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
if level.upper() in level_map:
self.log_level = level_map[level.upper()]
for handler in self.logger.handlers:
if isinstance(handler, logging.StreamHandler) and handler.stream == sys.stdout:
handler.setLevel(self.log_level)
else:
self.logger.warning(f"未知的日志级别: {level},使用默认级别 INFO")
def add_file_handler(self, log_file: Path):
"""添加文件处理器"""
try:
# 确保目录存在
log_file.parent.mkdir(parents=True, exist_ok=True)
# 创建文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # 文件记录所有级别
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(formatter)
# 添加处理器
self.logger.addHandler(file_handler)
self.log_file = log_file
self.logger.info(f"日志文件已设置为: {log_file}")
except Exception as e:
self.logger.error(f"设置日志文件失败: {e}")
def get_logger(self, name: Optional[str] = None):
"""获取命名日志器"""
if name:
return logging.getLogger(f"data_processor.{name}")
return self.logger
在代码中使用结构化日志
print
语句为适当的日志调用示例代码:
# 创建日志管理器实例
logging_manager = LoggingManager()
# 在主模块中配置
def configure_logging(args):
logging_manager.set_log_level(args.log_level)
if args.log_file:
logging_manager.add_file_handler(Path(args.log_file))
# 在各个模块中使用
class ZipProcessor:
def __init__(self, config):
self.config = config
self.logger = logging_manager.get_logger('zip_processor')
def process_zip(self):
self.logger.info(f"开始处理ZIP文件: {self.config.zip_path}")
try:
# 处理逻辑...
self.logger.debug(f"找到 {len(db_files)} 个数据库文件")
except zipfile.BadZipFile:
self.logger.error(f"无效的ZIP文件: {self.config.zip_path}")
except Exception as e:
self.logger.exception(f"处理ZIP文件时发生错误: {e}")
创建自定义异常类
示例代码:
class DataProcessError(Exception):
"""数据处理错误的基类"""
pass
class ConfigError(DataProcessError):
"""配置错误"""
pass
class DatabaseError(DataProcessError):
"""数据库操作错误"""
pass
class PluginError(DataProcessError):
"""插件相关错误"""
pass
class FileProcessError(DataProcessError):
"""文件处理错误"""
def __init__(self, file_path, message, original_error=None):
self.file_path = file_path
self.original_error = original_error
super().__init__(f"处理文件 {file_path} 时出错: {message}")
统一错误处理策略
示例代码:
import time
from functools import wraps
from typing import Callable, TypeVar, Any, Optional
T = TypeVar('T')
class ErrorHandler:
"""错误处理工具类"""
@staticmethod
def retry(max_attempts: int = 3, delay: float = 1.0,
backoff: float = 2.0, exceptions: tuple = (Exception,),
logger: Optional[logging.Logger] = None):
"""重试装饰器"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
attempt = 1
current_delay = delay
while attempt <= max_attempts:
try:
return func(*args, **kwargs)
except exceptions as e:
if logger:
logger.warning(
f"尝试 {attempt}/{max_attempts} 失败: {e}. "
f"{'重试中...' if attempt < max_attempts else '放弃.'}"
)
if attempt == max_attempts:
raise
time.sleep(current_delay)
current_delay *= backoff
attempt += 1
return wrapper
return decorator
@staticmethod
def safe_operation(default_value: Any = None,
logger: Optional[logging.Logger] = None):
"""安全操作装饰器,出错时返回默认值"""
def decorator(func: Callable[..., T]) -> Callable[..., Optional[T]]:
@wraps(func)
def wrapper(*args, **kwargs) -> Optional[T]:
try:
return func(*args, **kwargs)
except Exception as e:
if logger:
logger.error(f"操作失败: {e}")
return default_value
return wrapper
return decorator