kinglong_callback.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import urllib2
  5. import oss2
  6. from datetime import datetime, timedelta
  7. key1 = 'kinglong/'
  8. def add_hour(date_string, hour_number):
  9. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  10. new_date = original_date + timedelta(hours=hour_number)
  11. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  12. '''
  13. cname:http://open-bucket.oss.icvdc.com
  14. 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
  15. oss桶名: open-bucket
  16. keyid:n8glvFGS25MrLY7j
  17. secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
  18. '''
  19. if __name__ == '__main__':
  20. # 1 登录验证 。
  21. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  22. # 2 填写自定义域名,例如example.com。获取桶。
  23. # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
  24. # cname = 'http://open-bucket.oss.icvdc.com'
  25. # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
  26. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  27. bucket_name = 'open-bucket'
  28. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  29. count = 1
  30. while True:
  31. local_delete_list = []
  32. oss_delete_list = []
  33. upload_completed_prefix_list = []
  34. # 4 获取即将被合并的bag目录
  35. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  36. if 'callback.json' in str(obj1.key):
  37. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  38. pos_orig_csv = False
  39. pos_hmi_csv = False
  40. drive_csv = False
  41. camera_mp4 = False
  42. pcd_forwardlook_mp4 = False
  43. pcd_overlook_mp4 = False
  44. scenario_orig_xosc = False
  45. scenario_hmi_xosc = False
  46. scenario_orig_mp4 = False
  47. scenario_hmi_mp4 = False
  48. camera_bag = False
  49. fusion_bag = False
  50. plan_bag = False
  51. control_bag = False
  52. print '检测是否文件完整:', prefix
  53. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix):
  54. if 'pos_orig.csv' in str(obj2.key):
  55. pos_orig_csv = True
  56. if 'pos_hmi.csv' in str(obj2.key):
  57. pos_hmi_csv = True
  58. if 'drive.csv' in str(obj2.key):
  59. drive_csv = True
  60. if 'camera.mp4' in str(obj2.key):
  61. camera_mp4 = True
  62. if 'pcd_forwardlook.mp4' in str(obj2.key):
  63. pcd_forwardlook_mp4 = True
  64. if 'pcd_overlook.mp4' in str(obj2.key):
  65. pcd_overlook_mp4 = True
  66. if 'scenario_orig.xosc' in str(obj2.key):
  67. scenario_orig_xosc = True
  68. if 'scenario_hmi.xosc' in str(obj2.key):
  69. scenario_hmi_xosc = True
  70. if 'scenario_orig.mp4' in str(obj2.key):
  71. scenario_orig_mp4 = True
  72. if 'scenario_hmi.mp4' in str(obj2.key):
  73. scenario_hmi_mp4 = True
  74. if 'camera.bag' in str(obj2.key):
  75. camera_bag = True
  76. if 'fusion.bag' in str(obj2.key):
  77. fusion_bag = True
  78. if 'plan.bag' in str(obj2.key):
  79. plan_bag = True
  80. if 'control.bag' in str(obj2.key):
  81. control_bag = True
  82. if not pos_orig_csv or not pos_hmi_csv or not drive_csv or not camera_mp4 or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not scenario_orig_xosc or not scenario_hmi_xosc or not scenario_orig_mp4 or not scenario_hmi_mp4:
  83. continue
  84. time.sleep(2)
  85. print '发送:', str(obj1.key)
  86. # 1 获取json内容
  87. json_content = bucket.get_object(str(obj1.key)).read()
  88. # 2 获取token
  89. json_object = json.loads(json_content)
  90. data1 = {
  91. "equipmentNo": json_object['equipmentNo'],
  92. "secretKey": json_object['secretKey']
  93. }
  94. json_data1 = json.dumps(data1)
  95. request1 = urllib2.Request("http://139.9.199.227:30991/device/auth", json_data1,
  96. headers={'Content-Type': 'application/json'})
  97. response1 = urllib2.urlopen(request1)
  98. result_json1 = response1.read()
  99. result_object1 = json.loads(result_json1)
  100. access_token = result_object1.get('data').get('accessToken')
  101. try:
  102. print 'bag文件为:', json_object['rosBagPath']
  103. old_date = json_object['dataName']
  104. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  105. equipment_no = json_object['equipmentNo']
  106. old_file_path = json_object['filePath']
  107. old_ros_bag_path = json_object['rosBagPath']
  108. task_id = json_object['taskId']
  109. trigger_id = json_object['triggerId']
  110. except Exception as e:
  111. print 'callback报错:%s' % str(e)
  112. continue
  113. # 将时区统一
  114. new_date = add_hour(old_date, 8)
  115. old_delete_list = []
  116. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  117. old_delete_list.append(str(obj_old.key))
  118. if 'callback.json' in str(obj_old.key):
  119. bucket.copy_object(bucket_name, str(obj_old.key),
  120. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  121. 'callback_done.json'))
  122. else:
  123. bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date))
  124. bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  125. bucket.delete_object(old_ros_bag_path)
  126. bucket.batch_delete_objects(old_delete_list)
  127. # triggerId = json_object['triggerId']
  128. data2 = {
  129. "dataName": new_date,
  130. "dataSize": data_size,
  131. "equipmentNo": equipment_no,
  132. "filePath": old_file_path.replace(old_date, new_date),
  133. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  134. "taskId": task_id,
  135. "triggerId": ["1744180835122155522", "1744178775815360514"]
  136. }
  137. json_data2 = json.dumps(data2)
  138. request2 = urllib2.Request("http://139.9.199.227:30991/device/data/callback", json_data2,
  139. headers={'Content-Type': 'application/json',
  140. 'authorization': access_token})
  141. response2 = urllib2.urlopen(request2)
  142. result_json2 = response2.read()
  143. result_object2 = json.loads(result_json2)
  144. time.sleep(3)