# -*- coding: utf-8 -*- import os import time import oss2 import logging import json path1 = '/mnt/disk001/cicv-data-closedloop/python2-pjisuv-module/' logging.basicConfig(filename=path1 + 'log/merge.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') key1 = 'pjisuv/' key2 = 'data/' key3 = 'data_merge/' key4 = 'data_parse/' sleep_time = 60 # 每多少秒扫描一次 car_list = ['pjisuv-007', 'pjisuv-012', 'pjisuv-013'] # 判断是否在done_list 中 def is_upload_completed(prefix, done_list): for time in done_list: if time in prefix: return True return True def merge(local_bags1, merged_prefix1, local_merged_dir1, merged_bag_name1): try: all_bag_path1 = local_merged_dir1 + merged_bag_name1 with Bag(all_bag_path1, 'w', compression=compress_way) as o: for i in range(len(local_bags1)): with Bag(local_bags1[i], 'r') as ib: for topic, msg, t in ib: o.write(topic, msg, t) bucket.put_object_from_file(merged_prefix1 + merged_bag_name1, all_bag_path1) logging.info('上传合并包到 %s' % merged_prefix1 + merged_bag_name1) return all_bag_path1 except Exception as e1: logging.exception("bag包合并报错: %s", e1) def remove_element_of_oss_json(oss_json_key, time): temp_file_path_remove = 'remove-node1Done.json' # 1 读取oss的json文件为列表 bucket.get_object_to_file(oss_json_key, temp_file_path_remove) with open(temp_file_path_remove, 'r') as f_remove: remove_node1_done_list = json.load(f_remove) remove_node1_done_list.remove(time) json_str = json.dumps(remove_node1_done_list, indent=4, ensure_ascii=False) bucket.put_object(oss_json_key, json_str.encode('utf-8')) os.remove(temp_file_path_remove) ''' cname:http://open-bucket.oss.icvdc.com keyid:n8glvFGS25MrLY7j secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d 预设OSS路径: oss://open-bucket 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com oss桶名: open-bucket ''' # ------- 获取未合并的bag包,合并 ------- if __name__ == '__main__': # 1 创建阿里云对象 cname = 'http://open-bucket.oss.icvdc.com' bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True) # auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') # endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket = oss2.Bucket(auth, endpoint, 'open-bucket') while True: try: logging.info("开始新一轮扫描") for car in car_list: prefix_list = [] upload_completed_prefix_list_node1 = [] upload_completed_prefix_list_node2 = [] # todo 如何往回更新json需要确认一下,直接更新会和上传程序出现数据竞争 node1_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node1Done.json' node2_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node2Done.json' # 1 判断两个json是否存在 exist = bucket.object_exists(node1_done_json_key) if not exist: logging.info('不存在文件 %s' % node1_done_json_key) time.sleep(2) continue exist = bucket.object_exists(node2_done_json_key) if not exist: logging.info('不存在文件 %s' % node2_done_json_key) time.sleep(2) continue # 2 读取两个json为数组 temp_file_path1 = car + '-node1Done.json' temp_file_path2 = car + '-node2Done.json' try: # node1 bucket.get_object_to_file(node1_done_json_key, temp_file_path1) with open(temp_file_path1, 'r') as f1: node1_done_list = json.load(f1) except Exception as e1: logging.exception("下载json文件【%s】报错: 【%s】", node1_done_json_key, str(e1)) time.sleep(2) continue try: # node2 bucket.get_object_to_file(node2_done_json_key, temp_file_path2) with open(temp_file_path2, 'r') as f2: node2_done_list = json.load(f2) except Exception as e2: logging.exception("下载json文件【%s】报错: 【%s】", node2_done_json_key, str(e2)) time.sleep(2) continue os.remove(temp_file_path1) os.remove(temp_file_path2) logging.info("扫描车辆【%s】的数据", car) for obj1 in oss2.ObjectIterator(bucket, prefix=key1 + car + "/" + key2, delimiter='/'): if str(obj1.key).count('/') == 4: # pjisuv/pjisuv-007/data/node1_2023-12-20-02-16-56_obction_10/ if 'node1' in obj1.key: print(obj1.key) if 'node2' in obj1.key: print(obj1.key) time.sleep(sleep_time) except Exception as e: logging.exception("全局异常处理: %s", str(e))