callback-pjibot_patrol.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import urllib2
  5. import oss2
  6. from datetime import datetime, timedelta
  7. import logging
  8. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/'
  9. logging.basicConfig(filename=path1 + 'log/callback.log', level=logging.INFO,
  10. format='%(asctime)s - %(levelname)s - %(message)s')
  11. key1 = 'pjibot_patrol/'
  12. key2 = 'data/'
  13. key3 = 'data_merge/'
  14. key4 = 'data_parse/'
  15. path2 = 'data/'
  16. path3 = 'data_merge/'
  17. path4 = 'data_parse/'
  18. url1_private = "http://10.14.86.147:9081/device/auth"
  19. url2_private = "http://10.14.86.147:9081/device/data/callback"
  20. def add_hour(date_string, hour_number):
  21. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  22. new_date = original_date + timedelta(hours=hour_number)
  23. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  24. '''
  25. cname:http://open-bucket.oss.icvdc.com
  26. 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
  27. oss桶名: open-bucket
  28. keyid:n8glvFGS25MrLY7j
  29. secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
  30. '''
  31. if __name__ == '__main__':
  32. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  33. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  34. bucket_name = 'pji-bucket1'
  35. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  36. while True:
  37. logging.info("开始新一轮扫描")
  38. try:
  39. local_delete_list = []
  40. oss_delete_list = []
  41. upload_completed_prefix_list = []
  42. # 4 获取即将被合并的bag目录
  43. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  44. try:
  45. if 'callback.json' in str(obj1.key):
  46. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  47. file1 = False
  48. file2 = False
  49. file3 = False
  50. file4 = False
  51. file5 = False
  52. file7 = False
  53. file8 = False
  54. file9 = False
  55. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix):
  56. if '/callback.json' in str(obj2.key):
  57. file1 = True
  58. if '/ego_pji.csv' in str(obj2.key):
  59. file2 = True
  60. if '/objects_pji.csv' in str(obj2.key):
  61. file3 = True
  62. if '/pcd_overlook.mp4' in str(obj2.key):
  63. file4 = True
  64. if '/pos_pji.csv' in str(obj2.key):
  65. file5 = True
  66. if '/scenario_orig.mp4' in str(obj2.key):
  67. file7 = True
  68. if '/simulation.xosc' in str(obj2.key):
  69. file8 = True
  70. if '/trajectory_pji.csv' in str(obj2.key):
  71. file9 = True
  72. if not file1 or not file2 or not file3 or not file4 or not file5 or not file7 or not file8 or not file9:
  73. continue
  74. time.sleep(1)
  75. logging.info("发送:%s", str(obj1.key))
  76. # 1 获取json内容
  77. json_content = bucket.get_object(str(obj1.key)).read()
  78. # 2 获取token
  79. json_object = json.loads(json_content)
  80. data1 = {
  81. "equipmentNo": json_object['equipmentNo'],
  82. "secretKey": json_object['secretKey']
  83. }
  84. json_data1 = json.dumps(data1)
  85. logging.info("授权接口请求中: %s" % url1_private)
  86. logging.info("授权发送参数为: %s" % str(data1))
  87. request1 = urllib2.Request(url1_private, json_data1,
  88. headers={'Content-Type': 'application/json'})
  89. response1 = urllib2.urlopen(request1)
  90. result_json1 = response1.read()
  91. result_object1 = json.loads(result_json1)
  92. logging.info("授权接口请求结果为: %s", result_object1)
  93. access_token = result_object1.get('data').get('accessToken')
  94. # 要发送的JSON参数
  95. try:
  96. # logging.info("bag文件为: %s", json_object['rosBagPath'])
  97. old_date = json_object['dataName']
  98. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  99. equipment_no = json_object['equipmentNo']
  100. old_file_path = json_object['filePath']
  101. old_ros_bag_path = json_object['rosBagPath']
  102. task_id = json_object['taskId']
  103. trigger_id = json_object['triggerId']
  104. except Exception as e:
  105. logging.exception("callback报错:%s", str(e))
  106. continue
  107. # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断)
  108. # new_date = add_hour(old_date, 8)
  109. new_date = old_date
  110. old_delete_list = []
  111. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  112. old_delete_list.append(str(obj_old.key))
  113. if 'callback.json' in str(obj_old.key):
  114. bucket.copy_object(bucket_name, str(obj_old.key),
  115. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  116. 'callback_done.json'))
  117. bucket.delete_object(str(obj_old.key)) # 删除 callback.json
  118. # todo 时区不变也就不需要移动文件了
  119. # else:
  120. # bucket.copy_object(bucket_name, str(obj_old.key),
  121. # str(obj_old.key).replace(old_date, new_date))
  122. # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  123. # bucket.delete_object(old_ros_bag_path)
  124. # bucket.batch_delete_objects(old_delete_list)
  125. data2 = {
  126. "dataName": new_date,
  127. "dataSize": data_size,
  128. "equipmentNo": equipment_no,
  129. "filePath": old_file_path.replace(old_date, new_date),
  130. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  131. "taskId": task_id,
  132. "triggerId": trigger_id
  133. }
  134. json_data2 = json.dumps(data2)
  135. logging.info("回调接口请求中:%s" % url2_private)
  136. request2 = urllib2.Request(url2_private, json_data2,
  137. headers={'Content-Type': 'application/json',
  138. 'authorization': access_token})
  139. response2 = urllib2.urlopen(request2)
  140. result_json2 = response2.read()
  141. result_object2 = json.loads(result_json2)
  142. logging.info("回调接口请求结果为: %s", result_object2)
  143. except Exception as e:
  144. logging.exception("局部异常处理: %s" % str(e))
  145. time.sleep(2)
  146. except Exception as e:
  147. logging.exception("全局错误处理: %s" % str(e))