pcdtovideo_pji_lidar.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 功能:将ROS bag文件中的点云数据转换为PCD文件,并将PCD文件转换为图像,最后生成视频。
  5. 输入:rosbag文件路径,输出路径
  6. 输出:生成的视频文件
  7. 注意:需要安装pyntcloud、matplotlib、opencv库
  8. """
  9. from __future__ import print_function
  10. import sys
  11. import matplotlib.pyplot as plt
  12. import cv2
  13. import os
  14. import subprocess
  15. import matplotlib.pyplot as plt
  16. from pyntcloud import PyntCloud
  17. import rosbag
  18. import shutil
  19. def process_pcd_to_top_view_image(pcd_path, output_path):
  20. # 使用 PyntCloud 读取点云数据
  21. cloud = PyntCloud.from_file(pcd_path)
  22. points = cloud.points
  23. plt.figure(figsize=(10, 10))
  24. ax = plt.axes(projection='3d')
  25. ax.scatter(points['x'], points['y'], points['z'], s=1, color='blue')
  26. ax.view_init(elev=90, azim=90)
  27. ax.set_xlabel('X')
  28. ax.set_ylabel('Y')
  29. ax.set_zlabel('Z')
  30. ax.set_xticks([])
  31. ax.set_yticks([])
  32. ax.set_zticks([])
  33. plt.axis('off') # 隐藏坐标轴
  34. plt.savefig(output_path, bbox_inches='tight', pad_inches=0)
  35. plt.close()
  36. def create_video_from_images(image_folder, output_video_path, frame_rate):
  37. images = [img for img in sorted(os.listdir(image_folder)) if img.endswith(".jpg")]
  38. if not images:
  39. print("没有找到任何JPG图像,请检查图像文件夹!")
  40. return
  41. frame = cv2.imread(os.path.join(image_folder, images[0]))
  42. height, width, layers = frame.shape
  43. video = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), float(frame_rate), (width, height))
  44. for image in images:
  45. frame = cv2.imread(os.path.join(image_folder, image))
  46. if frame is not None:
  47. video.write(frame)
  48. video.release()
  49. print("视频已成功保存在: ", output_video_path)
  50. def convert_bag_to_pcd(input_dir, output_dir):
  51. """ 将ROS bag文件中的点云数据转换为PCD文件 """
  52. command = 'rosrun pcl_ros bag_to_pcd ' + input_dir + ' /scan_map_icp_amcl_node/scan_point_transformed ' + output_dir
  53. process = subprocess.run(command, shell=True, text=True, capture_output=True)
  54. if process.returncode != 0:
  55. print("转换失败:", process.stderr)
  56. else:
  57. print("PCD文件已成功生成")
  58. def parse(rosbag_path, output_base_path):
  59. bag_base_name = rosbag_path.split('/')[-1].split('.')[0]
  60. lidar_output_base = os.path.join(output_base_path, bag_base_name + '_pcd_lidar')
  61. pcd_folder_path = os.path.join(lidar_output_base, 'pcd')
  62. images_folder_path = os.path.join(lidar_output_base, 'images')
  63. if not os.path.exists(pcd_folder_path):
  64. os.makedirs(pcd_folder_path)
  65. if not os.path.exists(images_folder_path):
  66. os.makedirs(images_folder_path)
  67. with rosbag.Bag(rosbag_path, 'r') as bag:
  68. num_count = 0
  69. for topic, msg, t in bag.read_messages():
  70. if topic == "/scan_map_icp_amcl_node/scan_point_transformed":
  71. num_count += 1
  72. start_time = bag.get_start_time()
  73. end_time = bag.get_end_time()
  74. # 计算rosbag的持续时间
  75. duration = end_time - start_time
  76. bagtime = duration
  77. hz = str(float(float(num_count) / bagtime))
  78. print(hz)
  79. # 转换ROS bag到PCD
  80. convert_bag_to_pcd(rosbag_path, pcd_folder_path)
  81. pcd_files = [f for f in os.listdir(pcd_folder_path) if f.endswith('.pcd')]
  82. for file_name in pcd_files:
  83. pcd_file_path = os.path.join(pcd_folder_path, file_name)
  84. image_file_path = os.path.join(images_folder_path, file_name[:-4] + ".jpg")
  85. process_pcd_to_top_view_image(pcd_file_path, image_file_path)
  86. # 从图像生成视频
  87. video_path = os.path.join(lidar_output_base, 'output_video.mp4')
  88. create_video_from_images(images_folder_path, video_path, hz)
  89. # 清理临时文件
  90. shutil.rmtree(pcd_folder_path)
  91. shutil.rmtree(images_folder_path)