callback.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding: utf-8 -*-
  2. # 造数据
  3. import json
  4. import time
  5. import urllib2
  6. import oss2
  7. from datetime import datetime, timedelta
  8. key1 = 'kinglong/'
  9. def add_hour(date_string, hour_number):
  10. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  11. new_date = original_date + timedelta(hours=hour_number)
  12. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  13. '''
  14. cname:http://open-bucket.oss.icvdc.com
  15. 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
  16. oss桶名: open-bucket
  17. keyid:n8glvFGS25MrLY7j
  18. secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
  19. '''
  20. if __name__ == '__main__':
  21. # 1 登录验证 。
  22. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  23. # 2 填写自定义域名,例如example.com。获取桶。
  24. # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
  25. # cname = 'http://open-bucket.oss.icvdc.com'
  26. # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
  27. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  28. bucket_name = 'open-bucket'
  29. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  30. count = 1
  31. while True:
  32. local_delete_list = []
  33. oss_delete_list = []
  34. upload_completed_prefix_list = []
  35. # 4 获取即将被合并的bag目录
  36. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  37. if 'callback.json' in str(obj1.key):
  38. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  39. pos_orig_csv = False
  40. pos_hmi_csv = False
  41. drive_csv = False
  42. camera_mp4 = False
  43. pcd_forwardlook_mp4 = False
  44. pcd_overlook_mp4 = False
  45. scenario_orig_xosc = False
  46. scenario_hmi_xosc = False
  47. scenario_orig_mp4 = False
  48. scenario_hmi_mp4 = False
  49. camera_bag = False
  50. fusion_bag = False
  51. plan_bag = False
  52. control_bag = False
  53. print '检测是否文件完整:', prefix
  54. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix):
  55. if 'pos_orig.csv' in str(obj2.key):
  56. pos_orig_csv = True
  57. if 'pos_hmi.csv' in str(obj2.key):
  58. pos_hmi_csv = True
  59. if 'drive.csv' in str(obj2.key):
  60. drive_csv = True
  61. if 'camera.mp4' in str(obj2.key):
  62. camera_mp4 = True
  63. if 'pcd_forwardlook.mp4' in str(obj2.key):
  64. pcd_forwardlook_mp4 = True
  65. if 'pcd_overlook.mp4' in str(obj2.key):
  66. pcd_overlook_mp4 = True
  67. if 'scenario_orig.xosc' in str(obj2.key):
  68. scenario_orig_xosc = True
  69. if 'scenario_hmi.xosc' in str(obj2.key):
  70. scenario_hmi_xosc = True
  71. if 'scenario_orig.mp4' in str(obj2.key):
  72. scenario_orig_mp4 = True
  73. if 'scenario_hmi.mp4' in str(obj2.key):
  74. scenario_hmi_mp4 = True
  75. if 'camera.bag' in str(obj2.key):
  76. camera_bag = True
  77. if 'fusion.bag' in str(obj2.key):
  78. fusion_bag = True
  79. if 'plan.bag' in str(obj2.key):
  80. plan_bag = True
  81. if 'control.bag' in str(obj2.key):
  82. control_bag = True
  83. if not pos_orig_csv or not pos_hmi_csv or not drive_csv or not camera_mp4 or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not scenario_orig_xosc or not scenario_hmi_xosc or not scenario_orig_mp4 or not scenario_hmi_mp4:
  84. continue
  85. time.sleep(2)
  86. print '发送:', str(obj1.key)
  87. # 1 获取json内容
  88. json_content = bucket.get_object(str(obj1.key)).read()
  89. # 2 获取token
  90. json_object = json.loads(json_content)
  91. data1 = {
  92. "equipmentNo": json_object['equipmentNo'],
  93. "secretKey": json_object['secretKey']
  94. }
  95. json_data1 = json.dumps(data1)
  96. request1 = urllib2.Request("http://139.9.199.227:30991/device/auth", json_data1,
  97. headers={'Content-Type': 'application/json'})
  98. response1 = urllib2.urlopen(request1)
  99. result_json1 = response1.read()
  100. result_object1 = json.loads(result_json1)
  101. access_token = result_object1.get('data').get('accessToken')
  102. try:
  103. print 'bag文件为:', json_object['rosBagPath']
  104. old_date = json_object['dataName']
  105. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  106. equipment_no = json_object['equipmentNo']
  107. old_file_path = json_object['filePath']
  108. old_ros_bag_path = json_object['rosBagPath']
  109. task_id = json_object['taskId']
  110. trigger_id = json_object['triggerId']
  111. except Exception as e:
  112. print 'callback报错:%s' % str(e)
  113. continue
  114. # 将时区统一
  115. new_date = add_hour(old_date, 8)
  116. old_delete_list = []
  117. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  118. old_delete_list.append(str(obj_old.key))
  119. if 'callback.json' in str(obj_old.key):
  120. bucket.copy_object(bucket_name, str(obj_old.key),
  121. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  122. 'callback_done.json'))
  123. else:
  124. bucket.copy_object(bucket_name, str(obj_old.key), str(obj_old.key).replace(old_date, new_date))
  125. bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  126. bucket.delete_object(old_ros_bag_path)
  127. bucket.batch_delete_objects(old_delete_list)
  128. # triggerId = json_object['triggerId']
  129. data2 = {
  130. "dataName": new_date,
  131. "dataSize": data_size,
  132. "equipmentNo": equipment_no,
  133. "filePath": old_file_path.replace(old_date, new_date),
  134. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  135. "taskId": task_id,
  136. "triggerId": ["1744180835122155522", "1744178775815360514"]
  137. }
  138. json_data2 = json.dumps(data2)
  139. request2 = urllib2.Request("http://139.9.199.227:30991/device/data/callback", json_data2,
  140. headers={'Content-Type': 'application/json',
  141. 'authorization': access_token})
  142. response2 = urllib2.urlopen(request2)
  143. result_json2 = response2.read()
  144. result_object2 = json.loads(result_json2)
  145. time.sleep(3)