import os import math import numpy as np import pandas as pd import subprocess import time import multiprocessing from pyproj import CRS, Transformer from scipy.spatial.transform import Rotation as R from scipy.signal import savgol_filter from datetime import datetime import xml.etree.ElementTree as ET import sys import re import json # 导入进程模块 from functools import partial # 导入偏函数 from openx_outdoor import Scenario, formatThree, formatTwo, change_CDATA import warnings warnings.filterwarnings("ignore") xodr_list = ['/mnt/disk001/simulation_outdoor/thq_1116.xodr', '/mnt/disk001/simulation_outdoor/anqing.xodr'] map_engine = '/mnt/disk001/dcl_data_process/src/python3/pjibot_outdoor/a.out' class Batchrun: def __init__(self, path, keyFileName): """"初始化方法""" self.path = path self.keyFileName = keyFileName def getFile(self, path, keyFileName): ''' 获得可以进行轨迹提取的所有片段文件夹路径 Parameters ---------- path : TYPE DESCRIPTION. Returns ------- FileList : TYPE 可以进行轨迹提取的所有片段文件夹路径. ''' files = os.listdir(path) # 得到文件夹下的所有文件,包含文件夹名称 FileList = [] if keyFileName not in files: for name in files: if os.path.isdir(path + '/' + name): FileList.extend(self.getFile(path + '/' + name, keyFileName)) # 回调函数,对所有子文件夹进行搜索 else: FileList.append(path) FileList = list(set(FileList)) return FileList # 自定义一个函数来过滤行 def generateScenarios_raw(self, absPath, vehicle_type): ''' 原始自然驾驶场景还原 ''' # 读取自车数据 posObs = os.path.join(absPath, 'objects_pji.csv') # 用来生成场景中障碍物的路径 posEgo = os.path.join(absPath, "ego_pji.csv") posdata_obs = pd.read_csv(posObs) posdata_ego = pd.read_csv(posEgo) timestamp_s = posdata_ego['Time'].iloc[0] / 1000 dt_object = datetime.fromtimestamp(timestamp_s) hour = dt_object.hour - 8 posdata_obs = posdata_obs.drop('Time', axis=1) posdata_ego = posdata_ego.drop('Time', axis=1) posdata_obs.rename(columns={'posX': 'pos_x'}, inplace=True) posdata_obs.rename(columns={'posY': 'pos_y'}, inplace=True) posdata_obs.rename(columns={'posH': 'HeadingAngle'}, inplace=True) posdata_obs.rename(columns={'speedX': 'AbsVel'}, inplace=True) posdata_obs.rename(columns={'playerId': 'ID'}, inplace=True) posdata_ego.rename(columns={'posX': 'pos_x'}, inplace=True) posdata_ego.rename(columns={'posY': 'pos_y'}, inplace=True) posdata_ego.rename(columns={'posH': 'HeadingAngle'}, inplace=True) def get_road_info(command): # 使用 subprocess.run 运行命令 result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 获取命令的输出 output = result.stdout if result.stdout else result.stderr if output == "not found\n": return "不在道路范围" # 使用正则表达式提取数字 track_match = re.search(r'track\s*=\s*(-?\d+)', output) lane_match = re.search(r'lane\s*=\s*(-?\d+)', output) offset_match = re.search(r'offset\s*=\s*(-?\d+\.?\d*)', output) h_match = re.search(r'h\s*=\s*(-?\d+\.?\d*)', output) # 如果匹配成功,提取对应的值 track_id = int(track_match.group(1)) if track_match else None lane_id = int(lane_match.group(1)) if lane_match else None offset = float(offset_match.group(1)) if offset_match else None h_value = float(h_match.group(1)) if h_match else None return track_id, lane_id, offset, h_value def get_map_id(df): # 太和桥园区经纬度 taiheqiao_latitude = 39.7286896 taiheqiao_longitude = 116.4885025 # 安庆经纬度 anqing_latitude = 30.5585070 anqing_longitude = 117.1850840 points = [(taiheqiao_latitude, taiheqiao_longitude), (anqing_latitude, anqing_longitude)] if len(df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'latitude']) == 0: nearest_index = 0 else: cur_latitude = df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'latitude'].iloc[0] cur_longitude = df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'longitude'].iloc[0] # Calculate distances to each point distances = [] for lat, lon in points: distance = np.sqrt((lon - cur_longitude) ** 2 + (lat - cur_latitude) ** 2) distances.append(distance) # Find the index of the nearest point nearest_index = np.argmin(distances) return nearest_index def trans_pos(posdata, map_id): theta_dict = {0:1.4, 1: 1.055} trans_input = {0: (39.7286896, 116.4885025), 1: (30.5585070, 117.1850840)} trans_utm = {0: (456256.260152, 4397809.886833), 1: (517696.8132810, 3380770.294203)} transformer = Transformer.from_crs("epsg:4326", "epsg:32650") utm50n_x1, utm50n_y1 = transformer.transform(trans_input[map_id][0], trans_input[map_id][1]) translation = np.array([utm50n_x1 - trans_utm[map_id][0], utm50n_y1 - trans_utm[map_id][1]]) rotation_matrix = R.from_euler('z', theta_dict[map_id]).as_matrix() # 提取原始坐标 original_positions = posdata[['pos_x', 'pos_y']].values # 应用旋转和平移 rotated_positions =original_positions @ rotation_matrix[:2,:2].T # 只应用 2D 旋转 rotated_positions += translation # 更新 posdata posdata['pos_x'] = rotated_positions[:, 0] posdata['pos_y'] = rotated_positions[:, 1] posdata['HeadingAngle'] += theta_dict[map_id] return posdata map_id = get_map_id(posdata_ego) # map_id = 0 posdata_ego = trans_pos(posdata_ego, map_id) ego_init_x = posdata_ego['pos_x'].iloc[0] ego_init_y = posdata_ego['pos_y'].iloc[0] # 检查自车起始位置是否在地图外,如果在地图外抛出错误信息 command = [ map_engine, xodr_list[map_id], f'{ego_init_x}', f'{ego_init_y}' ] if get_road_info(command) == "不在道路范围": # 定义日志文件路径 log_file_path = os.path.join(absPath, "output.json") # 日志文件存储在 rootPath 下 # 写入错误信息数组 error_message = ["不在道路范围"] with open(log_file_path, "w") as log_file: json.dump(error_message, log_file, ensure_ascii=False) # 使用 json.dump 写入数组格式 print(f"自车位置不在道路范围,已记录到 {log_file_path}") # 终止程序运行 sys.exit() posdata_obs['simTime'] = np.round(posdata_obs['simTime'], decimals=1) posdata_ego['simTime'] = np.round(posdata_ego['simTime'], decimals=1) # 对障碍物数据进行处理 def filter_rows(group): # 首先对x进行排序 group = group.sort_values(by='pos_x') # 算出差值 diffs = group['pos_x'].diff().fillna(1) # 我们填充1(或任何大于0.2的值),确保第一行不被标记为删除 # 标记差值小于0.2的行为True group = group[diffs >= 0.2] return group def interpolate_missing_values(df): # 1. 生成一个完整的以0.1秒为间隔的simTime new_simTime = np.arange(df['simTime'].min(), df['simTime'].max() + 0.1, 0.1) # 2. 创建新的DataFrame,并与原始DataFrame对齐 new_df = pd.DataFrame(new_simTime, columns=['simTime']) new_df['simTime'] = np.round(new_df['simTime'], decimals=1) # 3. 将原始DataFrame与新simTime进行合并 new_df = pd.merge(new_df, df, on='simTime', how='left') # 4. 对列进行插值(线性插值) new_df = new_df.interpolate(method='linear') return new_df posdata_obs = trans_pos(posdata_obs, map_id) posdata_obs = posdata_obs.groupby(['simTime', 'pos_y'], group_keys=False).apply(filter_rows) posdata_obs.reset_index(drop=True, inplace=True) posdata_obs['Type'] = 10 # start_time_ego = posdata_ego['Time'].iloc[0] # posdata_ego['Time'] = posdata_ego['Time'] - start_time_ego # posdata_ego['Time'] = (posdata_ego['Time'] - (posdata_ego['Time'] % 100)) / 1000 posdata_ego = posdata_ego.drop_duplicates(subset='simTime', keep='first') posdata_ego = interpolate_missing_values(posdata_ego) posdata_ego['Type'] = 2 posdata_ego['ID'] = -1 # posdata_ego['pos_x'] = posdata_ego['pos_x'] - 88.96626 # posdata_ego['pos_y'] = posdata_ego['pos_y'] - 40.55367 # posdata_ego['HeadingAngle'] -= 90 posdata_ego['HeadingAngle'] = np.degrees(posdata_ego['HeadingAngle']) # 将负角度值转换为正值 while (posdata_ego['HeadingAngle'] < 0).any(): # 将负角度值转换为正值 posdata_ego['HeadingAngle'] = posdata_ego['HeadingAngle'].apply(lambda x: x + 360 if x < 0 else x) # posdata_obs['ID'] = range(len(posdata_obs)) posdata_obs = posdata_obs[['simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'ID', 'Type', 'AbsVel']] # posdata_obs['Time'] = posdata_obs['Time'].rank(method='dense') - 1 # posdata_obs['Time'] = posdata_obs['Time'] * 1 # 获取唯一时间点 # unique_times = posdata_obs['simTime'].unique() # # new_rows = [] # 用于存储新行的列表 # # # 遍历唯一时间点的索引,留一个空位以检查“下一组” # for i in range(len(unique_times) - 1): # current_time = unique_times[i] # next_time = unique_times[i + 1] # time_diff = next_time - current_time # 计算时间差 # # # 获取当前时间点的所有行 # current_group = posdata_obs[posdata_obs['simTime'] == current_time] # # # 为当前组基于时间差生成新行 # # 时间差一半的新行 # half_time_diff_rows = current_group.copy() # half_time_diff_rows['simTime'] = current_time + time_diff / 2 # # # 与下一组时间相同的新行(保留其他数据不变) # same_time_diff_rows = current_group.copy() # same_time_diff_rows['simTime'] = next_time # # # 收集新行 # new_rows.append(half_time_diff_rows) # new_rows.append(same_time_diff_rows) # # # 将新行添加到原始DataFrame # new_rows_df = pd.concat(new_rows, ignore_index=True) # posdata_obs = pd.concat([posdata_obs, new_rows_df], ignore_index=True).sort_values(by='simTime').reset_index( # drop=True) posdata = pd.concat([posdata_obs, posdata_ego], ignore_index=True) posdata = posdata.sort_values(by='simTime') altitude_dict = {0: 19.27, 1: 8.48332} posdata['altitude'] = altitude_dict[map_id] posdata['AbsVel'] = 0 pos_ego = posdata.loc[posdata['ID'] == -1, ['simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'altitude', 'Type']] pos_ego = pos_ego.reset_index(drop=True) # offset_x = -float(pos_ego.at[0, 'East']) # 初始East 设为0 # offset_y = -float(pos_ego.at[0, 'North']) # 初始North 设为0 start_time = pos_ego.at[0, 'simTime'] ego_points, gps_time, ego_type = Scenario.getXoscPosition(pos_ego, 'simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'altitude', 'Type', 0, 0, 0, start_time) # 读取目标数据 pos_obs = posdata[posdata['Type'] != 100] # 排除非目标物的物体 # pos_obs = posdata[posdata['Type'] != 10] # 排除非目标物的物体 pos_obs['Type'].replace([103, 10], 7, inplace=True) pos_obs = pos_obs.loc[ pos_obs['ID'] != -1, ['simTime', 'ID', 'pos_x', 'pos_y', 'HeadingAngle', 'AbsVel', 'altitude', 'Type']] pos_obs = pos_obs.reset_index(drop=True) # def calculate_heading_angle(group): # start_idx = 0 # length = len(group) # prev_heading_angle = None # 用于存储上一个计算的航向角 # # while start_idx < length: # start_pos_x, start_pos_y = group.iloc[start_idx]['pos_x'], group.iloc[start_idx]['pos_y'] # # # 找到与当前帧距离超过1米的帧 # for i in range(start_idx + 1, length): # current_pos_x, current_pos_y = group.iloc[i]['pos_x'], group.iloc[i]['pos_y'] # distance = np.sqrt((current_pos_x - start_pos_x) ** 2 + (current_pos_y - start_pos_y) ** 2) # # if distance >= 1: # # 计算航向角 # heading_angle = np.degrees(np.arctan2(current_pos_y - start_pos_y, current_pos_x - start_pos_x)) # # # 检查航向角与前一个航向角的差值 # if prev_heading_angle is not None: # angle_diff = np.abs(heading_angle - prev_heading_angle) # if angle_diff > 180: # angle_diff = 360 - angle_diff # # if angle_diff > 20: # # 如果航向角差值大于25度,跳过当前点 # continue # # # 将 start_idx 到 i 之间的航向角设置为 heading_angle # group.iloc[start_idx:i, group.columns.get_loc('HeadingAngle')] = heading_angle # prev_heading_angle = heading_angle # start_idx = i # break # else: # # 如果没有找到超过1米的距离,使用最后一帧的数据计算航向角 # last_pos_x, last_pos_y = group.iloc[-1]['pos_x'], group.iloc[-1]['pos_y'] # heading_angle = np.degrees(np.arctan2(last_pos_y - start_pos_y, last_pos_x - start_pos_x)) # # # 检查航向角与前一个航向角的差值 # if prev_heading_angle is not None: # angle_diff = np.abs(heading_angle - prev_heading_angle) # if angle_diff > 180: # angle_diff = 360 - angle_diff # # if angle_diff <= 20: # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle # else: # # 如果差值大于25度,跳过最后的点 # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = prev_heading_angle # else: # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle # # break # # # 确保最后一行的航向角等于倒数第二行 # if length > 1: # group.iloc[-1, group.columns.get_loc('HeadingAngle')] = group.iloc[-2]['HeadingAngle'] # # return group def calculate_heading_angle(group): start_idx = 0 length = len(group) prev_heading_angle = None # 用于存储上一个计算的航向角 while start_idx < length: start_pos_x, start_pos_y = group.iloc[start_idx]['pos_x'], group.iloc[start_idx]['pos_y'] # 找到与当前帧距离超过1米的帧 for i in range(start_idx + 1, length): current_pos_x, current_pos_y = group.iloc[i]['pos_x'], group.iloc[i]['pos_y'] distance = np.sqrt((current_pos_x - start_pos_x) ** 2 + (current_pos_y - start_pos_y) ** 2) if distance >= 0.5: # 计算航向角 heading_angle = np.degrees(np.arctan2(current_pos_y - start_pos_y, current_pos_x - start_pos_x)) # 检查航向角与前一个航向角的差值 if prev_heading_angle is not None: angle_diff = np.abs(heading_angle - prev_heading_angle) if angle_diff > 180: if heading_angle < 0: heading_angle += 360 else: heading_angle -= 360 angle_diff = 360 - angle_diff if angle_diff > 30: # 如果航向角差值大于30度,跳过当前点 continue # 将 start_idx 到 i 之间的航向角设置为 heading_angle group.iloc[start_idx:i, group.columns.get_loc('HeadingAngle')] = heading_angle prev_heading_angle = heading_angle start_idx = i break else: # 如果没有找到超过1米的距离,使用最后一帧的数据计算航向角 last_pos_x, last_pos_y = group.iloc[-1]['pos_x'], group.iloc[-1]['pos_y'] heading_angle = np.degrees(np.arctan2(last_pos_y - start_pos_y, last_pos_x - start_pos_x)) # 检查航向角与前一个航向角的差值 if prev_heading_angle is not None: angle_diff = np.abs(heading_angle - prev_heading_angle) if angle_diff > 180: if heading_angle < 0: heading_angle += angle_diff else: heading_angle -= angle_diff angle_diff = 360 - angle_diff if angle_diff <= 30: group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle else: # 如果差值大于20度,跳过最后的点 group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = prev_heading_angle else: group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = group.iloc[0]['HeadingAngle'] break # 确保最后一行的航向角等于倒数第二行 if length > 1: group.iloc[-1, group.columns.get_loc('HeadingAngle')] = group.iloc[-2]['HeadingAngle'] return group def apply_savgol(row, window_size, poly_order): if len(row) < window_size: return row else: return savgol_filter(row, window_size, poly_order) # 应用到分组数据 pos_obs = pos_obs.groupby('ID').apply(calculate_heading_angle).reset_index(drop=True) # while (pos_obs['HeadingAngle'] < 0).any(): # # 将负角度值转换为正值 # pos_obs['HeadingAngle'] = pos_obs['HeadingAngle'].apply(lambda x: x + 360 if x < 0 else x) pos_obs['HeadingAngle'] = pos_obs.groupby('ID')['HeadingAngle'].transform(lambda x: apply_savgol(x, 45, 2)) groups = pos_obs.groupby('ID') object_points = [] for key, value in groups: if len(value) < 5: continue # value = smooth_1(value) object_points.append( Scenario.getXoscPosition(value, 'simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'altitude', 'Type', 0, 0, 0, start_time)) # object_points.append(Scenario.getXoscPosition(value, 'Time', 'position_x', 'position_y', 'HeadingAngle', 'type', offset_x, offset_y, offset_h,start_time)) ego_speed = 5 period = math.ceil(gps_time[-1] - gps_time[0]) work_mode = 0 # 0为CICV车端数据 if hour + 8 >= 24: hour = hour - 16 else: hour = hour + 8 time_of_day = hour * 3600 # 没有路灯防止天过暗 if time_of_day >= 64800: time_of_day = 64800 s = Scenario(ego_points, object_points, gps_time, ego_speed, work_mode, period, absPath, 0, vehicle_type, map_id) filename = absPath + '/simulation' print(filename) files = s.generate(filename) change_CDATA(files[0][0]) print(files) # dynamic_path = filename + '/simulation.xosc' # combine_xosc(dynamic_path, static_path) # print("完成xosc的合并") # 生成osgb # osgbGenerator.formatTwo(filename) # 修改xosc路径 # formatThree(filename) # # # 生成每个场景的描述文件 json # # getLabel(output_path, scenario_series['场景编号'], scenario_series['场景名称']) # 拷贝到vtd路径下 # vtd_path = os.path.join('/home/lxj/VIRES/VTD.2021.3/Data/Projects/Current/Scenarios/myScenario', datetime.now().strftime("%Y%m%d%H%M%S")) # if not os.path.exists(vtd_path): # os.makedirs(vtd_path) # os.system('cp '+ files[0][0] + ' ' + vtd_path + '/') def multiRun(self, path, param): files = self.getFile(path, self.keyFileName) print('程序开始,%s 个数据包' % len(files)) t1 = time.time() # 无参数时,使用所有cpu核; 有参数时,使用CPU核数量为参数值 pool = multiprocessing.Pool(processes=10) pfunc = partial(self.generateScenarios_raw, param) pool.map(pfunc, files) # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 pool.close() pool.join() t2 = time.time() print("程序结束,并行执行时间:%s s" % int(t2 - t1)) def batchRun(self, path, param): files = self.getFile(path, self.keyFileName) print('程序开始,%s 个数据包' % len(files)) for di, absPath in enumerate(sorted(files)): print(absPath) self.generateScenarios_raw(absPath, param) # except: # print('Augmentation failed!!!!') # error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()} # with open('error.log','a+') as f: # json.dump(error, f, indent=4) # f.write('\n') # print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") return None def GenerateVideo(self, root_path): # 批量生成视频 imagelist = self.getFile(root_path, 'image') for item in imagelist: strs = item.split('/') respath = os.path.join(item, "video.mp4") # respath = os.path.join(res_path,strs[-3],strs[-2],"video.mp4") print('---------------------') # os.makedirs(os.path.join(res_path,strs[-3],strs[-2])) command = "ffmpeg -f image2 -r 10 -pattern_type glob -i '" + item + "/image/*.jpg" + "' -y '" + respath + "'" print(command) process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) process.wait() if __name__ == "__main__": # rootPath = "/media/hancheng/Simulation5/pujin/pujin_outdoor/11_22" # 跟车 # vehicle_type = "0" rootPath = sys.argv[1] vehicle_type = sys.argv[2] # 生成场景 a = Batchrun(rootPath, "ego_pji.csv") a.batchRun(rootPath, vehicle_type) # 0为占位参数 # a.multiRun(rootPath, 0) # 生成视频 # a.GenerateVideo(rootPath)