oss_test.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. import oss2
  5. import logging
  6. import json
  7. path1 = '/mnt/disk001/cicv-data-closedloop/python2-pjisuv-module/'
  8. logging.basicConfig(filename=path1 + 'log/merge.log', level=logging.INFO,
  9. format='%(asctime)s - %(levelname)s - %(message)s')
  10. key1 = 'pjisuv/'
  11. key2 = 'data/'
  12. key3 = 'data_merge/'
  13. key4 = 'data_parse/'
  14. sleep_time = 60 # 每多少秒扫描一次
  15. car_list = ['pjisuv-007', 'pjisuv-012', 'pjisuv-013']
  16. # 判断是否在done_list 中
  17. def is_upload_completed(prefix, done_list):
  18. for time in done_list:
  19. if time in prefix:
  20. return True
  21. return True
  22. def merge(local_bags1, merged_prefix1, local_merged_dir1, merged_bag_name1):
  23. try:
  24. all_bag_path1 = local_merged_dir1 + merged_bag_name1
  25. with Bag(all_bag_path1, 'w', compression=compress_way) as o:
  26. for i in range(len(local_bags1)):
  27. with Bag(local_bags1[i], 'r') as ib:
  28. for topic, msg, t in ib:
  29. o.write(topic, msg, t)
  30. bucket.put_object_from_file(merged_prefix1 + merged_bag_name1, all_bag_path1)
  31. logging.info('上传合并包到 %s' % merged_prefix1 + merged_bag_name1)
  32. return all_bag_path1
  33. except Exception as e1:
  34. logging.exception("bag包合并报错: %s", e1)
  35. def remove_element_of_oss_json(oss_json_key, time):
  36. temp_file_path_remove = 'remove-node1Done.json'
  37. # 1 读取oss的json文件为列表
  38. bucket.get_object_to_file(oss_json_key, temp_file_path_remove)
  39. with open(temp_file_path_remove, 'r') as f_remove:
  40. remove_node1_done_list = json.load(f_remove)
  41. remove_node1_done_list.remove(time)
  42. json_str = json.dumps(remove_node1_done_list, indent=4, ensure_ascii=False)
  43. bucket.put_object(oss_json_key, json_str.encode('utf-8'))
  44. os.remove(temp_file_path_remove)
  45. '''
  46. cname:http://open-bucket.oss.icvdc.com
  47. keyid:n8glvFGS25MrLY7j
  48. secret:xZ2Fozoarpfw0z28FUhtg8cu0yDc5d
  49. 预设OSS路径: oss://open-bucket
  50. 内网endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
  51. oss桶名: open-bucket
  52. '''
  53. # ------- 获取未合并的bag包,合并 -------
  54. if __name__ == '__main__':
  55. # 1 创建阿里云对象
  56. cname = 'http://open-bucket.oss.icvdc.com'
  57. bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
  58. # auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  59. # endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  60. bucket = oss2.Bucket(auth, endpoint, 'open-bucket')
  61. while True:
  62. try:
  63. logging.info("开始新一轮扫描")
  64. for car in car_list:
  65. prefix_list = []
  66. upload_completed_prefix_list_node1 = []
  67. upload_completed_prefix_list_node2 = []
  68. # todo 如何往回更新json需要确认一下,直接更新会和上传程序出现数据竞争
  69. node1_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node1Done.json'
  70. node2_done_json_key = 'pjisuv/' + car + '/' + key2 + 'node2Done.json'
  71. # 1 判断两个json是否存在
  72. exist = bucket.object_exists(node1_done_json_key)
  73. if not exist:
  74. logging.info('不存在文件 %s' % node1_done_json_key)
  75. time.sleep(2)
  76. continue
  77. exist = bucket.object_exists(node2_done_json_key)
  78. if not exist:
  79. logging.info('不存在文件 %s' % node2_done_json_key)
  80. time.sleep(2)
  81. continue
  82. # 2 读取两个json为数组
  83. temp_file_path1 = car + '-node1Done.json'
  84. temp_file_path2 = car + '-node2Done.json'
  85. try:
  86. # node1
  87. bucket.get_object_to_file(node1_done_json_key, temp_file_path1)
  88. with open(temp_file_path1, 'r') as f1:
  89. node1_done_list = json.load(f1)
  90. except Exception as e1:
  91. logging.exception("下载json文件【%s】报错: 【%s】", node1_done_json_key, str(e1))
  92. time.sleep(2)
  93. continue
  94. try:
  95. # node2
  96. bucket.get_object_to_file(node2_done_json_key, temp_file_path2)
  97. with open(temp_file_path2, 'r') as f2:
  98. node2_done_list = json.load(f2)
  99. except Exception as e2:
  100. logging.exception("下载json文件【%s】报错: 【%s】", node2_done_json_key, str(e2))
  101. time.sleep(2)
  102. continue
  103. os.remove(temp_file_path1)
  104. os.remove(temp_file_path2)
  105. logging.info("扫描车辆【%s】的数据", car)
  106. for obj1 in oss2.ObjectIterator(bucket, prefix=key1 + car + "/" + key2, delimiter='/'):
  107. if str(obj1.key).count('/') == 4: # pjisuv/pjisuv-007/data/node1_2023-12-20-02-16-56_obction_10/
  108. if 'node1' in obj1.key:
  109. print(obj1.key)
  110. if 'node2' in obj1.key:
  111. print(obj1.key)
  112. time.sleep(sleep_time)
  113. except Exception as e:
  114. logging.exception("全局异常处理: %s", str(e))