# -*-ing:utf-8-*- import os import sys from datetime import datetime import traceback import json import math import pandas as pd import subprocess import time import multiprocessing # 导入进程模块 from functools import partial # 导入偏函数 import warnings warnings.filterwarnings("ignore") from openx import Scenario, formatThree, formatTwo, change_CDATA import osgbGenerator from utils import smooth_1, get_coordinate_cow, get_map_name, reverse_gyb_gps class Batchrun: def __init__(self, path, keyFileName): """"初始化方法""" self.path = path self.keyFileName = keyFileName def getFile(self, path, keyFileName): ''' 获得可以进行轨迹提取的所有片段文件夹路径 Parameters ---------- path : TYPE DESCRIPTION. Returns ------- FileList : TYPE 可以进行轨迹提取的所有片段文件夹路径. ''' files = os.listdir(path) # 得到文件夹下的所有文件,包含文件夹名称 FileList = [] if keyFileName not in files: for name in files: if os.path.isdir(path + '/' + name): FileList.extend(self.getFile(path + '/' + name, keyFileName)) # 回调函数,对所有子文件夹进行搜索 else: FileList.append(path) FileList = list(set(FileList)) return FileList def generateScenarios_raw(self, absPath, param): ''' 原始自然驾驶场景还原 ''' # 读取自车数据 posPath = os.path.join(absPath, param) # 用来生成场景中自车和他车的路径 posdata = pd.read_csv(posPath) # 读取仿真地图数据 offset_x = -457000.0 offset_y = -4400000.0 # offset_h = 90 # 初始headinga 和 VTD 有 90偏角 offset_h = 0 # 对感知数据进行后处理 # 对公园北环路进行坐标转换 # 根据pos文件规范时间戳 posdata['Time'] = posdata['Time'].apply(lambda x: (x // 100) * 100) # 绝对高程与相对高程的差值 posdata['altitude'] = 19.27 # posdata['Type'].replace({8:2, 10:2}, inplace=True) pos_ego = posdata.loc[posdata['ID'] == -1, ['Time', 'East', 'North', 'HeadingAngle', 'altitude', 'Type']] # pos_ego[['East', 'North']] = pos_ego.apply(get_coordinate_cow, axis=1) # pos_ego = posdata.loc[posdata['ID'] == -1, ['Time', 'position_x', 'position_y', 'HeadingAngle', 'Type']] pos_ego = pos_ego.reset_index(drop=True) # offset_x = -float(pos_ego.at[0, 'East']) # 初始East 设为0 # offset_y = -float(pos_ego.at[0, 'North']) # 初始North 设为0 start_time = pos_ego.at[0, 'Time'] ego_points, gps_time, ego_type = Scenario.getXoscPosition(pos_ego, 'Time', 'East', 'North', 'HeadingAngle', 'altitude', 'Type', offset_x, offset_y, offset_h, start_time) # ego_points, gps_time, ego_type = Scenario.getXoscPosition(pos_ego, 'Time', 'position_x', 'position_y', 'HeadingAngle', 'Type', offset_x, offset_y, offset_h, start_time) # 读取目标数据 pos_obs = posdata # 排除非目标物的物体 # pos_obs = posdata[posdata['Type'] != 10] # 排除非目标物的物体 pos_obs = pos_obs.loc[ pos_obs['ID'] != -1, ['Time', 'ID', 'East', 'North', 'HeadingAngle', 'AbsVel', 'altitude', 'Type']] # pos_obs = pos_obs.loc[pos_obs['ID'] != -1, ['Time', 'ID', 'position_x', 'position_y', 'HeadingAngle', 'Type']] # !!!!自定义修改 # pos_obs.loc[pos_obs['ID'] == 1200, 'type'] = 7 # pos_obs.loc[pos_obs['ID'] == 1197, 'type'] = 7 # pos_obs.loc[pos_obs['ID'] == 1200, 'position_x'] += 1 # pos_obs.loc[pos_obs['ID'] == 1200, 'position_y'] -= 2 # pos_obs.loc[pos_obs['ID'] == 1197, 'position_x'] += 1 # pos_obs.loc[pos_obs['ID'] == 1197, 'position_y'] -= 1 # pos_obs.loc[pos_obs['ID'] == 1203, 'type'] = 4 # pos_obs.loc[pos_obs['ID'] == 1203, 'position_x'] = 456319 # pos_obs.loc[pos_obs['ID'] == 1203, 'position_y'] = 4397979 # pos_obs.loc[pos_obs['ID'] == 1203, 'HeadingAngle'] = 1 # offset_h = 30 # pos_obs.loc[pos_obs['ID'] == 1194, 'type'] = 7 # pos_obs.loc[pos_obs['ID'] == 1188, 'type'] = 2 # pos_obs.loc[pos_obs['ID'] == 1189, 'type'] = 2 # pos_obs.loc[pos_obs['ID'] == 1195, 'type'] = 2 # obsPath = os.path.join(absPath, 'obs.csv') # 用来筛选场景中不需要的他车 # obsdata = pd.read_csv(obsPath) # obsdata = obsdata[(obsdata['ObjectPosY'] < 5) & (obsdata['ObjectPosY'] > -5) & (obsdata['ObjectPosX'] > -10) & (obsdata['ObjectPosX'] < 100)] # 排除车道线范围外且前向距离较远的目标物 # idlist = obsdata['ObjectID'].tolist() # 筛选出符合条件的的目标物ID # pos_obs = pos_obs[(pos_obs['ID'].isin(idlist))] pos_obs = pos_obs.reset_index(drop=True) groups = pos_obs.groupby('ID') object_points = [] for key, value in groups: if len(value) < 5: continue # value = smooth_1(value) object_points.append( Scenario.getXoscPosition(value, 'Time', 'East', 'North', 'HeadingAngle', 'altitude', 'Type', offset_x, offset_y, offset_h, start_time)) # object_points.append(Scenario.getXoscPosition(value, 'Time', 'position_x', 'position_y', 'HeadingAngle', 'type', offset_x, offset_y, offset_h,start_time)) ego_speed = 5 period = math.ceil(gps_time[-1] - gps_time[0]) work_mode = 0 # 0为CICV车端数据 hour = int(absPath.split('/')[-1].split('-')[3]) if hour + 8 >= 24: hour = hour - 16 else: hour = hour + 8 time_of_day = hour * 3600 # 没有路灯防止天过暗 if time_of_day >= 64800: time_of_day = 64800 s = Scenario(ego_points, object_points, gps_time, ego_speed, work_mode, period, absPath, time_of_day) filename = absPath + '/simulation' + '_' + param.split("_")[1].split(".")[0] files = s.generate(filename) change_CDATA(files[0][0]) print(files) # 生成osgb # osgbGenerator.formatTwo(filename) # 修改xosc路径 # formatThree(filename) # # # 生成每个场景的描述文件 json # # getLabel(output_path, scenario_series['场景编号'], scenario_series['场景名称']) # 拷贝到vtd路径下 # vtd_path = os.path.join('/home/lxj/VIRES/VTD.2021.3/Data/Projects/Current/Scenarios/myScenario', datetime.now().strftime("%Y%m%d%H%M%S")) # if not os.path.exists(vtd_path): # os.makedirs(vtd_path) # os.system('cp '+ files[0][0] + ' ' + vtd_path + '/') def multiRun(self, path, param): files = self.getFile(path, self.keyFileName) print('程序开始,%s 个数据包' % len(files)) t1 = time.time() # 无参数时,使用所有cpu核; 有参数时,使用CPU核数量为参数值 pool = multiprocessing.Pool(processes=10) pfunc = partial(self.generateScenarios_raw, param) pool.map(pfunc, files) # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 pool.close() pool.join() t2 = time.time() print("程序结束,并行执行时间:%s s" % int(t2 - t1)) def batchRun(self, path, param): files = self.getFile(path, self.keyFileName) print('程序开始,%s 个数据包' % len(files)) for di, absPath in enumerate(sorted(files)): print(absPath) # try: self.generateScenarios_raw(absPath, param) # except: # print('Augmentation failed!!!!') # error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()} # with open('error.log','a+') as f: # json.dump(error, f, indent=4) # f.write('\n') # print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") return None def GenerateVideo(self, root_path): # 批量生成视频 imagelist = self.getFile(root_path, 'image') for item in imagelist: strs = item.split('/') respath = os.path.join(item, "video.mp4") # respath = os.path.join(res_path,strs[-3],strs[-2],"video.mp4") print('---------------------') # os.makedirs(os.path.join(res_path,strs[-3],strs[-2])) command = "ffmpeg -f image2 -r 10 -pattern_type glob -i '" + item + "/image/*.jpg" + "' -y '" + respath + "'" print(command) process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) process.wait() if __name__ == "__main__": # rootPath = "/media/hancheng/Simulation5/pjisuv_pjisuv95_data_merge_2024-01-31-07-53-50_TTC_11" # 跟车 rootPath = sys.argv[1] # 跟车 # 生成场景 a = Batchrun(rootPath, "pos_orig.csv") a.batchRun(rootPath, 'pos_orig.csv') # a.batchRun(rootPath, 'pos_hmi.csv') # a.multiRun(rootPath, 0) # 生成视频 # a.GenerateVideo(rootPath)