#!/usr/bin/env python # -*- coding: utf-8 -*- """ LST数据处理系统入口文件 支持多种数据格式和插件扩展 """ import argparse import sys import traceback from pathlib import Path from core.config_manager import load_config, update_config from core.optimized_processor import process_lst_data, process_pgvil_data # 新增导入 from core.plugin_manager import PluginManager from core.resource_manager import ResourceManager def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser( description='数据处理系统,支持多种数据格式和插件扩展' ) # 新增数据类型参数 parser.add_argument( '--data-type', type=str, choices=['lst', 'pgvil'], default='lst', help='要处理的数据类型 (lst 或 pgvil)' ) # 定义参数 parser.add_argument( '--zip-path', type=Path, default=Path('V2I_CSAE53-2020_HazardousLocationW_LST_02-01.zip'), help='输入的ZIP数据文件路径' ) parser.add_argument( '--trafficlight-json', type=Path, default=None, help='交通信号灯JSON配置文件路径' ) parser.add_argument( '--output-dir', type=Path, default=Path('output'), help='输出目录的基础路径' ) parser.add_argument( '--utm-zone', type=int, default=51, help='UTM坐标系区域 (默认: 51)' ) parser.add_argument( '--x-offset', type=float, default=0.0, help='X坐标偏移量' ) parser.add_argument( '--y-offset', type=float, default=0.0, help='Y坐标偏移量' ) parser.add_argument( '--config', type=Path, default=Path('config/config.json'), help='配置文件路径' ) parser.add_argument( '--plugins-dir', type=Path, default=Path('plugins'), help='插件目录路径' ) parser.add_argument( '--resources-dir', type=Path, default=Path('resources'), help='资源文件目录路径' ) parser.add_argument( '--use-parallel', action='store_true', help='启用并行处理' ) parser.add_argument( '--no-parallel', action='store_true', help='禁用并行处理' ) parser.add_argument( '--max-workers', type=int, default=None, help='并行处理的最大工作线程数' ) parser.add_argument( '--batch-size', type=int, default=10000, help='处理大数据集时的批处理大小' ) parser.add_argument( '--log-level', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='日志级别' ) parser.add_argument( '--log-dir', type=Path, default=Path('logs'), help='日志文件目录' ) parser.add_argument( '--no-log-file', action='store_true', help='禁用文件日志' ) return parser.parse_args() def setup_config(args): """设置配置""" # 根据ZIP文件名创建输出目录 zip_name = args.zip_path.stem output_dir = args.output_dir / zip_name output_dir.mkdir(parents=True, exist_ok=True) print(f"输出目录: {output_dir}") # 加载配置 config = load_config(args.config) # 更新配置中的输出目录为包含zip名称的子目录 config['paths']['output_dir'] = str(output_dir) config['paths']['data_dir'] = str(output_dir) # 确保数据也输出到同一目录 config['paths']['temp_dir'] = str(output_dir) # 确保临时文件也在同一目录 # 使用命令行参数覆盖配置文件中的设置 if args.use_parallel and args.no_parallel: print("警告: 同时指定了 --use-parallel 和 --no-parallel,将使用 --use-parallel") config['processing']['use_parallel'] = True elif args.use_parallel: config['processing']['use_parallel'] = True elif args.no_parallel: config['processing']['use_parallel'] = False if args.max_workers is not None: config['processing']['max_workers'] = args.max_workers if args.batch_size != 10000: # 不等于默认值 config['processing']['batch_size'] = args.batch_size # 更新日志配置 config['logging']['level'] = args.log_level config['logging']['log_dir'] = str(args.log_dir) config['logging']['log_to_file'] = not args.no_log_file # 更新坐标系配置 config['coordinates']['utm_zone'] = args.utm_zone config['coordinates']['x_offset'] = args.x_offset config['coordinates']['y_offset'] = args.y_offset # 更新路径配置 config['paths']['plugins_dir'] = str(args.plugins_dir) config['paths']['resources_dir'] = str(args.resources_dir) # 应用更新后的配置 update_config(config) return output_dir def process_plugins(args, output_dir, final_csv_path): """处理插件数据""" # 初始化插件处理管理器 plugin_manager = PluginManager(args.plugins_dir) resource_manager = ResourceManager(args.resources_dir) # 处理自定义数据 print("处理并合并自定义数据...") folders = resource_manager.list_zip_folders(args.zip_path) for folder in folders: plugin = plugin_manager.get_plugin_for_data(args.zip_path, folder) if not plugin: print(f"未找到文件夹的插件: {folder}") continue print(f"使用插件 '{plugin.__name__}' 处理文件夹 '{folder}'") plugin_instance = plugin() plugin_output = plugin_instance.process_data( args.zip_path, folder, output_dir ) if plugin_output is not None and not plugin_output.empty: output_file = output_dir / f"{folder}_processed.csv" print(f'插件输出文件: {output_file}') plugin_output.to_csv(output_file, index=False) if not resource_manager.validate_plugin_output(output_file): print(f"警告: 插件输出验证失败: {folder}") continue # 合并自定义数据与主数据文件 print(f"合并 {folder} 数据...") if resource_manager.merge_plugin_data( final_csv_path, output_file, final_csv_path ): print(f"成功合并 {folder} 数据") else: print(f"警告: 合并 {folder} 数据失败") else: print(f"警告: 插件处理失败: {folder}") def main(): """主函数""" args = parse_arguments() try: # 设置配置 output_dir = setup_config(args) print("开始数据处理流程") print(f"从以下位置加载配置: {args.config}") # 根据数据类型选择处理流程 if args.data_type == 'lst': final_csv_path = process_lst_data( zip_data_path=args.zip_path, output_base_dir=output_dir, trafficlight_json_path=args.trafficlight_json, utm_zone=args.utm_zone, x_offset=args.x_offset, y_offset=args.y_offset ) elif args.data_type == 'pgvil': final_csv_path = process_pgvil_data( zip_data_path=args.zip_path, output_base_dir=output_dir, utm_zone=args.utm_zone, x_offset=args.x_offset, y_offset=args.y_offset ) else: print(f"不支持的数据类型: {args.data_type}") sys.exit(1) if not final_csv_path: print(f"{args.data_type}内置数据处理失败") sys.exit(1) print(f"\n{args.data_type}内置处理流程成功完成!") # 处理插件数据 process_plugins(args, output_dir, final_csv_path) print("LST数据处理成功完成") print(f"所有处理结果已保存到: {output_dir}") sys.exit(0) except Exception as e: print(f"\n处理过程中出现错误: {e}") traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()