xosc-pjibot_delivery.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # -*- coding: utf-8 -*-
  2. # ------- 全局配置 -------
  3. import sys
  4. reload(sys)
  5. sys.setdefaultencoding("utf-8")
  6. import os
  7. import time
  8. import oss2
  9. import logging
  10. import json
  11. import subprocess
  12. import io
  13. from utils import json_utils
  14. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/'
  15. path2 = '/mnt/disk001/dcl_data_process/src/python3/pjibot_outdoor/'
  16. logging.basicConfig(filename=path1 + 'log/xosc-pjibot_delivery.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  17. key1 = 'pjibot_delivery/'
  18. sleep_time = 10 # 每多少秒扫描一次
  19. error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/xosc-errorBag.json"
  20. # 1 创建阿里云对象
  21. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  22. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  23. bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1')
  24. # ------- 全局配置 -------
  25. def generate_xosc(parse_prefix, local_parse_dir, local_delete_list):
  26. try:
  27. os.chdir(path2)
  28. command2 = 'python3 jiqiren_outdoor.py {} 0'.format(local_parse_dir[:-1]) # 配送机器人0 巡检机器人1
  29. logging.info("进入目录 %s 调用命令2: %s", path2, command2)
  30. process = subprocess.Popen(command2, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  31. stdout, stderr = process.communicate() # 等待进程执行完成并获取输出
  32. if process.returncode == 0:
  33. logging.info("命令输出: %s", stdout.decode("utf-8"))
  34. else:
  35. logging.error("命令执行失败,错误码: %d", process.returncode)
  36. logging.error("命令错误输出: %s", stderr.decode("utf-8"))
  37. local_xosc_path2 = local_parse_dir + 'simulation/xosc/openx_outdoor0.xosc'
  38. bucket.put_object_from_file(parse_prefix + 'scenario.xosc', local_xosc_path2)
  39. bucket.put_object_from_file(parse_prefix + 'scenario_hmi.xosc', local_xosc_path2)
  40. logging.info("上传 scenario_hmi.xosc 成功: %s" % str(parse_prefix + 'scenario.xosc'))
  41. # ------- 处理 output.json - 开始 -------
  42. output_json_path = local_parse_dir + 'simulation/xosc/output.json'
  43. callback_json_oss_key = parse_prefix+'callback.json'
  44. callback_json_local = local_parse_dir+'/callback.json'
  45. try:
  46. # 1 解析 output.json
  47. if os.path.exists(output_json_path):
  48. outputs = json_utils.parse_json_to_string_array(output_json_path)
  49. # 2 将 output.json 添加到 callback.json 的 check 字段
  50. bucket.get_object_to_file(callback_json_oss_key, callback_json_local)
  51. with io.open(callback_json_local, 'r', encoding='utf-8') as f:
  52. data = json.load(f)
  53. if 'check' not in data:
  54. data['check'] = []
  55. data['check'].extend(outputs)
  56. json_data = json.dumps(data, ensure_ascii=False, indent=4)
  57. with io.open(callback_json_local, 'w', encoding='utf-8') as f:
  58. f.write(unicode(json_data))
  59. bucket.put_object_from_file(callback_json_oss_key, callback_json_local)
  60. except Exception as e3:
  61. # todo 可能没有callback.json,已经处理成 callback_done.json了,暂时不管
  62. logging.exception("处理 output.json报错: %s" % str(e3))
  63. pass
  64. # ------- 处理 output.json - 结束 -------
  65. # 处理删除
  66. local_delete_list.append(local_xosc_path2)
  67. local_delete_list.append(callback_json_local)
  68. except Exception as e:
  69. error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
  70. error_bag_list.append(parse_prefix)
  71. json_utils.list_to_json_file(error_bag_list, error_bag_json)
  72. logging.exception("生成xosc报错: %s" % str(e))
  73. if __name__ == '__main__':
  74. while True:
  75. try:
  76. logging.info("开始新一轮扫描")
  77. local_delete_list = []
  78. oss_delete_list = []
  79. prefix_list = []
  80. # 2 获取已经上传完成的所有目录并分组
  81. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  82. try:
  83. # 获取合并后的包
  84. merged_bag_object_key = str(obj1.key)
  85. if 'data_merge' in str(obj1.key) and str(obj1.key).endswith('.bag'):
  86. merged_bag_object_key_split = merged_bag_object_key.split('/')
  87. merged_prefix = '/'.join(merged_bag_object_key_split[:-1])
  88. parse_prefix = merged_prefix.replace('data_merge', 'data_parse')
  89. parse_prefix_full = merged_bag_object_key.replace('data_merge', 'data_parse')[:-4] + '/'
  90. xosc_done = False
  91. csv1_done = False
  92. csv2_done = False
  93. for obj3 in oss2.ObjectIterator(bucket, prefix=str(parse_prefix_full)):
  94. if '/scenario_hmi.xosc' in str(obj3.key): # 仿真使用的 scenario_hmi.xosc 所以必须有
  95. xosc_done = True
  96. if '/objects_pji.csv' in str(obj3.key):
  97. csv1_done = True
  98. if '/ego_pji.csv' in str(obj3.key):
  99. csv2_done = True
  100. if xosc_done:
  101. continue
  102. if not csv1_done:
  103. continue
  104. if not csv2_done:
  105. continue
  106. error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
  107. if str(parse_prefix_full) in error_bag_list:
  108. continue
  109. logging.info("需要生成 scenario_hmi.xosc: %s" % str(parse_prefix_full))
  110. local_dir_full = path1 + parse_prefix_full
  111. if not os.path.exists(local_dir_full):
  112. os.makedirs(local_dir_full)
  113. bucket.get_object_to_file(parse_prefix_full + 'objects_pji.csv',local_dir_full + 'objects_pji.csv')
  114. bucket.get_object_to_file(parse_prefix_full + 'ego_pji.csv', local_dir_full + 'ego_pji.csv')
  115. generate_xosc(parse_prefix_full, local_dir_full, local_delete_list)
  116. except Exception as e:
  117. logging.exception("局部异常处理: %s", str(e))
  118. # 删除本地临时文件
  119. if len(local_delete_list) > 0:
  120. for local_delete in local_delete_list:
  121. try:
  122. os.remove(local_delete)
  123. except Exception as e:
  124. logging.exception("删除本地临时文件报错: %s" % str(e))
  125. time.sleep(sleep_time)
  126. except Exception as e:
  127. logging.exception("全局异常处理: %s", str(e))