# -*- coding: utf-8 -*- import sys reload(sys) sys.setdefaultencoding("utf-8") import json import time import urllib2 import oss2 import logging from datetime import datetime, timedelta auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket_name = 'pji-bucket1' bucket = oss2.Bucket(auth, endpoint, bucket_name) path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/' path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/callback/' logging.basicConfig(filename=path1 + 'log/callback-pjibot_patrol.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s') key1 = 'pjibot_patrol/' key2 = 'data/' key3 = 'data_merge/' key4 = 'data_parse/' path2 = 'data/' path3 = 'data_merge/' path4 = 'data_parse/' url1_private = "http://10.14.86.147:9081/device/auth" url2_private = "http://10.14.86.147:9081/device/data/callback" def add_hour(date_string, hour_number): original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S") new_date = original_date + timedelta(hours=hour_number) return new_date.strftime("%Y-%m-%d-%H-%M-%S") def judge_pcd_xosc(callback_json_oss_key): pcd = False xosc = False try: json_content = bucket.get_object(callback_json_oss_key).read() json_object = json.loads(json_content) if 'check' not in json_object: logging.error("Missing 'check' field in %s", callback_json_oss_key) return pcd,xosc check = json_object['check'] if '点云缺失' in check: pcd = True if '不在道路范围' in check: xosc = True except ValueError as e: logging.error("Failed to decode JSON from %s", e) except Exception as e: logging.error("Error processing %s", e) return pcd,xosc if __name__ == '__main__': while True: logging.info("开始新一轮扫描") try: local_delete_list = [] oss_delete_list = [] upload_completed_prefix_list = [] for obj1 in oss2.ObjectIterator(bucket, prefix=key1): try: if 'callback.json' in str(obj1.key): prefix = '/'.join(str(obj1.key).split('/')[:-1]) file1 = False file2 = False file3 = False pcd_ok = False file5 = False file7 = False xosc_ok = False file9 = False for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'): if '/callback_done.json' in str(obj2.key): file1 = True break if '/ego_pji.csv' in str(obj2.key): file2 = True if '/objects_pji.csv' in str(obj2.key): file3 = True if '/pcd_overlook.mp4' in str(obj2.key): pcd_ok = True if '/drive.csv' in str(obj2.key): file5 = True if '/scenario_orig.mp4' in str(obj2.key): file7 = True if '/trajectory_pji.csv' in str(obj2.key): file9 = True if file1 or not file2 or not file3 or not file5 or not file7 or not file9: continue if not pcd_ok: pcd_ok,xosc_ok = judge_pcd_xosc(str(obj1.key)) if not pcd_ok: continue if not xosc_ok: continue time.sleep(1) logging.info("发送:%s", prefix) json_content = bucket.get_object(str(obj1.key)).read() json_object = json.loads(json_content) data1 = {"equipmentNo": json_object['equipmentNo'],"secretKey": json_object['secretKey']} json_data1 = json.dumps(data1) logging.info("授权接口请求中: %s" % url1_private) logging.info("授权发送参数为: %s" % str(data1)) request1 = urllib2.Request(url1_private, json_data1,headers={'Content-Type': 'application/json'}) response1 = urllib2.urlopen(request1) result_json1 = response1.read() result_object1 = json.loads(result_json1) logging.info("授权接口请求结果为: %s", result_object1) access_token = result_object1.get('data').get('accessToken') old_date = json_object['dataName'] data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length equipment_no = json_object['equipmentNo'] old_file_path = json_object['filePath'] old_ros_bag_path = json_object['rosBagPath'] task_id = json_object['taskId'] trigger_id = json_object['triggerId'] check = json_object['check'] check_order = ['自车数据缺失', '不在道路范围', '无规划路径', '目标点缺失','点云缺失', '点云丢帧', '解析程序错误', '还原程序错误', '评价程序错误'] check_order_dict = dict((item, idx) for idx, item in enumerate(check_order)) check = sorted(check, key=lambda x: check_order_dict.get(x, float('inf'))) check = ','.join(check) # 数组元素拼接成字符串序列 if old_date is None: old_date = '' # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断) # new_date = add_hour(old_date, 8) new_date = old_date old_delete_list = [] callback_done_oss_key = '' for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path): old_delete_list.append(str(obj_old.key)) if 'callback.json' in str(obj_old.key): callback_done_oss_key = str(obj_old.key).replace(old_date, new_date).replace('callback.json','callback_done.json') # todo 时区不变也就不需要移动文件了 # else: # bucket.copy_object(bucket_name, str(obj_old.key), # str(obj_old.key).replace(old_date, new_date)) # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date)) # bucket.delete_object(old_ros_bag_path) # bucket.batch_delete_objects(old_delete_list) if 'userId' in json_object: logging.info("json_object 包含 'userId' 字段,值为:", json_object['userId']) data2 = { 'userId': json_object['userId'], "dataName": new_date, "dataSize": data_size, "equipmentNo": equipment_no, "filePath": old_file_path.replace(old_date, new_date), "rosBagPath": old_ros_bag_path.replace(old_date, new_date), "taskId": task_id, "triggerId": trigger_id, "check":check } else: logging.info("json_object 不包含 'userId' 字段") data2 = { "dataName": new_date, "dataSize": data_size, "equipmentNo": equipment_no, "filePath": old_file_path.replace(old_date, new_date), "rosBagPath": old_ros_bag_path.replace(old_date, new_date), "taskId": task_id, "triggerId": trigger_id, "check":check } json_data2 = json.dumps(data2) bucket.put_object(callback_done_oss_key, unicode(json_data2)) logging.info("回调接口请求中:%s" % url2_private) logging.info("回调接口发送参数为: %s" % str(data2)) request2 = urllib2.Request(url2_private, json_data2,headers={'Content-Type': 'application/json','authorization': access_token}) response2 = urllib2.urlopen(request2) result_json2 = response2.read() result_object2 = json.loads(result_json2) logging.info("回调接口请求结果为: %s", result_object2) except Exception as e: logging.exception("局部异常处理: %s" % str(e)) continue time.sleep(10) except Exception as e: logging.exception("全局错误处理: %s" % str(e))