callback-pjisuv.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import urllib2
  5. import oss2
  6. from datetime import datetime, timedelta
  7. import logging
  8. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjisuv/'
  9. logging.basicConfig(filename=path1 + 'log/callback-pjisuv.log', level=logging.INFO,
  10. format='%(asctime)s - %(levelname)s - %(message)s')
  11. key1 = 'pjisuv/'
  12. sleep_time = 30 # 每多少秒扫描一次
  13. url1_private = "http://10.14.86.127:9081/device/auth"
  14. url2_private = "http://10.14.86.127:9081/device/data/callback"
  15. error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjisuv/callback-errorBag.json"
  16. def parse_json_to_string_array(file_path):
  17. try:
  18. # 打开并读取JSON文件(Python 2中不支持encoding参数,需要使用codecs模块或处理文件读取后的编码)
  19. with open(file_path, 'r') as file:
  20. # 读取文件内容
  21. file_content = file.read()
  22. # 解析JSON内容(Python 2中json.loads用于解析字符串)
  23. data = json.loads(file_content.decode('utf-8')) # 假设文件是UTF-8编码,这里需要手动解码
  24. # 检查数据是否是一个列表,并且列表中的元素是否是字符串
  25. if isinstance(data, list):
  26. for item in data:
  27. if not isinstance(item, basestring): # Python 2中字符串类型包括str和unicode,用basestring检查
  28. raise ValueError("JSON数组中的元素不是字符串")
  29. return data
  30. else:
  31. return []
  32. except Exception as e:
  33. return []
  34. def list_to_json_file(data, file_path):
  35. """
  36. 将列表转换为JSON格式并写入指定的文件路径。
  37. 如果文件已存在,则覆盖它。
  38. 参数:
  39. data (list): 要转换为JSON的列表。
  40. file_path (str): 要写入JSON数据的文件路径。
  41. """
  42. # 将列表转换为JSON格式的字符串,并确保输出为UTF-8编码的字符串
  43. json_data = json.dumps(data, ensure_ascii=False, indent=4)
  44. json_data_utf8 = json_data.encode('utf-8') # 编码为UTF-8
  45. # 以写入模式打开文件,如果文件已存在则覆盖
  46. with open(file_path, 'w') as file:
  47. # 将UTF-8编码的JSON字符串写入文件
  48. file.write(json_data_utf8)
  49. def add_hour(date_string, hour_number):
  50. original_date = datetime.strptime(date_string, "%Y-%m-%d-%H-%M-%S")
  51. new_date = original_date + timedelta(hours=hour_number)
  52. return new_date.strftime("%Y-%m-%d-%H-%M-%S")
  53. if __name__ == '__main__':
  54. # 1 登录验证 。
  55. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  56. # 2 填写自定义域名,例如example.com。获取桶。
  57. # 3 填写Bucket名称,并设置is_cname=True来开启CNAME。CNAME是指将自定义域名绑定到存储空间。
  58. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  59. bucket_name = 'open-bucket'
  60. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  61. while True:
  62. logging.info("开始新一轮扫描")
  63. local_delete_list = []
  64. oss_delete_list = []
  65. upload_completed_prefix_list = []
  66. # 4 获取即将被合并的bag目录
  67. logging.info("开始扫描目录: %s" % str(key1))
  68. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  69. if 'callback.json' in str(obj1.key):
  70. time.sleep(1)
  71. prefix = '/'.join(str(obj1.key).split('/')[:-1])
  72. pos_orig_csv = False
  73. pos_hmi_csv = False
  74. drive_csv = False
  75. camera_mp4 = False
  76. pcd_forwardlook_mp4 = False
  77. pcd_overlook_mp4 = False
  78. scenario_orig_xosc = False
  79. scenario_hmi_xosc = False
  80. scenario_orig_mp4 = False
  81. scenario_hmi_mp4 = False
  82. camera_bag = False
  83. fusion_bag = False
  84. plan_bag = False
  85. control_bag = False
  86. callback_json = False
  87. for obj2 in oss2.ObjectIterator(bucket, prefix=prefix + '/'):
  88. if 'callback.json' in str(obj2.key):
  89. callback_json = True
  90. if 'camera.mp4' in str(obj2.key):
  91. camera_mp4 = True
  92. if 'drive.csv' in str(obj2.key):
  93. drive_csv = True
  94. if 'pcd_forwardlook.mp4' in str(obj2.key):
  95. pcd_forwardlook_mp4 = True
  96. if 'pcd_overlook.mp4' in str(obj2.key):
  97. pcd_overlook_mp4 = True
  98. if 'pos_orig.csv' in str(obj2.key):
  99. pos_orig_csv = True
  100. if 'scenario_orig.mp4' in str(obj2.key):
  101. scenario_orig_mp4 = True
  102. if 'scenario_orig.xosc' in str(obj2.key):
  103. scenario_orig_xosc = True
  104. if not callback_json or not camera_mp4 or not drive_csv or not pcd_forwardlook_mp4 or not pcd_overlook_mp4 or not pos_orig_csv or not scenario_orig_mp4 or not scenario_orig_xosc:
  105. continue
  106. error_bag_list = parse_json_to_string_array(error_bag_json)
  107. if str(obj1.key) in error_bag_list:
  108. continue
  109. time.sleep(2)
  110. logging.info("发送: %s" % str(obj1.key))
  111. # 1 获取json内容
  112. json_content = bucket.get_object(str(obj1.key)).read()
  113. # 2 获取token
  114. json_object = json.loads(json_content)
  115. data1 = {
  116. "equipmentNo": json_object['equipmentNo'],
  117. "secretKey": json_object['secretKey']
  118. }
  119. json_data1 = json.dumps(data1)
  120. request1 = urllib2.Request(url1_private, json_data1,
  121. headers={'Content-Type': 'application/json'})
  122. response1 = urllib2.urlopen(request1)
  123. result_json1 = response1.read()
  124. result_object1 = json.loads(result_json1)
  125. try:
  126. access_token = result_object1.get('data').get('accessToken')
  127. logging.info("bag文件为:%s" % str(json_object['rosBagPath']))
  128. old_date = json_object['dataName']
  129. equipment_no = json_object['equipmentNo']
  130. old_file_path = json_object['filePath']
  131. old_ros_bag_path = json_object['rosBagPath']
  132. task_id = json_object['taskId']
  133. trigger_id = json_object['triggerId']
  134. except Exception as e:
  135. logging.exception("callback报错: %s" % str(e))
  136. continue
  137. upload = False
  138. if 'userId' in json_object:
  139. logging.info("手动上传的数据")
  140. upload = True
  141. old_date = ''
  142. else:
  143. logging.info("自动采集的数据")
  144. upload = False
  145. old_delete_list = []
  146. old_delete_callback = ''
  147. new_date = ''
  148. # 复制 data_parse
  149. for obj_old in oss2.ObjectIterator(bucket, prefix=old_file_path):
  150. old_delete_list.append(str(obj_old.key))
  151. if 'callback.json' in str(obj_old.key):
  152. if not upload:
  153. new_date = add_hour(old_date, 8)
  154. # 将时区统一
  155. bucket.copy_object(bucket_name, str(obj_old.key),
  156. str(obj_old.key).replace(old_date, new_date).replace('callback.json',
  157. 'callback_done.json'))
  158. else:
  159. old_delete_callback = str(obj_old.key)
  160. bucket.copy_object(bucket_name, str(obj_old.key),
  161. str(obj_old.key).replace('callback.json',
  162. 'callback_done.json'))
  163. else:
  164. if not upload:
  165. new_date = add_hour(old_date, 8)
  166. bucket.copy_object(bucket_name, str(obj_old.key),
  167. str(obj_old.key).replace(old_date, new_date))
  168. # 处理是否上传
  169. if not upload:
  170. try:
  171. bucket.copy_object(bucket_name, old_ros_bag_path, old_ros_bag_path.replace(old_date, new_date))
  172. bucket.delete_object(old_ros_bag_path)
  173. bucket.batch_delete_objects(old_delete_list)
  174. except Exception as e:
  175. error_bag_list = parse_json_to_string_array(error_bag_json)
  176. error_bag_list.append(str(obj1.key))
  177. list_to_json_file(error_bag_list, error_bag_json)
  178. continue
  179. else:
  180. bucket.delete_object(old_delete_callback)
  181. if upload:
  182. data2 = {
  183. 'userId': json_object['userId'],
  184. "dataName": json_object['dataName'],
  185. "dataSize": json_object['dataSize'],
  186. "equipmentNo": equipment_no,
  187. "filePath": old_file_path.replace(old_date, new_date),
  188. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  189. "taskId": task_id,
  190. "triggerId": trigger_id
  191. }
  192. else:
  193. data2 = {
  194. "dataName": new_date,
  195. "dataSize": bucket.get_object_meta(json_object['rosBagPath']).content_length,
  196. "equipmentNo": equipment_no,
  197. "filePath": old_file_path.replace(old_date, new_date),
  198. "rosBagPath": old_ros_bag_path.replace(old_date, new_date),
  199. "taskId": task_id,
  200. "triggerId": trigger_id
  201. }
  202. json_data2 = json.dumps(data2)
  203. logging.info("回调接口请求中:%s" % url2_private)
  204. request2 = urllib2.Request(url2_private, json_data2,
  205. headers={'Content-Type': 'application/json',
  206. 'authorization': access_token})
  207. response2 = urllib2.urlopen(request2)
  208. result_json2 = response2.read()
  209. result_object2 = json.loads(result_json2)
  210. logging.info("回调接口请求结果为: %s", result_object2)
  211. time.sleep(sleep_time)