# -*- coding: utf-8 -*- import json import time import urllib2 import oss2 from datetime import datetime, timedelta key1 = 'pji/' key2 = 'data/' key3 = 'data_merge/' key4 = 'data_parse/' path1 = '/root/' path2 = 'data/' path3 = 'data_merge/' path4 = 'data_parse/' def add_hour(date_string, hour_number): original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S") new_date = original_date + timedelta(hours=hour_number) return new_date.strftime("%Y-%m-%d-%H-%M-%S") ''' cname:http://open-bucket.oss.icvdc.com 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com oss桶名: open-bucket keyid:n8glvFGS25MrLY7j secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d ''' if __name__ == '__main__': # 1 登录验证 。 auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') # 2 填写自定义域名,例如example.com。获取桶。 # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。 # cname = 'http://open-bucket.oss.icvdc.com' # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True) endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket_name = 'open-bucket' bucket = oss2.Bucket(auth, endpoint, bucket_name) while True: local_delete_list = [] oss_delete_list = [] upload_completed_prefix_list = [] # 4 获取即将被合并的bag目录 for obj1 in oss2.ObjectIterator(bucket, prefix=key1): if 'callback.json' in str(obj1.key): prefix = '/'.join(str(obj1.key).split('/')[:-1]) camera_mp4 = False pcd_depthcamera_mp4 = False pcd_lidar_mp4 = False # print '检测是否文件完整:', prefix for obj2 in oss2.ObjectIterator(bucket, prefix=prefix): if '/camera.mp4' in str(obj2.key): camera_mp4 = True if '/pcd_depthcamera.mp4' in str(obj2.key): pcd_depthcamera_mp4 = True if '/pcd_lidar.mp4' in str(obj2.key): pcd_lidar_mp4 = True if not camera_mp4 or not pcd_depthcamera_mp4 or not pcd_lidar_mp4: continue time.sleep(1) print '发送:', str(obj1.key) # 1 获取json内容 json_content = bucket.get_object(str(obj1.key)).read() # 2 获取token json_object = json.loads(json_content) data1 = { "equipmentNo": json_object['equipmentNo'], "secretKey": json_object['secretKey'] } json_data1 = json.dumps(data1) request1 = urllib2.Request("http://139.9.199.227:30991/device/auth", json_data1, headers={'Content-Type': 'application/json'}) response1 = urllib2.urlopen(request1) result_json1 = response1.read() result_object1 = json.loads(result_json1) access_token = result_object1.get('data').get('accessToken') # 要发送的JSON参数 try: print 'bag文件为:', json_object['rosBagPath'] old_date = json_object['dataName'] data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length equipment_no = json_object['equipmentNo'] old_file_path = json_object['filePath'] old_ros_bag_path = json_object['rosBagPath'] task_id = json_object['taskId'] trigger_id = json_object['triggerId'] except Exception as e: print 'callback报错:%s' % str(e) continue # 将时区统一 new_date = add_hour(old_date, 8) old_delete_list = [] for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path): old_delete_list.append(str(obj_old.key)) if 'callback.json' in str(obj_old.key): bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date).replace('callback.json', 'callback_done.json')) else: bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date)) bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date)) bucket.delete_object(old_ros_bag_path) bucket.batch_delete_objects(old_delete_list) data2 = { "dataName": new_date, "dataSize": data_size, "equipmentNo": equipment_no, "filePath": old_file_path.replace(old_date, new_date), "rosBagPath": old_ros_bag_path.replace(old_date, new_date), "taskId": task_id, "triggerId": trigger_id } json_data2 = json.dumps(data2) request2 = urllib2.Request("http://139.9.199.227:30991/device/data/callback", json_data2, headers={'Content-Type': 'application/json', 'authorization': access_token}) response2 = urllib2.urlopen(request2) result_json2 = response2.read() result_object2 = json.loads(result_json2) time.sleep(2)