#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 14 15:44:01 2022
@author: lxj
"""
import xml.etree.ElementTree as ET
from scenariogeneration import xodr
from scenariogeneration import xosc
from scenariogeneration import ScenarioGenerator
from scenariogeneration.xodr import RoadSide, Object, ObjectType, Dynamic, Orientation
import math, os
# from xodr_generator2 import road_detector_new
from datetime import datetime
import traceback
import json
import pandas as pd
from enum import Enum
from xml.sax.saxutils import escape
import warnings
warnings.filterwarnings("ignore")
xodr_list = ['/mnt/disk001/simulation_outdoor/thq_1116.xodr', '/mnt/disk001/simulation_outdoor/anqing.xodr']
osgb_list = ['/mnt/disk001/simulation_outdoor/thq_1116.opt.osgb', '/mnt/disk001/simulation_outdoor/anqing.osgb']
class Scenario(ScenarioGenerator):
def __init__(self, gps, obs, gps_time, ego_speed, work_mode, period, abspath, timeofday, vehicle_type, map_id):
ScenarioGenerator.__init__(self)
self.gps = gps
self.obs = obs
self.gps_time = gps_time
self.ego_speed = ego_speed
self.work_mode = work_mode
self.period = period # 场景持续的时间
self.abspath = abspath
self.weather = xosc.PrecipitationType.dry # 默认是晴天
self.visual_fog_range = 20000 # 正常的能见度,没有雾
self.time = 43200 # 强制为白天
self.vehicle_type = vehicle_type
self.map_id = map_id
# self.map_id = map_id
# self.ObjectID = ObjectID
# self.Speed = Speed
def road(self, **kwargs):
positionEgo = self.gps
planview = xodr.PlanView()
nextx = positionEgo[0].x
nexty = positionEgo[0].y
h = float(positionEgo[0].h)
# for i in range(len(positionEgo) - 1):
# x = 0.000001 if abs(positionEgo[i].x) < 0.000001 else positionEgo[i].x
# y = 0.000001 if abs(positionEgo[i].y) < 0.000001 else positionEgo[i].y
# nextx = 0.000001 if abs(positionEgo[i + 1].x) < 0.000001 else positionEgo[i + 1].x
# nexty = 0.000001 if abs(positionEgo[i + 1].y) < 0.000001 else positionEgo[i + 1].y
# h = float(positionEgo[i].h)
#
# planview.add_fixed_geometry(xodr.Line(math.sqrt(math.pow(nextx - x, 2) + math.pow(nexty - y, 2))), x, y, h)
planview.add_fixed_geometry(xodr.Line(100), nextx, nexty, h)
# create two different roadmarkings
rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2)
rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2)
# create simple lanes
lanes = xodr.Lanes()
lanesection1 = xodr.LaneSection(0, xodr.standard_lane(offset=1.75, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
lanes.add_lanesection(lanesection1)
lanes.add_laneoffset(xodr.LaneOffset(a=1.75))
road = xodr.Road(0, planview, lanes)
odr = xodr.OpenDrive('myroad')
# 静态元素
tree = ET.parse(os.getcwd() + '/models/static_object_models.xodr')
root = tree.getroot()
objects_dict = root.iter(tag='object')
road = create_static_object(road, objects_dict)
# road = create_static_object(road, object_id=0, object_type=ObjectType.railing, repeat_flag=1)
odr.add_road(road)
odr.adjust_roads_and_lanes()
return odr
def get_section(self, left_lanemark):
return None
def split_road_section(self, enu_file, lane_file):
lane_content = pd.read_csv(lane_file)
# 把车道分割开来,一个车道段,多种geometry
# 首先判断车道线数值是否有断裂,断裂处根据自车轨迹生成道路参数
left_lanemark = lane_content[lane_content['LaneID'] == 1][['Time','FrameID','LanePosition','LaneQuality']]
right_lanemark = lane_content[lane_content['LaneID'] == 2][['Time','FrameID','LanePosition','LaneQuality']]
left_break_section = self.get_section(left_lanemark, 1)
right_break_section = self.get_section(right_lanemark, 1)
break_section = []
normal_section = []
# 判断是否变道,然后判断是否弯道
# 最终目标是得到多个geometry参数,确定每个连接线处的s/t偏移量?,以及左右道路的个数和宽度
return None
def check_outliers_and_modify(self, datalist):
for index in datalist.index:
if index < 3:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 else 2 * datalist[index + 1] - datalist[index + 1:index + 3].mean()
elif 3 <= index < len(datalist) - 3:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 or abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else (datalist[index - 1] + datalist[index + 1]) / 2
else:
datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else 2 * datalist[index - 1] - datalist[index - 3:index].mean()
return datalist
def get_new_LaneID(self, change_flag, laneID, frameid, item):
'''
修改laneID,change_list包含了每个变道片段
'''
if item[0] <= frameid < item[1]:
return laneID + change_flag
else:
return laneID
def get_lane_width(self, data1, data2):
lane_union = pd.merge(data1, data2, on=['FrameID'])
lane_union['width'] = lane_union['LanePosition_y'] - lane_union['LanePosition_x']
a = lane_union["width"].quantile(0.75)
b = lane_union["width"].quantile(0.25)
lane_width = abs((a + b) / 2)
return lane_width
def road_pre_study(self, label_file, lane_file):
laneDF = pd.read_csv(lane_file)
road_label = ''
flag = 0 # flag = 0 can use road_ego to generate .xodr, flag = 1 better use road to generate .xodr
with open(label_file) as f:
label = json.load(f)
ego_actions = label['自车行为']
for item in ego_actions:
if ('变道' in item['type_']) or ('转成功' in item['type_']) or ('掉头' in item['type_']):
road_label = item['type_']
flag = 1
break
else:
road_label = item['type_']
if road_label == '向左变道成功':
# 查找变道标记点
change_flag = 1
lane_1 = laneDF[(laneDF['LaneID'] == 1) & (laneDF['LaneQuality'] > 1)]
lane_1 = lane_1.reset_index(drop=True)
leftlanepositon = lane_1['LanePosition']
lane_1['LanePosition'] = self.check_outliers_and_modify(leftlanepositon)
lane_1['last_Position'] = lane_1[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_1['last_FrameID'] = lane_1[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_1['change_flag'] = lane_1.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
lane_1['split_flag'] = lane_1.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
lane_2 = laneDF[(laneDF['LaneID'] == 2) & (laneDF['LaneQuality'] > 1)]
lane_2 = lane_2.reset_index(drop=True)
rightlanepositon = lane_2['LanePosition']
lane_2['LanePosition'] = self.check_outliers_and_modify(rightlanepositon)
lane_2['last_Position'] = lane_2[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_2['last_FrameID'] = lane_2[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列
lane_2['change_flag'] = lane_2.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
lane_2['split_flag'] = lane_2.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
lane_width = self.get_lane_width(lane_1[['FrameID','LanePosition']], lane_2[['FrameID','LanePosition']])
# 左变道用左车道线数据靠谱,不会有他车遮挡,找到变道数据段
change_lane_point = list(lane_1[lane_1['change_flag'] == change_flag]['change_flag'].index) + [list(lane_1.index)[-1]]
change_lane_section = [(lane_1.loc[change_lane_point[i], 'FrameID'], lane_1.loc[change_lane_point[i+1], 'FrameID']) for i in range(len(change_lane_point)-1)]
for item in change_lane_section:
# 变道后修改原始laneID
# laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if item[0] <= x['FrameID'] < item[1] else x['LaneID'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if item[0] <= x['FrameID'] < item[1] else x['LanePosition'], axis=1)
# laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if x['FrameID'] == max(laneDF['FrameID']) else x['LaneID'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if x['FrameID'] == max(laneDF['FrameID']) else x['LanePosition'], axis=1)
# 修正左左车道线和右右车道线数值
if change_flag == 1:
lane_0 = laneDF[(laneDF['LaneID'] == 0) & (laneDF['LaneQuality'] > 1) & (laneDF['FrameID'] < item[0])]
if not lane_0.empty:
left_lane_width = self.get_lane_width(lane_0[['FrameID','LanePosition']], lane_1[['FrameID','LanePosition']])
else:
left_lane_width = lane_width
print(left_lane_width)
# new_lane_1 = laneDF[laneDF['LaneID'] == 1][['FrameID','LanePosition']]
# new_lane_1.rename(columns={'LanePosition':'LanePosition2'},inplace=True)
# laneDF = pd.merge(laneDF, new_lane_1, on=['FrameID'])
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == -1) else x['LanePosition'], axis=1)
# laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
# laneDF['LaneID'] = laneDF.apply(lambda x: 3 if x['LaneID'] == -1 else x['LaneID'], axis=1)
# 缺右车道线right_lane_width处理
print(laneDF[laneDF['FrameID']>1860][['FrameID', 'LaneID', 'LanePosition']].head(50))
print('road_label: ', road_label)
return laneDF
# def road_new(self, **kwargs):
# polyfit_fig_path = os.path.join(self.abspath, 'simulation','polyfit.jpg')
#
# # run road detector
# enu_file = os.path.join(self.abspath, 'pos.csv')
# lane_file = os.path.join(self.abspath, 'lane.csv')
# label_file = os.path.join(self.abspath, 'label.json')
#
# # study road style: straight\curve\split-line; ego action: change-lane\no-change\cross-turn
# lane_content = self.road_pre_study(label_file, lane_file)
#
# # road split sections
# # self.split_road_section(enu_file, lane_file)
#
# try:
# laneid_tuple = [0, 1, 2, 3]
# uv_coord_info, road_info, left_lane_info, right_lane_info = road_detector_new(enu_file, lane_content, laneid_tuple, polyfit_fig_path)
# except:
# print('road detection failed!!!! Use ego trail road.')
# error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()}
# with open('error.log','a+') as f:
# json.dump(error, f, indent=4)
# f.write('\n')
# return self.road_ego()
#
# origin_x = uv_coord_info[0]
# origin_y = uv_coord_info[1]
# uv_coord_hdg = uv_coord_info[2]
#
# reference_line_offset = road_info[0]
# model_u = road_info[1] # in fomrat [c3, c2, c1, c0]
# model_v = road_info[2] # in fomrat [c3, c2, c1, c0]
# road_length = road_info[3]
# print(uv_coord_info)
# print(road_info)
# # generate xodr file
# planview = xodr.PlanView(origin_x, origin_y, uv_coord_hdg)
#
# # Create geometry and add it to the planview
# # parampoly3 = xodr.ParamPoly3(au=model_u[3], bu=model_u[2], cu=model_u[1], du=model_u[0], av=model_v[3], bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length)
# parampoly3 = xodr.ParamPoly3(au=0, bu=model_u[2], cu=model_u[1], du=model_u[0], av=0, bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length)
# planview.add_geometry(parampoly3)
#
# # create two different roadmarkings
# rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid,0.2)
# rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken,0.2,laneChange=xodr.LaneChange.both)
#
# ##4. Create centerlane
# centerlane = xodr.Lane(a=reference_line_offset)
# centerlane.add_roadmark(rm_dashed)
#
# ##5. Create lane section form the centerlane
# lanesec = xodr.LaneSection(0,centerlane)
#
# ##6. Create left and right lanes
# lane_id = 0
# for lane in left_lane_info:
# lane_id += 1
# lane_width = lane[0]
# spd_limit = lane[1]
# lane_left = xodr.Lane(a=lane_width)
# lane_left.add_roadmark(rm_dashed)
# # Add lanes to lane section
# lanesec.add_left_lane(lane_left)
#
# lane_id = 0
# for lane in right_lane_info:
# lane_id -= 1
# lane_width = lane[0]
# spd_limit = lane[1]
# lane_right = xodr.Lane(a=lane_width)
# lane_right.add_roadmark(rm_dashed)
# # Add lanes to lane section
# lanesec.add_right_lane(lane_right)
#
# ##8. Add lane section to Lanes
# lanes = xodr.Lanes()
# lanes.add_laneoffset(xodr.LaneOffset(s = 0, a=reference_line_offset))
# lanes.add_lanesection(lanesec)
#
#
# ##9. Create Road from Planview and Lanes
# road = xodr.Road(1,planview,lanes)
#
# ##10. Create the OpenDrive class (Master class)
# odr = xodr.OpenDrive('myroad')
#
# ##11. Finally add roads to Opendrive
# odr.add_road(road)
#
# ##12. Adjust initial positions of the roads looking at succ-pred logic
# odr.adjust_roads_and_lanes()
#
# # ##13. Print the .xodr file
# # xodr.prettyprint(odr.get_element())
#
# # ##14. Run the .xodr with esmini
# # xodr.run_road(odr,os.path.join('..','pyoscx','esmini'))
#
# return odr
def getXoscPosition(egodata, t, e, n, h, z, obj_type, offset_x, offset_y, offset_h, start_time):
'将dataframe的轨迹转换为场景文件的轨迹数据, Time, ego_e, ego_n, headinga'
position = []
time = []
# cur_time = 0.0
egodata = egodata[[t, e, n, h, z, obj_type]]
egodata = egodata.reset_index(drop=True)
#egodata[t] = egodata.index / 10
#egodata[t] = (egodata[t] - start_time) / 1000
egodata[t] = egodata[t]
egodata[e] = egodata[e] + offset_x
egodata[n] = egodata[n] + offset_y
egodata[h] = egodata[h] + offset_h
egodata[h] = egodata.apply(lambda x: x[h] if x[h] < 360 else x[h] - 360, axis=1)
type_list = list(egodata[obj_type])
ego_type = int(max(type_list, key=type_list.count))
lasth = float(egodata.at[0, h])
init_flag = True
for row in egodata.iterrows():
hhh = math.radians(row[1][h])
if init_flag:
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
init_flag = False
else:
if float(row[1][h]) - lasth > 300:
hhh = math.radians(float(row[1][h]) - 360)
elif float(row[1][h]) - lasth < -300:
hhh = math.radians(float(row[1][h]) + 360)
position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
lasth = float(row[1][h])
time.append(float(row[1][t]))
# time.append(cur_time)
# cur_time += 0.1
return position, time, ego_type
def scenario(self, **kwargs):
road = xosc.RoadNetwork(roadfile=xodr_list[self.map_id],
scenegraph=osgb_list[self.map_id])
catalog = xosc.Catalog()
catalog.add_catalog('VehicleCatalog', 'Distros/Current/Config/Players/Vehicles')
catalog.add_catalog('PedestrianCatalog', 'Distros/Current/Config/Players/Pedestrians')
catalog.add_catalog('MiscObjectCatalog', 'Distros/Current/Config/Players/Objects')
catalog.add_catalog('ControllerCatalog', 'Distros/Current/Config/Players/driverCfg.xml')
pbb = xosc.BoundingBox(0.5, 0.5, 1.8, 2.0, 0, 0.9)
mbb = xosc.BoundingBox(1, 2, 1.7, 1.5, 0, 0.9)
bb = xosc.BoundingBox(2.1, 4.5, 1.8, 1.5, 0, 0.9)
bus_bb = xosc.BoundingBox(2.55, 10.5, 3.1, 5.1, 0, 1.45)
bus_fa = xosc.Axle(0.48, 0.91, 2.5, 4.5, 1.5)
bus_ba = xosc.Axle(0, 0.91, 2.5, 4.5, 1.5)
truck_bb = xosc.BoundingBox(2.8, 8.744, 3.78, 4.372, 0, 1.89)
truck_fa = xosc.Axle(0.48, 1, 2.8, 4.372, 1.5)
truck_ba = xosc.Axle(0, 1, 2.8, 4.372, 1.5)
fa = xosc.Axle(0.5, 0.6, 1.8, 3.1, 0.3)
ba = xosc.Axle(0, 0.6, 1.8, 0, 0.3)
cone_bb = xosc.BoundingBox(0.5, 0.5, 1.0, 2.0, 0, 0.9)
red_veh = xosc.Vehicle('Audi_A3_2009_black', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
red_veh.add_property(name='control', value='external')
segway = xosc.Vehicle('Segway', xosc.VehicleCategory.car, pbb, fa, ba, 69.444, 200, 10)
PuJin_patrol_robot = xosc.Vehicle('PuJin_patrol_robot', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
PuJin_distribution = xosc.Vehicle('PuJin_distribution', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
white_veh = xosc.Vehicle('Audi_A3_2009_red', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
male_ped = xosc.Pedestrian('Christian', 'male_adult', 70, xosc.PedestrianCategory.pedestrian, pbb)
motorcycle = xosc.Vehicle('Kawasaki_ZX-9R_black', xosc.VehicleCategory.motorbike, mbb, fa, ba, 69.444, 200, 10)
motorcycle.add_property(name='control', value='external')
truck = xosc.Vehicle('MANTGS_11_Black', xosc.VehicleCategory.truck, truck_bb, truck_fa,
truck_ba, 26.4, 10, 10)
bus = xosc.Vehicle('MercedesTravego_10_ArcticWhite', xosc.VehicleCategory.bus, bus_bb, bus_fa, bus_ba, 29, 10,
10)
cone = xosc.MiscObject('RdMiscPylon03-32cm', 10, xosc.MiscObjectCategory.obstacle, cone_bb)
prop = xosc.Properties()
cnt = xosc.Controller('DefaultDriver', prop)
cnt2 = xosc.Controller('No Driver', prop)
# 添加自车实体
egoname = 'Ego'
entities = xosc.Entities()
if self.vehicle_type == "0":
entities.add_scenario_object(egoname, PuJin_distribution, cnt)
else:
entities.add_scenario_object(egoname, PuJin_patrol_robot, cnt)
# 添加目标车实体
objname = 'Player'
ped_type, car_type, bicycle_motor_type, bus_type, truck_type, cone_type = get_obj_type(self.work_mode)
positionEgo = self.gps
positionObj = self.obs
for i, row in enumerate(positionObj):
if len(row[1]) < 1:
continue
obj_type = row[2] # 根据目标物类型添加对应的实体
if obj_type in ped_type:
entities.add_scenario_object(objname + str(i), segway)
elif obj_type in car_type:
entities.add_scenario_object(objname + str(i), white_veh, cnt2)
elif obj_type in bicycle_motor_type:
entities.add_scenario_object(objname + str(i), motorcycle, cnt2)
elif obj_type in bus_type:
entities.add_scenario_object(objname + str(i), bus, cnt2)
elif obj_type in truck_type:
entities.add_scenario_object(objname + str(i), truck, cnt2)
elif obj_type in cone_type:
entities.add_scenario_object(objname + str(i), cone)
# 初始化
init = xosc.Init()
step_time = xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 0)
egospeed = xosc.AbsoluteSpeedAction(self.ego_speed, step_time)
# 设置初始环境要素,例如天气时间等
default_precipitation = xosc.PrecipitationType.dry
default_cloudstate = xosc.CloudState.free
# json_path = os.path.join(self.abspath, 'label.json')
# with open(json_path, 'r', encoding='utf-8') as f:
# sc_json = json.load(f)
# weather = sc_json['天气情况'][0]['type_'] #['自然环境']
# if weather == '晴':
# f.close()
# elif weather == '多云':
# default_cloudstate = xosc.CloudState.cloudy
# elif weather == '阴':
# default_cloudstate = xosc.CloudState.overcast
# elif weather == '雨':
# default_cloudstate = xosc.CloudState.overcast
# default_precipitation = xosc.PrecipitationType.rain
# elif weather == '雪':
# default_cloudstate = xosc.CloudState.overcast
# default_precipitation = xosc.PrecipitationType.snow
# elif weather == '雾霾' or weather == '沙尘':
# self.visual_fog_range = 700
# default_cloudstate = xosc.CloudState.cloudy
init.add_global_action(
xosc.EnvironmentAction(name="InitEnvironment", environment=xosc.Environment(
xosc.TimeOfDay(True, 2019, 12, 19, 12, 0, 0),
xosc.Weather(
sun_intensity=2000, sun_azimuth=40, sun_elevation=20,
cloudstate=default_cloudstate,
precipitation=default_precipitation,
precipitation_intensity=1,
visual_fog_range=self.visual_fog_range),
xosc.RoadCondition(friction_scale_factor=0.7))))
# ego car init
ego_init_h = positionEgo[0].h
init.add_init_action(egoname, xosc.TeleportAction(
xosc.WorldPosition(x=positionEgo[0].x, y=positionEgo[0].y, z=positionEgo[0].z, h=ego_init_h, p=0, r=0)))
init.add_init_action(egoname, egospeed)
# ego car trail
trajectory = xosc.Trajectory('egoTrajectory', False)
polyline = xosc.Polyline(self.gps_time, positionEgo)
trajectory.add_shape(polyline)
speedaction = xosc.FollowTrajectoryAction(trajectory, xosc.FollowMode.position, xosc.ReferenceContext.absolute,
1, 0)
trigger = xosc.ValueTrigger('drive_start_trigger', 0, xosc.ConditionEdge.rising,
xosc.SimulationTimeCondition(0, xosc.Rule.greaterThan))
trigger_init = xosc.ValueTrigger(name='init_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=0,
rule=xosc.Rule.greaterThan))
event_init = xosc.Event('Event_init', xosc.Priority.overwrite)
action_light = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
light_flag = 'auto'
if self.time == 64800:
light_flag = 'true'
text = f''
node = ET.Element("CustomCommandAction")
node.attrib = {'type': 'scp'}
node.text = f""
action_light.add_element(node)
event_init.add_action('headlight', action_light)
event_init.add_trigger(trigger_init)
event = xosc.Event('Event1', xosc.Priority.overwrite)
event.add_trigger(trigger)
event.add_action('newspeed', speedaction)
man = xosc.Maneuver('my maneuver')
man.add_event(event_init)
man.add_event(event)
mangr = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
mangr.add_actor('Ego')
mangr.add_maneuver(man)
trigger0 = xosc.Trigger('start')
act = xosc.Act('Act1', trigger0)
act.add_maneuver_group(mangr)
ego_story = xosc.Story('mystory_ego')
ego_story.add_act(act)
stop_trigger = xosc.ValueTrigger('stop_trigger', 0, xosc.ConditionEdge.none,
xosc.SimulationTimeCondition(self.period, xosc.Rule.greaterThan), 'stop')
sb = xosc.StoryBoard(init, stoptrigger=stop_trigger)
sb.add_story(ego_story)
# object car trail
if positionObj:
for i, row in enumerate(positionObj):
name = objname + str(i)
if len(row[1]) < 2:
continue
obj_type = row[2]
# 根据目标车需要动态创建实体
x = row[0][0].x
y = row[0][0].y
z = row[0][0].z
h = row[0][0].h
start_time = row[1][0]
init_position = xosc.WorldPosition(x=x, y=y, z=z, h=h, p=0, r=0)
newentity = xosc.AddEntityAction(name, init_position)
newentity_trigger = xosc.ValueTrigger(name='newentity_trigger', delay=0,
conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=start_time,
rule=xosc.Rule.greaterThan))
newentity_event = xosc.Event('Event_newentity', xosc.Priority.overwrite)
newentity_event.add_trigger(newentity_trigger)
newentity_event.add_action('newentity_action', newentity)
man_obj = xosc.Maneuver('my maneuver')
man_obj.add_event(newentity_event)
if obj_type != 10:
# 添加目标车轨迹
trajectoryM = xosc.Trajectory('objectTrajectory', False)
polylineM = xosc.Polyline(row[1], row[0])
trajectoryM.add_shape(polylineM)
obj_trail_action = xosc.FollowTrajectoryAction(trajectory=trajectoryM,
following_mode=xosc.FollowMode.position,
reference_domain=xosc.ReferenceContext.absolute,
scale=1, offset=0, initialDistanceOffset=0)
obj_trail_event = xosc.Event('Event1', xosc.Priority.overwrite)
# obj_start_trigger = xosc.EntityTrigger("obj-start-trigger", 0, xosc.ConditionEdge.rising, xosc.SpeedCondition(0, xosc.Rule.greaterThan),'Ego')
obj_start_trigger = xosc.ValueTrigger('obj_start_trigger', 0, xosc.ConditionEdge.rising,
xosc.SimulationTimeCondition(start_time, xosc.Rule.greaterThan))
obj_trail_event.add_trigger(obj_start_trigger)
obj_trail_event.add_action('trail_action', obj_trail_action)
# if obj_type in ped_type:
# walp_trigger = xosc.EntityTrigger('ped_walp_trigger', 0, xosc.ConditionEdge.rising,\
# xosc.TimeToCollisionCondition(self.intersectime, xosc.Rule.lessThan, entity=name), 'Ego')
# obj_trail_event.add_trigger(walp_trigger)
man_obj.add_event(obj_trail_event)
# 行人特有的事件
if obj_type in ped_type:
ped_event = xosc.Event('Event_ped', xosc.Priority.overwrite)
ped_event.add_trigger(obj_start_trigger)
ped_action = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
ped_action.add_element(self.createUDAction(name))
ped_event.add_action('ped_trail_action', ped_action)
man_obj.add_event(ped_event)
# finish_trigger = xosc.EntityTrigger('finish_trigger', 0, xosc.ConditionEdge.rising, xosc.ReachPositionCondition(position=row[1][0],tolerance=1),name)
# event4 = xosc.Event('Event_ped',xosc.Priority.overwrite)
# event4.add_trigger(finish_trigger)
# be_still_action = xosc.AbsoluteSpeedAction(0, xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 1))
# event4.add_action('ped_be_still_action',be_still_action)
# man_obj.add_event(event4)
# 轨迹最后删除实体
last_time = row[1][-1]
del_action = xosc.DeleteEntityAction(name)
del_trigger = xosc.ValueTrigger(name='entity_del_trigger', delay=0,
conditionedge=xosc.ConditionEdge.rising,
valuecondition=xosc.SimulationTimeCondition(value=last_time,
rule=xosc.Rule.greaterThan))
del_event = xosc.Event('Event_del', xosc.Priority.overwrite)
del_event.add_trigger(del_trigger)
del_event.add_action('del_action', del_action)
man_obj.add_event(del_event)
obj_mangroup = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
obj_mangroup.add_actor(name)
obj_mangroup.add_maneuver(man_obj)
obj_act = xosc.Act('Act2', trigger0)
obj_act.add_maneuver_group(obj_mangroup)
obj_story = xosc.Story('mystory_' + name)
obj_story.add_act(obj_act)
sb.add_story(obj_story)
# prettyprint(sb.get_element())
paramet = xosc.ParameterDeclarations()
sce = xosc.Scenario('my scenario', 'Maggie', paramet, entities, sb, road, catalog)
return sce
def createUDAction(self, name):
# tree = ET.parse('./models/ped_CDATA.xosc')
# root = tree.getroot()
# ele = root[5][2][1][0][1][1][0][0][0]
# newnode = ET.Element("CustomCommandAction")
# newnode.attrib = {'type': 'scp'}
# newnode.text = ''
speed = 3
motion = 'walk'
# text = f'\n \n'
text = '\n \n'.format(name, motion, speed)
# text = '<Traffic> <ActionMotion actor="Player0" move="walk" speed="3" force="false"rate="0" delayTime="0.0" activateOnExit="false"/></Traffic>'.format(name, motion, speed)
newnode = ET.Element("CustomCommandAction")
newnode.attrib = {'type': 'scp'}
newnode.text = "".format(text)
return newnode
class WorkMode(Enum):
cicv = 0 # CICV车端
roadside = 1 # 车网路端
car = 2 # 车网车端
merge = 3 # 车网车端路端融合
def get_obj_type(mode):
if mode == WorkMode.cicv.value:
# CICV车端
ped_type = [7]
car_type = [2]
bicycle_motor_type = [8]
bus_type = [3]
truck_type = [4]
cone_type = [103]
elif mode == WorkMode.roadside.value:
# 车网路端
ped_type = [0]
car_type = [2]
bicycle_motor_type = [1, 3]
bus_type = [5]
truck_type = [7]
cone_type = [103]
elif mode == WorkMode.car.value:
# 车网车端
ped_type = [1]
car_type = [6]
bicycle_motor_type = [2, 3, 4, 5]
bus_type = [7, 8]
truck_type = [9]
cone_type = [103]
else:
# 车网车端路端融合
ped_type = [1]
car_type = [6]
bicycle_motor_type = [2, 3, 4, 5]
bus_type = [7, 8]
truck_type = [9]
cone_type = [103]
return ped_type, car_type, bicycle_motor_type, bus_type, truck_type, cone_type
# 创建道路旁静止的场景
def create_static_object(road, object_dict):
for single_object in object_dict:
for k, v in ObjectType.__members__.items():
if k == single_object.attrib['type']:
single_object.attrib['type'] = v
break
road_object = Object(
s=single_object.attrib['s'],
t=single_object.attrib['t'],
Type=single_object.attrib['type'],
dynamic=Dynamic.no,
id=single_object.attrib['id'],
name=single_object.attrib['name'],
zOffset=single_object.attrib['zOffset'],
validLength=single_object.attrib['validLength'],
orientation=Orientation.none,
length=single_object.attrib['length'],
width=single_object.attrib['width'],
height=single_object.attrib['height'],
pitch=single_object.attrib['pitch'],
roll=single_object.attrib['roll']
)
# 判断此object是否是重复的元素
repeat = single_object.find('repeat')
if repeat is not None:
road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.left,
tOffset=1.75)
road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.right,
tOffset=-1.755)
else:
road.add_object(road_object)
return road
def change_CDATA(filepath):
'行人场景特例,对xosc文件内的特殊字符做转换'
f = open(filepath, "r", encoding="UTF-8")
txt = f.readline()
all_line = []
# txt是否为空可以作为判断文件是否到了末尾
while txt:
txt = txt.replace("<", "<").replace(">", ">").replace("&", "&").replace(""", '"').replace(
"'", "'")
all_line.append(txt)
# 读取文件的下一行
txt = f.readline()
f.close()
f1 = open(filepath, 'w', encoding="UTF-8")
for line in all_line:
f1.write(line)
f1.close()
def path_changer(xosc_path, xodr_path, osgb_path):
"""
provided by Dongpeng Ding
:param xosc_path:
:param xodr_path:
:param osgb_path:
:return:
"""
tree = ET.parse(xosc_path)
treeRoot = tree.getroot()
# for OpenScenario v0.9, v1.0
for RoadNetwork in treeRoot.findall('RoadNetwork'):
for Logics in RoadNetwork.findall('LogicFile'):
Logics.attrib['filepath'] = xodr_path
for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
SceneGraph.attrib['filepath'] = osgb_path
for Logics in RoadNetwork.findall('Logics'):
Logics.attrib['filepath'] = xodr_path
for SceneGraph in RoadNetwork.findall('SceneGraph'):
SceneGraph.attrib['filepath'] = osgb_path
# for VTD xml
for Layout in treeRoot.findall('Layout'):
Layout.attrib['File'] = xodr_path
Layout.attrib['Database'] = osgb_path
tree.write(xosc_path, xml_declaration=True)
def readXML(xoscPath):
xodrFileName = ""
osgbFileName = ""
tree = ET.parse(xoscPath)
treeRoot = tree.getroot()
for RoadNetwork in treeRoot.findall('RoadNetwork'):
for Logics in RoadNetwork.findall('LogicFile'):
xodrFileName = Logics.attrib['filepath']
for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
osgbFileName = SceneGraph.attrib['filepath']
for Logics in RoadNetwork.findall('Logics'):
xodrFileName = Logics.attrib['filepath']
for SceneGraph in RoadNetwork.findall('SceneGraph'):
osgbFileName = SceneGraph.attrib['filepath']
return xodrFileName[xodrFileName.rindex("/") + 1:], osgbFileName[osgbFileName.rindex("/") + 1:]
def formatThree(rootDirectory):
"""
xodr and osgb file path are fixed
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if ".xosc" == file[-5:]:
# xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.xodr" # 泛化效果好的场景用的
# osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.osgb" # 泛化效果好的场景用的
# xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.xodr" # 泛化效果好的场景用的
# osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.osgb" # 泛化效果好的场景用的
xodrFilePath = "/home/hancheng/hc_project/2023-08-08-10-50-16_jsons1/2023-08-08-10-50-16_jsons_0/2023-08-08-10-50-16_jsons_0.xodr" # 直路,泛化好用
osgbFilePath = "/home/hancheng/VIRES/VTD.2022.1/Runtime/Tools/RodDistro_7001_Rod4.6.3/DefaultProject/Database/json0.opt.osgb" # 直路,泛化好用
# xodrFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.xodr" # 十字路口,泛化好用
# osgbFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.opt.osgb" # 十字路口,泛化好用
# xodrFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.xodr" # Sharing-van还原用的
# osgbFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.opt.osgb" # Sharing-van还原用的
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def formatTwo(rootDirectory):
"""
data format:
simulation
file.xosc
file.xodr
file.osgb
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if ".xosc" == file[-5:]:
xodrFilePath = ""
osgbFilePath = ""
for odrFile in os.listdir(root):
if ".xodr" == odrFile[-5:]:
xodrFilePath = root + "/" + odrFile
break
for osgbFile in os.listdir(root):
if ".osgb" == osgbFile[-5:]:
osgbFilePath = root + "/" + osgbFile
break
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def formatOne(rootDirectory):
"""
data format:
openx
xosc
file.xosc
xodr
file.xodr
osgb
file.osgb
:return:
"""
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if "xosc" == file[-4:]:
xodrFilePath = ""
osgbFilePath = ""
for odrFile in os.listdir(root[:-4] + "xodr"):
if "xodr" == odrFile[-4:]:
xodrFilePath = root[:-4] + "xodr/" + odrFile
break
for osgbFile in os.listdir(root[:-4] + "osgb"):
if "osgb" == osgbFile[-4:]:
osgbFilePath = root[:-4] + "osgb/" + osgbFile
break
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print("Change success: " + root + "/" + file)
def chongQingFormat(rootDirectory):
"""
supporting file format: chong qing folder format
:return:
"""
counter = 1
for root, dirs, files in os.walk(rootDirectory):
for file in files:
if "xosc" == file[-4:]:
if "ver1.0.xosc" == file[-11:]:
xodrFileName, osgbFileName = readXML(root + "/" + file)
xodrFilePath = "/Xodrs/" + xodrFileName
osgbFilePath = "/Databases/" + osgbFileName
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print(counter, "Change success: " + root + "/" + file)
else:
xodrFileName, osgbFileName = readXML(root + "/" + file)
xodrFilePath = "/Xodrs/" + xodrFileName
osgbFilePath = "/Databases/" + osgbFileName
path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
print(counter, "Change success: " + root + "/" + file)
counter += 1