123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import os
- import rosbag
- import csv
- import math
- import sys
- import time
- def parsebag(input_dir, output_dir):
-
- dic_object_detection = ['Time','FrameID','HeadingAngle','ID','East', 'North' ,'position_z', 'altitude', 'AbsVel','Type']
- object_detection_file = open(output_dir + "/"+"pos_hmi.csv", 'w')
- writer_object_detection = csv.writer(object_detection_file)
- writer_object_detection.writerow(dic_object_detection)
- frame_max=sys.maxsize
- with rosbag.Bag(input_dir ,'r') as bag:
-
- framenum = 1
- hasLoc = False
- for topic,msg,t in bag.read_messages():
-
- if topic == "/cicv_location":
- hasLoc = True
- ego_speed=math.sqrt(msg.velocity_x**2+msg.velocity_y**2+msg.velocity_z**2)
- message_location =[str(t)[:-6],framenum,msg.yaw,'-1',msg.position_x,msg.position_y,msg.position_z,msg.altitude,ego_speed,'2']
- global pos_x ,pos_y,pos_z
- pos_x=msg.position_x
- pos_y=msg.position_y
- pos_z=msg.position_z
- egocar_yaw=msg.yaw
-
-
- if topic == "/tpperception/hmi":
- output_type=' '
- if not hasLoc :
-
- continue
-
- m=msg.header.sequence_num
- if m<frame_max:
- frame_max=m
- msg_l=len(msg.objs)
-
-
- for i in range(msg_l):
-
- obj_x=msg.objs[i].xabs
- obj_y=msg.objs[i].yabs
- obj_z=msg.objs[i].z+pos_z
-
- if msg.objs[i].type==0:
- output_type='2'
- elif msg.objs[i].type==1:
- output_type='4'
- elif msg.objs[i].type==2:
- output_type='7'
- elif msg.objs[i].type==3:
- output_type='8'
- elif msg.objs[i].type==4:
- output_type='10'
- elif msg.objs[i].type==5:
- output_type='103'
- message_obj=[message_location[0], framenum, msg.objs[i].heading, str(msg.objs[i].id), obj_x, obj_y, obj_z,message_location[-3], msg.objs[i].speed, output_type]
- writer_object_detection.writerow(message_obj)
- writer_object_detection.writerow(message_location)
- framenum+=1
-
-
-
-
- object_detection_file.close()
- if __name__ == "__main__":
- input_dir = sys.argv[1]
-
- bagname=input_dir.split('/')[-1].split('.')[0]
- output_dir = sys.argv[2]
-
- output_dir=os.path.join(output_dir, bagname)
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
-
- try:
- parsebag(input_dir, output_dir)
- print('successfully analysis '+input_dir)
- except Exception as e:
- print(e)
- def parse(input_dir, output_dir):
-
- bagname=input_dir.split('/')[-1].split('.')[0]
-
- output_dir=os.path.join(output_dir, bagname)
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
-
- try:
- parsebag(input_dir, output_dir)
- print('successfully analysis '+input_dir)
- return output_dir
- except Exception as e:
- print(e)
|