123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Mar 14 15:44:01 2022
- @author: lxj
- """
- import xml.etree.ElementTree as ET
- from scenariogeneration import xodr
- from scenariogeneration import xosc
- from scenariogeneration import ScenarioGenerator
- from scenariogeneration.xodr import RoadSide, Object, ObjectType, Dynamic, Orientation
- import math, os
- from xodr_generator2 import road_detector_new
- from datetime import datetime
- import traceback
- import json
- import pandas as pd
- from enum import Enum
- import elevation_0323 as el
- import warnings
- warnings.filterwarnings("ignore")
- # xodr_list = ['/home/hancheng/taiheqiao_map/thq_1116.xodr', '/home/hancheng/gongyuanbeihuan_map/OD/OpenDRIVE.xodr','/home/hancheng/glyt_map/OpenDRIVE2.xodr']
- # osgb_list = ['/home/hancheng/taiheqiao_map/thq_1116.opt.osgb', '/home/hancheng/gongyuanbeihuan_map/osgb/GJBM_20km_0822.opt.osgb', '/home/hancheng/glyt_map/OSGB/jinlong_20231129_2_guazai.opt.osgb']
- xodr_list = ['/root/rosmerge/resource/thq.xodr', '/root/rosmerge/resource/OpenDRIVE.xodr',
- '/root/rosmerge/resource/gelan.xodr']
- osgb_list = ['/root/rosmerge/resource/thq.opt.osgb', '/root/rosmerge/resource/GJBM_20km_0822.opt.osgb',
- '/root/rosmerge/resource/gelan.opt.osgb']
- class Scenario(ScenarioGenerator):
- def __init__(self, gps, obs, gps_time, ego_speed, work_mode, period, abspath, timeofday, map_id):
- ScenarioGenerator.__init__(self)
- self.gps = gps
- self.obs = obs
- self.gps_time = gps_time
- self.ego_speed = ego_speed
- self.work_mode = work_mode
- self.period = period # 场景持续的时间
- self.abspath = abspath
- self.weather = xosc.PrecipitationType.dry # 默认是晴天
- self.visual_fog_range = 20000 # 正常的能见度,没有雾
- self.time = timeofday # 用于判断白天还是黑夜
- self.map_id = map_id
- # self.ObjectID = ObjectID
- # self.Speed = Speed
- def road(self, **kwargs):
- positionEgo = self.gps
- planview = xodr.PlanView()
- for i in range(len(positionEgo) - 1):
- x = 0.000001 if abs(positionEgo[i].x) < 0.000001 else positionEgo[i].x
- y = 0.000001 if abs(positionEgo[i].y) < 0.000001 else positionEgo[i].y
- nextx = 0.000001 if abs(positionEgo[i + 1].x) < 0.000001 else positionEgo[i + 1].x
- nexty = 0.000001 if abs(positionEgo[i + 1].y) < 0.000001 else positionEgo[i + 1].y
- h = float(positionEgo[i].h)
- planview.add_fixed_geometry(xodr.Line(math.sqrt(math.pow(nextx - x, 2) + math.pow(nexty - y, 2))), x, y, h)
- planview.add_fixed_geometry(xodr.Line(100), nextx, nexty, h)
- # create two different roadmarkings
- rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2)
- rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2)
- # create simple lanes
- lanes = xodr.Lanes()
- lanesection1 = xodr.LaneSection(0, xodr.standard_lane(offset=1.75, rm=rm_dashed))
- lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
- lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
- lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
- lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
- lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed))
- lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_solid))
- lanes.add_lanesection(lanesection1)
- lanes.add_laneoffset(xodr.LaneOffset(a=1.75))
- road = xodr.Road(0, planview, lanes)
- odr = xodr.OpenDrive('myroad')
- # 静态元素
- tree = ET.parse(os.getcwd() + '/models/static_object_models.xodr')
- root = tree.getroot()
- objects_dict = root.iter(tag='object')
- road = create_static_object(road, objects_dict)
- # road = create_static_object(road, object_id=0, object_type=ObjectType.railing, repeat_flag=1)
- odr.add_road(road)
- odr.adjust_roads_and_lanes()
- return odr
- def get_section(self, left_lanemark):
- return None
- def split_road_section(self, enu_file, lane_file):
- lane_content = pd.read_csv(lane_file)
- # 把车道分割开来,一个车道段,多种geometry
- # 首先判断车道线数值是否有断裂,断裂处根据自车轨迹生成道路参数
- left_lanemark = lane_content[lane_content['LaneID'] == 1][['Time', 'FrameID', 'LanePosition', 'LaneQuality']]
- right_lanemark = lane_content[lane_content['LaneID'] == 2][['Time', 'FrameID', 'LanePosition', 'LaneQuality']]
- left_break_section = self.get_section(left_lanemark, 1)
- right_break_section = self.get_section(right_lanemark, 1)
- break_section = []
- normal_section = []
- # 判断是否变道,然后判断是否弯道
- # 最终目标是得到多个geometry参数,确定每个连接线处的s/t偏移量?,以及左右道路的个数和宽度
- return None
- def check_outliers_and_modify(self, datalist):
- for index in datalist.index:
- if index < 3:
- datalist.loc[index] = datalist[index] if abs(
- datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 else 2 * datalist[
- index + 1] - datalist[index + 1:index + 3].mean()
- elif 3 <= index < len(datalist) - 3:
- datalist.loc[index] = datalist[index] if abs(
- datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 or abs(
- datalist[index] - datalist[index - 3:index].mean()) < 0.5 else (datalist[index - 1] + datalist[
- index + 1]) / 2
- else:
- datalist.loc[index] = datalist[index] if abs(
- datalist[index] - datalist[index - 3:index].mean()) < 0.5 else 2 * datalist[index - 1] - datalist[
- index - 3:index].mean()
- return datalist
- def get_new_LaneID(self, change_flag, laneID, frameid, item):
- '''
- 修改laneID,change_list包含了每个变道片段
- '''
- if item[0] <= frameid < item[1]:
- return laneID + change_flag
- else:
- return laneID
- def get_lane_width(self, data1, data2):
- lane_union = pd.merge(data1, data2, on=['FrameID'])
- lane_union['width'] = lane_union['LanePosition_y'] - lane_union['LanePosition_x']
- a = lane_union["width"].quantile(0.75)
- b = lane_union["width"].quantile(0.25)
- lane_width = abs((a + b) / 2)
- return lane_width
- def road_pre_study(self, label_file, lane_file):
- laneDF = pd.read_csv(lane_file)
- road_label = ''
- flag = 0 # flag = 0 can use road_ego to generate .xodr, flag = 1 better use road to generate .xodr
- with open(label_file) as f:
- label = json.load(f)
- ego_actions = label['自车行为']
- for item in ego_actions:
- if ('变道' in item['type_']) or ('转成功' in item['type_']) or ('掉头' in item['type_']):
- road_label = item['type_']
- flag = 1
- break
- else:
- road_label = item['type_']
- if road_label == '向左变道成功':
- # 查找变道标记点
- change_flag = 1
- lane_1 = laneDF[(laneDF['LaneID'] == 1) & (laneDF['LaneQuality'] > 1)]
- lane_1 = lane_1.reset_index(drop=True)
- leftlanepositon = lane_1['LanePosition']
- lane_1['LanePosition'] = self.check_outliers_and_modify(leftlanepositon)
- lane_1['last_Position'] = lane_1[['LanePosition']].shift(axis=0, periods=1) # axis指定移动的轴:0为行,1为列
- lane_1['last_FrameID'] = lane_1[['FrameID']].shift(axis=0, periods=1) # axis指定移动的轴:0为行,1为列
- lane_1['change_flag'] = lane_1.apply(
- lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (
- x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
- lane_1['split_flag'] = lane_1.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
- lane_2 = laneDF[(laneDF['LaneID'] == 2) & (laneDF['LaneQuality'] > 1)]
- lane_2 = lane_2.reset_index(drop=True)
- rightlanepositon = lane_2['LanePosition']
- lane_2['LanePosition'] = self.check_outliers_and_modify(rightlanepositon)
- lane_2['last_Position'] = lane_2[['LanePosition']].shift(axis=0, periods=1) # axis指定移动的轴:0为行,1为列
- lane_2['last_FrameID'] = lane_2[['FrameID']].shift(axis=0, periods=1) # axis指定移动的轴:0为行,1为列
- lane_2['change_flag'] = lane_2.apply(
- lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (
- x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。
- lane_2['split_flag'] = lane_2.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1)
- lane_width = self.get_lane_width(lane_1[['FrameID', 'LanePosition']], lane_2[['FrameID', 'LanePosition']])
- # 左变道用左车道线数据靠谱,不会有他车遮挡,找到变道数据段
- change_lane_point = list(lane_1[lane_1['change_flag'] == change_flag]['change_flag'].index) + [
- list(lane_1.index)[-1]]
- change_lane_section = [
- (lane_1.loc[change_lane_point[i], 'FrameID'], lane_1.loc[change_lane_point[i + 1], 'FrameID']) for i in
- range(len(change_lane_point) - 1)]
- for item in change_lane_section:
- # 变道后修改原始laneID
- # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if item[0] <= x['FrameID'] < item[1] else x['LaneID'], axis=1)
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if item[0] <= x['FrameID'] < item[1] else x['LanePosition'], axis=1)
- # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if x['FrameID'] == max(laneDF['FrameID']) else x['LaneID'], axis=1)
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if x['FrameID'] == max(laneDF['FrameID']) else x['LanePosition'], axis=1)
- # 修正左左车道线和右右车道线数值
- if change_flag == 1:
- lane_0 = laneDF[
- (laneDF['LaneID'] == 0) & (laneDF['LaneQuality'] > 1) & (laneDF['FrameID'] < item[0])]
- if not lane_0.empty:
- left_lane_width = self.get_lane_width(lane_0[['FrameID', 'LanePosition']],
- lane_1[['FrameID', 'LanePosition']])
- else:
- left_lane_width = lane_width
- print(left_lane_width)
- # new_lane_1 = laneDF[laneDF['LaneID'] == 1][['FrameID','LanePosition']]
- # new_lane_1.rename(columns={'LanePosition':'LanePosition2'},inplace=True)
- # laneDF = pd.merge(laneDF, new_lane_1, on=['FrameID'])
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == -1) else x['LanePosition'], axis=1)
- # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1)
- # laneDF['LaneID'] = laneDF.apply(lambda x: 3 if x['LaneID'] == -1 else x['LaneID'], axis=1)
- # 缺右车道线right_lane_width处理
- print(laneDF[laneDF['FrameID'] > 1860][['FrameID', 'LaneID', 'LanePosition']].head(50))
- print('road_label: ', road_label)
- return laneDF
- def road_new(self, **kwargs):
- polyfit_fig_path = os.path.join(self.abspath, 'simulation', 'polyfit.jpg')
- # run road detector
- enu_file = os.path.join(self.abspath, 'pos.csv')
- lane_file = os.path.join(self.abspath, 'lane.csv')
- label_file = os.path.join(self.abspath, 'label.json')
- # study road style: straight\curve\split-line; ego action: change-lane\no-change\cross-turn
- lane_content = self.road_pre_study(label_file, lane_file)
- # road split sections
- # self.split_road_section(enu_file, lane_file)
- try:
- laneid_tuple = [0, 1, 2, 3]
- uv_coord_info, road_info, left_lane_info, right_lane_info = road_detector_new(enu_file, lane_content,
- laneid_tuple,
- polyfit_fig_path)
- except:
- print('road detection failed!!!! Use ego trail road.')
- error = {'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], 'traceback': traceback.format_exc()}
- with open('error.log', 'a+') as f:
- json.dump(error, f, indent=4)
- f.write('\n')
- return self.road_ego()
- origin_x = uv_coord_info[0]
- origin_y = uv_coord_info[1]
- uv_coord_hdg = uv_coord_info[2]
- reference_line_offset = road_info[0]
- model_u = road_info[1] # in fomrat [c3, c2, c1, c0]
- model_v = road_info[2] # in fomrat [c3, c2, c1, c0]
- road_length = road_info[3]
- print(uv_coord_info)
- print(road_info)
- # generate xodr file
- planview = xodr.PlanView(origin_x, origin_y, uv_coord_hdg)
- # Create geometry and add it to the planview
- # parampoly3 = xodr.ParamPoly3(au=model_u[3], bu=model_u[2], cu=model_u[1], du=model_u[0], av=model_v[3], bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length)
- parampoly3 = xodr.ParamPoly3(au=0, bu=model_u[2], cu=model_u[1], du=model_u[0], av=0, bv=model_v[2],
- cv=model_v[1], dv=model_v[0], prange='arcLength', length=road_length)
- planview.add_geometry(parampoly3)
- # create two different roadmarkings
- rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2)
- rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2, laneChange=xodr.LaneChange.both)
- ##4. Create centerlane
- centerlane = xodr.Lane(a=reference_line_offset)
- centerlane.add_roadmark(rm_dashed)
- ##5. Create lane section form the centerlane
- lanesec = xodr.LaneSection(0, centerlane)
- ##6. Create left and right lanes
- lane_id = 0
- for lane in left_lane_info:
- lane_id += 1
- lane_width = lane[0]
- spd_limit = lane[1]
- lane_left = xodr.Lane(a=lane_width)
- lane_left.add_roadmark(rm_dashed)
- # Add lanes to lane section
- lanesec.add_left_lane(lane_left)
- lane_id = 0
- for lane in right_lane_info:
- lane_id -= 1
- lane_width = lane[0]
- spd_limit = lane[1]
- lane_right = xodr.Lane(a=lane_width)
- lane_right.add_roadmark(rm_dashed)
- # Add lanes to lane section
- lanesec.add_right_lane(lane_right)
- ##8. Add lane section to Lanes
- lanes = xodr.Lanes()
- lanes.add_laneoffset(xodr.LaneOffset(s=0, a=reference_line_offset))
- lanes.add_lanesection(lanesec)
- ##9. Create Road from Planview and Lanes
- road = xodr.Road(1, planview, lanes)
- ##10. Create the OpenDrive class (Master class)
- odr = xodr.OpenDrive('myroad')
- ##11. Finally add roads to Opendrive
- odr.add_road(road)
- ##12. Adjust initial positions of the roads looking at succ-pred logic
- odr.adjust_roads_and_lanes()
- # ##13. Print the .xodr file
- # xodr.prettyprint(odr.get_element())
- # ##14. Run the .xodr with esmini
- # xodr.run_road(odr,os.path.join('..','pyoscx','esmini'))
- return odr
- def getXoscPosition(egodata, t, e, n, h, z, obj_type, offset_x, offset_y, offset_h, start_time):
- '将dataframe的轨迹转换为场景文件的轨迹数据, Time, ego_e, ego_n, headinga'
- position = []
- time = []
- # cur_time = 0.0
- egodata = egodata[[t, e, n, h, z, obj_type]]
- egodata = egodata.reset_index(drop=True)
- # egodata[t] = egodata.index / 10
- egodata[t] = (egodata[t] - start_time) / 1000
- egodata[e] = egodata[e] + offset_x
- egodata[n] = egodata[n] + offset_y
- egodata[h] = egodata[h] + offset_h
- egodata[h] = egodata.apply(lambda x: x[h] if x[h] < 360 else x[h] - 360, axis=1)
- type_list = list(egodata[obj_type])
- ego_type = int(max(type_list, key=type_list.count))
- lasth = float(egodata.at[0, h])
- init_flag = True
- # two method to set z value
- zflag = 0 # 1
- if zflag:
- # 根据地图的xy值计算z值
- zfile = '/home/hancheng/weihao/gongyuanbei_elevation.csv'
- # zfile = '/home/hancheng/simulation_dynamic/elevation_df_taiheqiao.csv'
- zdata = pd.read_csv(zfile)
- for row in egodata.iterrows():
- hhh = math.radians(row[1][h])
- xodr_z = el.cal_elevation(zdata, float(row[1][e]), float(row[1][n]))
- if init_flag:
- position.append(
- xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0))
- init_flag = False
- else:
- if float(row[1][h]) - lasth > 300:
- hhh = math.radians(float(row[1][h]) - 360)
- elif float(row[1][h]) - lasth < -300:
- hhh = math.radians(float(row[1][h]) + 360)
- position.append(
- xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0))
- lasth = hhh
- time.append(float(row[1][t]))
- # time.append(cur_time)
- # cur_time += 0.1
- return position, time, ego_type
- else:
- # z = 0
- for row in egodata.iterrows():
- hhh = math.radians(row[1][h])
- if init_flag:
- position.append(
- xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
- init_flag = False
- else:
- if float(row[1][h]) - lasth > 300:
- hhh = math.radians(float(row[1][h]) - 360)
- elif float(row[1][h]) - lasth < -300:
- hhh = math.radians(float(row[1][h]) + 360)
- position.append(
- xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0))
- lasth = hhh
- time.append(float(row[1][t]))
- # time.append(cur_time)
- # cur_time += 0.1
- return position, time, ego_type
- def scenario(self, **kwargs):
- road = xosc.RoadNetwork(roadfile=xodr_list[self.map_id], scenegraph=osgb_list[self.map_id])
- catalog = xosc.Catalog()
- catalog.add_catalog('VehicleCatalog', 'Distros/Current/Config/Players/Vehicles')
- catalog.add_catalog('PedestrianCatalog', 'Distros/Current/Config/Players/Pedestrians')
- catalog.add_catalog('ControllerCatalog', 'Distros/Current/Config/Players/driverCfg.xml')
- pbb = xosc.BoundingBox(0.5, 0.5, 1.8, 2.0, 0, 0.9)
- mbb = xosc.BoundingBox(1, 2, 1.7, 1.5, 0, 0.9)
- bb = xosc.BoundingBox(2.1, 4.5, 1.8, 1.5, 0, 0.9)
- bus_bb = xosc.BoundingBox(2.55, 10.5, 3.1, 5.1, 0, 1.45)
- bus_fa = xosc.Axle(0.48, 0.91, 2.5, 4.5, 1.5)
- bus_ba = xosc.Axle(0, 0.91, 2.5, 4.5, 1.5)
- truck_bb = xosc.BoundingBox(2.8, 8.744, 3.78, 4.372, 0, 1.89)
- truck_fa = xosc.Axle(0.48, 1, 2.8, 4.372, 1.5)
- truck_ba = xosc.Axle(0, 1, 2.8, 4.372, 1.5)
- fa = xosc.Axle(0.5, 0.6, 1.8, 3.1, 0.3)
- ba = xosc.Axle(0, 0.6, 1.8, 0, 0.3)
- red_veh = xosc.Vehicle('Audi_A3_2009_black', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
- red_veh.add_property(name='control', value='external')
- white_veh = xosc.Vehicle('Audi_A3_2009_red', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10)
- male_ped = xosc.Pedestrian('Christian', 'male adult', 70, xosc.PedestrianCategory.pedestrian, pbb)
- motorcycle = xosc.Vehicle('Kawasaki_ZX-9R_black', xosc.VehicleCategory.motorbike, mbb, fa, ba, 69.444, 200, 10)
- truck = xosc.Vehicle('MANTGS_11_Black', xosc.VehicleCategory.truck, truck_bb, truck_fa,
- truck_ba, 26.4, 10, 10)
- bus = xosc.Vehicle('MercedesTravego_10_ArcticWhite', xosc.VehicleCategory.bus, bus_bb, bus_fa, bus_ba, 29, 10,
- 10)
- prop = xosc.Properties()
- cnt = xosc.Controller('DefaultDriver', prop)
- cnt2 = xosc.Controller('No Driver', prop)
- # 添加自车实体
- egoname = 'Ego'
- entities = xosc.Entities()
- entities.add_scenario_object(egoname, red_veh, cnt)
- # 添加目标车实体
- objname = 'Player'
- ped_type, car_type, bicycle_motor_type, bus_type, truct_type = get_obj_type(self.work_mode)
- positionEgo = self.gps
- positionObj = self.obs
- for i, row in enumerate(positionObj):
- if len(row[1]) < 10:
- continue
- obj_type = row[2] # 根据目标物类型添加对应的实体
- if obj_type in ped_type:
- entities.add_scenario_object(objname + str(i), male_ped)
- elif obj_type in car_type:
- entities.add_scenario_object(objname + str(i), white_veh, cnt2)
- elif obj_type in bicycle_motor_type:
- entities.add_scenario_object(objname + str(i), motorcycle, cnt2)
- elif obj_type in bus_type:
- entities.add_scenario_object(objname + str(i), bus, cnt2)
- elif obj_type in truct_type:
- entities.add_scenario_object(objname + str(i), truck, cnt2)
- # 初始化
- init = xosc.Init()
- step_time = xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 0)
- egospeed = xosc.AbsoluteSpeedAction(self.ego_speed, step_time)
- # 设置初始环境要素,例如天气时间等
- default_precipitation = xosc.PrecipitationType.dry
- default_cloudstate = xosc.CloudState.free
- # json_path = os.path.join(self.abspath, 'label.json')
- # with open(json_path, 'r', encoding='utf-8') as f:
- # sc_json = json.load(f)
- # weather = sc_json['天气情况'][0]['type_'] #['自然环境']
- # if weather == '晴':
- # f.close()
- # elif weather == '多云':
- # default_cloudstate = xosc.CloudState.cloudy
- # elif weather == '阴':
- # default_cloudstate = xosc.CloudState.overcast
- # elif weather == '雨':
- # default_cloudstate = xosc.CloudState.overcast
- # default_precipitation = xosc.PrecipitationType.rain
- # elif weather == '雪':
- # default_cloudstate = xosc.CloudState.overcast
- # default_precipitation = xosc.PrecipitationType.snow
- # elif weather == '雾霾' or weather == '沙尘':
- # self.visual_fog_range = 700
- # default_cloudstate = xosc.CloudState.cloudy
- init.add_global_action(
- xosc.EnvironmentAction(name="InitEnvironment", environment=xosc.Environment(
- xosc.TimeOfDay(True, 2019, 12, 19, 12, 0, 0),
- xosc.Weather(
- sun_intensity=2000, sun_azimuth=40, sun_elevation=20,
- cloudstate=default_cloudstate,
- precipitation=default_precipitation,
- precipitation_intensity=1,
- visual_fog_range=self.visual_fog_range),
- xosc.RoadCondition(friction_scale_factor=0.7))))
- # ego car init
- ego_init_h = positionEgo[0].h
- init.add_init_action(egoname, xosc.TeleportAction(
- xosc.WorldPosition(x=positionEgo[0].x, y=positionEgo[0].y, z=positionEgo[0].z, h=ego_init_h, p=0, r=0)))
- init.add_init_action(egoname, egospeed)
- # ego car trail
- trajectory = xosc.Trajectory('egoTrajectory', False)
- polyline = xosc.Polyline(self.gps_time, positionEgo)
- trajectory.add_shape(polyline)
- speedaction = xosc.FollowTrajectoryAction(trajectory, xosc.FollowMode.position, xosc.ReferenceContext.absolute,
- 1, 0)
- trigger = xosc.ValueTrigger('drive_start_trigger', 0, xosc.ConditionEdge.rising,
- xosc.SimulationTimeCondition(0, xosc.Rule.greaterThan))
- trigger_init = xosc.ValueTrigger(name='init_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising,
- valuecondition=xosc.SimulationTimeCondition(value=0,
- rule=xosc.Rule.greaterThan))
- event_init = xosc.Event('Event_init', xosc.Priority.overwrite)
- action_light = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
- light_flag = 'auto'
- if self.time == 64800:
- light_flag = 'true'
- text = f'<Environment><TimeOfDay headlights="{light_flag}" timezone="8" value="{self.time}" /></Environment>'
- node = ET.Element("CustomCommandAction")
- node.attrib = {'type': 'scp'}
- node.text = f"<![CDATA[\n{text}]\n]>"
- action_light.add_element(node)
- event_init.add_action('headlight', action_light)
- event_init.add_trigger(trigger_init)
- event = xosc.Event('Event1', xosc.Priority.overwrite)
- event.add_trigger(trigger)
- event.add_action('newspeed', speedaction)
- man = xosc.Maneuver('my maneuver')
- man.add_event(event_init)
- man.add_event(event)
- mangr = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
- mangr.add_actor('Ego')
- mangr.add_maneuver(man)
- trigger0 = xosc.Trigger('start')
- act = xosc.Act('Act1', trigger0)
- act.add_maneuver_group(mangr)
- ego_story = xosc.Story('mystory_ego')
- ego_story.add_act(act)
- stop_trigger = xosc.ValueTrigger('stop_trigger', 0, xosc.ConditionEdge.none,
- xosc.SimulationTimeCondition(self.period, xosc.Rule.greaterThan), 'stop')
- sb = xosc.StoryBoard(init, stoptrigger=stop_trigger)
- sb.add_story(ego_story)
- # object car trail
- if positionObj:
- for i, row in enumerate(positionObj):
- name = objname + str(i)
- if len(row[1]) < 10:
- continue
- obj_type = row[2]
- # 根据目标车需要动态创建实体
- x = row[0][0].x
- y = row[0][0].y
- z = row[0][0].z
- h = row[0][0].h
- start_time = row[1][0]
- init_position = xosc.WorldPosition(x=x, y=y, z=z, h=h, p=0, r=0)
- newentity = xosc.AddEntityAction(name, init_position)
- newentity_trigger = xosc.ValueTrigger(name='newentity_trigger', delay=0,
- conditionedge=xosc.ConditionEdge.rising,
- valuecondition=xosc.SimulationTimeCondition(value=start_time,
- rule=xosc.Rule.equalTo))
- newentity_event = xosc.Event('Event_newentity', xosc.Priority.overwrite)
- newentity_event.add_trigger(newentity_trigger)
- newentity_event.add_action('newentity_action', newentity)
- man_obj = xosc.Maneuver('my maneuver')
- man_obj.add_event(newentity_event)
- # 添加目标车轨迹
- trajectoryM = xosc.Trajectory('objectTrajectory', False)
- polylineM = xosc.Polyline(row[1], row[0])
- trajectoryM.add_shape(polylineM)
- obj_trail_action = xosc.FollowTrajectoryAction(trajectory=trajectoryM,
- following_mode=xosc.FollowMode.position,
- reference_domain=xosc.ReferenceContext.absolute,
- scale=1, offset=0, initialDistanceOffset=0)
- obj_trail_event = xosc.Event('Event1', xosc.Priority.overwrite)
- # obj_start_trigger = xosc.EntityTrigger("obj-start-trigger", 0, xosc.ConditionEdge.rising, xosc.SpeedCondition(0, xosc.Rule.greaterThan),'Ego')
- obj_start_trigger = xosc.ValueTrigger('obj_start_trigger', 0, xosc.ConditionEdge.rising,
- xosc.SimulationTimeCondition(start_time, xosc.Rule.greaterThan))
- obj_trail_event.add_trigger(obj_start_trigger)
- obj_trail_event.add_action('trail_action', obj_trail_action)
- # if obj_type in ped_type:
- # walp_trigger = xosc.EntityTrigger('ped_walp_trigger', 0, xosc.ConditionEdge.rising,\
- # xosc.TimeToCollisionCondition(self.intersectime, xosc.Rule.lessThan, entity=name), 'Ego')
- # obj_trail_event.add_trigger(walp_trigger)
- man_obj.add_event(obj_trail_event)
- # 行人特有的事件
- if obj_type in ped_type:
- ped_event = xosc.Event('Event_ped', xosc.Priority.overwrite)
- ped_event.add_trigger(obj_start_trigger)
- ped_action = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0)
- ped_action.add_element(self.createUDAction(name))
- ped_event.add_action('ped_trail_action', ped_action)
- man_obj.add_event(ped_event)
- # finish_trigger = xosc.EntityTrigger('finish_trigger', 0, xosc.ConditionEdge.rising, xosc.ReachPositionCondition(position=row[1][0],tolerance=1),name)
- # event4 = xosc.Event('Event_ped',xosc.Priority.overwrite)
- # event4.add_trigger(finish_trigger)
- # be_still_action = xosc.AbsoluteSpeedAction(0, xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 1))
- # event4.add_action('ped_be_still_action',be_still_action)
- # man_obj.add_event(event4)
- # 轨迹最后删除实体
- last_time = row[1][-1]
- del_action = xosc.DeleteEntityAction(name)
- del_trigger = xosc.ValueTrigger(name='entity_del_trigger', delay=0,
- conditionedge=xosc.ConditionEdge.rising,
- valuecondition=xosc.SimulationTimeCondition(value=last_time,
- rule=xosc.Rule.greaterThan))
- del_event = xosc.Event('Event_del', xosc.Priority.overwrite)
- del_event.add_trigger(del_trigger)
- del_event.add_action('del_action', del_action)
- man_obj.add_event(del_event)
- obj_mangroup = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True)
- obj_mangroup.add_actor(name)
- obj_mangroup.add_maneuver(man_obj)
- obj_act = xosc.Act('Act2', trigger0)
- obj_act.add_maneuver_group(obj_mangroup)
- obj_story = xosc.Story('mystory_' + name)
- obj_story.add_act(obj_act)
- sb.add_story(obj_story)
- # prettyprint(sb.get_element())
- paramet = xosc.ParameterDeclarations()
- sce = xosc.Scenario('my scenario', 'Maggie', paramet, entities, sb, road, catalog)
- return sce
- def createUDAction(self, name):
- # tree = ET.parse('./models/ped_CDATA.xosc')
- # root = tree.getroot()
- # ele = root[5][2][1][0][1][1][0][0][0]
- # newnode = ET.Element("CustomCommandAction")
- # newnode.attrib = {'type': 'scp'}
- # newnode.text = '<![CDATA[\n' + ele.text + ']\n]>'
- speed = 3
- motion = 'walk'
- text = f'<Traffic>\n <ActionMotion actor="{name}" move="{motion}" speed="{speed}" force="false" ' \
- f'rate="0" delayTime="0.0" activateOnExit="false"/>\n</Traffic>'
- newnode = ET.Element("CustomCommandAction")
- newnode.attrib = {'type': 'scp'}
- newnode.text = f"<![CDATA[\n{text}]\n]>"
- return newnode
- class WorkMode(Enum):
- cicv = 0 # CICV车端
- roadside = 1 # 车网路端
- car = 2 # 车网车端
- merge = 3 # 车网车端路端融合
- def get_obj_type(mode):
- if mode == WorkMode.cicv.value:
- # CICV车端
- ped_type = [7]
- car_type = [2]
- bicycle_motor_type = [8]
- bus_type = [3]
- truct_type = [4]
- elif mode == WorkMode.roadside.value:
- # 车网路端
- ped_type = [0]
- car_type = [2]
- bicycle_motor_type = [1, 3]
- bus_type = [5]
- truct_type = [7]
- elif mode == WorkMode.car.value:
- # 车网车端
- ped_type = [1]
- car_type = [6]
- bicycle_motor_type = [2, 3, 4, 5]
- bus_type = [7, 8]
- truct_type = [9]
- else:
- # 车网车端路端融合
- ped_type = [1]
- car_type = [6]
- bicycle_motor_type = [2, 3, 4, 5]
- bus_type = [7, 8]
- truct_type = [9]
- return ped_type, car_type, bicycle_motor_type, bus_type, truct_type
- # 创建道路旁静止的场景
- def create_static_object(road, object_dict):
- for single_object in object_dict:
- for k, v in ObjectType.__members__.items():
- if k == single_object.attrib['type']:
- single_object.attrib['type'] = v
- break
- road_object = Object(
- s=single_object.attrib['s'],
- t=single_object.attrib['t'],
- Type=single_object.attrib['type'],
- dynamic=Dynamic.no,
- id=single_object.attrib['id'],
- name=single_object.attrib['name'],
- zOffset=single_object.attrib['zOffset'],
- validLength=single_object.attrib['validLength'],
- orientation=Orientation.none,
- length=single_object.attrib['length'],
- width=single_object.attrib['width'],
- height=single_object.attrib['height'],
- pitch=single_object.attrib['pitch'],
- roll=single_object.attrib['roll']
- )
- # 判断此object是否是重复的元素
- repeat = single_object.find('repeat')
- if repeat is not None:
- road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.left,
- tOffset=1.75)
- road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.right,
- tOffset=-1.755)
- else:
- road.add_object(road_object)
- return road
- def change_CDATA(filepath):
- '行人场景特例,对xosc文件内的特殊字符做转换'
- f = open(filepath, "r", encoding="UTF-8")
- txt = f.readline()
- all_line = []
- # txt是否为空可以作为判断文件是否到了末尾
- while txt:
- txt = txt.replace("<", "<").replace(">", ">").replace("&", "&").replace(""", '"').replace(
- "'", "'")
- all_line.append(txt)
- # 读取文件的下一行
- txt = f.readline()
- f.close()
- f1 = open(filepath, 'w', encoding="UTF-8")
- for line in all_line:
- f1.write(line)
- f1.close()
- def path_changer(xosc_path, xodr_path, osgb_path):
- """
- provided by Dongpeng Ding
- :param xosc_path:
- :param xodr_path:
- :param osgb_path:
- :return:
- """
- tree = ET.parse(xosc_path)
- treeRoot = tree.getroot()
- # for OpenScenario v0.9, v1.0
- for RoadNetwork in treeRoot.findall('RoadNetwork'):
- for Logics in RoadNetwork.findall('LogicFile'):
- Logics.attrib['filepath'] = xodr_path
- for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
- SceneGraph.attrib['filepath'] = osgb_path
- for Logics in RoadNetwork.findall('Logics'):
- Logics.attrib['filepath'] = xodr_path
- for SceneGraph in RoadNetwork.findall('SceneGraph'):
- SceneGraph.attrib['filepath'] = osgb_path
- # for VTD xml
- for Layout in treeRoot.findall('Layout'):
- Layout.attrib['File'] = xodr_path
- Layout.attrib['Database'] = osgb_path
- tree.write(xosc_path, xml_declaration=True)
- def readXML(xoscPath):
- xodrFileName = ""
- osgbFileName = ""
- tree = ET.parse(xoscPath)
- treeRoot = tree.getroot()
- for RoadNetwork in treeRoot.findall('RoadNetwork'):
- for Logics in RoadNetwork.findall('LogicFile'):
- xodrFileName = Logics.attrib['filepath']
- for SceneGraph in RoadNetwork.findall('SceneGraphFile'):
- osgbFileName = SceneGraph.attrib['filepath']
- for Logics in RoadNetwork.findall('Logics'):
- xodrFileName = Logics.attrib['filepath']
- for SceneGraph in RoadNetwork.findall('SceneGraph'):
- osgbFileName = SceneGraph.attrib['filepath']
- return xodrFileName[xodrFileName.rindex("/") + 1:], osgbFileName[osgbFileName.rindex("/") + 1:]
- def formatThree(rootDirectory):
- """
- xodr and osgb file path are fixed
- :return:
- """
- for root, dirs, files in os.walk(rootDirectory):
- for file in files:
- if ".xosc" == file[-5:]:
- # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.xodr" # 泛化效果好的场景用的
- # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.osgb" # 泛化效果好的场景用的
- # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.xodr" # 泛化效果好的场景用的
- # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.osgb" # 泛化效果好的场景用的
- xodrFilePath = "/home/hancheng/hc_project/2023-08-08-10-50-16_jsons1/2023-08-08-10-50-16_jsons_0/2023-08-08-10-50-16_jsons_0.xodr" # 直路,泛化好用
- osgbFilePath = "/home/hancheng/VIRES/VTD.2022.1/Runtime/Tools/RodDistro_7001_Rod4.6.3/DefaultProject/Database/json0.opt.osgb" # 直路,泛化好用
- # xodrFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.xodr" # 十字路口,泛化好用
- # osgbFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.opt.osgb" # 十字路口,泛化好用
- # xodrFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.xodr" # Sharing-van还原用的
- # osgbFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.opt.osgb" # Sharing-van还原用的
- path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
- print("Change success: " + root + "/" + file)
- def formatTwo(rootDirectory):
- """
- data format:
- simulation
- file.xosc
- file.xodr
- file.osgb
- :return:
- """
- for root, dirs, files in os.walk(rootDirectory):
- for file in files:
- if ".xosc" == file[-5:]:
- xodrFilePath = ""
- osgbFilePath = ""
- for odrFile in os.listdir(root):
- if ".xodr" == odrFile[-5:]:
- xodrFilePath = root + "/" + odrFile
- break
- for osgbFile in os.listdir(root):
- if ".osgb" == osgbFile[-5:]:
- osgbFilePath = root + "/" + osgbFile
- break
- path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
- print("Change success: " + root + "/" + file)
- def formatOne(rootDirectory):
- """
- data format:
- openx
- xosc
- file.xosc
- xodr
- file.xodr
- osgb
- file.osgb
- :return:
- """
- for root, dirs, files in os.walk(rootDirectory):
- for file in files:
- if "xosc" == file[-4:]:
- xodrFilePath = ""
- osgbFilePath = ""
- for odrFile in os.listdir(root[:-4] + "xodr"):
- if "xodr" == odrFile[-4:]:
- xodrFilePath = root[:-4] + "xodr/" + odrFile
- break
- for osgbFile in os.listdir(root[:-4] + "osgb"):
- if "osgb" == osgbFile[-4:]:
- osgbFilePath = root[:-4] + "osgb/" + osgbFile
- break
- path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
- print("Change success: " + root + "/" + file)
- def chongQingFormat(rootDirectory):
- """
- supporting file format: chong qing folder format
- :return:
- """
- counter = 1
- for root, dirs, files in os.walk(rootDirectory):
- for file in files:
- if "xosc" == file[-4:]:
- if "ver1.0.xosc" == file[-11:]:
- xodrFileName, osgbFileName = readXML(root + "/" + file)
- xodrFilePath = "/Xodrs/" + xodrFileName
- osgbFilePath = "/Databases/" + osgbFileName
- path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
- print(counter, "Change success: " + root + "/" + file)
- else:
- xodrFileName, osgbFileName = readXML(root + "/" + file)
- xodrFilePath = "/Xodrs/" + xodrFileName
- osgbFilePath = "/Databases/" + osgbFileName
- path_changer(root + "/" + file, xodrFilePath, osgbFilePath)
- print(counter, "Change success: " + root + "/" + file)
- counter += 1
|