|
- # -*- coding: utf-8 -*-
- import sys
- reload(sys)
- sys.setdefaultencoding("utf-8")
- import json
- import time
- import urllib2
- import oss2
- import logging
- from datetime import datetime, timedelta
- auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
- endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
- bucket_name = 'pji-bucket1'
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
- path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/'
- path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/callback/'
- logging.basicConfig(filename=path1 + 'log/callback-pjibot_delivery.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
- key1 = 'pjibot_delivery/'
- key2 = 'data/'
- key3 = 'data_merge/'
- key4 = 'data_parse/'
- path2 = 'data/'
- path3 = 'data_merge/'
- path4 = 'data_parse/'
- url1_private = "http://10.14.86.147:9081/device/auth"
- url2_private = "http://10.14.86.147:9081/device/data/callback"
- def add_hour(date_string, hour_number):
- original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
- new_date = original_date + timedelta(hours=hour_number)
- return new_date.strftime("%Y-%m-%d-%H-%M-%S")
- def judge_pcd_xosc(callback_json_oss_key):
- pcd = False
- xosc = False
- try:
- json_content = bucket.get_object(callback_json_oss_key).read()
- json_object = json.loads(json_content)
- if 'check' not in json_object:
- logging.error("Missing 'check' field in %s", callback_json_oss_key)
- return pcd,xosc
- check = json_object['check']
- if '点云缺失' in check:
- pcd = True
- if '不在道路范围' in check:
- xosc = True
- except ValueError as e:
- logging.error("Failed to decode JSON from %s", e)
- except Exception as e:
- logging.error("Error processing %s", e)
- return pcd,xosc
- if __name__ == '__main__':
- while True:
- logging.info("开始新一轮扫描")
- try:
- local_delete_list = []
- oss_delete_list = []
- upload_completed_prefix_list = []
- for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
- try:
- if 'callback.json' in str(obj1.key):
- prefix = '/'.join(str(obj1.key).split('/')[:-1])
- file1 = False
- file2 = False
- file3 = False
- pcd_ok = False
- file5 = False
- xosc_ok = False
- file9 = False
- for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
- if '/callback_done.json' in str(obj2.key):
- file1 = True
- break
- if '/ego_pji.csv' in str(obj2.key):
- file2 = True
- if '/objects_pji.csv' in str(obj2.key):
- file3 = True
- if '/drive.csv' in str(obj2.key):
- file5 = True
- if '/trajectory_pji.csv' in str(obj2.key):
- file9 = True
- if '/pcd_overlook.mp4' in str(obj2.key):
- pcd_ok = True
- if '/scenario_hmi.xosc' in str(obj2.key):
- xosc_ok = True
- if file1 or not file2 or not file3 or not file5 or not file9:
- continue
- if not pcd_ok:
- pcd_ok,temp = judge_pcd_xosc(str(obj1.key))
- if not pcd_ok:
- continue
- if not xosc_ok:
- temp,xosc_ok = judge_pcd_xosc(str(obj1.key))
- if not xosc_ok:
- continue
- time.sleep(1)
- logging.info("发送:%s", prefix)
- json_content = bucket.get_object(str(obj1.key)).read()
- json_object = json.loads(json_content)
- if 'secretKey' not in json_object:
- logging.warning("缺少'secretKey'键")
- data1 = {
- "equipmentNo": json_object['equipmentNo'],
- "secretKey": prefix.split('/')[1].split('-')[1]
- }
- logging.info("数据已处理:%s", data1)
- else:
- data1 = {"equipmentNo": json_object['equipmentNo'],"secretKey": json_object['secretKey']}
- # 如果任何一个键不存在,则记录日志或执行其他错误处理
- json_data1 = json.dumps(data1)
- logging.info("授权接口请求中: %s" % url1_private)
- logging.info("授权发送参数为: %s" % str(data1))
- request1 = urllib2.Request(url1_private, json_data1,headers={'Content-Type': 'application/json'})
- response1 = urllib2.urlopen(request1)
- result_json1 = response1.read()
- result_object1 = json.loads(result_json1)
- logging.info("授权接口请求结果为: %s", result_object1)
- access_token = result_object1.get('data').get('accessToken')
- old_date = json_object['dataName']
- data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
- equipment_no = json_object['equipmentNo']
- old_file_path = json_object['filePath']
- old_ros_bag_path = json_object['rosBagPath']
- task_id = json_object['taskId']
- trigger_id = json_object['triggerId']
- check = []
- # 检查 'check' 键的值是列表还是字符串
- if 'check' in json_object:
- if isinstance(json_object['check'], list):
- # 如果 'check' 是列表,则直接使用它(或者根据需要处理)
- check = json_object['check'] # 如果不需要修改列表,可以直接赋值
- elif isinstance(json_object['check'], basestring):
- # 如果 'check' 是字符串,则将其添加到空列表中
- check = [json_object['check']]
- else:
- # 如果 'check' 不是列表也不是字符串,记录日志或进行其他错误处理
- logging.warning("'check' 的类型不是列表也不是字符串,类型为:%s", type(json_object['check']))
- else:
- # 如果 'check' 键不存在,记录日志或进行其他错误处理
- logging.warning("'check' 键在 json_object 中不存在")
- check_order = ['自车数据缺失', '不在道路范围', '无规划路径', '目标点缺失','点云缺失', '点云丢帧', '解析程序错误', '还原程序错误', '评价程序错误',"正常"]
- check_order_dict = dict((item, idx) for idx, item in enumerate(check_order))
- check = sorted(check, key=lambda x: check_order_dict.get(x, float('inf')))
- if len(check) > 1:
- check = [item for item in check if item != "正常"]
- check = ','.join(check) # 数组元素拼接成字符串序列
- if old_date is None:
- old_date = ''
- # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断)
- # new_date = add_hour(old_date, 8)
- new_date = old_date
- old_delete_list = []
- callback_done_oss_key = ''
- for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
- old_delete_list.append(str(obj_old.key))
- if 'callback.json' in str(obj_old.key):
- callback_done_oss_key = str(obj_old.key).replace(old_date, new_date).replace('callback.json','callback_done.json')
- # todo 时区不变也就不需要移动文件了
- # else:
- # bucket.copy_object(bucket_name, str(obj_old.key),
- # str(obj_old.key).replace(old_date, new_date))
- # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
- # bucket.delete_object(old_ros_bag_path)
- # bucket.batch_delete_objects(old_delete_list)
- if 'userId' in json_object:
- logging.info("json_object 包含 'userId' 字段,值为:", json_object['userId'])
- data2 = {
- 'userId': json_object['userId'],
- "dataName": new_date,
- "dataSize": data_size,
- "equipmentNo": equipment_no,
- "filePath": old_file_path.replace(old_date, new_date),
- "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
- "taskId": task_id,
- "triggerId": trigger_id,
- "check":check
- }
- else:
- logging.info("json_object 不包含 'userId' 字段")
- data2 = {
- "dataName": new_date,
- "dataSize": data_size,
- "equipmentNo": equipment_no,
- "filePath": old_file_path.replace(old_date, new_date),
- "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
- "taskId": task_id,
- "triggerId": trigger_id,
- "check":check
- }
- json_data2 = json.dumps(data2)
- bucket.put_object(callback_done_oss_key, unicode(json_data2))
- logging.info("回调接口请求中:%s" % url2_private)
- logging.info("回调接口发送参数为: %s" % str(data2))
- request2 = urllib2.Request(url2_private, json_data2,headers={'Content-Type': 'application/json','authorization': access_token})
- response2 = urllib2.urlopen(request2)
- result_json2 = response2.read()
- result_object2 = json.loads(result_json2)
- logging.info("回调接口请求结果为: %s", result_object2)
- except Exception as e:
- logging.exception("局部异常处理: %s" % str(e))
- continue
- time.sleep(10)
- except Exception as e:
- logging.exception("全局错误处理: %s" % str(e))
|