# -*- coding: utf-8 -*- # ------- 全局配置 ------- import os import time import oss2 import xml.etree.ElementTree as ET import shutil import docker import logging from utils import json_utils key1 = 'pjibot_patrol/' path1 = '/scenarios4/' container_name = 'vtd4' path2 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/' vehicle_name = 'PuJin_patrol_robot' # 配送 PuJin_distribution 巡检 PuJin_patrol_robot xoscName = 'scenario.xosc' logging.basicConfig(filename=path2 + 'log/simulation-pjibot_patrol.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s') sleep_time = 30 # 每多少秒扫描一次 error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/simulation-errorBag.json" # 1 创建阿里云对象 auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1') def move_xosc_before_simulation(root_path): try: xosc_src = os.path.join(root_path, 'scenario_orig.xosc') xosc_target = os.path.join(root_path, 'openx0.xosc') tree1 = ET.parse(xosc_src) root1 = tree1.getroot() for node0 in root1: if node0.tag == 'RoadNetwork': for node1 in node0: # 打印根元素的标签名 if 'LogicFile' == node1.tag: xodr_src = str(node1.get('filepath')) xodr_target = root_path + xodr_src.rsplit('/', 1)[-1] shutil.copy(xodr_src, xodr_target) node1.set('filepath', xodr_target) if 'SceneGraphFile' == node1.tag: osgb_src = str(node1.get('filepath')) osgb_target = root_path + osgb_src.rsplit('/', 1)[-1] shutil.copy(osgb_src, osgb_target) node1.set('filepath', osgb_target) if node0.tag == 'Entities': for node1 in node0: if node1.get("name") == 'Ego': for node2 in node1: if node2.tag == 'Vehicle': node2.set('name', vehicle_name) tree1.write(xosc_target) # 检查文件是否存在,以避免在文件不存在时引发异常 if os.path.exists(xosc_src): # 删除文件 os.remove(xosc_src) print("文件已删除:", xosc_src) else: print("文件不存在:", xosc_src) except Exception as e: logging.exception("修改xosc报错: %s" % str(e)) def upload_simulation(parse_prefix, mp41): try: # 上传两次吧,对应上 bucket.put_object_from_file(parse_prefix + 'scenario_orig.mp4', mp41) bucket.put_object_from_file(parse_prefix + 'scenario_hmi.mp4', mp41) logging.info('上传仿真视频到 %s' % parse_prefix + 'scenario_orig.mp4') shutil.rmtree(path1) # 仿真完就删除 except Exception as e: logging.exception("上传视频报错 %s" % str(e)) def simulation(parse_prefix, mp41): try: os.system('docker start '+container_name) # 实例化Docker客户端 client = docker.from_env() while True: time.sleep(5) # 获取容器列表 containers = client.containers.list() run = False # 打印容器列表 for container in containers: if container_name == container.name: run = True break if not run: break time.sleep(5) upload_simulation(parse_prefix, mp41) except Exception as e: logging.exception("生成仿真视频报错 %s" % str(e)) def is_upload_completed(bucket, prefix): target_number = str(prefix).split('_')[-1][:-1] count = 0 for obj in oss2.ObjectIterator(bucket, prefix=prefix): if obj.key != prefix: count += 1 return int(count) == int(target_number) # ------- 获取合并之后的bag包,解析出csv ------- if __name__ == '__main__': # 1 创建阿里云对象 auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1') while True: try: logging.info("开始新一轮扫描") oss_delete_list = [] prefix_list = [] # 2 获取已经上传完成的所有目录并分组 for obj1 in oss2.ObjectIterator(bucket, prefix=key1): try: # 获取合并后的包 merged_bag_object_key = str(obj1.key) if 'data_merge' in str(obj1.key) and str(obj1.key).endswith('.bag'): merged_bag_object_key_split = merged_bag_object_key.split('/') merged_prefix = '/'.join(merged_bag_object_key_split[:-1]) parse_prefix = merged_prefix.replace('data_merge', 'data_parse') parse_prefix_full = merged_bag_object_key.replace('data_merge', 'data_parse')[:-4] + '/' xosc_done = False mp4_done = False for obj3 in oss2.ObjectIterator(bucket, prefix=str(parse_prefix_full)): if xoscName in str(obj3.key): xosc_done = True if '/scenario_orig.mp4' in str(obj3.key): mp4_done = True if not xosc_done: continue if mp4_done: continue logging.info("需要生成仿真视频: %s" % str(parse_prefix_full)) parse_prefix_full = str(parse_prefix_full) local_dir_full = path1 + parse_prefix_full if not os.path.exists(local_dir_full): os.makedirs(local_dir_full) # 下载两个csv root_path1 = local_dir_full + 'orig/' if not os.path.exists(root_path1): os.makedirs(root_path1) bucket.get_object_to_file(parse_prefix_full + xoscName, root_path1 + 'scenario_orig.xosc') move_xosc_before_simulation(root_path1) simulation(parse_prefix_full, root_path1 + 'simulation.mp4') except Exception as e: logging.exception("局部异常处理: %s", str(e)) time.sleep(sleep_time) except Exception as e: logging.exception("全局异常处理: %s", str(e))