123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # -*- coding: utf-8 -*-
- # ------- 全局配置 -------
- import sys
- reload(sys)
- sys.setdefaultencoding("utf-8")
- import os
- import time
- import oss2
- import logging
- import json
- import subprocess
- import io
- from utils import json_utils
- path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/'
- path2 = '/mnt/disk001/dcl_data_process/src/python3/pjibot_outdoor/'
- logging.basicConfig(filename=path1 + 'log/xosc-pjibot_delivery.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
- key1 = 'pjibot_delivery/'
- sleep_time = 10 # 每多少秒扫描一次
- error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/xosc-errorBag.json"
- # 1 创建阿里云对象
- auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
- endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
- bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1')
- # ------- 全局配置 -------
- def generate_xosc(parse_prefix, local_parse_dir, local_delete_list):
- try:
- os.chdir(path2)
- command2 = 'python3 jiqiren_outdoor.py {} 0'.format(local_parse_dir[:-1]) # 配送机器人0 巡检机器人1
- logging.info("进入目录 %s 调用命令2: %s", path2, command2)
- process = subprocess.Popen(command2, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = process.communicate() # 等待进程执行完成并获取输出
- if process.returncode == 0:
- logging.info("命令输出: %s", stdout.decode("utf-8"))
- else:
- logging.error("命令执行失败,错误码: %d", process.returncode)
- logging.error("命令错误输出: %s", stderr.decode("utf-8"))
- local_xosc_path2 = local_parse_dir + 'simulation/xosc/openx_outdoor0.xosc'
- bucket.put_object_from_file(parse_prefix + 'scenario.xosc', local_xosc_path2)
- bucket.put_object_from_file(parse_prefix + 'scenario_hmi.xosc', local_xosc_path2)
- logging.info("上传 scenario_hmi.xosc 成功: %s" % str(parse_prefix + 'scenario.xosc'))
-
- # ------- 处理 output.json - 开始 -------
- output_json_path = local_parse_dir + 'simulation/xosc/output.json'
- callback_json_oss_key = parse_prefix+'callback.json'
- callback_json_local = local_parse_dir+'/callback.json'
- try:
- # 1 解析 output.json
- if os.path.exists(output_json_path):
- outputs = json_utils.parse_json_to_string_array(output_json_path)
- # 2 将 output.json 添加到 callback.json 的 check 字段
- bucket.get_object_to_file(callback_json_oss_key, callback_json_local)
- with io.open(callback_json_local, 'r', encoding='utf-8') as f:
- data = json.load(f)
- if 'check' not in data:
- data['check'] = []
- data['check'].extend(outputs)
- json_data = json.dumps(data, ensure_ascii=False, indent=4)
- with io.open(callback_json_local, 'w', encoding='utf-8') as f:
- f.write(unicode(json_data))
- bucket.put_object_from_file(callback_json_oss_key, callback_json_local)
- except Exception as e3:
- # todo 可能没有callback.json,已经处理成 callback_done.json了,暂时不管
- logging.exception("处理 output.json报错: %s" % str(e3))
- pass
- # ------- 处理 output.json - 结束 -------
-
-
- # 处理删除
- local_delete_list.append(local_xosc_path2)
- local_delete_list.append(callback_json_local)
- except Exception as e:
- error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
- error_bag_list.append(parse_prefix)
- json_utils.list_to_json_file(error_bag_list, error_bag_json)
- logging.exception("生成xosc报错: %s" % str(e))
- if __name__ == '__main__':
- while True:
- try:
- logging.info("开始新一轮扫描")
- local_delete_list = []
- oss_delete_list = []
- prefix_list = []
- # 2 获取已经上传完成的所有目录并分组
- for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
- try:
- # 获取合并后的包
- merged_bag_object_key = str(obj1.key)
- if 'data_merge' in str(obj1.key) and str(obj1.key).endswith('.bag'):
- merged_bag_object_key_split = merged_bag_object_key.split('/')
- merged_prefix = '/'.join(merged_bag_object_key_split[:-1])
- parse_prefix = merged_prefix.replace('data_merge', 'data_parse')
- parse_prefix_full = merged_bag_object_key.replace('data_merge', 'data_parse')[:-4] + '/'
- xosc_done = False
- csv1_done = False
- csv2_done = False
- for obj3 in oss2.ObjectIterator(bucket, prefix=str(parse_prefix_full)):
- if '/scenario_hmi.xosc' in str(obj3.key): # 仿真使用的 scenario_hmi.xosc 所以必须有
- xosc_done = True
- if '/objects_pji.csv' in str(obj3.key):
- csv1_done = True
- if '/ego_pji.csv' in str(obj3.key):
- csv2_done = True
- if xosc_done:
- continue
- if not csv1_done:
- continue
- if not csv2_done:
- continue
- error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
- if str(parse_prefix_full) in error_bag_list:
- continue
- logging.info("需要生成 scenario_hmi.xosc: %s" % str(parse_prefix_full))
- local_dir_full = path1 + parse_prefix_full
- if not os.path.exists(local_dir_full):
- os.makedirs(local_dir_full)
- bucket.get_object_to_file(parse_prefix_full + 'objects_pji.csv',local_dir_full + 'objects_pji.csv')
- bucket.get_object_to_file(parse_prefix_full + 'ego_pji.csv', local_dir_full + 'ego_pji.csv')
- generate_xosc(parse_prefix_full, local_dir_full, local_delete_list)
- except Exception as e:
- logging.exception("局部异常处理: %s", str(e))
- # 删除本地临时文件
- if len(local_delete_list) > 0:
- for local_delete in local_delete_list:
- try:
- os.remove(local_delete)
- except Exception as e:
- logging.exception("删除本地临时文件报错: %s" % str(e))
- time.sleep(sleep_time)
- except Exception as e:
- logging.exception("全局异常处理: %s", str(e))
|