run.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. LST数据处理系统入口文件
  5. 支持多种数据格式和插件扩展
  6. """
  7. import argparse
  8. import sys
  9. import traceback
  10. from pathlib import Path
  11. from core.config_manager import load_config, update_config
  12. from core.optimized_processor import process_lst_data, process_pgvil_data # 新增导入
  13. from core.plugin_manager import PluginManager
  14. from core.resource_manager import ResourceManager
  15. def parse_arguments():
  16. """解析命令行参数"""
  17. parser = argparse.ArgumentParser(
  18. description='数据处理系统,支持多种数据格式和插件扩展'
  19. )
  20. # 新增数据类型参数
  21. parser.add_argument(
  22. '--data-type',
  23. type=str,
  24. choices=['lst', 'pgvil'],
  25. default='lst',
  26. help='要处理的数据类型 (lst 或 pgvil)'
  27. )
  28. # 定义参数
  29. parser.add_argument(
  30. '--zip-path',
  31. type=Path,
  32. default=Path('/home/kevin/kevin/zhaoyuan/sqlite3_demo/docker_build/preprocess_run/data/V2I_CSAE53-2020_RedLightViolationW_LST_01-01.zip'),
  33. help='输入的ZIP数据文件路径'
  34. )
  35. parser.add_argument(
  36. '--trafficlight-json',
  37. type=Path,
  38. default='/home/kevin/kevin/zhaoyuan/sqlite3_demo/docker_build/preprocess_run/data/process_20250421_154131.json',
  39. help='交通信号灯JSON配置文件路径'
  40. )
  41. parser.add_argument(
  42. '--output-dir',
  43. type=Path,
  44. default=Path('./output/'),
  45. help='输出目录的基础路径'
  46. )
  47. parser.add_argument(
  48. '--utm-zone',
  49. type=int,
  50. default=51,
  51. help='UTM坐标系区域 (默认: 51)'
  52. )
  53. parser.add_argument(
  54. '--x-offset',
  55. type=float,
  56. default=0.0,
  57. help='X坐标偏移量'
  58. )
  59. parser.add_argument(
  60. '--y-offset',
  61. type=float,
  62. default=0.0,
  63. help='Y坐标偏移量'
  64. )
  65. parser.add_argument(
  66. '--config',
  67. type=Path,
  68. default=Path('config/config.json'),
  69. help='配置文件路径'
  70. )
  71. parser.add_argument(
  72. '--plugins-dir',
  73. type=Path,
  74. default=Path('plugins'),
  75. help='插件目录路径'
  76. )
  77. parser.add_argument(
  78. '--resources-dir',
  79. type=Path,
  80. default=Path('resources'),
  81. help='资源文件目录路径'
  82. )
  83. parser.add_argument(
  84. '--use-parallel',
  85. action='store_true',
  86. help='启用并行处理'
  87. )
  88. parser.add_argument(
  89. '--no-parallel',
  90. action='store_true',
  91. help='禁用并行处理'
  92. )
  93. parser.add_argument(
  94. '--max-workers',
  95. type=int,
  96. default=None,
  97. help='并行处理的最大工作线程数'
  98. )
  99. parser.add_argument(
  100. '--batch-size',
  101. type=int,
  102. default=10000,
  103. help='处理大数据集时的批处理大小'
  104. )
  105. parser.add_argument(
  106. '--log-level',
  107. type=str,
  108. choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
  109. default='INFO',
  110. help='日志级别'
  111. )
  112. parser.add_argument(
  113. '--log-dir',
  114. type=Path,
  115. default=Path('logs'),
  116. help='日志文件目录'
  117. )
  118. parser.add_argument(
  119. '--no-log-file',
  120. action='store_true',
  121. help='禁用文件日志'
  122. )
  123. return parser.parse_args()
  124. def setup_config(args):
  125. """设置配置"""
  126. # 根据ZIP文件名创建输出目录
  127. zip_name = args.zip_path.stem
  128. output_dir = args.output_dir / zip_name
  129. output_dir.mkdir(parents=True, exist_ok=True)
  130. print(f"输出目录: {output_dir}")
  131. # 加载配置
  132. config = load_config(args.config)
  133. # 更新配置中的输出目录为包含zip名称的子目录
  134. config['paths']['output_dir'] = str(output_dir)
  135. config['paths']['data_dir'] = str(output_dir) # 确保数据也输出到同一目录
  136. config['paths']['temp_dir'] = str(output_dir) # 确保临时文件也在同一目录
  137. # 使用命令行参数覆盖配置文件中的设置
  138. if args.use_parallel and args.no_parallel:
  139. print("警告: 同时指定了 --use-parallel 和 --no-parallel,将使用 --use-parallel")
  140. config['processing']['use_parallel'] = True
  141. elif args.use_parallel:
  142. config['processing']['use_parallel'] = True
  143. elif args.no_parallel:
  144. config['processing']['use_parallel'] = False
  145. if args.max_workers is not None:
  146. config['processing']['max_workers'] = args.max_workers
  147. if args.batch_size != 10000: # 不等于默认值
  148. config['processing']['batch_size'] = args.batch_size
  149. # 更新日志配置
  150. config['logging']['level'] = args.log_level
  151. config['logging']['log_dir'] = str(args.log_dir)
  152. config['logging']['log_to_file'] = not args.no_log_file
  153. # 更新坐标系配置
  154. config['coordinates']['utm_zone'] = args.utm_zone
  155. config['coordinates']['x_offset'] = args.x_offset
  156. config['coordinates']['y_offset'] = args.y_offset
  157. # 更新路径配置
  158. config['paths']['plugins_dir'] = str(args.plugins_dir)
  159. config['paths']['resources_dir'] = str(args.resources_dir)
  160. # 应用更新后的配置
  161. update_config(config)
  162. return output_dir
  163. def process_plugins(args, output_dir, final_csv_path):
  164. """处理插件数据"""
  165. # 初始化插件处理管理器
  166. plugin_manager = PluginManager(args.plugins_dir)
  167. resource_manager = ResourceManager(args.resources_dir)
  168. # 处理自定义数据
  169. print("处理并合并自定义数据...")
  170. folders = resource_manager.list_zip_folders(args.zip_path)
  171. for folder in folders:
  172. plugin = plugin_manager.get_plugin_for_data(args.zip_path, folder)
  173. if not plugin:
  174. print(f"未找到文件夹的插件: {folder}")
  175. continue
  176. print(f"使用插件 '{plugin.__name__}' 处理文件夹 '{folder}'")
  177. plugin_instance = plugin()
  178. plugin_output = plugin_instance.process_data(
  179. args.zip_path,
  180. folder,
  181. output_dir
  182. )
  183. if plugin_output is not None and not plugin_output.empty:
  184. output_file = output_dir / f"{folder}_processed.csv"
  185. print(f'插件输出文件: {output_file}')
  186. plugin_output.to_csv(output_file, index=False)
  187. if not resource_manager.validate_plugin_output(output_file):
  188. print(f"警告: 插件输出验证失败: {folder}")
  189. continue
  190. # 合并自定义数据与主数据文件
  191. print(f"合并 {folder} 数据...")
  192. if resource_manager.merge_plugin_data(
  193. final_csv_path,
  194. output_file,
  195. final_csv_path
  196. ):
  197. print(f"成功合并 {folder} 数据")
  198. else:
  199. print(f"警告: 合并 {folder} 数据失败")
  200. else:
  201. print(f"警告: 插件处理失败: {folder}")
  202. def main():
  203. """主函数"""
  204. args = parse_arguments()
  205. try:
  206. # 设置配置
  207. output_dir = setup_config(args)
  208. print("开始数据处理流程")
  209. print(f"从以下位置加载配置: {args.config}")
  210. # 根据数据类型选择处理流程
  211. if args.data_type == 'lst':
  212. final_csv_path = process_lst_data(
  213. zip_data_path=args.zip_path,
  214. output_base_dir=output_dir,
  215. trafficlight_json_path=args.trafficlight_json,
  216. utm_zone=args.utm_zone,
  217. x_offset=args.x_offset,
  218. y_offset=args.y_offset
  219. )
  220. elif args.data_type == 'pgvil':
  221. final_csv_path = process_pgvil_data(
  222. zip_data_path=args.zip_path,
  223. output_base_dir=output_dir,
  224. utm_zone=args.utm_zone,
  225. x_offset=args.x_offset,
  226. y_offset=args.y_offset
  227. )
  228. else:
  229. print(f"不支持的数据类型: {args.data_type}")
  230. sys.exit(1)
  231. if not final_csv_path:
  232. print(f"{args.data_type}内置数据处理失败")
  233. sys.exit(1)
  234. print(f"\n{args.data_type}内置处理流程成功完成!")
  235. # 处理插件数据
  236. process_plugins(args, output_dir, final_csv_path)
  237. print("LST数据处理成功完成")
  238. print(f"所有处理结果已保存到: {output_dir}")
  239. sys.exit(0)
  240. except Exception as e:
  241. print(f"\n处理过程中出现错误: {e}")
  242. traceback.print_exc()
  243. sys.exit(1)
  244. if __name__ == "__main__":
  245. main()