123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # -*- coding: utf-8 -*-
- import json
- import time
- import urllib2
- import oss2
- from datetime import datetime, timedelta
- import logging
- path1 = '/mnt/disk001/cicv-data-closedloop/python2-pjisuv-module/'
- logging.basicConfig(filename=path1 + 'log/callback.log', level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s')
- key1 = 'pjisuv/'
- sleep_time = 60 # 每多少秒扫描一次
- url1_public = "http://1.202.169.139:9081/device/auth"
- url1_private = "http://10.14.86.127:9081/device/auth"
- url2_public = " http://1.202.169.139:9081/device/data/callback"
- url2_private = "http://10.14.86.127:9081/device/data/callback"
- def add_hour(date_string, hour_number):
- original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
- new_date = original_date + timedelta(hours=hour_number)
- return new_date.strftime("%Y-%m-%d-%H-%M-%S")
- '''
- cname:http://open-bucket.oss.icvdc.com
- 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
- oss桶名: open-bucket
- keyid:n8glvFGS25MrLY7j
- secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
- '''
- if __name__ == '__main__':
- # 1 登录验证 。
- auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
- # 2 填写自定义域名,例如example.com。获取桶。
- # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
- # cname = 'http://open-bucket.oss.icvdc.com'
- # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
- endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
- bucket_name = 'open-bucket'
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
- while True:
- logging.info("开始新一轮扫描")
- local_delete_list = []
- oss_delete_list = []
- upload_completed_prefix_list = []
- # 4 获取即将被合并的bag目录
- logging.info("开始扫描目录: %s" % str(key1))
- for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
- if 'callback.json' in str(obj1.key):
- time.sleep(1)
- prefix = '/'.join(str(obj1.key).split('/')[:-1])
- pos_orig_csv = False
- pos_hmi_csv = False
- drive_csv = False
- camera_mp4 = False
- pcd_forwardlook_mp4 = False
- pcd_overlook_mp4 = False
- scenario_orig_xosc = False
- scenario_hmi_xosc = False
- scenario_orig_mp4 = False
- scenario_hmi_mp4 = False
- camera_bag = False
- fusion_bag = False
- plan_bag = False
- control_bag = False
- callback_json = False
- for obj2 in oss2.ObjectIterator(bucket, prefix=prefix):
- if 'callback.json' in str(obj2.key):
- callback_json = True
- if 'camera.mp4' in str(obj2.key):
- camera_mp4 = True
- if 'drive.csv' in str(obj2.key):
- drive_csv = True
- if 'pcd_forwardlook.mp4' in str(obj2.key):
- pcd_forwardlook_mp4 = True
- if 'pcd_overlook.mp4' in str(obj2.key):
- pcd_overlook_mp4 = True
- if 'pos_orig.csv' in str(obj2.key):
- pos_orig_csv = True
- if 'scenario_orig.mp4' in str(obj2.key):
- scenario_orig_mp4 = True
- if 'scenario_orig.xosc' in str(obj2.key):
- scenario_orig_xosc = True
- if not callback_json or not camera_mp4 or not drive_csv or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not pos_orig_csv or not scenario_orig_mp4 or not scenario_orig_xosc:
- continue
- time.sleep(2)
- logging.info("发送: %s" % str(obj1.key))
- # 1 获取json内容
- json_content = bucket.get_object(str(obj1.key)).read()
- # 2 获取token
- json_object = json.loads(json_content)
- data1 = {
- "equipmentNo": json_object['equipmentNo'],
- "secretKey": json_object['secretKey']
- }
- json_data1 = json.dumps(data1)
- logging.info("授权接口请求中: %s" % url1_private)
- request1 = urllib2.Request(url1_private, json_data1,
- headers={'Content-Type': 'application/json'})
- response1 = urllib2.urlopen(request1)
- result_json1 = response1.read()
- result_object1 = json.loads(result_json1)
- logging.info("授权接口请求结果为: %s", result_object1)
- try:
- access_token = result_object1.get('data').get('accessToken')
- logging.info("bag文件为: %s" % str(json_object['rosBagPath']))
- old_date = json_object['dataName']
- data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
- equipment_no = json_object['equipmentNo']
- old_file_path = json_object['filePath']
- old_ros_bag_path = json_object['rosBagPath']
- task_id = json_object['taskId']
- trigger_id = json_object['triggerId']
- except Exception as e:
- logging.exception("callback报错: %s" % str(e))
- continue
- # 将时区统一
- new_date = add_hour(old_date, 8)
- old_delete_list = []
- for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
- old_delete_list.append(str(obj_old.key))
- if 'callback.json' in str(obj_old.key):
- bucket.copy_object(bucket_name, str(obj_old.key),
- str(obj_old.key).replace(old_date, new_date).replace('callback.json',
- 'callback_done.json'))
- else:
- bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date))
- bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
- bucket.delete_object(old_ros_bag_path)
- bucket.batch_delete_objects(old_delete_list)
- triggerId = json_object['triggerId']
- data2 = {
- "dataName": new_date,
- "dataSize": data_size,
- "equipmentNo": equipment_no,
- "filePath": old_file_path.replace(old_date, new_date),
- "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
- "taskId": task_id,
- # "triggerId": ["1744180835122155522", "1744178775815360514"]
- "triggerId": triggerId
- }
- json_data2 = json.dumps(data2)
- logging.info("回调接口请求中:%s" % url2_private)
- request2 = urllib2.Request(url2_private, json_data2,
- headers={'Content-Type': 'application/json',
- 'authorization': access_token})
- response2 = urllib2.urlopen(request2)
- result_json2 = response2.read()
- result_object2 = json.loads(result_json2)
- logging.info("回调接口请求结果为: %s", result_object2)
- time.sleep(sleep_time)
|