camera-pjibot_guide.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. import oss2
  5. from resource import parse_pji_image
  6. from utils import json_utils
  7. import logging
  8. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot/'
  9. logging.basicConfig(filename=path1 + 'log/camera-pjibot_guide.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  10. key1 = 'pjibot/'
  11. sleep_time = 30 # 每多少秒扫描一次
  12. error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjibot/camera-errorBag.json"
  13. def get_paths(local_parse_dir,parse_prefix):
  14. local_mp4_dir1 = local_parse_dir + '/camera1/'
  15. mp4_file_name1 = 'camera1'
  16. local_mp4_file_path1 = local_mp4_dir1 + mp4_file_name1 + '.mp4'
  17. oss_csv_object_key1 = parse_prefix + 'camera.mp4'
  18. local_mp4_dir2 = local_parse_dir + '/camera2/'
  19. mp4_file_name2 = 'camera2'
  20. local_mp4_file_path2 = local_parse_dir + '/camera2/' + mp4_file_name2 + '.mp4'
  21. oss_csv_object_key2 = parse_prefix + mp4_file_name2 + '.mp4'
  22. return local_mp4_dir1,local_mp4_file_path1,oss_csv_object_key1, local_mp4_dir2,local_mp4_file_path2, oss_csv_object_key2
  23. def parse_to_mp4(merged_bag_file_path, parse_prefix, local_parse_dir, local_delete_list):
  24. try:
  25. local_mp4_dir1,local_mp4_file_path1,oss_csv_object_key1, local_mp4_dir2,local_mp4_file_path2, oss_csv_object_key2 = get_paths(local_parse_dir,parse_prefix)
  26. parse_pji_image.parse1('/ob_camera_01/color/image_raw', merged_bag_file_path,local_mp4_dir1)
  27. bucket.put_object_from_file(oss_csv_object_key1, local_mp4_file_path1)
  28. logging.info("上传 camera1.mp4 成功: %s" , str(oss_csv_object_key1))
  29. parse_pji_image.parse2('/ob_camera_02/color/image_raw', merged_bag_file_path,local_mp4_dir2)
  30. bucket.put_object_from_file(oss_csv_object_key2, local_mp4_file_path2)
  31. logging.info("上传 camera2.mp4 成功: %s" , str(oss_csv_object_key1))
  32. local_delete_list.append(local_mp4_file_path1)
  33. local_delete_list.append(local_mp4_file_path2)
  34. except Exception as e2:
  35. logging.exception("生成摄像头视频报错,添加到 camera-errorBag.json: %s", e2)
  36. json_utils.add_error(parse_prefix,error_bag_json)
  37. # 当出现异常时执行的代码
  38. # ------- 获取合并之后的bag包,解析出csv -------
  39. if __name__ == '__main__':
  40. # 1 创建阿里云对象
  41. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  42. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  43. bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1')
  44. while True:
  45. logging.info("开始新一轮扫描")
  46. try:
  47. local_delete_list = []
  48. oss_delete_list = []
  49. prefix_list = []
  50. # 2 获取已经上传完成的所有目录并分组
  51. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  52. # 获取合并后的包
  53. merged_bag_object_key = str(obj1.key)
  54. if 'data_merge' in str(obj1.key) and str(obj1.key).endswith('.bag'):
  55. merged_bag_object_key_split = merged_bag_object_key.split('/')
  56. merged_prefix = '/'.join(merged_bag_object_key_split[:-1])
  57. parse_prefix = merged_prefix.replace('data_merge', 'data_parse')
  58. parse_prefix_full = merged_bag_object_key.replace('data_merge', 'data_parse')[:-4] + '/'
  59. camera_done = False
  60. camera2_done = False
  61. for obj2 in oss2.ObjectIterator(bucket, prefix=parse_prefix_full):
  62. if '/camera.mp4' in str(obj2.key):
  63. camera_done = True
  64. if '/camera2.mp4' in str(obj2.key):
  65. camera2_done = True
  66. if camera_done and camera2_done:
  67. continue
  68. error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
  69. if parse_prefix_full in error_bag_list:
  70. continue
  71. logging.info("%s 等待处理到: %s", merged_bag_object_key, parse_prefix_full)
  72. local_merged_bag_path = path1 + 'camera/' + merged_bag_object_key
  73. local_merged_dir = '/'.join(local_merged_bag_path.split('/')[:-1])
  74. local_parse_dir = local_merged_dir.replace('data_merge', 'data_parse')
  75. if not os.path.exists(local_merged_dir):
  76. os.makedirs(local_merged_dir)
  77. if not os.path.exists(local_parse_dir):
  78. os.makedirs(local_parse_dir)
  79. merged_bag_full_name = merged_bag_object_key_split[-1]
  80. merged_bag_name = merged_bag_full_name.split('.')[0]
  81. try:
  82. bucket.get_object_to_file(merged_bag_object_key, local_merged_bag_path)
  83. except Exception as e:
  84. logging.exception("下载合并后的bag包失败: %s" % str(e))
  85. local_delete_list.append(local_merged_bag_path)
  86. # 2 生成 pos_orig.csv 和 pos_hmi.csv
  87. parse_to_mp4(local_merged_bag_path, parse_prefix_full, local_parse_dir, local_delete_list)
  88. # 删除本地临时文件
  89. if len(local_delete_list) > 0:
  90. for local_delete in local_delete_list:
  91. try:
  92. os.remove(local_delete)
  93. except Exception as e:
  94. pass
  95. time.sleep(sleep_time)
  96. except Exception as e:
  97. logging.exception("全局错误处理: %s" % str(e))