callback-pjibot_patrol.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # -*- coding: utf-8 -*-
  2. import sys
  3. reload(sys)
  4. sys.setdefaultencoding("utf-8")
  5. import json
  6. import time
  7. import urllib2
  8. import oss2
  9. import logging
  10. from datetime import datetime, timedelta
  11. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  12. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  13. bucket_name = 'pji-bucket1'
  14. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  15. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/'
  16. path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_patrol/callback/'
  17. logging.basicConfig(filename=path1 + 'log/callback-pjibot_patrol.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  18. key1 = 'pjibot_patrol/'
  19. key2 = 'data/'
  20. key3 = 'data_merge/'
  21. key4 = 'data_parse/'
  22. path2 = 'data/'
  23. path3 = 'data_merge/'
  24. path4 = 'data_parse/'
  25. url1_private = "http://10.14.86.147:9081/device/auth"
  26. url2_private = "http://10.14.86.147:9081/device/data/callback"
  27. def add_hour(date_string, hour_number):
  28. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  29. new_date = original_date + timedelta(hours=hour_number)
  30. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  31. def judge_pcd_xosc(callback_json_oss_key):
  32. pcd = False
  33. xosc = False
  34. try:
  35. json_content = bucket.get_object(callback_json_oss_key).read()
  36. json_object = json.loads(json_content)
  37. if 'check' not in json_object:
  38. logging.error("Missing 'check' field in %s", callback_json_oss_key)
  39. return pcd,xosc
  40. check = json_object['check']
  41. if '点云缺失' in check:
  42. pcd = True
  43. if '不在道路范围' in check:
  44. xosc = True
  45. except ValueError as e:
  46. logging.error("Failed to decode JSON from %s", e)
  47. except Exception as e:
  48. logging.error("Error processing %s", e)
  49. return pcd,xosc
  50. if __name__ == '__main__':
  51. while True:
  52. logging.info("开始新一轮扫描")
  53. try:
  54. local_delete_list = []
  55. oss_delete_list = []
  56. upload_completed_prefix_list = []
  57. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  58. try:
  59. if 'callback.json' in str(obj1.key):
  60. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  61. file1 = False
  62. file2 = False
  63. file3 = False
  64. pcd_ok = False
  65. file5 = False
  66. file7 = False
  67. xosc_ok = False
  68. file9 = False
  69. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
  70. if '/callback_done.json' in str(obj2.key):
  71. file1 = True
  72. break
  73. if '/ego_pji.csv' in str(obj2.key):
  74. file2 = True
  75. if '/objects_pji.csv' in str(obj2.key):
  76. file3 = True
  77. if '/pcd_overlook.mp4' in str(obj2.key):
  78. pcd_ok = True
  79. if '/drive.csv' in str(obj2.key):
  80. file5 = True
  81. if '/scenario_orig.mp4' in str(obj2.key):
  82. file7 = True
  83. if '/trajectory_pji.csv' in str(obj2.key):
  84. file9 = True
  85. if file1 or not file2 or not file3 or not file5 or not file7 or not file9:
  86. continue
  87. if not pcd_ok:
  88. pcd_ok,xosc_ok = judge_pcd_xosc(str(obj1.key))
  89. if not pcd_ok:
  90. continue
  91. if not xosc_ok:
  92. continue
  93. time.sleep(1)
  94. logging.info("发送:%s", prefix)
  95. json_content = bucket.get_object(str(obj1.key)).read()
  96. json_object = json.loads(json_content)
  97. data1 = {"equipmentNo": json_object['equipmentNo'],"secretKey": json_object['secretKey']}
  98. json_data1 = json.dumps(data1)
  99. logging.info("授权接口请求中: %s" % url1_private)
  100. logging.info("授权发送参数为: %s" % str(data1))
  101. request1 = urllib2.Request(url1_private, json_data1,headers={'Content-Type': 'application/json'})
  102. response1 = urllib2.urlopen(request1)
  103. result_json1 = response1.read()
  104. result_object1 = json.loads(result_json1)
  105. logging.info("授权接口请求结果为: %s", result_object1)
  106. access_token = result_object1.get('data').get('accessToken')
  107. old_date = json_object['dataName']
  108. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  109. equipment_no = json_object['equipmentNo']
  110. old_file_path = json_object['filePath']
  111. old_ros_bag_path = json_object['rosBagPath']
  112. task_id = json_object['taskId']
  113. trigger_id = json_object['triggerId']
  114. check = json_object['check']
  115. check_order = ['自车数据缺失', '不在道路范围', '无规划路径', '目标点缺失','点云缺失', '点云丢帧', '解析程序错误', '还原程序错误', '评价程序错误']
  116. check_order_dict = dict((item, idx) for idx, item in enumerate(check_order))
  117. check = sorted(check, key=lambda x: check_order_dict.get(x, float('inf')))
  118. check = ','.join(check) # 数组元素拼接成字符串序列
  119. if old_date is None:
  120. old_date = ''
  121. # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断)
  122. # new_date = add_hour(old_date, 8)
  123. new_date = old_date
  124. old_delete_list = []
  125. callback_done_oss_key = ''
  126. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  127. old_delete_list.append(str(obj_old.key))
  128. if 'callback.json' in str(obj_old.key):
  129. callback_done_oss_key = str(obj_old.key).replace(old_date, new_date).replace('callback.json','callback_done.json')
  130. # todo 时区不变也就不需要移动文件了
  131. # else:
  132. # bucket.copy_object(bucket_name, str(obj_old.key),
  133. # str(obj_old.key).replace(old_date, new_date))
  134. # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  135. # bucket.delete_object(old_ros_bag_path)
  136. # bucket.batch_delete_objects(old_delete_list)
  137. if 'userId' in json_object:
  138. logging.info("json_object 包含 'userId' 字段,值为:", json_object['userId'])
  139. data2 = {
  140. 'userId': json_object['userId'],
  141. "dataName": new_date,
  142. "dataSize": data_size,
  143. "equipmentNo": equipment_no,
  144. "filePath": old_file_path.replace(old_date, new_date),
  145. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  146. "taskId": task_id,
  147. "triggerId": trigger_id,
  148. "check":check
  149. }
  150. else:
  151. logging.info("json_object 不包含 'userId' 字段")
  152. data2 = {
  153. "dataName": new_date,
  154. "dataSize": data_size,
  155. "equipmentNo": equipment_no,
  156. "filePath": old_file_path.replace(old_date, new_date),
  157. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  158. "taskId": task_id,
  159. "triggerId": trigger_id,
  160. "check":check
  161. }
  162. json_data2 = json.dumps(data2)
  163. bucket.put_object(callback_done_oss_key, unicode(json_data2))
  164. logging.info("回调接口请求中:%s" % url2_private)
  165. logging.info("回调接口发送参数为: %s" % str(data2))
  166. request2 = urllib2.Request(url2_private, json_data2,headers={'Content-Type': 'application/json','authorization': access_token})
  167. response2 = urllib2.urlopen(request2)
  168. result_json2 = response2.read()
  169. result_object2 = json.loads(result_json2)
  170. logging.info("回调接口请求结果为: %s", result_object2)
  171. except Exception as e:
  172. logging.exception("局部异常处理: %s" % str(e))
  173. continue
  174. time.sleep(10)
  175. except Exception as e:
  176. logging.exception("全局错误处理: %s" % str(e))