123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- # -*- coding: utf-8 -*-
- import os
- import time
- import oss2
- import logging
- import json
- path1 = '/mnt/disk001/cicv-data-closedloop/python2-pjisuv-module/'
- logging.basicConfig(filename=path1 + 'log/merge.log', level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s')
- key1 = 'pjisuv/'
- key2 = 'data/'
- key3 = 'data_merge/'
- key4 = 'data_parse/'
- sleep_time = 60 # 每多少秒扫描一次
- car_list = ['pjisuv-007', 'pjisuv-012', 'pjisuv-013']
- # 判断是否在done_list 中
- def is_upload_completed(prefix, done_list):
- for time in done_list:
- if time in prefix:
- return True
- return True
- def merge(local_bags1, merged_prefix1, local_merged_dir1, merged_bag_name1):
- try:
- all_bag_path1 = local_merged_dir1 + merged_bag_name1
- with Bag(all_bag_path1, 'w', compression=compress_way) as o:
- for i in range(len(local_bags1)):
- with Bag(local_bags1[i], 'r') as ib:
- for topic, msg, t in ib:
- o.write(topic, msg, t)
- bucket.put_object_from_file(merged_prefix1 + merged_bag_name1, all_bag_path1)
- logging.info('上传合并包到 %s' % merged_prefix1 + merged_bag_name1)
- return all_bag_path1
- except Exception as e1:
- logging.exception("bag包合并报错: %s", e1)
- def remove_element_of_oss_json(oss_json_key, time):
- temp_file_path_remove = 'remove-node1Done.json'
- # 1 读取oss的json文件为列表
- bucket.get_object_to_file(oss_json_key, temp_file_path_remove)
- with open(temp_file_path_remove, 'r') as f_remove:
- remove_node1_done_list = json.load(f_remove)
- remove_node1_done_list.remove(time)
- json_str = json.dumps(remove_node1_done_list, indent=4, ensure_ascii=False)
- bucket.put_object(oss_json_key, json_str.encode('utf-8'))
- os.remove(temp_file_path_remove)
- '''
- cname:http://open-bucket.oss.icvdc.com
- keyid:n8glvFGS25MrLY7j
- secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
- 预设OSS路径: oss://open-bucket
- 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
- oss桶名: open-bucket
- '''
- # ------- 获取未合并的bag包,合并 -------
- if __name__ == '__main__':
- # 1 创建阿里云对象
- cname = 'http://open-bucket.oss.icvdc.com'
- bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
- # auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
- # endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
- bucket = oss2.Bucket(auth, endpoint, 'open-bucket')
- while True:
- try:
- logging.info("开始新一轮扫描")
- for car in car_list:
- prefix_list = []
- upload_completed_prefix_list_node1 = []
- upload_completed_prefix_list_node2 = []
- # todo 如何往回更新json需要确认一下,直接更新会和上传程序出现数据竞争
- node1_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node1Done.json'
- node2_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node2Done.json'
- # 1 判断两个json是否存在
- exist = bucket.object_exists(node1_done_json_key)
- if not exist:
- logging.info('不存在文件 %s' % node1_done_json_key)
- time.sleep(2)
- continue
- exist = bucket.object_exists(node2_done_json_key)
- if not exist:
- logging.info('不存在文件 %s' % node2_done_json_key)
- time.sleep(2)
- continue
- # 2 读取两个json为数组
- temp_file_path1 = car + '-node1Done.json'
- temp_file_path2 = car + '-node2Done.json'
- try:
- # node1
- bucket.get_object_to_file(node1_done_json_key, temp_file_path1)
- with open(temp_file_path1, 'r') as f1:
- node1_done_list = json.load(f1)
- except Exception as e1:
- logging.exception("下载json文件【%s】报错: 【%s】", node1_done_json_key, str(e1))
- time.sleep(2)
- continue
- try:
- # node2
- bucket.get_object_to_file(node2_done_json_key, temp_file_path2)
- with open(temp_file_path2, 'r') as f2:
- node2_done_list = json.load(f2)
- except Exception as e2:
- logging.exception("下载json文件【%s】报错: 【%s】", node2_done_json_key, str(e2))
- time.sleep(2)
- continue
- os.remove(temp_file_path1)
- os.remove(temp_file_path2)
- logging.info("扫描车辆【%s】的数据", car)
- for obj1 in oss2.ObjectIterator(bucket, prefix=key1 + car + "/" + key2, delimiter='/'):
- if str(obj1.key).count('/') == 4: # pjisuv/pjisuv-007/data/node1_2023-12-20-02-16-56_obction_10/
- if 'node1' in obj1.key:
- print(obj1.key)
- if 'node2' in obj1.key:
- print(obj1.key)
- time.sleep(sleep_time)
- except Exception as e:
- logging.exception("全局异常处理: %s", str(e))
|