# coding: utf-8 # !/usr/bin/env python2 import os import rosbag import csv import math import rospy import sys import time import numpy as np from datetime import datetime import argparse import pandas as pd def quaternion_to_euler(x, y, z, w): # 将四元数归一化 try: length = np.sqrt(x ** 2 + y ** 2 + z ** 2 + w ** 2) x /= length y /= length z /= length w /= length # 计算欧拉角 # roll = np.arctan2(2*(w*x + y*z), 1 - 2*(x**2 + y**2)) # pitch = np.arcsin(2*(w*y - z*x)) yaw = np.arctan2(2 * (w * z + x * y), 1 - 2 * (y ** 2 + z ** 2)) return yaw except: return 0 def parsehancheng(input_dir, output_dir): dic_trajectory = ['Time', 'simTime', 'simFrame', 'PointNumber', 'posX', 'posY', 'posZ', 'posH', 'TargetX', 'TargetY', 'TargetZ', 'TargetH'] trajectory_file = open(output_dir + "/" + "trajectory_pji.csv", 'w') writer_trajectory = csv.writer(trajectory_file) writer_trajectory.writerow(dic_trajectory) dic_EgoState = ['Time', 'simTime', 'simFrame', 'posX', 'posY', 'posZ', 'posH', 'speedX', 'speedH', 'cmd_speedX', 'cmd_speedH', 'obstacle', 'task_error_code'] EgoState_file = open(output_dir + "/" + "ego_pji.csv", 'w') writer_EgoState = csv.writer(EgoState_file) writer_EgoState.writerow(dic_EgoState) dic_robot_pos = ['Time', 'simtime', 'FrameID', 'HeadingAngle', 'X', 'Y', 'Z', 'latitude', 'longitude'] robot_pos_file = open(output_dir + "/" + "pos_pji.csv", 'w') writer_robot_pos = csv.writer(robot_pos_file) writer_robot_pos.writerow(dic_robot_pos) dic_objects = ['Time', 'simtime', 'FrameID', 'playerId', 'posH', 'posX', 'posY', 'posZ', 'label', 'dimX', 'dimY', 'dimZ', 'speedX', 'speedY', 'speedZ', 'speedH_X', 'speedH_Y', 'speedH_Z', 'accelX', 'accelY', 'accelZ', 'accelH_X', 'accelH_Y', 'accelH_Z'] objects_file = open(output_dir + "/" + "objects_pji.csv", 'w') writer_objects = csv.writer(objects_file) writer_objects.writerow(dic_objects) frame_max = sys.maxsize with rosbag.Bag(input_dir, 'r') as bag: # wheel_odom_flag=False # cmd_vel_flag=False first_message_time = None Frame_robot_pose = 1 obstacle_state = 0 speed_linear = 0 speed_angular = 0 cmd_speed_angular = 0 cmd_speed_linear = 0 first_message_time = None framenum_ego = 1 task_error_code = 0 # framenum_obj = 1 locationsimtime = ' ' date_time = ' ' simFrame = 1 latitude = 0 longitude = 0 ego_posx, ego_posy, ego_posz, ego_posH = 0, 0, 0, 0 for topic, msg, t in bag.read_messages( topics=['/obstacle_detection', '/wheel_odom', '/cmd_vel', '/robot_pose', '/tracking/objects', '/nav/task_feedback_info', '/robot/final_trajectory', '/gnss']): if first_message_time is None: first_message_time = t first_message_time = rospy.Time.to_sec(first_message_time) first_message_time = datetime.fromtimestamp(first_message_time) if topic == "/gnss": latitude = msg.latitude / 10000000 longitude = msg.longitude / 10000000 if topic == "/obstacle_detection": obstacle_state = msg.data # print(msg.data) if topic == "/wheel_odom": # wheel_odom_flag=True speed_linear = msg.twist.twist.linear.x * 3.6 speed_angular = msg.twist.twist.angular.z if topic == "/cmd_vel": # cmd_vel_flag=True cmd_speed_linear = msg.linear.x * 3.6 cmd_speed_angular = msg.angular.z if topic == "/nav/task_feedback_info": task_error_code = msg.task_error_code if topic == "/robot_pose": timestamp = rospy.Time.to_sec(t) date_time = datetime.fromtimestamp(timestamp) locationsimtime = (date_time - first_message_time).total_seconds() ego_posx = msg.pose.position.x poseX = ego_posx # poseX=ego_posx-88.96626338170609 ego_posy = msg.pose.position.y poseY = ego_posy # poseY=ego_posy-40.553671177476645 poseZ = msg.pose.position.z orientationX = msg.pose.orientation.x orientationY = msg.pose.orientation.y orientationZ = msg.pose.orientation.z orientationW = msg.pose.orientation.w egoyaw = quaternion_to_euler(orientationX, orientationY, orientationZ, orientationW) message_location = [date_time, locationsimtime, framenum_ego, egoyaw, poseX, poseY, poseZ, latitude, longitude] # print(f'date_time={date_time}, locationtime={locationsimtime}') writer_robot_pos.writerow(message_location) # if wheel_odom_flag and cmd_vel_flag: message_EgoState = [date_time, locationsimtime, Frame_robot_pose, poseX, poseY, poseZ, egoyaw, speed_linear, speed_angular, cmd_speed_linear, cmd_speed_angular, obstacle_state, task_error_code] writer_EgoState.writerow(message_EgoState) framenum_ego += 1 Frame_robot_pose += 1 if topic == "/tracking/objects": msg_l = len(msg.objects) if msg_l and (locationsimtime != ' ') and (date_time != ' '): # timestamp = rospy.Time.to_sec(t) # date_time = datetime.fromtimestamp(timestamp) # simtime=(date_time-first_message_time).total_seconds() for i in range(msg_l): obj_ID = msg.objects[i].id obj_x = msg.objects[i].pose.position.x # obj_x = msg.objects[i].pose.position.x-88.96626338170609 obj_y = msg.objects[i].pose.position.y # obj_y = msg.objects[i].pose.position.y-40.553671177476645 obj_z = msg.objects[i].pose.position.z obj_orientationX = msg.objects[i].pose.orientation.x obj_orientationY = msg.objects[i].pose.orientation.y obj_orientationZ = msg.objects[i].pose.orientation.z obj_orientationW = msg.objects[i].pose.orientation.w objyaw = quaternion_to_euler(obj_orientationX, obj_orientationY, obj_orientationZ, obj_orientationW) obj_lable = msg.objects[i].label obj_dimX = msg.objects[i].dimensions.x obj_dimY = msg.objects[i].dimensions.y obj_dimZ = msg.objects[i].dimensions.z velocity_linear_x = msg.objects[i].velocity.linear.x velocity_linear_y = msg.objects[i].velocity.linear.y velocity_linear_z = msg.objects[i].velocity.linear.z velocity_angular_x = msg.objects[i].velocity.angular.x velocity_angular_y = msg.objects[i].velocity.angular.y velocity_angular_z = msg.objects[i].velocity.angular.z acceleration_linear_x = msg.objects[i].acceleration.linear.x acceleration_linear_y = msg.objects[i].acceleration.linear.y acceleration_linear_z = msg.objects[i].acceleration.linear.z acceleration_angular_x = msg.objects[i].acceleration.angular.x acceleration_angular_y = msg.objects[i].acceleration.angular.y acceleration_angular_z = msg.objects[i].acceleration.angular.z message_obj = [date_time, locationsimtime, Frame_robot_pose, obj_ID, objyaw, obj_x, obj_y, obj_z, obj_lable, obj_dimX, obj_dimY, obj_dimZ, velocity_linear_x, velocity_linear_y, velocity_linear_z, velocity_angular_x, velocity_angular_y, velocity_angular_z, acceleration_linear_x, acceleration_linear_y, acceleration_linear_z, acceleration_angular_x, acceleration_angular_y, acceleration_angular_z] # print(f'{date_time}, {locationsimtime}') writer_objects.writerow(message_obj) # framenum_obj+=1 if topic == "/robot/final_trajectory": PointNumber = 1 timestamp1 = rospy.Time.to_sec(t) date_time1 = datetime.fromtimestamp(timestamp1) locationsimtime1 = (date_time1 - first_message_time).total_seconds() if ego_posx != 0 or ego_posy != 0: for i in range(len(msg.waypoints)): Targetx = msg.waypoints[i].pose.pose.position.x Targety = msg.waypoints[i].pose.pose.position.y Targetz = msg.waypoints[i].pose.pose.position.z orientationX = msg.waypoints[i].pose.pose.orientation.x orientationY = msg.waypoints[i].pose.pose.orientation.y orientationZ = msg.waypoints[i].pose.pose.orientation.z orientationW = msg.waypoints[i].pose.pose.orientation.w TargetH = quaternion_to_euler(orientationX, orientationY, orientationZ, orientationW) message_trajectory = [date_time1, locationsimtime1, simFrame, PointNumber, ego_posx, ego_posy, poseZ, egoyaw, Targetx, Targety, Targetz, TargetH] writer_trajectory.writerow(message_trajectory) PointNumber += 1 simFrame += 1 robot_pos_file.close() objects_file.close() EgoState_file.close() trajectory_file.close() # if __name__ == '__main__': def parse(input_dir, output_dir): # input_dir='/media/dell/HIKSEMI/pji_DGNC/pjioutrobot_2024-08-21-15-12-04.bag' # output_dir='/media/dell/HIKSEMI/pji_DGNC' # input_dir=sys.argv[1] # output_dir = sys.argv[2] bagname = input_dir.split('/')[-1].split('.')[0] output_dir = os.path.join(output_dir, bagname) if not os.path.exists(output_dir): os.makedirs(output_dir) parsehancheng(input_dir, output_dir)