123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- LST数据处理系统入口文件
- 支持多种数据格式和插件扩展
- """
- import argparse
- import sys
- import traceback
- from pathlib import Path
- from core.config_manager import load_config, update_config
- from core.optimized_processor import process_lst_data, process_pgvil_data # 新增导入
- from core.plugin_manager import PluginManager
- from core.resource_manager import ResourceManager
- def parse_arguments():
- """解析命令行参数"""
- parser = argparse.ArgumentParser(
- description='数据处理系统,支持多种数据格式和插件扩展'
- )
- # 新增数据类型参数
- parser.add_argument(
- '--data-type',
- type=str,
- choices=['lst', 'pgvil'],
- default='lst',
- help='要处理的数据类型 (lst 或 pgvil)'
- )
- # 定义参数
- parser.add_argument(
- '--zip-path',
- type=Path,
- default=Path('/home/kevin/kevin/zhaoyuan/sqlite3_demo/docker_build/preprocess_run/data/V2I_CSAE53-2020_RedLightViolationW_LST_01-01.zip'),
- help='输入的ZIP数据文件路径'
- )
- parser.add_argument(
- '--trafficlight-json',
- type=Path,
- default='/home/kevin/kevin/zhaoyuan/sqlite3_demo/docker_build/preprocess_run/data/process_20250421_154131.json',
- help='交通信号灯JSON配置文件路径'
- )
- parser.add_argument(
- '--output-dir',
- type=Path,
- default=Path('./output/'),
- help='输出目录的基础路径'
- )
- parser.add_argument(
- '--utm-zone',
- type=int,
- default=51,
- help='UTM坐标系区域 (默认: 51)'
- )
- parser.add_argument(
- '--x-offset',
- type=float,
- default=0.0,
- help='X坐标偏移量'
- )
- parser.add_argument(
- '--y-offset',
- type=float,
- default=0.0,
- help='Y坐标偏移量'
- )
- parser.add_argument(
- '--config',
- type=Path,
- default=Path('config/config.json'),
- help='配置文件路径'
- )
- parser.add_argument(
- '--plugins-dir',
- type=Path,
- default=Path('plugins'),
- help='插件目录路径'
- )
- parser.add_argument(
- '--resources-dir',
- type=Path,
- default=Path('resources'),
- help='资源文件目录路径'
- )
- parser.add_argument(
- '--use-parallel',
- action='store_true',
- help='启用并行处理'
- )
- parser.add_argument(
- '--no-parallel',
- action='store_true',
- help='禁用并行处理'
- )
- parser.add_argument(
- '--max-workers',
- type=int,
- default=None,
- help='并行处理的最大工作线程数'
- )
- parser.add_argument(
- '--batch-size',
- type=int,
- default=10000,
- help='处理大数据集时的批处理大小'
- )
- parser.add_argument(
- '--log-level',
- type=str,
- choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
- default='INFO',
- help='日志级别'
- )
- parser.add_argument(
- '--log-dir',
- type=Path,
- default=Path('logs'),
- help='日志文件目录'
- )
- parser.add_argument(
- '--no-log-file',
- action='store_true',
- help='禁用文件日志'
- )
- return parser.parse_args()
- def setup_config(args):
- """设置配置"""
- # 根据ZIP文件名创建输出目录
- zip_name = args.zip_path.stem
- output_dir = args.output_dir / zip_name
- output_dir.mkdir(parents=True, exist_ok=True)
- print(f"输出目录: {output_dir}")
- # 加载配置
- config = load_config(args.config)
- # 更新配置中的输出目录为包含zip名称的子目录
- config['paths']['output_dir'] = str(output_dir)
- config['paths']['data_dir'] = str(output_dir) # 确保数据也输出到同一目录
- config['paths']['temp_dir'] = str(output_dir) # 确保临时文件也在同一目录
- # 使用命令行参数覆盖配置文件中的设置
- if args.use_parallel and args.no_parallel:
- print("警告: 同时指定了 --use-parallel 和 --no-parallel,将使用 --use-parallel")
- config['processing']['use_parallel'] = True
- elif args.use_parallel:
- config['processing']['use_parallel'] = True
- elif args.no_parallel:
- config['processing']['use_parallel'] = False
- if args.max_workers is not None:
- config['processing']['max_workers'] = args.max_workers
- if args.batch_size != 10000: # 不等于默认值
- config['processing']['batch_size'] = args.batch_size
- # 更新日志配置
- config['logging']['level'] = args.log_level
- config['logging']['log_dir'] = str(args.log_dir)
- config['logging']['log_to_file'] = not args.no_log_file
- # 更新坐标系配置
- config['coordinates']['utm_zone'] = args.utm_zone
- config['coordinates']['x_offset'] = args.x_offset
- config['coordinates']['y_offset'] = args.y_offset
- # 更新路径配置
- config['paths']['plugins_dir'] = str(args.plugins_dir)
- config['paths']['resources_dir'] = str(args.resources_dir)
- # 应用更新后的配置
- update_config(config)
- return output_dir
- def process_plugins(args, output_dir, final_csv_path):
- """处理插件数据"""
- # 初始化插件处理管理器
- plugin_manager = PluginManager(args.plugins_dir)
- resource_manager = ResourceManager(args.resources_dir)
- # 处理自定义数据
- print("处理并合并自定义数据...")
- folders = resource_manager.list_zip_folders(args.zip_path)
- for folder in folders:
- plugin = plugin_manager.get_plugin_for_data(args.zip_path, folder)
- if not plugin:
- print(f"未找到文件夹的插件: {folder}")
- continue
- print(f"使用插件 '{plugin.__name__}' 处理文件夹 '{folder}'")
- plugin_instance = plugin()
- plugin_output = plugin_instance.process_data(
- args.zip_path,
- folder,
- output_dir
- )
- if plugin_output is not None and not plugin_output.empty:
- output_file = output_dir / f"{folder}_processed.csv"
- print(f'插件输出文件: {output_file}')
- plugin_output.to_csv(output_file, index=False)
- if not resource_manager.validate_plugin_output(output_file):
- print(f"警告: 插件输出验证失败: {folder}")
- continue
- # 合并自定义数据与主数据文件
- print(f"合并 {folder} 数据...")
- if resource_manager.merge_plugin_data(
- final_csv_path,
- output_file,
- final_csv_path
- ):
- print(f"成功合并 {folder} 数据")
- else:
- print(f"警告: 合并 {folder} 数据失败")
- else:
- print(f"警告: 插件处理失败: {folder}")
- def main():
- """主函数"""
- args = parse_arguments()
- try:
- # 设置配置
- output_dir = setup_config(args)
- print("开始数据处理流程")
- print(f"从以下位置加载配置: {args.config}")
- # 根据数据类型选择处理流程
- if args.data_type == 'lst':
- final_csv_path = process_lst_data(
- zip_data_path=args.zip_path,
- output_base_dir=output_dir,
- trafficlight_json_path=args.trafficlight_json,
- utm_zone=args.utm_zone,
- x_offset=args.x_offset,
- y_offset=args.y_offset
- )
- elif args.data_type == 'pgvil':
- final_csv_path = process_pgvil_data(
- zip_data_path=args.zip_path,
- output_base_dir=output_dir,
- utm_zone=args.utm_zone,
- x_offset=args.x_offset,
- y_offset=args.y_offset
- )
- else:
- print(f"不支持的数据类型: {args.data_type}")
- sys.exit(1)
- if not final_csv_path:
- print(f"{args.data_type}内置数据处理失败")
- sys.exit(1)
- print(f"\n{args.data_type}内置处理流程成功完成!")
- # 处理插件数据
- process_plugins(args, output_dir, final_csv_path)
- print("LST数据处理成功完成")
- print(f"所有处理结果已保存到: {output_dir}")
- sys.exit(0)
- except Exception as e:
- print(f"\n处理过程中出现错误: {e}")
- traceback.print_exc()
- sys.exit(1)
- if __name__ == "__main__":
- main()
|