callback-pjisuv.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import urllib2
  5. import oss2
  6. from datetime import datetime, timedelta
  7. import logging
  8. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjisuv/'
  9. logging.basicConfig(filename=path1 + 'log/callback-pjisuv.log', level=logging.INFO,
  10. format='%(asctime)s - %(levelname)s - %(message)s')
  11. key1 = 'pjisuv/'
  12. sleep_time = 60 # 每多少秒扫描一次
  13. url1_private = "http://10.14.86.127:9081/device/auth"
  14. url2_private = "http://10.14.86.127:9081/device/data/callback"
  15. def add_hour(date_string, hour_number):
  16. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  17. new_date = original_date + timedelta(hours=hour_number)
  18. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  19. if __name__ == '__main__':
  20. # 1 登录验证 。
  21. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  22. # 2 填写自定义域名,例如example.com。获取桶。
  23. # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
  24. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  25. bucket_name = 'open-bucket'
  26. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  27. while True:
  28. logging.info("开始新一轮扫描")
  29. local_delete_list = []
  30. oss_delete_list = []
  31. upload_completed_prefix_list = []
  32. # 4 获取即将被合并的bag目录
  33. logging.info("开始扫描目录: %s" % str(key1))
  34. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  35. if 'callback.json' in str(obj1.key):
  36. time.sleep(1)
  37. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  38. pos_orig_csv = False
  39. pos_hmi_csv = False
  40. drive_csv = False
  41. camera_mp4 = False
  42. pcd_forwardlook_mp4 = False
  43. pcd_overlook_mp4 = False
  44. scenario_orig_xosc = False
  45. scenario_hmi_xosc = False
  46. scenario_orig_mp4 = False
  47. scenario_hmi_mp4 = False
  48. camera_bag = False
  49. fusion_bag = False
  50. plan_bag = False
  51. control_bag = False
  52. callback_json = False
  53. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
  54. if 'callback.json' in str(obj2.key):
  55. callback_json = True
  56. if 'camera.mp4' in str(obj2.key):
  57. camera_mp4 = True
  58. if 'drive.csv' in str(obj2.key):
  59. drive_csv = True
  60. if 'pcd_forwardlook.mp4' in str(obj2.key):
  61. pcd_forwardlook_mp4 = True
  62. if 'pcd_overlook.mp4' in str(obj2.key):
  63. pcd_overlook_mp4 = True
  64. if 'pos_orig.csv' in str(obj2.key):
  65. pos_orig_csv = True
  66. if 'scenario_orig.mp4' in str(obj2.key):
  67. scenario_orig_mp4 = True
  68. if 'scenario_orig.xosc' in str(obj2.key):
  69. scenario_orig_xosc = True
  70. if not callback_json or not camera_mp4 or not drive_csv or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not pos_orig_csv or not scenario_orig_mp4 or not scenario_orig_xosc:
  71. continue
  72. time.sleep(2)
  73. logging.info("发送: %s" % str(obj1.key))
  74. # 1 获取json内容
  75. json_content = bucket.get_object(str(obj1.key)).read()
  76. # 2 获取token
  77. json_object = json.loads(json_content)
  78. data1 = {
  79. "equipmentNo": json_object['equipmentNo'],
  80. "secretKey": json_object['secretKey']
  81. }
  82. json_data1 = json.dumps(data1)
  83. # logging.info("授权接口请求中: %s" % url1_private)
  84. request1 = urllib2.Request(url1_private, json_data1,
  85. headers={'Content-Type': 'application/json'})
  86. response1 = urllib2.urlopen(request1)
  87. result_json1 = response1.read()
  88. result_object1 = json.loads(result_json1)
  89. # logging.info("授权接口请求结果为: %s", result_object1)
  90. try:
  91. access_token = result_object1.get('data').get('accessToken')
  92. logging.info("bag文件为:%s" % str(json_object['rosBagPath']))
  93. old_date = json_object['dataName']
  94. equipment_no = json_object['equipmentNo']
  95. old_file_path = json_object['filePath']
  96. old_ros_bag_path = json_object['rosBagPath']
  97. task_id = json_object['taskId']
  98. trigger_id = json_object['triggerId']
  99. except Exception as e:
  100. logging.exception("callback报错: %s" % str(e))
  101. continue
  102. upload = False
  103. if old_date is None:
  104. logging.info("手动上传的数据")
  105. upload = True
  106. old_date = ''
  107. elif str(old_date).endswith('.bag'):
  108. logging.info("手动上传的数据")
  109. upload = True
  110. pass
  111. else:
  112. logging.info("自动采集的数据")
  113. upload = False
  114. old_delete_list = []
  115. new_date = ''
  116. # 复制 data_parse
  117. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  118. old_delete_list.append(str(obj_old.key))
  119. if 'callback.json' in str(obj_old.key):
  120. if not upload:
  121. new_date = add_hour(old_date, 8)
  122. # 将时区统一
  123. bucket.copy_object(bucket_name, str(obj_old.key),
  124. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  125. 'callback_done.json'))
  126. else:
  127. bucket.copy_object(bucket_name, str(obj_old.key),
  128. str(obj_old.key).replace('callback.json',
  129. 'callback_done.json'))
  130. else:
  131. if not upload:
  132. new_date = add_hour(old_date, 8)
  133. bucket.copy_object(bucket_name, str(obj_old.key),
  134. str(obj_old.key).replace(old_date, new_date))
  135. # 处理是否上传
  136. if not upload:
  137. bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  138. bucket.delete_object(old_ros_bag_path)
  139. bucket.batch_delete_objects(old_delete_list)
  140. if 'userId' in json_object:
  141. logging.info("json_object 包含 'userId' 字段,是手动上传的包,userId为:", json_object['userId'])
  142. data2 = {
  143. 'userId': json_object['userId'],
  144. "dataName": new_date,
  145. "dataSize": json_object['dataSize'],
  146. "equipmentNo": equipment_no,
  147. "filePath": old_file_path.replace(old_date, new_date),
  148. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  149. "taskId": task_id,
  150. "triggerId": trigger_id
  151. }
  152. else:
  153. logging.info("json_object 不包含 'userId' 字段,是自动采集的包")
  154. data2 = {
  155. "dataName": new_date,
  156. "dataSize": bucket.get_object_meta(json_object['rosBagPath']).content_length,
  157. "equipmentNo": equipment_no,
  158. "filePath": old_file_path.replace(old_date, new_date),
  159. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  160. "taskId": task_id,
  161. "triggerId": trigger_id
  162. }
  163. json_data2 = json.dumps(data2)
  164. logging.info("回调接口请求中:%s" % url2_private)
  165. request2 = urllib2.Request(url2_private, json_data2,
  166. headers={'Content-Type': 'application/json',
  167. 'authorization': access_token})
  168. response2 = urllib2.urlopen(request2)
  169. result_json2 = response2.read()
  170. result_object2 = json.loads(result_json2)
  171. logging.info("回调接口请求结果为: %s", result_object2)
  172. time.sleep(sleep_time)