123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- import json
- import math
- import shutil
- import os
- import subprocess
- import pandas as pd
- import cv2
- import numpy as np
- import requests
- geocode_base = 'http://restapi.amap.com/v3/geocode/regeo'
- def geocode(location):
- parameters = {'output': 'json', 'key': 'cdf24f471cc579ba6d5dd1f9b856ee31', 'location': location,
- 'extensions': 'base'}
- response = requests.get(geocode_base, parameters)
- # print('HTTP 请求的状态: %s' % response.status_code)
- return response.json()
- def convert(locations):
- parameters = {'output': 'json', 'key': 'cdf24f471cc579ba6d5dd1f9b856ee31', 'locations': locations,
- 'coordsys': 'gps'}
- base = 'https://restapi.amap.com/v3/assistant/coordinate/convert'
- response_cor = requests.get(base, parameters).json()
- result_location = response_cor['locations']
- para_location = result_location.replace(';', '|')
- response_location = geocode(para_location)
- status = response_location['status']
- if status == '1':
- try:
- address = geocode(para_location)['regeocode']['formatted_address']
- return status, (result_location, address)
- except:
- return '0', None
- else:
- return status, None
- def get_position_single(base_df, inset_df):
- inset_df['Longitude'] = inset_df['Longitude'].astype('str')
- inset_df['Latitude'] = inset_df['Latitude'].astype('str')
- inset_df = inset_df[['Longitude', 'Latitude']]
- for index, twenty_df in inset_df.groupby(lambda x: math.floor(x / 20)):
- twenty_df = twenty_df.T
- cor_list = twenty_df.values.tolist()
- cor_total = ''
- for i, v in enumerate(cor_list[0]):
- cor_total += f'{v},{cor_list[1][i]}|'
- cor_total = cor_total[:-1]
- status, convert_result = convert(cor_total)
- if status == '1':
- result_cor = convert_result[0]
- result_cor_list = result_cor.split(';')
- result_cor_list = list(map(lambda x: x.split(','), result_cor_list))
- twenty_df = pd.DataFrame(result_cor_list, columns=['Longitude', 'Latitude'])
- base_df = pd.concat([base_df, twenty_df]).reset_index(drop=True)
- return base_df[1:].reset_index(drop=True)
- def draw_points(pic, filename):
- f = open(filename, "r")
- ff = f.readline()
- for i in range(0, len(ff)):
- point = ff[i].split()[:4]
- point_x = float(point[0])
- point_y = -float(point[1])
- if math.isnan(point_x):
- pass
- else:
- cv2.circle(pic, (int(800 - point_x * 10), int(400 - point_y * 10)), 0, (250, 250, 250), 0)
- def pcd_image(pcd_path):
- for root, dirs, files in os.walk(pcd_path):
- # create pcd_ascii and pcd_jpg folders
- if "pcd" in dirs:
- if "jpg" not in dirs:
- jpg_path = os.path.join(root, 'jpg')
- os.mkdir(jpg_path)
- # find .pcd files and do the transfer twice
- for file in files:
- if "pcd" == root[-3:] and ".pcd" == file[-4:]:
- inputFilePath = root + "/" + file
- outputFilePath = root[:-3] + "jpg/" + file[:-3] + "jpg"
- pic = np.zeros([800, 1600, 3])
- draw_points(pic, inputFilePath)
- cv2.imwrite(outputFilePath, pic)
- else:
- break
- return jpg_path
- def generate_video(img_path, video_name):
- if video_name == 'CAMERA0.mp4':
- img_name = 'image'
- else:
- img_name = 'jpg'
- image_list = get_file(img_path, img_name)
- for item in image_list:
- res_path = os.path.join(item, video_name)
- if os.path.exists(res_path):
- return
- all_image = list(map(int, list(map(lambda x: x.split('.')[0], os.listdir(os.path.join(img_path, img_name))))))
- all_image.sort(reverse=False)
- frame = round(len(all_image) / ((all_image[-1] - all_image[0]) / 1000), 2)
- command = f"ffmpeg -f image2 -r {frame} -pattern_type glob -i '" + item + f"/{img_name}/*.jpg" + "' -y '" + res_path + "'"
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- process.wait()
- def get_file(path, key_file_name):
- files = os.listdir(path)
- file_list = []
- if key_file_name not in files:
- for name in files:
- next_path = os.path.join(path, name)
- if os.path.isdir(next_path):
- file_list.extend(get_file(next_path, key_file_name))
- else:
- file_list.append(path)
- file_list = list(set(file_list))
- return file_list
- def copy_files(source, target, dir_name):
- files_source_path = os.path.join(source, dir_name)
- if dir_name == 'simulation':
- dir_name = dir_name.upper()
- files_target_path = os.path.join(target, dir_name)
- if os.path.isdir(files_source_path):
- shutil.copytree(files_source_path, files_target_path)
- else:
- shutil.copy(files_source_path, files_target_path)
- return files_target_path
- if __name__ == '__main__':
- root_path = '/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23'
- output_path = '/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23/output'
- # xodr_list = ['/home/hancheng/simulation_dynamic/od/jiuzhonglu.xodr', '/home/hancheng/simulation_dynamic/od/gaosu.xodr']
- # osgb_list = ['/home/hancheng/simulation_dynamic/od/GJBM_20km_0822.opt.osgb', '/home/hancheng/simulation_dynamic/od/gaosu.osgb']
- source_files_list = get_file(root_path, 'pos.csv')
- if not os.path.exists(output_path):
- os.mkdir(output_path)
- for single_path in source_files_list:
- # if not os.path.exists(os.path.join(single_path, 'simulation')):
- # continue
- # json_path = os.path.join(single_path, 'autoSliceRes.json')
- # with open(json_path) as f:
- # sc_json = json.load(f)
- # road_section = sc_json['road_section']
- #target_path = os.path.join(output_path, os.path.basename(single_path))
- # if not os.path.exists(target_path):
- # os.mkdir(target_path)
- # else:
- # shutil.rmtree(target_path)
- #
- # # 原始照片/视频
- # copy_files(single_path, target_path, 'CAMERA0')
- # 点云图片/视频
- #target_lidar_path = copy_files(single_path, target_path, 'LIDAR0')
- #jpg_path = pcd_image(target_lidar_path)
- generate_video('/media/hancheng/Simulation4/1013/here_bizhang2_2023-10-13-14-32-23/output/here_bizhang2_2023-10-13-14-32-23/LIDAR0', 'LIDAR0.mp4')
- #shutil.rmtree(jpg_path)
- # 场景以及道路文件
- # target_sc_path = copy_files(single_path, target_path, 'simulation')
- # simulation_video = (os.path.join(target_sc_path, 'simulation_Camera_1.mp4'))
- # os.rename(simulation_video, simulation_video.replace('_Camera_1', ''))
- # xodr_path = os.path.join(target_sc_path, "SIMULATION.xodr")
- # osgb_path = os.path.join(target_sc_path, "SIMULATION.osgb")
- # shutil.copy(xodr_list[road_section-1], xodr_path)
- # shutil.copy(osgb_list[road_section-1], osgb_path)
- # xosc_path = os.path.join(target_sc_path, 'simulation.xosc')
- # xosc_upper = os.path.join(target_sc_path, 'SIMULATION.xosc')
- # os.rename(xosc_path, xosc_upper)
- # 数据以及说明文件
- #copy_files(single_path, target_path, 'label.json')
- # data_path = os.path.join(target_path, 'data.csv')
- # autocan = pd.read_csv(os.path.join(os.path.join(single_path, 'autocan.csv')))
- # gps_origin = pd.read_csv(os.path.join(os.path.join(single_path, 'gps.csv')))
- # origin_data = pd.DataFrame(columns=['date', 'time', 'location', 'lon', 'lat'], index=[0])
- # gps = get_position_single(origin_data, gps_origin)
- # data_df = pd.DataFrame(
- # columns=['Time', 'FrameID', 'Velocity', 'AcceleratorPedalPos', 'BrakePedalSignal',
- # 'SteeringAngle',
- # 'GPSStatus', 'gdlng', 'gdlat'])
- # data_df['Time'] = autocan['Time']
- # data_df['FrameID'] = autocan['FrameID']
- # data_df['Velocity'] = autocan['Velocity']
- # data_df['AcceleratorPedalPos'] = autocan['AcceleratorPedalPos']
- # data_df['BrakePedalSignal'] = autocan['BrakePedalSignal']
- # data_df['SteeringAngle'] = autocan['SteeringAngle']
- # data_df['GPSStatus'] = gps_origin['GPSStatus']
- # data_df['gdlng'] = gps['Longitude']
- # data_df['gdlat'] = gps['Latitude']
- # data_df.reset_index(drop=False)
- # data_df.to_csv(data_path)
- print(f'{single_path} done!')
|