2callback-pjibot_patrol.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # -*- coding: utf-8 -*-
  2. import sys
  3. reload(sys)
  4. sys.setdefaultencoding("utf-8")
  5. import json
  6. import time
  7. import urllib2
  8. import oss2
  9. import logging
  10. from datetime import datetime, timedelta
  11. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  12. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  13. bucket_name = 'open-bucket'
  14. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  15. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/'
  16. path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/2callback/'
  17. logging.basicConfig(filename=path1 + 'log/2callback-pjibot_patrol.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  18. key1 = 'pjibot_patrol/'
  19. key2 = 'data/'
  20. key3 = 'data_merge/'
  21. key4 = 'data_parse/'
  22. path2 = 'data/'
  23. path3 = 'data_merge/'
  24. path4 = 'data_parse/'
  25. url1_private = "http://10.14.86.127:9081/device/auth"
  26. url2_private = "http://10.14.86.127:9081/device/data/callback"
  27. def add_hour(date_string, hour_number):
  28. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  29. new_date = original_date + timedelta(hours=hour_number)
  30. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  31. def judge_pcd(callback_json_oss_key):
  32. json_content = bucket.get_object(callback_json_oss_key).read()
  33. json_object = json.loads(json_content)
  34. check = json_object['check']
  35. if '点云缺失' in check:
  36. return True
  37. if __name__ == '__main__':
  38. while True:
  39. logging.info("开始新一轮扫描")
  40. try:
  41. local_delete_list = []
  42. oss_delete_list = []
  43. upload_completed_prefix_list = []
  44. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  45. try:
  46. if 'callback.json' in str(obj1.key):
  47. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  48. file1 = False
  49. file2 = False
  50. file3 = False
  51. pcd_ok = False
  52. file5 = False
  53. file7 = False
  54. file8 = False
  55. file9 = False
  56. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
  57. if '/callback_done.json' in str(obj2.key):
  58. file1 = True
  59. break
  60. if '/ego_pji.csv' in str(obj2.key):
  61. file2 = True
  62. if '/objects_pji.csv' in str(obj2.key):
  63. file3 = True
  64. if '/pcd_overlook.mp4' in str(obj2.key):
  65. pcd_ok = True
  66. if '/drive.csv' in str(obj2.key):
  67. file5 = True
  68. if '/scenario_orig.mp4' in str(obj2.key):
  69. file7 = True
  70. if '/scenario_hmi.xosc' in str(obj2.key):
  71. file8 = True
  72. if '/trajectory_pji.csv' in str(obj2.key):
  73. file9 = True
  74. if not pcd_ok:
  75. pcd_ok = judge_pcd(str(obj1.key))
  76. if file1 or not file2 or not file3 or not pcd_ok or not file5 or not file7 or not file8 or not file9:
  77. continue
  78. time.sleep(1)
  79. logging.info("发送:%s", prefix)
  80. json_content = bucket.get_object(str(obj1.key)).read()
  81. json_object = json.loads(json_content)
  82. data1 = {"equipmentNo": json_object['equipmentNo'],"secretKey": json_object['secretKey']}
  83. json_data1 = json.dumps(data1)
  84. logging.info("授权接口请求中: %s" % url1_private)
  85. logging.info("授权发送参数为: %s" % str(data1))
  86. request1 = urllib2.Request(url1_private, json_data1,headers={'Content-Type': 'application/json'})
  87. response1 = urllib2.urlopen(request1)
  88. result_json1 = response1.read()
  89. result_object1 = json.loads(result_json1)
  90. logging.info("授权接口请求结果为: %s", result_object1)
  91. access_token = result_object1.get('data').get('accessToken')
  92. old_date = json_object['dataName']
  93. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  94. equipment_no = json_object['equipmentNo']
  95. old_file_path = json_object['filePath']
  96. old_ros_bag_path = json_object['rosBagPath']
  97. task_id = json_object['taskId']
  98. trigger_id = json_object['triggerId']
  99. check = json_object['check']
  100. check_order = ['自车数据缺失', '不在道路范围', '无规划路径', '目标点缺失','点云缺失', '点云丢帧', '解析程序错误', '还原程序错误', '评价程序错误']
  101. check_order_dict = dict((item, idx) for idx, item in enumerate(check_order))
  102. check = sorted(check, key=lambda x: check_order_dict.get(x, float('inf')))
  103. check = ','.join(check) # 数组元素拼接成字符串序列
  104. if old_date is None:
  105. old_date = ''
  106. # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断)
  107. # new_date = add_hour(old_date, 8)
  108. new_date = old_date
  109. old_delete_list = []
  110. callback_done_oss_key = ''
  111. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  112. old_delete_list.append(str(obj_old.key))
  113. if 'callback.json' in str(obj_old.key):
  114. callback_done_oss_key = str(obj_old.key).replace(old_date, new_date).replace('callback.json','callback_done.json')
  115. # todo 时区不变也就不需要移动文件了
  116. # else:
  117. # bucket.copy_object(bucket_name, str(obj_old.key),
  118. # str(obj_old.key).replace(old_date, new_date))
  119. # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  120. # bucket.delete_object(old_ros_bag_path)
  121. # bucket.batch_delete_objects(old_delete_list)
  122. if 'userId' in json_object:
  123. logging.info("json_object 包含 'userId' 字段,值为:", json_object['userId'])
  124. data2 = {
  125. 'userId': json_object['userId'],
  126. "dataName": new_date,
  127. "dataSize": data_size,
  128. "equipmentNo": equipment_no,
  129. "filePath": old_file_path.replace(old_date, new_date),
  130. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  131. "taskId": task_id,
  132. "triggerId": trigger_id,
  133. "check":check
  134. }
  135. else:
  136. logging.info("json_object 不包含 'userId' 字段")
  137. data2 = {
  138. "dataName": new_date,
  139. "dataSize": data_size,
  140. "equipmentNo": equipment_no,
  141. "filePath": old_file_path.replace(old_date, new_date),
  142. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  143. "taskId": task_id,
  144. "triggerId": trigger_id,
  145. "check":check
  146. }
  147. json_data2 = json.dumps(data2)
  148. bucket.put_object(callback_done_oss_key, unicode(json_data2))
  149. logging.info("回调接口请求中:%s" % url2_private)
  150. logging.info("回调接口发送参数为: %s" % str(data2))
  151. request2 = urllib2.Request(url2_private, json_data2,headers={'Content-Type': 'application/json','authorization': access_token})
  152. response2 = urllib2.urlopen(request2)
  153. result_json2 = response2.read()
  154. result_object2 = json.loads(result_json2)
  155. logging.info("回调接口请求结果为: %s", result_object2)
  156. except Exception as e:
  157. logging.exception("局部异常处理: %s" % str(e))
  158. continue
  159. time.sleep(10)
  160. except Exception as e:
  161. logging.exception("全局错误处理: %s" % str(e))