#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 14 15:44:01 2022
@author: lxj
"""
import xml.etree.ElementTree as ET
from scenariogeneration import xodr
from scenariogeneration import xosc
from scenariogeneration import ScenarioGenerator
from scenariogeneration.xodr import RoadSide, Object, ObjectType, Dynamic, Orientation
import math, os
from xodr_generator2 import road_detector_new
from datetime import datetime
import traceback
import json
import pandas as pd
from enum import Enum
import elevation_0323 as el
from xml.sax.saxutils import escape
import warnings
warnings.filterwarnings("ignore")
#
# #xodr_list = ['/home/hancheng/taiheqiao_map/thq_1116.xodr', '/home/hancheng/gongyuanbeihuan_map/OD/OpenDRIVE.xodr','/home/hancheng/glyt_map/OpenDRIVE/OpenDRIVE2.xodr']
# #osgb_list = ['/home/hancheng/taiheqiao_map/thq_1116.opt.osgb', '/home/hancheng/gongyuanbeihuan_map/osgb/GJBM_20km_0822.opt.osgb', '/home/hancheng/glyt_map/OSGB/jinlong_20231129_2_guazai.opt.osgb']
# xodr_list = ['/root/rosmerge/resource/thq.xodr', '/root/rosmerge/resource/OpenDRIVE.xodr','/root/rosmerge/resource/gelan.xodr']
# osgb_list = ['/root/rosmerge/resource/thq.opt.osgb', '/root/rosmerge/resource/GJBM_20km_0822.opt.osgb', '/root/rosmerge/resource/gelan.opt.osgb']
class Scenario(ScenarioGenerator):
def __init__(self, gps, obs, gps_time, ego_speed, work_mode, period, abspath, timeofday):
ScenarioGenerator.__init__(self)
self.gps = gps
self.obs = obs
self.gps_time = gps_time
self.ego_speed = ego_speed
self.work_mode = work_mode
self.period = period # 场景持续的时间
self.abspath = abspath
self.weather = xosc.PrecipitationType.dry # 默认是晴天
self.visual_fog_range = 20000 # 正常的能见度,没有雾
self.time = 43200 # 强制为白天
# self.map_id = map_id
# self.ObjectID = ObjectID
# self.Speed = Speed
def road(self, **kwargs):
positionEgo = self.gps
planview = xodr.PlanView()
nextx = positionEgo[0].x
nexty = positionEgo[0].y
h = float(positionEgo[0].h)
# for i in range(len(positionEgo) - 1):
# x = 0.000001 if abs(positionEgo[i].x) < 0.000001 else positionEgo[i].x
# y = 0.000001 if abs(positionEgo[i].y) < 0.000001 else positionEgo[i].y
# nextx = 0.000001 if abs(positionEgo[i + 1].x) < 0.000001 else positionEgo[i + 1].x
# nexty = 0.000001 if abs(positionEgo[i + 1].y) < 0.000001 else positionEgo[i + 1].y
# h = float(positionEgo[i].h)
#
# planview.add_fixed_geometry(xodr.Line(math.sqrt(math.pow(nextx - x, 2) + math.pow(nexty - y, 2))), x, y, h)
planview.add_fixed_geometry(xodr.Line(100), nextx, nexty, h)
# create two different roadmarkings
rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2)
rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2)
# create simple lanes
lanes = xodr.Lanes()
lanesection1 = xodr.LaneSection(0, xodr.standard_lane(offset=1.75, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
lanes.add_lanesection(lanesection1)
lanes.add_laneoffset(xodr.LaneOffset(a=1.75))
road = xodr.Road(0, planview, lanes)
odr = xodr.OpenDrive('myroad')
# 静态元素
tree = ET.parse(os.getcwd() + '/models/static_object_models.xodr')
root = tree.getroot()
objects_dict = root.iter(tag='object')
road = create_static_object(road, objects_dict)
# road = create_static_object(road, object_id=0, object_type=ObjectType.railing, repeat_flag=1)
odr.add_road(road)
odr.adjust_roads_and_lanes()
return odr
def get_section(self, left_lanemark):
return None
def split_road_section(self, enu_file, lane_file):
lane_content = pd.read_csv(lane_file)
# 把车道分割开来,一个车道段,多种geometry
# 首先判断车道线数值是否有断裂,断裂处根据自车轨迹生成道路参数
left_lanemark = lane_content[lane_content['LaneID'] == 1][['Time','FrameID','LanePosition','LaneQuality']]
right_lanemark = lane_content[lane_content['LaneID'] == 2][['Time','FrameID','LanePosition','LaneQuality']]
left_break_section = self.get_section(left_lanemark, 1)
right_break_section = self.get_section(right_lanemark, 1)
break_section = []
normal_section = []
# 判断是否变道,然后判断是否弯道
# 最终目标是得到多个geometry参数,确定每个连接线处的s/t偏移量?,以及左右道路的个数和宽度
return None
def check_outliers_and_modify(self, datalist):
for index in datalist.index:
if index < 3:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 else 2 * datalist[index + 1] - datalist[index + 1:index + 3].mean()
elif 3 <= index < len(datalist) - 3:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 or abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else (datalist[index - 1] + datalist[index + 1]) / 2
else:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else 2 * datalist[index - 1] - datalist[index - 3:index].mean()
return datalist
def get_new_LaneID(self, change_flag, laneID, frameid, item):
'''
修改laneID,change_list包含了每个变道片段
'''
if item[0] <= frameid < item[1]:
return laneID + change_flag
else:
return laneID
def get_lane_width(self, data1, data2):
lane_union = pd.merge(data1, data2, on=['FrameID'])
lane_union['width'] = lane_union['LanePosition_y'] - lane_union['LanePosition_x']
a = lane_union["width"].quantile(0.75)
b = lane_union["width"].quantile(0.25)
lane_width = abs((a + b) / 2)
return lane_width
def road_pre_study(self, label_file, lane_file):
laneDF = pd.read_csv(lane_file)
road_label = ''
flag = 0 # flag = 0 can use road_ego to generate .xodr, flag = 1 better use road to generate .xodr
with open(label_file) as f:
label = json.load(f)
ego_actions = label['自车行为']
for item in ego_actions:
if ('变道' in item['type_']) or ('转成功' in item['type_']) or ('掉头' in item['type_']):
road_label = item['type_']
flag = 1
break
else:
road_label = item['type_']
if road_label == '向左变道成功':
# 查找变道标记点
change_flag = 1
lane_1 = laneDF[(laneDF['LaneID'] == 1) & (laneDF['LaneQuality'] > 1)]
lane_1 = lane_1.reset_index(drop=True)
leftlanepositon = lane_1['LanePosition']
lane_1['LanePosition'] = self.check_outliers_and_modify(leftlanepositon)
lane_1['last_Position'] = lane_1[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_1['last_FrameID'] = lane_1[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_1['change_flag'] = lane_1.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
lane_1['split_flag'] = lane_1.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
lane_2 = laneDF[(laneDF['LaneID'] == 2) & (laneDF['LaneQuality'] > 1)]
lane_2 = lane_2.reset_index(drop=True)
rightlanepositon = lane_2['LanePosition']
lane_2['LanePosition'] = self.check_outliers_and_modify(rightlanepositon)
lane_2['last_Position'] = lane_2[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_2['last_FrameID'] = lane_2[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_2['change_flag'] = lane_2.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
lane_2['split_flag'] = lane_2.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
lane_width = self.get_lane_width(lane_1[['FrameID','LanePosition']], lane_2[['FrameID','LanePosition']])
# 左变道用左车道线数据靠谱,不会有他车遮挡,找到变道数据段
change_lane_point = list(lane_1[lane_1['change_flag'] == change_flag]['change_flag'].index) + [list(lane_1.index)[-1]]
change_lane_section = [(lane_1.loc[change_lane_point[i], 'FrameID'], lane_1.loc[change_lane_point[i+1], 'FrameID']) for i in range(len(change_lane_point)-1)]
for item in change_lane_section:
# 变道后修改原始laneID
# laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if item[0] <= x['FrameID'] < item[1] else x['LaneID'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if item[0] <= x['FrameID'] < item[1] else x['LanePosition'], axis=1)
# laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if x['FrameID'] == max(laneDF['FrameID']) else x['LaneID'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if x['FrameID'] == max(laneDF['FrameID']) else x['LanePosition'], axis=1)
# 修正左左车道线和右右车道线数值
if change_flag == 1:
lane_0 = laneDF[(laneDF['LaneID'] == 0) & (laneDF['LaneQuality'] > 1) & (laneDF['FrameID'] < item[0])]
if not lane_0.empty:
left_lane_width = self.get_lane_width(lane_0[['FrameID','LanePosition']], lane_1[['FrameID','LanePosition']])
else:
left_lane_width = lane_width
print(left_lane_width)
# new_lane_1 = laneDF[laneDF['LaneID'] == 1][['FrameID','LanePosition']]
# new_lane_1.rename(columns={'LanePosition':'LanePosition2'},inplace=True)
# laneDF = pd.merge(laneDF, new_lane_1, on=['FrameID'])
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == -1) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LaneID'] = laneDF.apply(lambda x: 3 if x['LaneID'] == -1 else x['LaneID'], axis=1)
# 缺右车道线right_lane_width处理
print(laneDF[laneDF['FrameID']>1860][['FrameID', 'LaneID', 'LanePosition']].head(50))
print('road_label: ', road_label)
return laneDF
def road_new(self, **kwargs):
polyfit_fig_path = os.path.join(self.abspath, 'simulation','polyfit.jpg')
# run road detector
enu_file = os.path.join(self.abspath, 'pos.csv')
lane_file = os.path.join(self.abspath, 'lane.csv')
label_file = os.path.join(self.abspath, 'label.json')
# study road style: straight\curve\split-line; ego action: change-lane\no-change\cross-turn
lane_content = self.road_pre_study(label_file, lane_file)
# road split sections
# self.split_road_section(enu_file, lane_file)
try:
laneid_tuple = [0, 1, 2, 3]
uv_coord_info, road_info, left_lane_info, right_lane_info = road_detector_new(enu_file, lane_content, laneid_tuple, polyfit_fig_path)
except:
print('road detection failed!!!! Use ego trail road.')
error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()}
with open('error.log','a+') as f:
json.dump(error, f, indent=4)
f.write('\n')
return self.road_ego()
origin_x = uv_coord_info[0]
origin_y = uv_coord_info[1]
uv_coord_hdg = uv_coord_info[2]
reference_line_offset = road_info[0]
model_u = road_info[1] # in fomrat [c3, c2, c1, c0]
model_v = road_info[2] # in fomrat [c3, c2, c1, c0]
road_length = road_info[3]
print(uv_coord_info)
print(road_info)
# generate xodr file
planview = xodr.PlanView(origin_x, origin_y, uv_coord_hdg)
# Create geometry and add it to the planview
# parampoly3 = xodr.ParamPoly3(au=model_u[3], bu=model_u[2], cu=model_u[1], du=model_u[0], av=model_v[3], bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length)
parampoly3 = xodr.ParamPoly3(au=0, bu=model_u[2], cu=model_u[1], du=model_u[0], av=0, bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length)
planview.add_geometry(parampoly3)
# create two different roadmarkings
rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid,0.2)
rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken,0.2,laneChange=xodr.LaneChange.both)
##4. Create centerlane
centerlane = xodr.Lane(a=reference_line_offset)
centerlane.add_roadmark(rm_dashed)
##5. Create lane section form the centerlane
lanesec = xodr.LaneSection(0,centerlane)
##6. Create left and right lanes
lane_id = 0
for lane in left_lane_info:
lane_id += 1
lane_width = lane[0]
spd_limit = lane[1]
lane_left = xodr.Lane(a=lane_width)
lane_left.add_roadmark(rm_dashed)
# Add lanes to lane section
lanesec.add_left_lane(lane_left)
lane_id = 0
for lane in right_lane_info:
lane_id -= 1
lane_width = lane[0]
spd_limit = lane[1]
lane_right = xodr.Lane(a=lane_width)
lane_right.add_roadmark(rm_dashed)
# Add lanes to lane section
lanesec.add_right_lane(lane_right)
##8. Add lane section to Lanes
lanes = xodr.Lanes()
lanes.add_laneoffset(xodr.LaneOffset(s = 0, a=reference_line_offset))
lanes.add_lanesection(lanesec)
##9. Create Road from Planview and Lanes
road = xodr.Road(1,planview,lanes)
##10. Create the OpenDrive class (Master class)
odr = xodr.OpenDrive('myroad')
##11. Finally add roads to Opendrive
odr.add_road(road)
##12. Adjust initial positions of the roads looking at succ-pred logic
odr.adjust_roads_and_lanes()
# ##13. Print the .xodr file
# xodr.prettyprint(odr.get_element())
# ##14. Run the .xodr with esmini
# xodr.run_road(odr,os.path.join('..','pyoscx','esmini'))
return odr
def getXoscPosition(egodata, t, e, n, h, z, obj_type, offset_x, offset_y, offset_h, start_time):
'将dataframe的轨迹转换为场景文件的轨迹数据, Time, ego_e, ego_n, headinga'
position = []
time = []
# cur_time = 0.0
egodata = egodata[[t, e, n, h, z, obj_type]]
egodata = egodata.reset_index(drop=True)
# egodata[t] = egodata.index / 10
egodata[t] = (egodata[t] - start_time) / 1000
egodata[e] = egodata[e] + offset_x
egodata[n] = egodata[n] + offset_y
egodata[h] = egodata[h] + offset_h
egodata[h] = egodata.apply(lambda x: x[h] if x[h] < 360 else x[h] - 360, axis=1)
type_list = list(egodata[obj_type])
ego_type = int(max(type_list, key=type_list.count))
lasth = float(egodata.at[0, h])
init_flag = True
# two method to set z value
zflag = 0 # 1
if zflag:
# 根据地图的xy值计算z值
zfile = '/home/hancheng/weihao/gongyuanbei_elevation.csv'
# zfile = '/home/hancheng/simulation_dynamic/elevation_df_taiheqiao.csv'
zdata = pd.read_csv(zfile)
for row in egodata.iterrows():
hhh = math.radians(row[1][h])
xodr_z = el.cal_elevation(zdata, float(row[1][e]), float(row[1][n]))
if init_flag:
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0))
init_flag = False
else:
if float(row[1][h]) - lasth > 300:
hhh = math.radians(float(row[1][h]) - 360)
elif float(row[1][h]) - lasth < -300:
hhh = math.radians(float(row[1][h]) + 360)
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0))
lasth = float(row[1][h])
time.append(float(row[1][t]))
# time.append(cur_time)
# cur_time += 0.1
return position, time, ego_type
else:
# z = 0
for row in egodata.iterrows():
hhh = math.radians(row[1][h])
if init_flag:
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
init_flag = False
else:
if float(row[1][h]) - lasth > 300:
hhh = math.radians(float(row[1][h]) - 360)
elif float(row[1][h]) - lasth < -300:
hhh = math.radians(float(row[1][h]) + 360)
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
lasth = float(row[1][h])
time.append(float(row[1][t]))
# time.append(cur_time)
# cur_time += 0.1
return position, time, ego_type
def scenario(self, **kwargs):
road = xosc.RoadNetwork(roadfile='/root/rosmerge/resource/OpenDRIVE2_add_stop_line_marking_20231229.xodr', scenegraph='/root/rosmerge/resource/jinlong_all_addtrafficlight_20240226_5_guazai_2.opt.osgb')
catalog = xosc.Catalog()
catalog.add_catalog('VehicleCatalog', 'Distros/Current/Config/Players/Vehicles')
catalog.add_catalog('PedestrianCatalog', 'Distros/Current/Config/Players/Pedestrians')
catalog.add_catalog('MiscObjectCatalog', 'Distros/Current/Config/Players/Objects')
catalog.add_catalog('ControllerCatalog', 'Distros/Current/Config/Players/driverCfg.xml')
pbb = xosc.BoundingBox(0.5, 0.5, 1.8, 2.0, 0, 0.9)
mbb = xosc.BoundingBox(1, 2, 1.7, 1.5, 0, 0.9)
bb = xosc.BoundingBox(2.1, 4.5, 1.8, 1.5, 0, 0.9)
bus_bb = xosc.BoundingBox(2.55, 10.5, 3.1, 5.1, 0, 1.45)
bus_fa = xosc.Axle(0.48, 0.91, 2.5, 4.5, 1.5)
bus_ba = xosc.Axle(0, 0.91, 2.5, 4.5, 1.5)
truck_bb = xosc.BoundingBox(2.8, 8.744, 3.78, 4.372, 0, 1.89)
truck_fa = xosc.Axle(0.48, 1, 2.8, 4.372, 1.5)
truck_ba = xosc.Axle(0, 1, 2.8, 4.372, 1.5)
fa = xosc.Axle(0.5, 0.6, 1.8, 3.1, 0.3)
ba = xosc.Axle(0, 0.6, 1.8, 0, 0.3)
cone_bb = xosc.BoundingBox(0.5, 0.5, 1.0, 2.0, 0, 0.9)
red_veh = xosc.Vehicle('Audi_A3_2009_black', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
red_veh.add_property(name='control', value='external')
segway = xosc.Vehicle('Segway', xosc.VehicleCategory.car, pbb, fa, ba, 69.444, 200, 10)
segway.add_property(name='control', value='external')
white_veh = xosc.Vehicle('Audi_A3_2009_red', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
male_ped = xosc.Pedestrian('Christian', 'male_adult', 70, xosc.PedestrianCategory.pedestrian, pbb)
motorcycle = xosc.Vehicle('Kawasaki_ZX-9R_black', xosc.VehicleCategory.motorbike, mbb, fa, ba, 69.444, 200, 10)
motorcycle.add_property(name='control', value='external')
truck = xosc.Vehicle('MANTGS_11_Black', xosc.VehicleCategory.truck, truck_bb, truck_fa,
truck_ba, 26.4, 10, 10)
bus = xosc.Vehicle('MercedesTravego_10_ArcticWhite', xosc.VehicleCategory.bus, bus_bb, bus_fa, bus_ba, 29, 10,
10)
cone = xosc.MiscObject('RdMiscPylon03-32cm', 10, xosc.MiscObjectCategory.obstacle, cone_bb)
prop = xosc.Properties()
cnt = xosc.Controller('DefaultDriver', prop)
cnt2 = xosc.Controller('No Driver', prop)
# 添加自车实体
egoname = 'Ego'
entities = xosc.Entities()
entities.add_scenario_object(egoname, segway, cnt)
# 添加目标车实体
objname = 'Player'
ped_type, car_type, bicycle_motor_type, bus_type, truct_type, cone_type = get_obj_type(self.work_mode)
positionEgo = self.gps
positionObj = self.obs
for i, row in enumerate(positionObj):
if len(row[1]) < 1:
continue
obj_type = row[2] # 根据目标物类型添加对应的实体
if obj_type in ped_type:
entities.add_scenario_object(objname + str(i), male_ped)
elif obj_type in car_type:
entities.add_scenario_object(objname + str(i), white_veh, cnt2)
elif obj_type in bicycle_motor_type:
entities.add_scenario_object(objname + str(i), motorcycle, cnt2)
elif obj_type in bus_type:
entities.add_scenario_object(objname + str(i), bus, cnt2)
elif obj_type in truct_type:
entities.add_scenario_object(objname + str(i), truck, cnt2)
elif obj_type in cone_type:
entities.add_scenario_object(objname + str(i), cone)
# 初始化
init = xosc.Init()
step_time = xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 0)
egospeed = xosc.AbsoluteSpeedAction(self.ego_speed, step_time)
# 设置初始环境要素,例如天气时间等
default_precipitation = xosc.PrecipitationType.dry
default_cloudstate = xosc.CloudState.free
# json_path = os.path.join(self.abspath, 'label.json')
# with open(json_path, 'r', encoding='utf-8') as f:
# sc_json = json.load(f)
# weather = sc_json['天气情况'][0]['type_'] #['自然环境']
# if weather == '晴':
# f.close()
# elif weather == '多云':
# default_cloudstate = xosc.CloudState.cloudy
# elif weather == '阴':
# default_cloudstate = xosc.CloudState.overcast
# elif weather == '雨':
# default_cloudstate = xosc.CloudState.overcast
# default_precipitation = xosc.PrecipitationType.rain
# elif weather == '雪':
# default_cloudstate = xosc.CloudState.overcast
# default_precipitation = xosc.PrecipitationType.snow
# elif weather == '雾霾' or weather == '沙尘':
# self.visual_fog_range = 700
# default_cloudstate = xosc.CloudState.cloudy
init.add_global_action(
xosc.EnvironmentAction(name="InitEnvironment", environment=xosc.Environment(
xosc.TimeOfDay(True, 2019, 12, 19, 12, 0, 0),
xosc.Weather(
sun_intensity=2000, sun_azimuth=40, sun_elevation=20,
cloudstate=default_cloudstate,
precipitation=default_precipitation,
precipitation_intensity=1,
visual_fog_range=self.visual_fog_range),
xosc.RoadCondition(friction_scale_factor=0.7))))
# ego car init
ego_init_h = positionEgo[0].h
init.add_init_action(egoname, xosc.TeleportAction(
xosc.WorldPosition(x=positionEgo[0].x, y=positionEgo[0].y, z=positionEgo[0].z, h=ego_init_h, p=0, r=0)))
init.add_init_action(egoname, egospeed)
# ego car trail
trajectory = xosc.Trajectory('egoTrajectory', False)
polyline = xosc.Polyline(self.gps_time, positionEgo)
trajectory.add_shape(polyline)
speedaction = xosc.FollowTrajectoryAction(trajectory, xosc.FollowMode.position, xosc.ReferenceContext.absolute,
1, 0)
trigger = xosc.ValueTrigger('drive_start_trigger', 0, xosc.ConditionEdge.rising,
xosc.SimulationTimeCondition(0, xosc.Rule.greaterThan))
trigger_init = xosc.ValueTrigger(name='init_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=0,
rule=xosc.Rule.greaterThan))
event_init = xosc.Event('Event_init', xosc.Priority.overwrite)
action_light = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
light_flag = 'auto'
if self.time == 64800:
light_flag = 'true'
text = f''
node = ET.Element("CustomCommandAction")
node.attrib = {'type': 'scp'}
node.text = f""
action_light.add_element(node)
event_init.add_action('headlight', action_light)
event_init.add_trigger(trigger_init)
event = xosc.Event('Event1', xosc.Priority.overwrite)
event.add_trigger(trigger)
event.add_action('newspeed', speedaction)
man = xosc.Maneuver('my maneuver')
man.add_event(event_init)
man.add_event(event)
mangr = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
mangr.add_actor('Ego')
mangr.add_maneuver(man)
trigger0 = xosc.Trigger('start')
act = xosc.Act('Act1', trigger0)
act.add_maneuver_group(mangr)
ego_story = xosc.Story('mystory_ego')
ego_story.add_act(act)
stop_trigger = xosc.ValueTrigger('stop_trigger', 0, xosc.ConditionEdge.none,
xosc.SimulationTimeCondition(self.period, xosc.Rule.greaterThan), 'stop')
sb = xosc.StoryBoard(init, stoptrigger=stop_trigger)
sb.add_story(ego_story)
# object car trail
if positionObj:
for i, row in enumerate(positionObj):
name = objname + str(i)
if len(row[1]) < 2:
continue
obj_type = row[2]
# 根据目标车需要动态创建实体
x = row[0][0].x
y = row[0][0].y
z = row[0][0].z
h = row[0][0].h
start_time = row[1][0]
init_position = xosc.WorldPosition(x=x, y=y, z=z, h=h, p=0, r=0)
newentity = xosc.AddEntityAction(name, init_position)
newentity_trigger = xosc.ValueTrigger(name='newentity_trigger', delay=0,
conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=start_time,
rule=xosc.Rule.greaterThan))
newentity_event = xosc.Event('Event_newentity', xosc.Priority.overwrite)
newentity_event.add_trigger(newentity_trigger)
newentity_event.add_action('newentity_action', newentity)
man_obj = xosc.Maneuver('my maneuver')
man_obj.add_event(newentity_event)
if obj_type != 103:
# 添加目标车轨迹
trajectoryM = xosc.Trajectory('objectTrajectory', False)
polylineM = xosc.Polyline(row[1], row[0])
trajectoryM.add_shape(polylineM)
obj_trail_action = xosc.FollowTrajectoryAction(trajectory=trajectoryM,
following_mode=xosc.FollowMode.position,
reference_domain=xosc.ReferenceContext.absolute,
scale=1, offset=0, initialDistanceOffset=0)
obj_trail_event = xosc.Event('Event1', xosc.Priority.overwrite)
# obj_start_trigger = xosc.EntityTrigger("obj-start-trigger", 0, xosc.ConditionEdge.rising, xosc.SpeedCondition(0, xosc.Rule.greaterThan),'Ego')
obj_start_trigger = xosc.ValueTrigger('obj_start_trigger', 0, xosc.ConditionEdge.rising,
xosc.SimulationTimeCondition(start_time, xosc.Rule.greaterThan))
obj_trail_event.add_trigger(obj_start_trigger)
obj_trail_event.add_action('trail_action', obj_trail_action)
# if obj_type in ped_type:
# walp_trigger = xosc.EntityTrigger('ped_walp_trigger', 0, xosc.ConditionEdge.rising,\
# xosc.TimeToCollisionCondition(self.intersectime, xosc.Rule.lessThan, entity=name), 'Ego')
# obj_trail_event.add_trigger(walp_trigger)
man_obj.add_event(obj_trail_event)
# 行人特有的事件
if obj_type in ped_type:
ped_event = xosc.Event('Event_ped', xosc.Priority.overwrite)
ped_event.add_trigger(obj_start_trigger)
ped_action = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
ped_action.add_element(self.createUDAction(name))
ped_event.add_action('ped_trail_action', ped_action)
man_obj.add_event(ped_event)
# finish_trigger = xosc.EntityTrigger('finish_trigger', 0, xosc.ConditionEdge.rising, xosc.ReachPositionCondition(position=row[1][0],tolerance=1),name)
# event4 = xosc.Event('Event_ped',xosc.Priority.overwrite)
# event4.add_trigger(finish_trigger)
# be_still_action = xosc.AbsoluteSpeedAction(0, xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 1))
# event4.add_action('ped_be_still_action',be_still_action)
# man_obj.add_event(event4)
# 轨迹最后删除实体
last_time = row[1][-1]
del_action = xosc.DeleteEntityAction(name)
del_trigger = xosc.ValueTrigger(name='entity_del_trigger', delay=0,
conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=last_time,
rule=xosc.Rule.greaterThan))
del_event = xosc.Event('Event_del', xosc.Priority.overwrite)
del_event.add_trigger(del_trigger)
del_event.add_action('del_action', del_action)
man_obj.add_event(del_event)
obj_mangroup = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
obj_mangroup.add_actor(name)
obj_mangroup.add_maneuver(man_obj)
obj_act = xosc.Act('Act2', trigger0)
obj_act.add_maneuver_group(obj_mangroup)
obj_story = xosc.Story('mystory_' + name)
obj_story.add_act(obj_act)
sb.add_story(obj_story)
# prettyprint(sb.get_element())
paramet = xosc.ParameterDeclarations()
sce = xosc.Scenario('my scenario', 'Maggie', paramet, entities, sb, road, catalog)
return sce
def createUDAction(self, name):
# tree = ET.parse('./models/ped_CDATA.xosc')
# root = tree.getroot()
# ele = root[5][2][1][0][1][1][0][0][0]
# newnode = ET.Element("CustomCommandAction")
# newnode.attrib = {'type': 'scp'}
# newnode.text = ''
speed = 3
motion = 'walk'
# text = f'\n \n'
text = '\n \n'.format(name, motion, speed)
# text = '<Traffic> <ActionMotion actor="Player0" move="walk" speed="3" force="false"rate="0" delayTime="0.0" activateOnExit="false"/></Traffic>'.format(name, motion, speed)
newnode = ET.Element("CustomCommandAction")
newnode.attrib = {'type': 'scp'}
newnode.text = "".format(text)
return newnode
class WorkMode(Enum):
cicv = 0 # CICV车端
roadside = 1 # 车网路端
car = 2 # 车网车端
merge = 3 # 车网车端路端融合
def get_obj_type(mode):
if mode == WorkMode.cicv.value:
# CICV车端
ped_type = [7]
car_type = [2]
bicycle_motor_type = [8]
bus_type = [3]
truct_type = [4]
cone_type = [103]
elif mode == WorkMode.roadside.value:
# 车网路端
ped_type = [0]
car_type = [2]
bicycle_motor_type = [1, 3]
bus_type = [5]
truct_type = [7]
cone_type = [103]
elif mode == WorkMode.car.value:
# 车网车端
ped_type = [1]
car_type = [6]
bicycle_motor_type = [2, 3, 4, 5]
bus_type = [7, 8]
truct_type = [9]
cone_type = [103]
else:
# 车网车端路端融合
ped_type = [1]
car_type = [6]
bicycle_motor_type = [2, 3, 4, 5]
bus_type = [7, 8]
truct_type = [9]
cone_type = [103]
return ped_type, car_type, bicycle_motor_type, bus_type, truct_type, cone_type
# 创建道路旁静止的场景
def create_static_object(road, object_dict):
for single_object in object_dict:
for k, v in ObjectType.__members__.items():
if k == single_object.attrib['type']:
single_object.attrib['type'] = v
break
road_object = Object(
s=single_object.attrib['s'],
t=single_object.attrib['t'],
Type=single_object.attrib['type'],
dynamic=Dynamic.no,
id=single_object.attrib['id'],
name=single_object.attrib['name'],
zOffset=single_object.attrib['zOffset'],
validLength=single_object.attrib['validLength'],
orientation=Orientation.none,
length=single_object.attrib['length'],
width=single_object.attrib['width'],
height=single_object.attrib['height'],
pitch=single_object.attrib['pitch'],
roll=single_object.attrib['roll']
)
# 判断此object是否是重复的元素
repeat = single_object.find('repeat')
if repeat is not None:
road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.left,
tOffset=1.75)
road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.right,
tOffset=-1.755)
else:
road.add_object(road_object)
return road
def change_CDATA(filepath):
'行人场景特例,对xosc文件内的特殊字符做转换'
f = open(filepath, "r", encoding="UTF-8")
txt = f.readline()
all_line = []
# txt是否为空可以作为判断文件是否到了末尾
while txt:
txt = txt.replace("<", "<").replace(">", ">").replace("&", "&").replace(""", '"').replace(
"'", "'")
all_line.append(txt)
# 读取文件的下一行
txt = f.readline()
f.close()
f1 = open(filepath, 'w', encoding="UTF-8")
for line in all_line:
f1.write(line)
f1.close()
def path_changer(xosc_path, xodr_path, osgb_path):
"""
provided by Dongpeng Ding
:param xosc_path:
:param xodr_path:
:param osgb_path:
:return:
"""
tree = ET.parse(xosc_path)
treeRoot = tree.getroot()
# for OpenScenario v0.9, v1.0
for RoadNetwork in treeRoot.findall('RoadNetwork'):
for Logics in RoadNetwork.findall('LogicFile'):
Logics.attrib['filepath'] = xodr_path
for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
SceneGraph.attrib['filepath'] = osgb_path
for Logics in RoadNetwork.findall('Logics'):
Logics.attrib['filepath'] = xodr_path
for SceneGraph in RoadNetwork.findall('SceneGraph'):
SceneGraph.attrib['filepath'] = osgb_path
# for VTD xml
for Layout in treeRoot.findall('Layout'):
Layout.attrib['File'] = xodr_path
Layout.attrib['Database'] = osgb_path
tree.write(xosc_path, xml_declaration=True)
def readXML(xoscPath):
xodrFileName = ""
osgbFileName = ""
tree = ET.parse(xoscPath)
treeRoot = tree.getroot()
for RoadNetwork in treeRoot.findall('RoadNetwork'):
for Logics in RoadNetwork.findall('LogicFile'):
xodrFileName = Logics.attrib['filepath']
for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
osgbFileName = SceneGraph.attrib['filepath']
for Logics in RoadNetwork.findall('Logics'):
xodrFileName = Logics.attrib['filepath']
for SceneGraph in RoadNetwork.findall('SceneGraph'):
osgbFileName = SceneGraph.attrib['filepath']
return xodrFileName[xodrFileName.rindex("/") + 1:], osgbFileName[osgbFileName.rindex("/") + 1:]
def formatThree(rootDirectory):
"""
xodr and osgb file path are fixed
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if ".xosc" == file[-5:]:
# xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.xodr" # 泛化效果好的场景用的
# osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.osgb" # 泛化效果好的场景用的
# xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.xodr" # 泛化效果好的场景用的
# osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.osgb" # 泛化效果好的场景用的
xodrFilePath = "/home/hancheng/hc_project/2023-08-08-10-50-16_jsons1/2023-08-08-10-50-16_jsons_0/2023-08-08-10-50-16_jsons_0.xodr" # 直路,泛化好用
osgbFilePath = "/home/hancheng/VIRES/VTD.2022.1/Runtime/Tools/RodDistro_7001_Rod4.6.3/DefaultProject/Database/json0.opt.osgb" # 直路,泛化好用
# xodrFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.xodr" # 十字路口,泛化好用
# osgbFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.opt.osgb" # 十字路口,泛化好用
# xodrFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.xodr" # Sharing-van还原用的
# osgbFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.opt.osgb" # Sharing-van还原用的
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def formatTwo(rootDirectory):
"""
data format:
simulation
file.xosc
file.xodr
file.osgb
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if ".xosc" == file[-5:]:
xodrFilePath = ""
osgbFilePath = ""
for odrFile in os.listdir(root):
if ".xodr" == odrFile[-5:]:
xodrFilePath = root + "/" + odrFile
break
for osgbFile in os.listdir(root):
if ".osgb" == osgbFile[-5:]:
osgbFilePath = root + "/" + osgbFile
break
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def formatOne(rootDirectory):
"""
data format:
openx
xosc
file.xosc
xodr
file.xodr
osgb
file.osgb
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if "xosc" == file[-4:]:
xodrFilePath = ""
osgbFilePath = ""
for odrFile in os.listdir(root[:-4] + "xodr"):
if "xodr" == odrFile[-4:]:
xodrFilePath = root[:-4] + "xodr/" + odrFile
break
for osgbFile in os.listdir(root[:-4] + "osgb"):
if "osgb" == osgbFile[-4:]:
osgbFilePath = root[:-4] + "osgb/" + osgbFile
break
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def chongQingFormat(rootDirectory):
"""
supporting file format: chong qing folder format
:return:
"""
counter = 1
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if "xosc" == file[-4:]:
if "ver1.0.xosc" == file[-11:]:
xodrFileName, osgbFileName = readXML(root + "/" + file)
xodrFilePath = "/Xodrs/" + xodrFileName
osgbFilePath = "/Databases/" + osgbFileName
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print(counter, "Change success: " + root + "/" + file)
else:
xodrFileName, osgbFileName = readXML(root + "/" + file)
xodrFilePath = "/Xodrs/" + xodrFileName
osgbFilePath = "/Databases/" + osgbFileName
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print(counter, "Change success: " + root + "/" + file)
counter += 1