# -*- coding: utf-8 -*- import json import time import urllib2 import oss2 from datetime import datetime, timedelta import logging path1 = '/mnt/disk001/dcl_data_process/src/python2/pjisuv/' logging.basicConfig(filename=path1 + 'log/callback-pjisuv.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') key1 = 'pjisuv/' sleep_time = 60 # 每多少秒扫描一次 url1_private = "http://10.14.86.127:9081/device/auth" url2_private = "http://10.14.86.127:9081/device/data/callback" error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjisuv/callback-errorBag.json" def parse_json_to_string_array(file_path): try: # 打开并读取JSON文件(Python 2中不支持encoding参数,需要使用codecs模块或处理文件读取后的编码) with open(file_path, 'r') as file: # 读取文件内容 file_content = file.read() # 解析JSON内容(Python 2中json.loads用于解析字符串) data = json.loads(file_content.decode('utf-8')) # 假设文件是UTF-8编码,这里需要手动解码 # 检查数据是否是一个列表,并且列表中的元素是否是字符串 if isinstance(data, list): for item in data: if not isinstance(item, basestring): # Python 2中字符串类型包括str和unicode,用basestring检查 raise ValueError("JSON数组中的元素不是字符串") return data else: return [] except Exception as e: return [] def list_to_json_file(data, file_path): """ 将列表转换为JSON格式并写入指定的文件路径。 如果文件已存在,则覆盖它。 参数: data (list): 要转换为JSON的列表。 file_path (str): 要写入JSON数据的文件路径。 """ # 将列表转换为JSON格式的字符串,并确保输出为UTF-8编码的字符串 json_data = json.dumps(data, ensure_ascii=False, indent=4) json_data_utf8 = json_data.encode('utf-8') # 编码为UTF-8 # 以写入模式打开文件,如果文件已存在则覆盖 with open(file_path, 'w') as file: # 将UTF-8编码的JSON字符串写入文件 file.write(json_data_utf8) def add_hour(date_string, hour_number): original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S") new_date = original_date + timedelta(hours=hour_number) return new_date.strftime("%Y-%m-%d-%H-%M-%S") if __name__ == '__main__': # 1 登录验证 。 auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') # 2 填写自定义域名,例如example.com。获取桶。 # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。 endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket_name = 'open-bucket' bucket = oss2.Bucket(auth, endpoint, bucket_name) while True: logging.info("开始新一轮扫描") local_delete_list = [] oss_delete_list = [] upload_completed_prefix_list = [] # 4 获取即将被合并的bag目录 logging.info("开始扫描目录: %s" % str(key1)) for obj1 in oss2.ObjectIterator(bucket, prefix=key1): if 'callback.json' in str(obj1.key): time.sleep(1) prefix = '/'.join(str(obj1.key).split('/')[:-1]) pos_orig_csv = False pos_hmi_csv = False drive_csv = False camera_mp4 = False pcd_forwardlook_mp4 = False pcd_overlook_mp4 = False scenario_orig_xosc = False scenario_hmi_xosc = False scenario_orig_mp4 = False scenario_hmi_mp4 = False camera_bag = False fusion_bag = False plan_bag = False control_bag = False callback_json = False for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'): if 'callback.json' in str(obj2.key): callback_json = True if 'camera.mp4' in str(obj2.key): camera_mp4 = True if 'drive.csv' in str(obj2.key): drive_csv = True if 'pcd_forwardlook.mp4' in str(obj2.key): pcd_forwardlook_mp4 = True if 'pcd_overlook.mp4' in str(obj2.key): pcd_overlook_mp4 = True if 'pos_orig.csv' in str(obj2.key): pos_orig_csv = True if 'scenario_orig.mp4' in str(obj2.key): scenario_orig_mp4 = True if 'scenario_orig.xosc' in str(obj2.key): scenario_orig_xosc = True if not callback_json or not camera_mp4 or not drive_csv or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not pos_orig_csv or not scenario_orig_mp4 or not scenario_orig_xosc: continue time.sleep(2) logging.info("发送: %s" % str(obj1.key)) # 1 获取json内容 json_content = bucket.get_object(str(obj1.key)).read() # 2 获取token json_object = json.loads(json_content) data1 = { "equipmentNo": json_object['equipmentNo'], "secretKey": json_object['secretKey'] } json_data1 = json.dumps(data1) request1 = urllib2.Request(url1_private, json_data1, headers={'Content-Type': 'application/json'}) response1 = urllib2.urlopen(request1) result_json1 = response1.read() result_object1 = json.loads(result_json1) try: access_token = result_object1.get('data').get('accessToken') logging.info("bag文件为:%s" % str(json_object['rosBagPath'])) old_date = json_object['dataName'] equipment_no = json_object['equipmentNo'] old_file_path = json_object['filePath'] old_ros_bag_path = json_object['rosBagPath'] task_id = json_object['taskId'] trigger_id = json_object['triggerId'] except Exception as e: logging.exception("callback报错: %s" % str(e)) continue upload = False if 'userId' in json_object: logging.info("手动上传的数据") upload = True old_date = '' else: logging.info("自动采集的数据") upload = False old_delete_list = [] new_date = '' # 复制 data_parse for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path): old_delete_list.append(str(obj_old.key)) if 'callback.json' in str(obj_old.key): if not upload: new_date = add_hour(old_date, 8) # 将时区统一 bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date).replace('callback.json', 'callback_done.json')) else: bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace('callback.json', 'callback_done.json')) else: if not upload: new_date = add_hour(old_date, 8) bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date)) # 处理是否上传 if not upload: try: bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date)) bucket.delete_object(old_ros_bag_path) bucket.batch_delete_objects(old_delete_list) except Exception as e: error_bag_list = parse_json_to_string_array(error_bag_json) error_bag_list.append(parse_prefix) list_to_json_file(error_bag_list, error_bag_json) continue if upload: data2 = { 'userId': json_object['userId'], "dataName": new_date, "dataSize": json_object['dataSize'], "equipmentNo": equipment_no, "filePath": old_file_path.replace(old_date, new_date), "rosBagPath": old_ros_bag_path.replace(old_date, new_date), "taskId": task_id, "triggerId": trigger_id } else: data2 = { "dataName": new_date, "dataSize": bucket.get_object_meta(json_object['rosBagPath']).content_length, "equipmentNo": equipment_no, "filePath": old_file_path.replace(old_date, new_date), "rosBagPath": old_ros_bag_path.replace(old_date, new_date), "taskId": task_id, "triggerId": trigger_id } json_data2 = json.dumps(data2) logging.info("回调接口请求中:%s" % url2_private) request2 = urllib2.Request(url2_private, json_data2, headers={'Content-Type': 'application/json', 'authorization': access_token}) response2 = urllib2.urlopen(request2) result_json2 = response2.read() result_object2 = json.loads(result_json2) logging.info("回调接口请求结果为: %s", result_object2) time.sleep(sleep_time)