callback-pjibot_delivery.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. # -*- coding: utf-8 -*-
  2. import sys
  3. reload(sys)
  4. sys.setdefaultencoding("utf-8")
  5. import json
  6. import time
  7. import urllib2
  8. import oss2
  9. import logging
  10. from datetime import datetime, timedelta
  11. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  12. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  13. bucket_name = 'pji-bucket1'
  14. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  15. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/'
  16. path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/callback/'
  17. logging.basicConfig(filename=path1 + 'log/callback-pjibot_delivery.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  18. key1 = 'pjibot_delivery/'
  19. key2 = 'data/'
  20. key3 = 'data_merge/'
  21. key4 = 'data_parse/'
  22. path2 = 'data/'
  23. path3 = 'data_merge/'
  24. path4 = 'data_parse/'
  25. url1_private = "http://10.14.86.147:9081/device/auth"
  26. url2_private = "http://10.14.86.147:9081/device/data/callback"
  27. def add_hour(date_string, hour_number):
  28. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  29. new_date = original_date + timedelta(hours=hour_number)
  30. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  31. def judge_pcd_xosc(callback_json_oss_key):
  32. pcd = False
  33. xosc = False
  34. try:
  35. json_content = bucket.get_object(callback_json_oss_key).read()
  36. json_object = json.loads(json_content)
  37. if 'check' not in json_object:
  38. logging.error("Missing 'check' field in %s", callback_json_oss_key)
  39. return pcd,xosc
  40. check = json_object['check']
  41. if '点云缺失' in check:
  42. pcd = True
  43. if '不在道路范围' in check:
  44. xosc = True
  45. except ValueError as e:
  46. logging.error("Failed to decode JSON from %s", e)
  47. except Exception as e:
  48. logging.error("Error processing %s", e)
  49. return pcd,xosc
  50. if __name__ == '__main__':
  51. while True:
  52. logging.info("开始新一轮扫描")
  53. try:
  54. local_delete_list = []
  55. oss_delete_list = []
  56. upload_completed_prefix_list = []
  57. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  58. try:
  59. if 'callback.json' in str(obj1.key):
  60. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  61. file1 = False
  62. file2 = False
  63. file3 = False
  64. pcd_ok = False
  65. file5 = False
  66. xosc_ok = False
  67. file9 = False
  68. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
  69. if '/callback_done.json' in str(obj2.key):
  70. file1 = True
  71. break
  72. if '/ego_pji.csv' in str(obj2.key):
  73. file2 = True
  74. if '/objects_pji.csv' in str(obj2.key):
  75. file3 = True
  76. if '/drive.csv' in str(obj2.key):
  77. file5 = True
  78. if '/trajectory_pji.csv' in str(obj2.key):
  79. file9 = True
  80. if '/pcd_overlook.mp4' in str(obj2.key):
  81. pcd_ok = True
  82. if '/scenario_hmi.xosc' in str(obj2.key):
  83. xosc_ok = True
  84. if file1 or not file2 or not file3 or not file5 or not file9:
  85. continue
  86. if not pcd_ok:
  87. pcd_ok,temp = judge_pcd_xosc(str(obj1.key))
  88. if not pcd_ok:
  89. continue
  90. if not xosc_ok:
  91. temp,xosc_ok = judge_pcd_xosc(str(obj1.key))
  92. if not xosc_ok:
  93. continue
  94. time.sleep(1)
  95. logging.info("发送:%s", prefix)
  96. json_content = bucket.get_object(str(obj1.key)).read()
  97. json_object = json.loads(json_content)
  98. if 'secretKey' not in json_object:
  99. logging.warning("缺少'secretKey'键")
  100. data1 = {
  101. "equipmentNo": json_object['equipmentNo'],
  102. "secretKey": prefix.split('/')[1].split('-')[1]
  103. }
  104. logging.info("数据已处理:%s", data1)
  105. else:
  106. data1 = {"equipmentNo": json_object['equipmentNo'],"secretKey": json_object['secretKey']}
  107. # 如果任何一个键不存在,则记录日志或执行其他错误处理
  108. json_data1 = json.dumps(data1)
  109. logging.info("授权接口请求中: %s" % url1_private)
  110. logging.info("授权发送参数为: %s" % str(data1))
  111. request1 = urllib2.Request(url1_private, json_data1,headers={'Content-Type': 'application/json'})
  112. response1 = urllib2.urlopen(request1)
  113. result_json1 = response1.read()
  114. result_object1 = json.loads(result_json1)
  115. logging.info("授权接口请求结果为: %s", result_object1)
  116. access_token = result_object1.get('data').get('accessToken')
  117. old_date = json_object['dataName']
  118. data_size = bucket.get_object_meta(json_object['rosBagPath']).content_length
  119. equipment_no = json_object['equipmentNo']
  120. old_file_path = json_object['filePath']
  121. old_ros_bag_path = json_object['rosBagPath']
  122. task_id = json_object['taskId']
  123. trigger_id = json_object['triggerId']
  124. check = []
  125. # 检查 'check' 键的值是列表还是字符串
  126. if 'check' in json_object:
  127. if isinstance(json_object['check'], list):
  128. # 如果 'check' 是列表,则直接使用它(或者根据需要处理)
  129. check = json_object['check'] # 如果不需要修改列表,可以直接赋值
  130. elif isinstance(json_object['check'], basestring):
  131. # 如果 'check' 是字符串,则将其添加到空列表中
  132. check = [json_object['check']]
  133. else:
  134. # 如果 'check' 不是列表也不是字符串,记录日志或进行其他错误处理
  135. logging.warning("'check' 的类型不是列表也不是字符串,类型为:%s", type(json_object['check']))
  136. else:
  137. # 如果 'check' 键不存在,记录日志或进行其他错误处理
  138. logging.warning("'check' 键在 json_object 中不存在")
  139. check_order = ['自车数据缺失', '不在道路范围', '无规划路径', '目标点缺失','点云缺失', '点云丢帧', '解析程序错误', '还原程序错误', '评价程序错误',"正常"]
  140. check_order_dict = dict((item, idx) for idx, item in enumerate(check_order))
  141. check = sorted(check, key=lambda x: check_order_dict.get(x, float('inf')))
  142. if len(check) > 1:
  143. check = [item for item in check if item != "正常"]
  144. check = ','.join(check) # 数组元素拼接成字符串序列
  145. if old_date is None:
  146. old_date = ''
  147. # 将时区统一(室外不需要需要加8,根据机器人终端的时区判断)
  148. # new_date = add_hour(old_date, 8)
  149. new_date = old_date
  150. old_delete_list = []
  151. callback_done_oss_key = ''
  152. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  153. old_delete_list.append(str(obj_old.key))
  154. if 'callback.json' in str(obj_old.key):
  155. callback_done_oss_key = str(obj_old.key).replace(old_date, new_date).replace('callback.json','callback_done.json')
  156. # todo 时区不变也就不需要移动文件了
  157. # else:
  158. # bucket.copy_object(bucket_name, str(obj_old.key),
  159. # str(obj_old.key).replace(old_date, new_date))
  160. # bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  161. # bucket.delete_object(old_ros_bag_path)
  162. # bucket.batch_delete_objects(old_delete_list)
  163. if 'userId' in json_object:
  164. logging.info("json_object 包含 'userId' 字段,值为:", json_object['userId'])
  165. data2 = {
  166. 'userId': json_object['userId'],
  167. "dataName": new_date,
  168. "dataSize": data_size,
  169. "equipmentNo": equipment_no,
  170. "filePath": old_file_path.replace(old_date, new_date),
  171. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  172. "taskId": task_id,
  173. "triggerId": trigger_id,
  174. "check":check
  175. }
  176. else:
  177. logging.info("json_object 不包含 'userId' 字段")
  178. data2 = {
  179. "dataName": new_date,
  180. "dataSize": data_size,
  181. "equipmentNo": equipment_no,
  182. "filePath": old_file_path.replace(old_date, new_date),
  183. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  184. "taskId": task_id,
  185. "triggerId": trigger_id,
  186. "check":check
  187. }
  188. json_data2 = json.dumps(data2)
  189. bucket.put_object(callback_done_oss_key, unicode(json_data2))
  190. logging.info("回调接口请求中:%s" % url2_private)
  191. logging.info("回调接口发送参数为: %s" % str(data2))
  192. request2 = urllib2.Request(url2_private, json_data2,headers={'Content-Type': 'application/json','authorization': access_token})
  193. response2 = urllib2.urlopen(request2)
  194. result_json2 = response2.read()
  195. result_object2 = json.loads(result_json2)
  196. logging.info("回调接口请求结果为: %s", result_object2)
  197. except Exception as e:
  198. logging.exception("局部异常处理: %s" % str(e))
  199. continue
  200. time.sleep(10)
  201. except Exception as e:
  202. logging.exception("全局错误处理: %s" % str(e))