#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Dec 25 17:35:56 2023 @author: dell """ # coding: utf-8 # !/usr/bin/env python2 import os import rosbag import csv import math import sys import time def parsebag(input_dir, output_dir): dic_object_detection = ['Time', 'Strg_Angle', 'Accel_Pos', 'BrkPel_Pos', 'Real_Speed', 'latitude', 'longitude', 'angular_velocity_z', 'accel_x'] object_detection_file = open(output_dir + "/" + "drive.csv", 'w') writer_object_detection = csv.writer(object_detection_file) writer_object_detection.writerow(dic_object_detection) frame_max = sys.maxsize count_data_read = 1 count_cicv_location = 1 with rosbag.Bag(input_dir, 'r') as bag: latitude = 0 longitude = 0 speed = 0 ActStrWhAng = 0 AccPed1 = 0 BrkPedSts = 0 angular_velocity_z = 0 accel_x = 0 for topic, msg, t in bag.read_messages(topics=['/cicv_location', '/data_read']): # t代表时间 # print(topic) # print('-------------------------------------------------') if topic == "/cicv_location": # /pji_gps #/cicv_location count_cicv_location += 1 latitude = msg.latitude longitude = msg.longitude speed = pow(pow(msg.velocity_x, 2) + pow(msg.velocity_y, 2), 0.5) * 3.6 accel_x = msg.accel_x * 9.8 angular_velocity_z = msg.angular_velocity_z if count_cicv_location % 10 == 0 and latitude != 0 and longitude != 0: message_location = [str(t)[:-6], ActStrWhAng, AccPed1, BrkPedSts, speed, latitude, longitude, angular_velocity_z, accel_x] writer_object_detection.writerow(message_location) if topic == "/data_read": # 10hz ActStrWhAng = msg.ActStrWhAng AccPed1 = msg.AccPed1 BrkPedSts = msg.BrkPedSts count_data_read += 1 if count_data_read % 10 == 0 and latitude != 0 and longitude != 0: message_data = [str(t)[:-6], msg.ActStrWhAng, msg.AccPed1, msg.BrkPedSts, speed, latitude, longitude, angular_velocity_z, accel_x] writer_object_detection.writerow(message_data) object_detection_file.close() # location_file.close() def parse(input_dir, output_dir): bagname = input_dir.split('/')[-1].split('.bag')[0] output_dir = os.path.join(output_dir, bagname) if not os.path.exists(output_dir): os.makedirs(output_dir) parsebag(input_dir, output_dir) return output_dir