pjibot_callback.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import urllib2
  5. import oss2
  6. from datetime import datetime, timedelta
  7. key1 = 'pji/'
  8. key2 = 'data/'
  9. key3 = 'data_merge/'
  10. key4 = 'data_parse/'
  11. path1 = '/root/'
  12. path2 = 'data/'
  13. path3 = 'data_merge/'
  14. path4 = 'data_parse/'
  15. def add_hour(date_string, hour_number):
  16. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  17. new_date = original_date + timedelta(hours=hour_number)
  18. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  19. '''
  20. cname:http://open-bucket.oss.icvdc.com
  21. 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
  22. oss桶名: open-bucket
  23. keyid:n8glvFGS25MrLY7j
  24. secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
  25. '''
  26. if __name__ == '__main__':
  27. # 1 登录验证 。
  28. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  29. # 2 填写自定义域名,例如example.com。获取桶。
  30. # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
  31. # cname = 'http://open-bucket.oss.icvdc.com'
  32. # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
  33. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  34. bucket_name = 'open-bucket'
  35. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  36. while True:
  37. local_delete_list = []
  38. oss_delete_list = []
  39. upload_completed_prefix_list = []
  40. # 4 获取即将被合并的bag目录
  41. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  42. if 'callback.json' in str(obj1.key):
  43. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  44. camera_mp4 = False
  45. pcd_depthcamera_mp4 = False
  46. pcd_lidar_mp4 = False
  47. # print '检测是否文件完整:', prefix
  48. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix):
  49. if '/camera.mp4' in str(obj2.key):
  50. camera_mp4 = True
  51. if '/pcd_depthcamera.mp4' in str(obj2.key):
  52. pcd_depthcamera_mp4 = True
  53. if '/pcd_lidar.mp4' in str(obj2.key):
  54. pcd_lidar_mp4 = True
  55. if not camera_mp4 or not pcd_depthcamera_mp4 or not pcd_lidar_mp4:
  56. continue
  57. time.sleep(1)
  58. print '发送:', str(obj1.key)
  59. # 1 获取json内容
  60. json_content = bucket.get_object(str(obj1.key)).read()
  61. # 2 获取token
  62. json_object = json.loads(json_content)
  63. data1 = {
  64. "equipmentNo": json_object['equipmentNo'],
  65. "secretKey": json_object['secretKey']
  66. }
  67. json_data1 = json.dumps(data1)
  68. request1 = urllib2.Request("http://139.9.199.227:30991/device/auth", json_data1,
  69. headers={'Content-Type': 'application/json'})
  70. response1 = urllib2.urlopen(request1)
  71. result_json1 = response1.read()
  72. result_object1 = json.loads(result_json1)
  73. access_token = result_object1.get('data').get('accessToken')
  74. # 要发送的JSON参数
  75. try:
  76. print 'bag文件为:', json_object['rosBagPath']
  77. old_date = json_object['dataName']
  78. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  79. equipment_no = json_object['equipmentNo']
  80. old_file_path = json_object['filePath']
  81. old_ros_bag_path = json_object['rosBagPath']
  82. task_id = json_object['taskId']
  83. trigger_id = json_object['triggerId']
  84. except Exception as e:
  85. print 'callback报错:%s' % str(e)
  86. continue
  87. # 将时区统一
  88. new_date = add_hour(old_date, 8)
  89. old_delete_list = []
  90. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  91. old_delete_list.append(str(obj_old.key))
  92. if 'callback.json' in str(obj_old.key):
  93. bucket.copy_object(bucket_name, str(obj_old.key),
  94. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  95. 'callback_done.json'))
  96. else:
  97. bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date))
  98. bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  99. bucket.delete_object(old_ros_bag_path)
  100. bucket.batch_delete_objects(old_delete_list)
  101. data2 = {
  102. "dataName": new_date,
  103. "dataSize": data_size,
  104. "equipmentNo": equipment_no,
  105. "filePath": old_file_path.replace(old_date, new_date),
  106. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  107. "taskId": task_id,
  108. "triggerId": trigger_id
  109. }
  110. json_data2 = json.dumps(data2)
  111. request2 = urllib2.Request("http://139.9.199.227:30991/device/data/callback", json_data2,
  112. headers={'Content-Type': 'application/json',
  113. 'authorization': access_token})
  114. response2 = urllib2.urlopen(request2)
  115. result_json2 = response2.read()
  116. result_object2 = json.loads(result_json2)
  117. time.sleep(2)