run.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. LST数据处理系统入口文件
  5. 支持多种数据格式和插件扩展
  6. """
  7. import argparse
  8. import sys
  9. import traceback
  10. from pathlib import Path
  11. from core.config_manager import load_config, update_config
  12. from core.optimized_processor import process_lst_data, process_pgvil_data # 新增导入
  13. from core.plugin_manager import PluginManager
  14. from core.resource_manager import ResourceManager
  15. def parse_arguments():
  16. """解析命令行参数"""
  17. parser = argparse.ArgumentParser(
  18. description='数据处理系统,支持多种数据格式和插件扩展'
  19. )
  20. # 新增数据类型参数
  21. parser.add_argument(
  22. '--data-type',
  23. type=str,
  24. choices=['lst', 'pgvil'],
  25. default='lst',
  26. help='要处理的数据类型 (lst 或 pgvil)'
  27. )
  28. # 定义参数
  29. parser.add_argument(
  30. '--zip-path',
  31. type=Path,
  32. default=Path('/home/server/桌面/XGJ/dataprocess/data_0421/V2V_CSAE53-2020_ForwardCollisionW_LST_01-01.zip'),
  33. # default=Path('/home/server/桌面/XGJ/dataprocess/data_0421/V2I_CSAE53-2020_LeftTurnAssist_LST_01-04.zip'),
  34. # default=Path('/home/server/桌面/XGJ/dataprocess/data_0421/V2I_CSAE53-2020_LeftTurnAssist_LST_01-01.zip'),
  35. help='输入的ZIP数据文件路径'
  36. )
  37. parser.add_argument(
  38. '--trafficlight-json',
  39. type=Path,
  40. default=None,
  41. help='交通信号灯JSON配置文件路径'
  42. )
  43. parser.add_argument(
  44. '--output-dir',
  45. type=Path,
  46. default=Path('./data_zhaoyuan3/'),
  47. help='输出目录的基础路径'
  48. )
  49. parser.add_argument(
  50. '--utm-zone',
  51. type=int,
  52. default=51,
  53. help='UTM坐标系区域 (默认: 51)'
  54. )
  55. parser.add_argument(
  56. '--x-offset',
  57. type=float,
  58. default=0.0,
  59. help='X坐标偏移量'
  60. )
  61. parser.add_argument(
  62. '--y-offset',
  63. type=float,
  64. default=0.0,
  65. help='Y坐标偏移量'
  66. )
  67. parser.add_argument(
  68. '--config',
  69. type=Path,
  70. default=Path('config/config.json'),
  71. help='配置文件路径'
  72. )
  73. parser.add_argument(
  74. '--plugins-dir',
  75. type=Path,
  76. default=Path('plugins'),
  77. help='插件目录路径'
  78. )
  79. parser.add_argument(
  80. '--resources-dir',
  81. type=Path,
  82. default=Path('resources'),
  83. help='资源文件目录路径'
  84. )
  85. parser.add_argument(
  86. '--use-parallel',
  87. action='store_true',
  88. help='启用并行处理'
  89. )
  90. parser.add_argument(
  91. '--no-parallel',
  92. action='store_true',
  93. help='禁用并行处理'
  94. )
  95. parser.add_argument(
  96. '--max-workers',
  97. type=int,
  98. default=None,
  99. help='并行处理的最大工作线程数'
  100. )
  101. parser.add_argument(
  102. '--batch-size',
  103. type=int,
  104. default=10000,
  105. help='处理大数据集时的批处理大小'
  106. )
  107. parser.add_argument(
  108. '--log-level',
  109. type=str,
  110. choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
  111. default='INFO',
  112. help='日志级别'
  113. )
  114. parser.add_argument(
  115. '--log-dir',
  116. type=Path,
  117. default=Path('logs'),
  118. help='日志文件目录'
  119. )
  120. parser.add_argument(
  121. '--no-log-file',
  122. action='store_true',
  123. help='禁用文件日志'
  124. )
  125. return parser.parse_args()
  126. def setup_config(args):
  127. """设置配置"""
  128. # 根据ZIP文件名创建输出目录
  129. zip_name = args.zip_path.stem
  130. output_dir = args.output_dir / zip_name
  131. output_dir.mkdir(parents=True, exist_ok=True)
  132. print(f"输出目录: {output_dir}")
  133. # 加载配置
  134. config = load_config(args.config)
  135. # 更新配置中的输出目录为包含zip名称的子目录
  136. config['paths']['output_dir'] = str(output_dir)
  137. config['paths']['data_dir'] = str(output_dir) # 确保数据也输出到同一目录
  138. config['paths']['temp_dir'] = str(output_dir) # 确保临时文件也在同一目录
  139. # 使用命令行参数覆盖配置文件中的设置
  140. if args.use_parallel and args.no_parallel:
  141. print("警告: 同时指定了 --use-parallel 和 --no-parallel,将使用 --use-parallel")
  142. config['processing']['use_parallel'] = True
  143. elif args.use_parallel:
  144. config['processing']['use_parallel'] = True
  145. elif args.no_parallel:
  146. config['processing']['use_parallel'] = False
  147. if args.max_workers is not None:
  148. config['processing']['max_workers'] = args.max_workers
  149. if args.batch_size != 10000: # 不等于默认值
  150. config['processing']['batch_size'] = args.batch_size
  151. # 更新日志配置
  152. config['logging']['level'] = args.log_level
  153. config['logging']['log_dir'] = str(args.log_dir)
  154. config['logging']['log_to_file'] = not args.no_log_file
  155. # 更新坐标系配置
  156. config['coordinates']['utm_zone'] = args.utm_zone
  157. config['coordinates']['x_offset'] = args.x_offset
  158. config['coordinates']['y_offset'] = args.y_offset
  159. # 更新路径配置
  160. config['paths']['plugins_dir'] = str(args.plugins_dir)
  161. config['paths']['resources_dir'] = str(args.resources_dir)
  162. # 应用更新后的配置
  163. update_config(config)
  164. return output_dir
  165. def process_plugins(args, output_dir, final_csv_path):
  166. """处理插件数据"""
  167. # 初始化插件处理管理器
  168. plugin_manager = PluginManager(args.plugins_dir)
  169. resource_manager = ResourceManager(args.resources_dir)
  170. # 处理自定义数据
  171. print("处理并合并自定义数据...")
  172. folders = resource_manager.list_zip_folders(args.zip_path)
  173. for folder in folders:
  174. plugin = plugin_manager.get_plugin_for_data(args.zip_path, folder)
  175. if not plugin:
  176. print(f"未找到文件夹的插件: {folder}")
  177. continue
  178. print(f"使用插件 '{plugin.__name__}' 处理文件夹 '{folder}'")
  179. plugin_instance = plugin()
  180. plugin_output = plugin_instance.process_data(
  181. args.zip_path,
  182. folder,
  183. output_dir
  184. )
  185. if plugin_output is not None and not plugin_output.empty:
  186. output_file = output_dir / f"{folder}_processed.csv"
  187. print(f'插件输出文件: {output_file}')
  188. plugin_output.to_csv(output_file, index=False)
  189. if not resource_manager.validate_plugin_output(output_file):
  190. print(f"警告: 插件输出验证失败: {folder}")
  191. continue
  192. # 合并自定义数据与主数据文件
  193. print(f"合并 {folder} 数据...")
  194. if resource_manager.merge_plugin_data(
  195. final_csv_path,
  196. output_file,
  197. final_csv_path
  198. ):
  199. print(f"成功合并 {folder} 数据")
  200. else:
  201. print(f"警告: 合并 {folder} 数据失败")
  202. else:
  203. print(f"警告: 插件处理失败: {folder}")
  204. def main():
  205. """主函数"""
  206. args = parse_arguments()
  207. try:
  208. # 设置配置
  209. output_dir = setup_config(args)
  210. print("开始数据处理流程")
  211. print(f"从以下位置加载配置: {args.config}")
  212. # 根据数据类型选择处理流程
  213. if args.data_type == 'lst':
  214. final_csv_path = process_lst_data(
  215. zip_data_path=args.zip_path,
  216. output_base_dir=output_dir,
  217. trafficlight_json_path=args.trafficlight_json,
  218. utm_zone=args.utm_zone,
  219. x_offset=args.x_offset,
  220. y_offset=args.y_offset
  221. )
  222. elif args.data_type == 'pgvil':
  223. final_csv_path = process_pgvil_data(
  224. zip_data_path=args.zip_path,
  225. output_base_dir=output_dir,
  226. utm_zone=args.utm_zone,
  227. x_offset=args.x_offset,
  228. y_offset=args.y_offset
  229. )
  230. else:
  231. print(f"不支持的数据类型: {args.data_type}")
  232. sys.exit(1)
  233. if not final_csv_path:
  234. print(f"{args.data_type}内置数据处理失败")
  235. sys.exit(1)
  236. print(f"\n{args.data_type}内置处理流程成功完成!")
  237. # 处理插件数据
  238. process_plugins(args, output_dir, final_csv_path)
  239. print("LST数据处理成功完成")
  240. print(f"所有处理结果已保存到: {output_dir}")
  241. sys.exit(0)
  242. except Exception as e:
  243. print(f"\n处理过程中出现错误: {e}")
  244. traceback.print_exc()
  245. sys.exit(1)
  246. if __name__ == "__main__":
  247. main()