csv-pjibot_delivery.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # -*- coding: utf-8 -*-
  2. # ------- 全局配置 -------
  3. import sys
  4. reload(sys)
  5. sys.setdefaultencoding('utf8')
  6. import os
  7. import subprocess
  8. import time
  9. import oss2
  10. import json
  11. import io
  12. import logging
  13. from resource import bagtocsv_robot
  14. from utils import json_utils
  15. # 创建阿里云对象
  16. auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
  17. endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
  18. bucket = oss2.Bucket(auth, endpoint, 'pji-bucket1')
  19. path1 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/'
  20. path2 = '/mnt/disk001/pdf_outdoor/run/'
  21. path3 = '/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/csv/'
  22. logging.basicConfig(filename=path1 + 'log/csv-pjibot_delivery.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
  23. key1 = 'pjibot_delivery/'
  24. sleep_time = 30 # 每多少秒扫描一次
  25. error_bag_json = "/mnt/disk001/dcl_data_process/src/python2/pjibot_delivery/csv-errorBag.json"
  26. # ------- 全局配置 -------
  27. def parse_csv(data_bag, parse_prefix, local_parse_dir, local_delete_list):
  28. try:
  29. bagtocsv_robot.parse(data_bag, local_parse_dir + '/csv/')
  30. bagname = data_bag.split('/')[-1].split('.')[0]
  31. local_csv_dir = os.path.join(local_parse_dir + '/csv/', bagname) # 最终生成四个csv文件和output.json的目录
  32. # ------- 处理 output.json - 开始 -------
  33. outputs = []
  34. try:
  35. output_json_path = str(local_csv_dir)+'/output.json'
  36. if os.path.exists(output_json_path):
  37. outputs = json_utils.parse_json_to_string_array(output_json_path)
  38. # 2 将 output.json 添加到 callback.json 的 check 字段
  39. callback_json_oss_key = parse_prefix+'callback.json'
  40. callback_json_local = local_csv_dir+'/callback.json'
  41. bucket.get_object_to_file(callback_json_oss_key, callback_json_local)
  42. with io.open(callback_json_local, 'r', encoding='utf-8') as f:
  43. data = json.load(f)
  44. if 'check' not in data:
  45. data['check'] = []
  46. data['check'].extend(outputs)
  47. data['check'] = list(set(data['check'])) # 去重
  48. json_data = json.dumps(data, ensure_ascii=False, indent=4)
  49. with io.open(callback_json_local, 'w', encoding='utf-8') as f:
  50. f.write(unicode(json_data))
  51. bucket.put_object_from_file(callback_json_oss_key, callback_json_local)
  52. except Exception as e3:
  53. pass
  54. # ------- 处理 output.json - 结束 -------
  55. csv_file_name1 = 'trajectory_pji'
  56. local_csv_file_path1 = str(local_csv_dir) + '/' + str(csv_file_name1) + '.csv'
  57. oss_csv_object_key1 = parse_prefix + csv_file_name1 + '.csv'
  58. if os.path.exists(local_csv_file_path1):
  59. bucket.put_object_from_file(oss_csv_object_key1, local_csv_file_path1)
  60. else:
  61. logging.error("没有 trajectory_pji.csv")
  62. json_utils.add_error(parse_prefix,error_bag_json)
  63. csv_file_name2 = 'ego_pji'
  64. local_csv_file_path2 = str(local_csv_dir) + '/' + str(csv_file_name2) + '.csv'
  65. oss_csv_object_key2 = parse_prefix + csv_file_name2 + '.csv'
  66. if os.path.exists(local_csv_file_path2):
  67. bucket.put_object_from_file(oss_csv_object_key2, local_csv_file_path2)
  68. else:
  69. logging.error("没有 ego_pji.csv")
  70. json_utils.add_error(parse_prefix,error_bag_json)
  71. if '目标点缺失' in outputs:
  72. logging.error("报错【目标点缺失】,不上传targetposition.csv了")
  73. json_utils.add_error(parse_prefix,error_bag_json)
  74. else:
  75. csv_file_name3 = 'targetposition'
  76. local_csv_file_path3 = str(local_csv_dir) + '/' + str(csv_file_name3) + '.csv'
  77. oss_csv_object_key3 = parse_prefix + csv_file_name3 + '.csv'
  78. if os.path.exists(local_csv_file_path3):
  79. bucket.put_object_from_file(oss_csv_object_key3, local_csv_file_path3)
  80. else:
  81. logging.error("没有 targetposition.csv")
  82. json_utils.add_error(parse_prefix,error_bag_json)
  83. csv_file_name4 = 'objects_pji'
  84. local_csv_file_path4 = str(local_csv_dir) + '/' + str(csv_file_name4) + '.csv'
  85. oss_csv_object_key4 = parse_prefix + csv_file_name4 + '.csv'
  86. if os.path.exists(local_csv_file_path4):
  87. bucket.put_object_from_file(oss_csv_object_key4, local_csv_file_path4)
  88. else:
  89. logging.error("没有 objects_pji.csv")
  90. json_utils.add_error(parse_prefix,error_bag_json)
  91. csv_file_name5 = 'drive'
  92. local_csv_file_path5 = str(local_csv_dir) + '/' + str(csv_file_name5) + '.csv'
  93. oss_csv_object_key5 = parse_prefix + csv_file_name5 + '.csv'
  94. if os.path.exists(local_csv_file_path5):
  95. bucket.put_object_from_file(oss_csv_object_key5, local_csv_file_path5)
  96. else:
  97. logging.error("没有 drive.csv")
  98. json_utils.add_error(parse_prefix,error_bag_json)
  99. # ------- 生成pdf - 开始 -------
  100. pdf_local_path = str(local_csv_dir) + '/report.pdf'
  101. can_pdf = True
  102. for output in outputs:
  103. if str(output) in ['自车数据缺失','无规划路径']:
  104. logging.error("【自车数据缺失】或【无规划路径】导致无法生成评价报告PDF")
  105. can_pdf = False
  106. if can_pdf:
  107. os.chdir(path2)
  108. command1 = ['./pji_outdoor_real',
  109. os.path.join(local_csv_dir, ''), # 注意:这里可能不需要末尾的 '/',取决于程序要求
  110. os.path.join(local_csv_dir, ''), # 同上
  111. os.path.join(local_csv_dir, 'trajectory.png'),
  112. bagname]
  113. logging.info("调用生成pdf 报告命令: %s" % ' '.join(command1))
  114. process = subprocess.Popen(command1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  115. stdout, stderr = process.communicate() # 等待命令执行完成
  116. if stdout:
  117. logging.info("命令的标准输出:")
  118. logging.info(stdout.decode('utf-8')) # Python 2 中需要解码
  119. if stderr:
  120. logging.error("命令的错误输出:")
  121. logging.error(stderr.decode('utf-8')) # Python 2 中需要解码
  122. if process.returncode != 0:
  123. logging.error("命令执行失败,退出码: %s" % process.returncode)
  124. oss_csv_object_key5 = parse_prefix + 'report.pdf'
  125. bucket.put_object_from_file(oss_csv_object_key5, pdf_local_path)
  126. logging.info("pdf 报告生成并上传完成。")
  127. # ------- 生成pdf - 结束 -------
  128. # 记得删除
  129. local_delete_list.append(local_csv_file_path1)
  130. local_delete_list.append(local_csv_file_path2)
  131. local_delete_list.append(local_csv_file_path4)
  132. local_delete_list.append(output_json_path)
  133. local_delete_list.append(pdf_local_path)
  134. local_delete_list.append(str(local_csv_dir) + '/trajectory.png')
  135. except Exception as e2:
  136. logging.exception("生成csv报错: %s", e2)
  137. json_utils.add_error(parse_prefix,error_bag_json)
  138. if __name__ == '__main__':
  139. while True:
  140. logging.info("开始新一轮扫描:%s " % key1)
  141. try:
  142. local_delete_list = []
  143. oss_delete_list = []
  144. prefix_list = []
  145. # 2 获取已经上传完成的所有目录并分组
  146. for obj1 in oss2.ObjectIterator(bucket, prefix=key1):
  147. if 'data_merge' in str(obj1.key) and str(obj1.key).endswith('.bag'): # data_merge下的bag是等待解析的
  148. # 获取合并后的包
  149. merged_bag_object_key = str(obj1.key)
  150. merged_bag_object_key_split = merged_bag_object_key.split('/')
  151. merged_prefix = '/'.join(merged_bag_object_key_split[:-1]) # data_merge 目录
  152. parse_prefix = merged_prefix.replace('data_merge', 'data_parse')
  153. parse_prefix_full = merged_bag_object_key.replace('data_merge', 'data_parse').replace('.bag',
  154. '/') # data_parse 目录
  155. csv1_done = False
  156. csv2_done = False
  157. csv3_done = False
  158. csv4_done = False
  159. csv5_done = False
  160. for obj2 in oss2.ObjectIterator(bucket, prefix=parse_prefix_full): # 判断 data_parse 目录下是否有解析后的文件
  161. if '/trajectory_pji.csv' in str(obj2.key):
  162. csv1_done = True
  163. if '/ego_pji.csv' in str(obj2.key):
  164. csv2_done = True
  165. if '/targetposition.csv' in str(obj2.key):
  166. csv3_done = True
  167. if '/objects_pji.csv' in str(obj2.key):
  168. csv4_done = True
  169. if '/drive.csv' in str(obj2.key):
  170. csv5_done = True
  171. if csv1_done and csv2_done and csv3_done and csv4_done and csv5_done:
  172. continue
  173. error_bag_list = json_utils.parse_json_to_string_array(error_bag_json)
  174. if parse_prefix_full in error_bag_list:
  175. continue
  176. logging.info("------- 生成场景还原csv - 开始: %s -------" % str(obj1.key))
  177. local_merged_bag_path = path3 + merged_bag_object_key
  178. local_merged_dir = '/'.join(local_merged_bag_path.split('/')[:-1])
  179. local_parse_dir = local_merged_dir.replace('data_merge', 'data_parse')
  180. if not os.path.exists(local_merged_dir):
  181. os.makedirs(local_merged_dir)
  182. if not os.path.exists(local_parse_dir):
  183. os.makedirs(local_parse_dir)
  184. merged_bag_full_name = merged_bag_object_key_split[-1]
  185. merged_bag_name = merged_bag_full_name.split('.')[0]
  186. bucket.get_object_to_file(merged_bag_object_key, local_merged_bag_path)
  187. local_delete_list.append(local_merged_bag_path)
  188. # 2 生成 pos_orig.csv 和 pos_hmi.csv
  189. parse_csv(local_merged_bag_path, parse_prefix_full, local_parse_dir, local_delete_list)
  190. logging.info("------- 生成场景还原csv - 结束: %s -------" % str(obj1.key))
  191. # 删除本地临时文件
  192. if len(local_delete_list) > 0:
  193. for local_delete in local_delete_list:
  194. try:
  195. os.remove(local_delete)
  196. except Exception as e:
  197. pass
  198. except Exception as e:
  199. logging.exception("全局错误处理: %s" % str(e))
  200. time.sleep(sleep_time)