import json import math import shutil import os import subprocess import pandas as pd import cv2 import numpy as np import requests geocode_base = 'http://restapi.amap.com/v3/geocode/regeo' def geocode(location): parameters = {'output': 'json', 'key': 'cdf24f471cc579ba6d5dd1f9b856ee31', 'location': location, 'extensions': 'base'} response = requests.get(geocode_base, parameters) # print('HTTP 请求的状态: %s' % response.status_code) return response.json() def convert(locations): parameters = {'output': 'json', 'key': 'cdf24f471cc579ba6d5dd1f9b856ee31', 'locations': locations, 'coordsys': 'gps'} base = 'https://restapi.amap.com/v3/assistant/coordinate/convert' response_cor = requests.get(base, parameters).json() result_location = response_cor['locations'] para_location = result_location.replace(';', '|') response_location = geocode(para_location) status = response_location['status'] if status == '1': try: address = geocode(para_location)['regeocode']['formatted_address'] return status, (result_location, address) except: return '0', None else: return status, None def get_position_single(base_df, inset_df): inset_df['Longitude'] = inset_df['Longitude'].astype('str') inset_df['Latitude'] = inset_df['Latitude'].astype('str') inset_df = inset_df[['Longitude', 'Latitude']] for index, twenty_df in inset_df.groupby(lambda x: math.floor(x / 20)): twenty_df = twenty_df.T cor_list = twenty_df.values.tolist() cor_total = '' for i, v in enumerate(cor_list[0]): cor_total += f'{v},{cor_list[1][i]}|' cor_total = cor_total[:-1] status, convert_result = convert(cor_total) if status == '1': result_cor = convert_result[0] result_cor_list = result_cor.split(';') result_cor_list = list(map(lambda x: x.split(','), result_cor_list)) twenty_df = pd.DataFrame(result_cor_list, columns=['Longitude', 'Latitude']) base_df = pd.concat([base_df, twenty_df]).reset_index(drop=True) return base_df[1:].reset_index(drop=True) def draw_points(pic, filename): f = open(filename, "r") ff = f.readline() for i in range(0, len(ff)): point = ff[i].split()[:4] point_x = float(point[0]) point_y = -float(point[1]) if math.isnan(point_x): pass else: cv2.circle(pic, (int(800 - point_x * 10), int(400 - point_y * 10)), 0, (250, 250, 250), 0) def pcd_image(pcd_path): for root, dirs, files in os.walk(pcd_path): # create pcd_ascii and pcd_jpg folders if "pcd" in dirs: if "jpg" not in dirs: jpg_path = os.path.join(root, 'jpg') os.mkdir(jpg_path) # find .pcd files and do the transfer twice for file in files: if "pcd" == root[-3:] and ".pcd" == file[-4:]: inputFilePath = root + "/" + file outputFilePath = root[:-3] + "jpg/" + file[:-3] + "jpg" pic = np.zeros([800, 1600, 3]) draw_points(pic, inputFilePath) cv2.imwrite(outputFilePath, pic) else: break return jpg_path def generate_video(img_path, video_name): if video_name == 'CAMERA0.mp4': img_name = 'image' else: img_name = 'jpg' image_list = get_file(img_path, img_name) for item in image_list: res_path = os.path.join(item, video_name) if os.path.exists(res_path): return all_image = list(map(int, list(map(lambda x: x.split('.')[0], os.listdir(os.path.join(img_path, img_name)))))) all_image.sort(reverse=False) frame = round(len(all_image) / ((all_image[-1] - all_image[0]) / 1000), 2) command = f"ffmpeg -f image2 -r {frame} -pattern_type glob -i '" + item + f"/{img_name}/*.jpg" + "' -y '" + res_path + "'" process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) process.wait() def get_file(path, key_file_name): files = os.listdir(path) file_list = [] if key_file_name not in files: for name in files: next_path = os.path.join(path, name) if os.path.isdir(next_path): file_list.extend(get_file(next_path, key_file_name)) else: file_list.append(path) file_list = list(set(file_list)) return file_list def copy_files(source, target, dir_name): files_source_path = os.path.join(source, dir_name) if dir_name == 'simulation': dir_name = dir_name.upper() files_target_path = os.path.join(target, dir_name) if os.path.isdir(files_source_path): shutil.copytree(files_source_path, files_target_path) else: shutil.copy(files_source_path, files_target_path) return files_target_path if __name__ == '__main__': root_path = '/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23' output_path = '/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23/output' # xodr_list = ['/home/hancheng/simulation_dynamic/od/jiuzhonglu.xodr', '/home/hancheng/simulation_dynamic/od/gaosu.xodr'] # osgb_list = ['/home/hancheng/simulation_dynamic/od/GJBM_20km_0822.opt.osgb', '/home/hancheng/simulation_dynamic/od/gaosu.osgb'] source_files_list = get_file(root_path, 'pos.csv') if not os.path.exists(output_path): os.mkdir(output_path) for single_path in source_files_list: # if not os.path.exists(os.path.join(single_path, 'simulation')): # continue # json_path = os.path.join(single_path, 'autoSliceRes.json') # with open(json_path) as f: # sc_json = json.load(f) # road_section = sc_json['road_section'] #target_path = os.path.join(output_path, os.path.basename(single_path)) # if not os.path.exists(target_path): # os.mkdir(target_path) # else: # shutil.rmtree(target_path) # # # 原始照片/视频 # copy_files(single_path, target_path, 'CAMERA0') # 点云图片/视频 #target_lidar_path = copy_files(single_path, target_path, 'LIDAR0') #jpg_path = pcd_image(target_lidar_path) generate_video('/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23/output/here_bizhang2_2023-10-13-14-32-23/LIDAR0', 'LIDAR0.mp4') #shutil.rmtree(jpg_path) # 场景以及道路文件 # target_sc_path = copy_files(single_path, target_path, 'simulation') # simulation_video = (os.path.join(target_sc_path, 'simulation_Camera_1.mp4')) # os.rename(simulation_video, simulation_video.replace('_Camera_1', '')) # xodr_path = os.path.join(target_sc_path, "SIMULATION.xodr") # osgb_path = os.path.join(target_sc_path, "SIMULATION.osgb") # shutil.copy(xodr_list[road_section-1], xodr_path) # shutil.copy(osgb_list[road_section-1], osgb_path) # xosc_path = os.path.join(target_sc_path, 'simulation.xosc') # xosc_upper = os.path.join(target_sc_path, 'SIMULATION.xosc') # os.rename(xosc_path, xosc_upper) # 数据以及说明文件 #copy_files(single_path, target_path, 'label.json') # data_path = os.path.join(target_path, 'data.csv') # autocan = pd.read_csv(os.path.join(os.path.join(single_path, 'autocan.csv'))) # gps_origin = pd.read_csv(os.path.join(os.path.join(single_path, 'gps.csv'))) # origin_data = pd.DataFrame(columns=['date', 'time', 'location', 'lon', 'lat'], index=[0]) # gps = get_position_single(origin_data, gps_origin) # data_df = pd.DataFrame( # columns=['Time', 'FrameID', 'Velocity', 'AcceleratorPedalPos', 'BrakePedalSignal', # 'SteeringAngle', # 'GPSStatus', 'gdlng', 'gdlat']) # data_df['Time'] = autocan['Time'] # data_df['FrameID'] = autocan['FrameID'] # data_df['Velocity'] = autocan['Velocity'] # data_df['AcceleratorPedalPos'] = autocan['AcceleratorPedalPos'] # data_df['BrakePedalSignal'] = autocan['BrakePedalSignal'] # data_df['SteeringAngle'] = autocan['SteeringAngle'] # data_df['GPSStatus'] = gps_origin['GPSStatus'] # data_df['gdlng'] = gps['Longitude'] # data_df['gdlat'] = gps['Latitude'] # data_df.reset_index(drop=False) # data_df.to_csv(data_path) print(f'{single_path} done!')