|
- import os
- import math
- import numpy as np
- import pandas as pd
- import subprocess
- import time
- import multiprocessing
- from pyproj import CRS, Transformer
- from scipy.spatial.transform import Rotation as R
- from scipy.signal import savgol_filter
- from datetime import datetime
- import xml.etree.ElementTree as ET
- import sys
- import re
- import json
- # 导入进程模块
- from functools import partial # 导入偏函数
- from openx_outdoor import Scenario, formatThree, formatTwo, change_CDATA
- import warnings
- warnings.filterwarnings("ignore")
- xodr_list = ['/mnt/disk001/simulation_outdoor/thq_1116.xodr', '/mnt/disk001/simulation_outdoor/anqing.xodr']
- map_engine = '/mnt/disk001/dcl_data_process/src/python3/pjibot_outdoor/a.out'
- class Batchrun:
- def __init__(self, path, keyFileName):
- """"初始化方法"""
- self.path = path
- self.keyFileName = keyFileName
- def getFile(self, path, keyFileName):
- '''
- 获得可以进行轨迹提取的所有片段文件夹路径
- Parameters
- ----------
- path : TYPE
- DESCRIPTION.
- Returns
- -------
- FileList : TYPE
- 可以进行轨迹提取的所有片段文件夹路径.
- '''
- files = os.listdir(path) # 得到文件夹下的所有文件,包含文件夹名称
- FileList = []
- if keyFileName not in files:
- for name in files:
- if os.path.isdir(path + '/' + name):
- FileList.extend(self.getFile(path + '/' + name, keyFileName)) # 回调函数,对所有子文件夹进行搜索
- else:
- FileList.append(path)
- FileList = list(set(FileList))
- return FileList
- # 自定义一个函数来过滤行
- def generateScenarios_raw(self, absPath, vehicle_type):
- '''
- 原始自然驾驶场景还原
- '''
- # 读取自车数据
- posObs = os.path.join(absPath, 'objects_pji.csv') # 用来生成场景中障碍物的路径
- posEgo = os.path.join(absPath, "ego_pji.csv")
- posdata_obs = pd.read_csv(posObs)
- posdata_ego = pd.read_csv(posEgo)
- timestamp_s = posdata_ego['Time'].iloc[0] / 1000
- dt_object = datetime.fromtimestamp(timestamp_s)
- hour = dt_object.hour - 8
- posdata_obs = posdata_obs.drop('Time', axis=1)
- posdata_ego = posdata_ego.drop('Time', axis=1)
- posdata_obs.rename(columns={'posX': 'pos_x'}, inplace=True)
- posdata_obs.rename(columns={'posY': 'pos_y'}, inplace=True)
- posdata_obs.rename(columns={'posH': 'HeadingAngle'}, inplace=True)
- posdata_obs.rename(columns={'speedX': 'AbsVel'}, inplace=True)
- posdata_obs.rename(columns={'playerId': 'ID'}, inplace=True)
- posdata_ego.rename(columns={'posX': 'pos_x'}, inplace=True)
- posdata_ego.rename(columns={'posY': 'pos_y'}, inplace=True)
- posdata_ego.rename(columns={'posH': 'HeadingAngle'}, inplace=True)
- def get_road_info(command):
- # 使用 subprocess.run 运行命令
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- # 获取命令的输出
- output = result.stdout if result.stdout else result.stderr
- if output == "not found\n":
- return "不在道路范围"
- # 使用正则表达式提取数字
- track_match = re.search(r'track\s*=\s*(-?\d+)', output)
- lane_match = re.search(r'lane\s*=\s*(-?\d+)', output)
- offset_match = re.search(r'offset\s*=\s*(-?\d+\.?\d*)', output)
- h_match = re.search(r'h\s*=\s*(-?\d+\.?\d*)', output)
- # 如果匹配成功,提取对应的值
- track_id = int(track_match.group(1)) if track_match else None
- lane_id = int(lane_match.group(1)) if lane_match else None
- offset = float(offset_match.group(1)) if offset_match else None
- h_value = float(h_match.group(1)) if h_match else None
- return track_id, lane_id, offset, h_value
- def get_map_id(df):
- # 太和桥园区经纬度
- taiheqiao_latitude = 39.7286896
- taiheqiao_longitude = 116.4885025
- # 安庆经纬度
- anqing_latitude = 30.5585070
- anqing_longitude = 117.1850840
- points = [(taiheqiao_latitude, taiheqiao_longitude), (anqing_latitude, anqing_longitude)]
- if len(df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'latitude']) == 0:
- nearest_index = 0
- else:
- cur_latitude = df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'latitude'].iloc[0]
- cur_longitude = df.loc[(df['latitude'] != 0) & (df['longitude'] != 0), 'longitude'].iloc[0]
- # Calculate distances to each point
- distances = []
- for lat, lon in points:
- distance = np.sqrt((lon - cur_longitude) ** 2 + (lat - cur_latitude) ** 2)
- distances.append(distance)
- # Find the index of the nearest point
- nearest_index = np.argmin(distances)
- return nearest_index
- def trans_pos(posdata, map_id):
- theta_dict = {0:1.4, 1: 1.055}
- trans_input = {0: (39.7286896, 116.4885025), 1: (30.5585070, 117.1850840)}
- trans_utm = {0: (456256.260152, 4397809.886833), 1: (517696.8132810, 3380770.294203)}
- transformer = Transformer.from_crs("epsg:4326", "epsg:32650")
- utm50n_x1, utm50n_y1 = transformer.transform(trans_input[map_id][0], trans_input[map_id][1])
- translation = np.array([utm50n_x1 - trans_utm[map_id][0], utm50n_y1 - trans_utm[map_id][1]])
- rotation_matrix = R.from_euler('z', theta_dict[map_id]).as_matrix()
- # 提取原始坐标
- original_positions = posdata[['pos_x', 'pos_y']].values
- # 应用旋转和平移
- rotated_positions =original_positions @ rotation_matrix[:2,:2].T # 只应用 2D 旋转
- rotated_positions += translation
- # 更新 posdata
- posdata['pos_x'] = rotated_positions[:, 0]
- posdata['pos_y'] = rotated_positions[:, 1]
- posdata['HeadingAngle'] += theta_dict[map_id]
- return posdata
- map_id = get_map_id(posdata_ego)
- # map_id = 0
- posdata_ego = trans_pos(posdata_ego, map_id)
- ego_init_x = posdata_ego['pos_x'].iloc[0]
- ego_init_y = posdata_ego['pos_y'].iloc[0]
- # 检查自车起始位置是否在地图外,如果在地图外抛出错误信息
- command = [
- map_engine,
- xodr_list[map_id],
- f'{ego_init_x}',
- f'{ego_init_y}'
- ]
- if get_road_info(command) == "不在道路范围":
- # 定义日志文件路径
- log_file_path = os.path.join(absPath, "output.json") # 日志文件存储在 rootPath 下
- # 写入错误信息数组
- error_message = ["不在道路范围"]
- with open(log_file_path, "w") as log_file:
- json.dump(error_message, log_file, ensure_ascii=False) # 使用 json.dump 写入数组格式
- print(f"自车位置不在道路范围,已记录到 {log_file_path}")
- # 终止程序运行
- sys.exit()
- posdata_obs['simTime'] = np.round(posdata_obs['simTime'], decimals=1)
- posdata_ego['simTime'] = np.round(posdata_ego['simTime'], decimals=1)
- # 对障碍物数据进行处理
- def filter_rows(group):
- # 首先对x进行排序
- group = group.sort_values(by='pos_x')
- # 算出差值
- diffs = group['pos_x'].diff().fillna(1) # 我们填充1(或任何大于0.2的值),确保第一行不被标记为删除
- # 标记差值小于0.2的行为True
- group = group[diffs >= 0.2]
- return group
- def interpolate_missing_values(df):
- # 1. 生成一个完整的以0.1秒为间隔的simTime
- new_simTime = np.arange(df['simTime'].min(), df['simTime'].max() + 0.1, 0.1)
- # 2. 创建新的DataFrame,并与原始DataFrame对齐
- new_df = pd.DataFrame(new_simTime, columns=['simTime'])
- new_df['simTime'] = np.round(new_df['simTime'], decimals=1)
- # 3. 将原始DataFrame与新simTime进行合并
- new_df = pd.merge(new_df, df, on='simTime', how='left')
- # 4. 对列进行插值(线性插值)
- new_df = new_df.interpolate(method='linear')
- return new_df
- posdata_obs = trans_pos(posdata_obs, map_id)
- posdata_obs = posdata_obs.groupby(['simTime', 'pos_y'], group_keys=False).apply(filter_rows)
- posdata_obs.reset_index(drop=True, inplace=True)
- posdata_obs['Type'] = 10
- # start_time_ego = posdata_ego['Time'].iloc[0]
- # posdata_ego['Time'] = posdata_ego['Time'] - start_time_ego
- # posdata_ego['Time'] = (posdata_ego['Time'] - (posdata_ego['Time'] % 100)) / 1000
- posdata_ego = posdata_ego.drop_duplicates(subset='simTime', keep='first')
- posdata_ego = interpolate_missing_values(posdata_ego)
- posdata_ego['Type'] = 2
- posdata_ego['ID'] = -1
- # posdata_ego['pos_x'] = posdata_ego['pos_x'] - 88.96626
- # posdata_ego['pos_y'] = posdata_ego['pos_y'] - 40.55367
- # posdata_ego['HeadingAngle'] -= 90
- posdata_ego['HeadingAngle'] = np.degrees(posdata_ego['HeadingAngle'])
- # 将负角度值转换为正值
- while (posdata_ego['HeadingAngle'] < 0).any():
- # 将负角度值转换为正值
- posdata_ego['HeadingAngle'] = posdata_ego['HeadingAngle'].apply(lambda x: x + 360 if x < 0 else x)
- # posdata_obs['ID'] = range(len(posdata_obs))
- posdata_obs = posdata_obs[['simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'ID', 'Type', 'AbsVel']]
- # posdata_obs['Time'] = posdata_obs['Time'].rank(method='dense') - 1
- # posdata_obs['Time'] = posdata_obs['Time'] * 1
- # 获取唯一时间点
- # unique_times = posdata_obs['simTime'].unique()
- #
- # new_rows = [] # 用于存储新行的列表
- #
- # # 遍历唯一时间点的索引,留一个空位以检查“下一组”
- # for i in range(len(unique_times) - 1):
- # current_time = unique_times[i]
- # next_time = unique_times[i + 1]
- # time_diff = next_time - current_time # 计算时间差
- #
- # # 获取当前时间点的所有行
- # current_group = posdata_obs[posdata_obs['simTime'] == current_time]
- #
- # # 为当前组基于时间差生成新行
- # # 时间差一半的新行
- # half_time_diff_rows = current_group.copy()
- # half_time_diff_rows['simTime'] = current_time + time_diff / 2
- #
- # # 与下一组时间相同的新行(保留其他数据不变)
- # same_time_diff_rows = current_group.copy()
- # same_time_diff_rows['simTime'] = next_time
- #
- # # 收集新行
- # new_rows.append(half_time_diff_rows)
- # new_rows.append(same_time_diff_rows)
- #
- # # 将新行添加到原始DataFrame
- # new_rows_df = pd.concat(new_rows, ignore_index=True)
- # posdata_obs = pd.concat([posdata_obs, new_rows_df], ignore_index=True).sort_values(by='simTime').reset_index(
- # drop=True)
- posdata = pd.concat([posdata_obs, posdata_ego], ignore_index=True)
- posdata = posdata.sort_values(by='simTime')
- altitude_dict = {0: 19.27, 1: 8.48332}
- posdata['altitude'] = altitude_dict[map_id]
- posdata['AbsVel'] = 0
- pos_ego = posdata.loc[posdata['ID'] == -1, ['simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'altitude', 'Type']]
- pos_ego = pos_ego.reset_index(drop=True)
- # offset_x = -float(pos_ego.at[0, 'East']) # 初始East 设为0
- # offset_y = -float(pos_ego.at[0, 'North']) # 初始North 设为0
- start_time = pos_ego.at[0, 'simTime']
- ego_points, gps_time, ego_type = Scenario.getXoscPosition(pos_ego, 'simTime', 'pos_x', 'pos_y', 'HeadingAngle',
- 'altitude', 'Type', 0, 0, 0, start_time)
- # 读取目标数据
- pos_obs = posdata[posdata['Type'] != 100] # 排除非目标物的物体
- # pos_obs = posdata[posdata['Type'] != 10] # 排除非目标物的物体
- pos_obs['Type'].replace([103, 10], 7, inplace=True)
- pos_obs = pos_obs.loc[
- pos_obs['ID'] != -1, ['simTime', 'ID', 'pos_x', 'pos_y', 'HeadingAngle', 'AbsVel', 'altitude', 'Type']]
- pos_obs = pos_obs.reset_index(drop=True)
- # def calculate_heading_angle(group):
- # start_idx = 0
- # length = len(group)
- # prev_heading_angle = None # 用于存储上一个计算的航向角
- #
- # while start_idx < length:
- # start_pos_x, start_pos_y = group.iloc[start_idx]['pos_x'], group.iloc[start_idx]['pos_y']
- #
- # # 找到与当前帧距离超过1米的帧
- # for i in range(start_idx + 1, length):
- # current_pos_x, current_pos_y = group.iloc[i]['pos_x'], group.iloc[i]['pos_y']
- # distance = np.sqrt((current_pos_x - start_pos_x) ** 2 + (current_pos_y - start_pos_y) ** 2)
- #
- # if distance >= 1:
- # # 计算航向角
- # heading_angle = np.degrees(np.arctan2(current_pos_y - start_pos_y, current_pos_x - start_pos_x))
- #
- # # 检查航向角与前一个航向角的差值
- # if prev_heading_angle is not None:
- # angle_diff = np.abs(heading_angle - prev_heading_angle)
- # if angle_diff > 180:
- # angle_diff = 360 - angle_diff
- #
- # if angle_diff > 20:
- # # 如果航向角差值大于25度,跳过当前点
- # continue
- #
- # # 将 start_idx 到 i 之间的航向角设置为 heading_angle
- # group.iloc[start_idx:i, group.columns.get_loc('HeadingAngle')] = heading_angle
- # prev_heading_angle = heading_angle
- # start_idx = i
- # break
- # else:
- # # 如果没有找到超过1米的距离,使用最后一帧的数据计算航向角
- # last_pos_x, last_pos_y = group.iloc[-1]['pos_x'], group.iloc[-1]['pos_y']
- # heading_angle = np.degrees(np.arctan2(last_pos_y - start_pos_y, last_pos_x - start_pos_x))
- #
- # # 检查航向角与前一个航向角的差值
- # if prev_heading_angle is not None:
- # angle_diff = np.abs(heading_angle - prev_heading_angle)
- # if angle_diff > 180:
- # angle_diff = 360 - angle_diff
- #
- # if angle_diff <= 20:
- # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle
- # else:
- # # 如果差值大于25度,跳过最后的点
- # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = prev_heading_angle
- # else:
- # group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle
- #
- # break
- #
- # # 确保最后一行的航向角等于倒数第二行
- # if length > 1:
- # group.iloc[-1, group.columns.get_loc('HeadingAngle')] = group.iloc[-2]['HeadingAngle']
- #
- # return group
- def calculate_heading_angle(group):
- start_idx = 0
- length = len(group)
- prev_heading_angle = None # 用于存储上一个计算的航向角
- while start_idx < length:
- start_pos_x, start_pos_y = group.iloc[start_idx]['pos_x'], group.iloc[start_idx]['pos_y']
- # 找到与当前帧距离超过1米的帧
- for i in range(start_idx + 1, length):
- current_pos_x, current_pos_y = group.iloc[i]['pos_x'], group.iloc[i]['pos_y']
- distance = np.sqrt((current_pos_x - start_pos_x) ** 2 + (current_pos_y - start_pos_y) ** 2)
- if distance >= 0.5:
- # 计算航向角
- heading_angle = np.degrees(np.arctan2(current_pos_y - start_pos_y, current_pos_x - start_pos_x))
- # 检查航向角与前一个航向角的差值
- if prev_heading_angle is not None:
- angle_diff = np.abs(heading_angle - prev_heading_angle)
- if angle_diff > 180:
- if heading_angle < 0:
- heading_angle += 360
- else:
- heading_angle -= 360
- angle_diff = 360 - angle_diff
- if angle_diff > 30:
- # 如果航向角差值大于30度,跳过当前点
- continue
- # 将 start_idx 到 i 之间的航向角设置为 heading_angle
- group.iloc[start_idx:i, group.columns.get_loc('HeadingAngle')] = heading_angle
- prev_heading_angle = heading_angle
- start_idx = i
- break
- else:
- # 如果没有找到超过1米的距离,使用最后一帧的数据计算航向角
- last_pos_x, last_pos_y = group.iloc[-1]['pos_x'], group.iloc[-1]['pos_y']
- heading_angle = np.degrees(np.arctan2(last_pos_y - start_pos_y, last_pos_x - start_pos_x))
- # 检查航向角与前一个航向角的差值
- if prev_heading_angle is not None:
- angle_diff = np.abs(heading_angle - prev_heading_angle)
- if angle_diff > 180:
- if heading_angle < 0:
- heading_angle += angle_diff
- else:
- heading_angle -= angle_diff
- angle_diff = 360 - angle_diff
- if angle_diff <= 30:
- group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = heading_angle
- else:
- # 如果差值大于20度,跳过最后的点
- group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = prev_heading_angle
- else:
- group.iloc[start_idx:length - 1, group.columns.get_loc('HeadingAngle')] = group.iloc[0]['HeadingAngle']
- break
- # 确保最后一行的航向角等于倒数第二行
- if length > 1:
- group.iloc[-1, group.columns.get_loc('HeadingAngle')] = group.iloc[-2]['HeadingAngle']
- return group
- def apply_savgol(row, window_size, poly_order):
- if len(row) < window_size:
- return row
- else:
- return savgol_filter(row, window_size, poly_order)
- # 应用到分组数据
- pos_obs = pos_obs.groupby('ID').apply(calculate_heading_angle).reset_index(drop=True)
- # while (pos_obs['HeadingAngle'] < 0).any():
- # # 将负角度值转换为正值
- # pos_obs['HeadingAngle'] = pos_obs['HeadingAngle'].apply(lambda x: x + 360 if x < 0 else x)
- pos_obs['HeadingAngle'] = pos_obs.groupby('ID')['HeadingAngle'].transform(lambda x: apply_savgol(x, 45, 2))
- groups = pos_obs.groupby('ID')
- object_points = []
- for key, value in groups:
- if len(value) < 5:
- continue
- # value = smooth_1(value)
- object_points.append(
- Scenario.getXoscPosition(value, 'simTime', 'pos_x', 'pos_y', 'HeadingAngle', 'altitude', 'Type', 0, 0, 0,
- start_time))
- # object_points.append(Scenario.getXoscPosition(value, 'Time', 'position_x', 'position_y', 'HeadingAngle', 'type', offset_x, offset_y, offset_h,start_time))
- ego_speed = 5
- period = math.ceil(gps_time[-1] - gps_time[0])
- work_mode = 0 # 0为CICV车端数据
- if hour + 8 >= 24:
- hour = hour - 16
- else:
- hour = hour + 8
- time_of_day = hour * 3600
- # 没有路灯防止天过暗
- if time_of_day >= 64800:
- time_of_day = 64800
- s = Scenario(ego_points, object_points, gps_time, ego_speed, work_mode, period, absPath, 0, vehicle_type, map_id)
- filename = absPath + '/simulation'
- print(filename)
- files = s.generate(filename)
- change_CDATA(files[0][0])
- print(files)
- # dynamic_path = filename + '/simulation.xosc'
- # combine_xosc(dynamic_path, static_path)
- # print("完成xosc的合并")
- # 生成osgb
- # osgbGenerator.formatTwo(filename)
- # 修改xosc路径
- # formatThree(filename)
- # # # 生成每个场景的描述文件 json
- # # getLabel(output_path, scenario_series['场景编号'], scenario_series['场景名称'])
- # 拷贝到vtd路径下
- # vtd_path = os.path.join('/home/lxj/VIRES/VTD.2021.3/Data/Projects/Current/Scenarios/myScenario', datetime.now().strftime("%Y%m%d%H%M%S"))
- # if not os.path.exists(vtd_path):
- # os.makedirs(vtd_path)
- # os.system('cp '+ files[0][0] + ' ' + vtd_path + '/')
- def multiRun(self, path, param):
- files = self.getFile(path, self.keyFileName)
- print('程序开始,%s 个数据包' % len(files))
- t1 = time.time()
- # 无参数时,使用所有cpu核; 有参数时,使用CPU核数量为参数值
- pool = multiprocessing.Pool(processes=10)
- pfunc = partial(self.generateScenarios_raw, param)
- pool.map(pfunc, files)
- # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
- pool.close()
- pool.join()
- t2 = time.time()
- print("程序结束,并行执行时间:%s s" % int(t2 - t1))
- def batchRun(self, path, param):
- files = self.getFile(path, self.keyFileName)
- print('程序开始,%s 个数据包' % len(files))
- for di, absPath in enumerate(sorted(files)):
- print(absPath)
- self.generateScenarios_raw(absPath, param)
- # except:
- # print('Augmentation failed!!!!')
- # error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()}
- # with open('error.log','a+') as f:
- # json.dump(error, f, indent=4)
- # f.write('\n')
- # print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
- return None
- def GenerateVideo(self, root_path):
- # 批量生成视频
- imagelist = self.getFile(root_path, 'image')
- for item in imagelist:
- strs = item.split('/')
- respath = os.path.join(item, "video.mp4")
- # respath = os.path.join(res_path,strs[-3],strs[-2],"video.mp4")
- print('---------------------')
- # os.makedirs(os.path.join(res_path,strs[-3],strs[-2]))
- command = "ffmpeg -f image2 -r 10 -pattern_type glob -i '" + item + "/image/*.jpg" + "' -y '" + respath + "'"
- print(command)
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- process.wait()
- if __name__ == "__main__":
- # rootPath = "/media/hancheng/Simulation5/pujin/pujin_outdoor/11_22" # 跟车
- # vehicle_type = "0"
- rootPath = sys.argv[1]
- vehicle_type = sys.argv[2]
- # 生成场景
- a = Batchrun(rootPath, "ego_pji.csv")
- a.batchRun(rootPath, vehicle_type) # 0为占位参数
- # a.multiRun(rootPath, 0)
- # 生成视频
- # a.GenerateVideo(rootPath)
|