#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Mar 14 15:44:01 2022 @author: lxj """ import xml.etree.ElementTree as ET from scenariogeneration import xodr from scenariogeneration import xosc from scenariogeneration import ScenarioGenerator from scenariogeneration.xodr import RoadSide, Object, ObjectType, Dynamic, Orientation import math, os from xodr_generator2 import road_detector_new from datetime import datetime import traceback import json import pandas as pd from enum import Enum import elevation_0323 as el from xml.sax.saxutils import escape import warnings warnings.filterwarnings("ignore") # # #xodr_list = ['/home/hancheng/taiheqiao_map/thq_1116.xodr', '/home/hancheng/gongyuanbeihuan_map/OD/OpenDRIVE.xodr','/home/hancheng/glyt_map/OpenDRIVE/OpenDRIVE2.xodr'] # #osgb_list = ['/home/hancheng/taiheqiao_map/thq_1116.opt.osgb', '/home/hancheng/gongyuanbeihuan_map/osgb/GJBM_20km_0822.opt.osgb', '/home/hancheng/glyt_map/OSGB/jinlong_20231129_2_guazai.opt.osgb'] # xodr_list = ['/root/rosmerge/resource/thq.xodr', '/root/rosmerge/resource/OpenDRIVE.xodr','/root/rosmerge/resource/gelan.xodr'] # osgb_list = ['/root/rosmerge/resource/thq.opt.osgb', '/root/rosmerge/resource/GJBM_20km_0822.opt.osgb', '/root/rosmerge/resource/gelan.opt.osgb'] class Scenario(ScenarioGenerator): def __init__(self, gps, obs, gps_time, ego_speed, work_mode, period, abspath, timeofday): ScenarioGenerator.__init__(self) self.gps = gps self.obs = obs self.gps_time = gps_time self.ego_speed = ego_speed self.work_mode = work_mode self.period = period # 场景持续的时间 self.abspath = abspath self.weather = xosc.PrecipitationType.dry # 默认是晴天 self.visual_fog_range = 20000 # 正常的能见度,没有雾 self.time = 43200 # 强制为白天 # self.map_id = map_id # self.ObjectID = ObjectID # self.Speed = Speed def road(self, **kwargs): positionEgo = self.gps planview = xodr.PlanView() nextx = positionEgo[0].x nexty = positionEgo[0].y h = float(positionEgo[0].h) # for i in range(len(positionEgo) - 1): # x = 0.000001 if abs(positionEgo[i].x) < 0.000001 else positionEgo[i].x # y = 0.000001 if abs(positionEgo[i].y) < 0.000001 else positionEgo[i].y # nextx = 0.000001 if abs(positionEgo[i + 1].x) < 0.000001 else positionEgo[i + 1].x # nexty = 0.000001 if abs(positionEgo[i + 1].y) < 0.000001 else positionEgo[i + 1].y # h = float(positionEgo[i].h) # # planview.add_fixed_geometry(xodr.Line(math.sqrt(math.pow(nextx - x, 2) + math.pow(nexty - y, 2))), x, y, h) planview.add_fixed_geometry(xodr.Line(100), nextx, nexty, h) # create two different roadmarkings rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2) rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2) # create simple lanes lanes = xodr.Lanes() lanesection1 = xodr.LaneSection(0, xodr.standard_lane(offset=1.75, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_solid)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_solid)) lanes.add_lanesection(lanesection1) lanes.add_laneoffset(xodr.LaneOffset(a=1.75)) road = xodr.Road(0, planview, lanes) odr = xodr.OpenDrive('myroad') # 静态元素 tree = ET.parse(os.getcwd() + '/models/static_object_models.xodr') root = tree.getroot() objects_dict = root.iter(tag='object') road = create_static_object(road, objects_dict) # road = create_static_object(road, object_id=0, object_type=ObjectType.railing, repeat_flag=1) odr.add_road(road) odr.adjust_roads_and_lanes() return odr def get_section(self, left_lanemark): return None def split_road_section(self, enu_file, lane_file): lane_content = pd.read_csv(lane_file) # 把车道分割开来,一个车道段,多种geometry # 首先判断车道线数值是否有断裂,断裂处根据自车轨迹生成道路参数 left_lanemark = lane_content[lane_content['LaneID'] == 1][['Time','FrameID','LanePosition','LaneQuality']] right_lanemark = lane_content[lane_content['LaneID'] == 2][['Time','FrameID','LanePosition','LaneQuality']] left_break_section = self.get_section(left_lanemark, 1) right_break_section = self.get_section(right_lanemark, 1) break_section = [] normal_section = [] # 判断是否变道,然后判断是否弯道 # 最终目标是得到多个geometry参数,确定每个连接线处的s/t偏移量?,以及左右道路的个数和宽度 return None def check_outliers_and_modify(self, datalist): for index in datalist.index: if index < 3: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 else 2 * datalist[index + 1] - datalist[index + 1:index + 3].mean() elif 3 <= index < len(datalist) - 3: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 or abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else (datalist[index - 1] + datalist[index + 1]) / 2 else: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else 2 * datalist[index - 1] - datalist[index - 3:index].mean() return datalist def get_new_LaneID(self, change_flag, laneID, frameid, item): ''' 修改laneID,change_list包含了每个变道片段 ''' if item[0] <= frameid < item[1]: return laneID + change_flag else: return laneID def get_lane_width(self, data1, data2): lane_union = pd.merge(data1, data2, on=['FrameID']) lane_union['width'] = lane_union['LanePosition_y'] - lane_union['LanePosition_x'] a = lane_union["width"].quantile(0.75) b = lane_union["width"].quantile(0.25) lane_width = abs((a + b) / 2) return lane_width def road_pre_study(self, label_file, lane_file): laneDF = pd.read_csv(lane_file) road_label = '' flag = 0 # flag = 0 can use road_ego to generate .xodr, flag = 1 better use road to generate .xodr with open(label_file) as f: label = json.load(f) ego_actions = label['自车行为'] for item in ego_actions: if ('变道' in item['type_']) or ('转成功' in item['type_']) or ('掉头' in item['type_']): road_label = item['type_'] flag = 1 break else: road_label = item['type_'] if road_label == '向左变道成功': # 查找变道标记点 change_flag = 1 lane_1 = laneDF[(laneDF['LaneID'] == 1) & (laneDF['LaneQuality'] > 1)] lane_1 = lane_1.reset_index(drop=True) leftlanepositon = lane_1['LanePosition'] lane_1['LanePosition'] = self.check_outliers_and_modify(leftlanepositon) lane_1['last_Position'] = lane_1[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_1['last_FrameID'] = lane_1[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_1['change_flag'] = lane_1.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。 lane_1['split_flag'] = lane_1.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1) lane_2 = laneDF[(laneDF['LaneID'] == 2) & (laneDF['LaneQuality'] > 1)] lane_2 = lane_2.reset_index(drop=True) rightlanepositon = lane_2['LanePosition'] lane_2['LanePosition'] = self.check_outliers_and_modify(rightlanepositon) lane_2['last_Position'] = lane_2[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_2['last_FrameID'] = lane_2[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_2['change_flag'] = lane_2.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。 lane_2['split_flag'] = lane_2.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1) lane_width = self.get_lane_width(lane_1[['FrameID','LanePosition']], lane_2[['FrameID','LanePosition']]) # 左变道用左车道线数据靠谱,不会有他车遮挡,找到变道数据段 change_lane_point = list(lane_1[lane_1['change_flag'] == change_flag]['change_flag'].index) + [list(lane_1.index)[-1]] change_lane_section = [(lane_1.loc[change_lane_point[i], 'FrameID'], lane_1.loc[change_lane_point[i+1], 'FrameID']) for i in range(len(change_lane_point)-1)] for item in change_lane_section: # 变道后修改原始laneID # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if item[0] <= x['FrameID'] < item[1] else x['LaneID'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if item[0] <= x['FrameID'] < item[1] else x['LanePosition'], axis=1) # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if x['FrameID'] == max(laneDF['FrameID']) else x['LaneID'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if x['FrameID'] == max(laneDF['FrameID']) else x['LanePosition'], axis=1) # 修正左左车道线和右右车道线数值 if change_flag == 1: lane_0 = laneDF[(laneDF['LaneID'] == 0) & (laneDF['LaneQuality'] > 1) & (laneDF['FrameID'] < item[0])] if not lane_0.empty: left_lane_width = self.get_lane_width(lane_0[['FrameID','LanePosition']], lane_1[['FrameID','LanePosition']]) else: left_lane_width = lane_width print(left_lane_width) # new_lane_1 = laneDF[laneDF['LaneID'] == 1][['FrameID','LanePosition']] # new_lane_1.rename(columns={'LanePosition':'LanePosition2'},inplace=True) # laneDF = pd.merge(laneDF, new_lane_1, on=['FrameID']) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == -1) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LaneID'] = laneDF.apply(lambda x: 3 if x['LaneID'] == -1 else x['LaneID'], axis=1) # 缺右车道线right_lane_width处理 print(laneDF[laneDF['FrameID']>1860][['FrameID', 'LaneID', 'LanePosition']].head(50)) print('road_label: ', road_label) return laneDF def road_new(self, **kwargs): polyfit_fig_path = os.path.join(self.abspath, 'simulation','polyfit.jpg') # run road detector enu_file = os.path.join(self.abspath, 'pos.csv') lane_file = os.path.join(self.abspath, 'lane.csv') label_file = os.path.join(self.abspath, 'label.json') # study road style: straight\curve\split-line; ego action: change-lane\no-change\cross-turn lane_content = self.road_pre_study(label_file, lane_file) # road split sections # self.split_road_section(enu_file, lane_file) try: laneid_tuple = [0, 1, 2, 3] uv_coord_info, road_info, left_lane_info, right_lane_info = road_detector_new(enu_file, lane_content, laneid_tuple, polyfit_fig_path) except: print('road detection failed!!!! Use ego trail road.') error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()} with open('error.log','a+') as f: json.dump(error, f, indent=4) f.write('\n') return self.road_ego() origin_x = uv_coord_info[0] origin_y = uv_coord_info[1] uv_coord_hdg = uv_coord_info[2] reference_line_offset = road_info[0] model_u = road_info[1] # in fomrat [c3, c2, c1, c0] model_v = road_info[2] # in fomrat [c3, c2, c1, c0] road_length = road_info[3] print(uv_coord_info) print(road_info) # generate xodr file planview = xodr.PlanView(origin_x, origin_y, uv_coord_hdg) # Create geometry and add it to the planview # parampoly3 = xodr.ParamPoly3(au=model_u[3], bu=model_u[2], cu=model_u[1], du=model_u[0], av=model_v[3], bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length) parampoly3 = xodr.ParamPoly3(au=0, bu=model_u[2], cu=model_u[1], du=model_u[0], av=0, bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length) planview.add_geometry(parampoly3) # create two different roadmarkings rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid,0.2) rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken,0.2,laneChange=xodr.LaneChange.both) ##4. Create centerlane centerlane = xodr.Lane(a=reference_line_offset) centerlane.add_roadmark(rm_dashed) ##5. Create lane section form the centerlane lanesec = xodr.LaneSection(0,centerlane) ##6. Create left and right lanes lane_id = 0 for lane in left_lane_info: lane_id += 1 lane_width = lane[0] spd_limit = lane[1] lane_left = xodr.Lane(a=lane_width) lane_left.add_roadmark(rm_dashed) # Add lanes to lane section lanesec.add_left_lane(lane_left) lane_id = 0 for lane in right_lane_info: lane_id -= 1 lane_width = lane[0] spd_limit = lane[1] lane_right = xodr.Lane(a=lane_width) lane_right.add_roadmark(rm_dashed) # Add lanes to lane section lanesec.add_right_lane(lane_right) ##8. Add lane section to Lanes lanes = xodr.Lanes() lanes.add_laneoffset(xodr.LaneOffset(s = 0, a=reference_line_offset)) lanes.add_lanesection(lanesec) ##9. Create Road from Planview and Lanes road = xodr.Road(1,planview,lanes) ##10. Create the OpenDrive class (Master class) odr = xodr.OpenDrive('myroad') ##11. Finally add roads to Opendrive odr.add_road(road) ##12. Adjust initial positions of the roads looking at succ-pred logic odr.adjust_roads_and_lanes() # ##13. Print the .xodr file # xodr.prettyprint(odr.get_element()) # ##14. Run the .xodr with esmini # xodr.run_road(odr,os.path.join('..','pyoscx','esmini')) return odr def getXoscPosition(egodata, t, e, n, h, z, obj_type, offset_x, offset_y, offset_h, start_time): '将dataframe的轨迹转换为场景文件的轨迹数据, Time, ego_e, ego_n, headinga' position = [] time = [] # cur_time = 0.0 egodata = egodata[[t, e, n, h, z, obj_type]] egodata = egodata.reset_index(drop=True) # egodata[t] = egodata.index / 10 egodata[t] = (egodata[t] - start_time) / 1000 egodata[e] = egodata[e] + offset_x egodata[n] = egodata[n] + offset_y egodata[h] = egodata[h] + offset_h egodata[h] = egodata.apply(lambda x: x[h] if x[h] < 360 else x[h] - 360, axis=1) type_list = list(egodata[obj_type]) ego_type = int(max(type_list, key=type_list.count)) lasth = float(egodata.at[0, h]) init_flag = True # two method to set z value zflag = 0 # 1 if zflag: # 根据地图的xy值计算z值 zfile = '/home/hancheng/weihao/gongyuanbei_elevation.csv' # zfile = '/home/hancheng/simulation_dynamic/elevation_df_taiheqiao.csv' zdata = pd.read_csv(zfile) for row in egodata.iterrows(): hhh = math.radians(row[1][h]) xodr_z = el.cal_elevation(zdata, float(row[1][e]), float(row[1][n])) if init_flag: position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0)) init_flag = False else: if float(row[1][h]) - lasth > 300: hhh = math.radians(float(row[1][h]) - 360) elif float(row[1][h]) - lasth < -300: hhh = math.radians(float(row[1][h]) + 360) position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=xodr_z, h=hhh, p=0, r=0)) lasth = float(row[1][h]) time.append(float(row[1][t])) # time.append(cur_time) # cur_time += 0.1 return position, time, ego_type else: # z = 0 for row in egodata.iterrows(): hhh = math.radians(row[1][h]) if init_flag: position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0)) init_flag = False else: if float(row[1][h]) - lasth > 300: hhh = math.radians(float(row[1][h]) - 360) elif float(row[1][h]) - lasth < -300: hhh = math.radians(float(row[1][h]) + 360) position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0)) lasth = float(row[1][h]) time.append(float(row[1][t])) # time.append(cur_time) # cur_time += 0.1 return position, time, ego_type def scenario(self, **kwargs): road = xosc.RoadNetwork(roadfile='/root/rosmerge/resource/OpenDRIVE2_add_stop_line_marking_20231229.xodr', scenegraph='/root/rosmerge/resource/jinlong_all_addtrafficlight_20240226_5_guazai_2.opt.osgb') catalog = xosc.Catalog() catalog.add_catalog('VehicleCatalog', 'Distros/Current/Config/Players/Vehicles') catalog.add_catalog('PedestrianCatalog', 'Distros/Current/Config/Players/Pedestrians') catalog.add_catalog('MiscObjectCatalog', 'Distros/Current/Config/Players/Objects') catalog.add_catalog('ControllerCatalog', 'Distros/Current/Config/Players/driverCfg.xml') pbb = xosc.BoundingBox(0.5, 0.5, 1.8, 2.0, 0, 0.9) mbb = xosc.BoundingBox(1, 2, 1.7, 1.5, 0, 0.9) bb = xosc.BoundingBox(2.1, 4.5, 1.8, 1.5, 0, 0.9) bus_bb = xosc.BoundingBox(2.55, 10.5, 3.1, 5.1, 0, 1.45) bus_fa = xosc.Axle(0.48, 0.91, 2.5, 4.5, 1.5) bus_ba = xosc.Axle(0, 0.91, 2.5, 4.5, 1.5) truck_bb = xosc.BoundingBox(2.8, 8.744, 3.78, 4.372, 0, 1.89) truck_fa = xosc.Axle(0.48, 1, 2.8, 4.372, 1.5) truck_ba = xosc.Axle(0, 1, 2.8, 4.372, 1.5) fa = xosc.Axle(0.5, 0.6, 1.8, 3.1, 0.3) ba = xosc.Axle(0, 0.6, 1.8, 0, 0.3) cone_bb = xosc.BoundingBox(0.5, 0.5, 1.0, 2.0, 0, 0.9) red_veh = xosc.Vehicle('Audi_A3_2009_black', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) red_veh.add_property(name='control', value='external') segway = xosc.Vehicle('Segway', xosc.VehicleCategory.car, pbb, fa, ba, 69.444, 200, 10) segway.add_property(name='control', value='external') white_veh = xosc.Vehicle('Audi_A3_2009_red', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) male_ped = xosc.Pedestrian('Christian', 'male_adult', 70, xosc.PedestrianCategory.pedestrian, pbb) motorcycle = xosc.Vehicle('Kawasaki_ZX-9R_black', xosc.VehicleCategory.motorbike, mbb, fa, ba, 69.444, 200, 10) motorcycle.add_property(name='control', value='external') truck = xosc.Vehicle('MANTGS_11_Black', xosc.VehicleCategory.truck, truck_bb, truck_fa, truck_ba, 26.4, 10, 10) bus = xosc.Vehicle('MercedesTravego_10_ArcticWhite', xosc.VehicleCategory.bus, bus_bb, bus_fa, bus_ba, 29, 10, 10) cone = xosc.MiscObject('RdMiscPylon03-32cm', 10, xosc.MiscObjectCategory.obstacle, cone_bb) prop = xosc.Properties() cnt = xosc.Controller('DefaultDriver', prop) cnt2 = xosc.Controller('No Driver', prop) # 添加自车实体 egoname = 'Ego' entities = xosc.Entities() entities.add_scenario_object(egoname, segway, cnt) # 添加目标车实体 objname = 'Player' ped_type, car_type, bicycle_motor_type, bus_type, truct_type, cone_type = get_obj_type(self.work_mode) positionEgo = self.gps positionObj = self.obs for i, row in enumerate(positionObj): if len(row[1]) < 1: continue obj_type = row[2] # 根据目标物类型添加对应的实体 if obj_type in ped_type: entities.add_scenario_object(objname + str(i), male_ped) elif obj_type in car_type: entities.add_scenario_object(objname + str(i), white_veh, cnt2) elif obj_type in bicycle_motor_type: entities.add_scenario_object(objname + str(i), motorcycle, cnt2) elif obj_type in bus_type: entities.add_scenario_object(objname + str(i), bus, cnt2) elif obj_type in truct_type: entities.add_scenario_object(objname + str(i), truck, cnt2) elif obj_type in cone_type: entities.add_scenario_object(objname + str(i), cone) # 初始化 init = xosc.Init() step_time = xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 0) egospeed = xosc.AbsoluteSpeedAction(self.ego_speed, step_time) # 设置初始环境要素,例如天气时间等 default_precipitation = xosc.PrecipitationType.dry default_cloudstate = xosc.CloudState.free # json_path = os.path.join(self.abspath, 'label.json') # with open(json_path, 'r', encoding='utf-8') as f: # sc_json = json.load(f) # weather = sc_json['天气情况'][0]['type_'] #['自然环境'] # if weather == '晴': # f.close() # elif weather == '多云': # default_cloudstate = xosc.CloudState.cloudy # elif weather == '阴': # default_cloudstate = xosc.CloudState.overcast # elif weather == '雨': # default_cloudstate = xosc.CloudState.overcast # default_precipitation = xosc.PrecipitationType.rain # elif weather == '雪': # default_cloudstate = xosc.CloudState.overcast # default_precipitation = xosc.PrecipitationType.snow # elif weather == '雾霾' or weather == '沙尘': # self.visual_fog_range = 700 # default_cloudstate = xosc.CloudState.cloudy init.add_global_action( xosc.EnvironmentAction(name="InitEnvironment", environment=xosc.Environment( xosc.TimeOfDay(True, 2019, 12, 19, 12, 0, 0), xosc.Weather( sun_intensity=2000, sun_azimuth=40, sun_elevation=20, cloudstate=default_cloudstate, precipitation=default_precipitation, precipitation_intensity=1, visual_fog_range=self.visual_fog_range), xosc.RoadCondition(friction_scale_factor=0.7)))) # ego car init ego_init_h = positionEgo[0].h init.add_init_action(egoname, xosc.TeleportAction( xosc.WorldPosition(x=positionEgo[0].x, y=positionEgo[0].y, z=positionEgo[0].z, h=ego_init_h, p=0, r=0))) init.add_init_action(egoname, egospeed) # ego car trail trajectory = xosc.Trajectory('egoTrajectory', False) polyline = xosc.Polyline(self.gps_time, positionEgo) trajectory.add_shape(polyline) speedaction = xosc.FollowTrajectoryAction(trajectory, xosc.FollowMode.position, xosc.ReferenceContext.absolute, 1, 0) trigger = xosc.ValueTrigger('drive_start_trigger', 0, xosc.ConditionEdge.rising, xosc.SimulationTimeCondition(0, xosc.Rule.greaterThan)) trigger_init = xosc.ValueTrigger(name='init_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=0, rule=xosc.Rule.greaterThan)) event_init = xosc.Event('Event_init', xosc.Priority.overwrite) action_light = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0) light_flag = 'auto' if self.time == 64800: light_flag = 'true' text = f'' node = ET.Element("CustomCommandAction") node.attrib = {'type': 'scp'} node.text = f"" action_light.add_element(node) event_init.add_action('headlight', action_light) event_init.add_trigger(trigger_init) event = xosc.Event('Event1', xosc.Priority.overwrite) event.add_trigger(trigger) event.add_action('newspeed', speedaction) man = xosc.Maneuver('my maneuver') man.add_event(event_init) man.add_event(event) mangr = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True) mangr.add_actor('Ego') mangr.add_maneuver(man) trigger0 = xosc.Trigger('start') act = xosc.Act('Act1', trigger0) act.add_maneuver_group(mangr) ego_story = xosc.Story('mystory_ego') ego_story.add_act(act) stop_trigger = xosc.ValueTrigger('stop_trigger', 0, xosc.ConditionEdge.none, xosc.SimulationTimeCondition(self.period, xosc.Rule.greaterThan), 'stop') sb = xosc.StoryBoard(init, stoptrigger=stop_trigger) sb.add_story(ego_story) # object car trail if positionObj: for i, row in enumerate(positionObj): name = objname + str(i) if len(row[1]) < 2: continue obj_type = row[2] # 根据目标车需要动态创建实体 x = row[0][0].x y = row[0][0].y z = row[0][0].z h = row[0][0].h start_time = row[1][0] init_position = xosc.WorldPosition(x=x, y=y, z=z, h=h, p=0, r=0) newentity = xosc.AddEntityAction(name, init_position) newentity_trigger = xosc.ValueTrigger(name='newentity_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=start_time, rule=xosc.Rule.greaterThan)) newentity_event = xosc.Event('Event_newentity', xosc.Priority.overwrite) newentity_event.add_trigger(newentity_trigger) newentity_event.add_action('newentity_action', newentity) man_obj = xosc.Maneuver('my maneuver') man_obj.add_event(newentity_event) if obj_type != 103: # 添加目标车轨迹 trajectoryM = xosc.Trajectory('objectTrajectory', False) polylineM = xosc.Polyline(row[1], row[0]) trajectoryM.add_shape(polylineM) obj_trail_action = xosc.FollowTrajectoryAction(trajectory=trajectoryM, following_mode=xosc.FollowMode.position, reference_domain=xosc.ReferenceContext.absolute, scale=1, offset=0, initialDistanceOffset=0) obj_trail_event = xosc.Event('Event1', xosc.Priority.overwrite) # obj_start_trigger = xosc.EntityTrigger("obj-start-trigger", 0, xosc.ConditionEdge.rising, xosc.SpeedCondition(0, xosc.Rule.greaterThan),'Ego') obj_start_trigger = xosc.ValueTrigger('obj_start_trigger', 0, xosc.ConditionEdge.rising, xosc.SimulationTimeCondition(start_time, xosc.Rule.greaterThan)) obj_trail_event.add_trigger(obj_start_trigger) obj_trail_event.add_action('trail_action', obj_trail_action) # if obj_type in ped_type: # walp_trigger = xosc.EntityTrigger('ped_walp_trigger', 0, xosc.ConditionEdge.rising,\ # xosc.TimeToCollisionCondition(self.intersectime, xosc.Rule.lessThan, entity=name), 'Ego') # obj_trail_event.add_trigger(walp_trigger) man_obj.add_event(obj_trail_event) # 行人特有的事件 if obj_type in ped_type: ped_event = xosc.Event('Event_ped', xosc.Priority.overwrite) ped_event.add_trigger(obj_start_trigger) ped_action = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0) ped_action.add_element(self.createUDAction(name)) ped_event.add_action('ped_trail_action', ped_action) man_obj.add_event(ped_event) # finish_trigger = xosc.EntityTrigger('finish_trigger', 0, xosc.ConditionEdge.rising, xosc.ReachPositionCondition(position=row[1][0],tolerance=1),name) # event4 = xosc.Event('Event_ped',xosc.Priority.overwrite) # event4.add_trigger(finish_trigger) # be_still_action = xosc.AbsoluteSpeedAction(0, xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 1)) # event4.add_action('ped_be_still_action',be_still_action) # man_obj.add_event(event4) # 轨迹最后删除实体 last_time = row[1][-1] del_action = xosc.DeleteEntityAction(name) del_trigger = xosc.ValueTrigger(name='entity_del_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=last_time, rule=xosc.Rule.greaterThan)) del_event = xosc.Event('Event_del', xosc.Priority.overwrite) del_event.add_trigger(del_trigger) del_event.add_action('del_action', del_action) man_obj.add_event(del_event) obj_mangroup = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True) obj_mangroup.add_actor(name) obj_mangroup.add_maneuver(man_obj) obj_act = xosc.Act('Act2', trigger0) obj_act.add_maneuver_group(obj_mangroup) obj_story = xosc.Story('mystory_' + name) obj_story.add_act(obj_act) sb.add_story(obj_story) # prettyprint(sb.get_element()) paramet = xosc.ParameterDeclarations() sce = xosc.Scenario('my scenario', 'Maggie', paramet, entities, sb, road, catalog) return sce def createUDAction(self, name): # tree = ET.parse('./models/ped_CDATA.xosc') # root = tree.getroot() # ele = root[5][2][1][0][1][1][0][0][0] # newnode = ET.Element("CustomCommandAction") # newnode.attrib = {'type': 'scp'} # newnode.text = '' speed = 3 motion = 'walk' # text = f'\n \n' text = '\n \n'.format(name, motion, speed) # text = '<Traffic> <ActionMotion actor="Player0" move="walk" speed="3" force="false"rate="0" delayTime="0.0" activateOnExit="false"/></Traffic>'.format(name, motion, speed) newnode = ET.Element("CustomCommandAction") newnode.attrib = {'type': 'scp'} newnode.text = "".format(text) return newnode class WorkMode(Enum): cicv = 0 # CICV车端 roadside = 1 # 车网路端 car = 2 # 车网车端 merge = 3 # 车网车端路端融合 def get_obj_type(mode): if mode == WorkMode.cicv.value: # CICV车端 ped_type = [7] car_type = [2] bicycle_motor_type = [8] bus_type = [3] truct_type = [4] cone_type = [103] elif mode == WorkMode.roadside.value: # 车网路端 ped_type = [0] car_type = [2] bicycle_motor_type = [1, 3] bus_type = [5] truct_type = [7] cone_type = [103] elif mode == WorkMode.car.value: # 车网车端 ped_type = [1] car_type = [6] bicycle_motor_type = [2, 3, 4, 5] bus_type = [7, 8] truct_type = [9] cone_type = [103] else: # 车网车端路端融合 ped_type = [1] car_type = [6] bicycle_motor_type = [2, 3, 4, 5] bus_type = [7, 8] truct_type = [9] cone_type = [103] return ped_type, car_type, bicycle_motor_type, bus_type, truct_type, cone_type # 创建道路旁静止的场景 def create_static_object(road, object_dict): for single_object in object_dict: for k, v in ObjectType.__members__.items(): if k == single_object.attrib['type']: single_object.attrib['type'] = v break road_object = Object( s=single_object.attrib['s'], t=single_object.attrib['t'], Type=single_object.attrib['type'], dynamic=Dynamic.no, id=single_object.attrib['id'], name=single_object.attrib['name'], zOffset=single_object.attrib['zOffset'], validLength=single_object.attrib['validLength'], orientation=Orientation.none, length=single_object.attrib['length'], width=single_object.attrib['width'], height=single_object.attrib['height'], pitch=single_object.attrib['pitch'], roll=single_object.attrib['roll'] ) # 判断此object是否是重复的元素 repeat = single_object.find('repeat') if repeat is not None: road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.left, tOffset=1.75) road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.right, tOffset=-1.755) else: road.add_object(road_object) return road def change_CDATA(filepath): '行人场景特例,对xosc文件内的特殊字符做转换' f = open(filepath, "r", encoding="UTF-8") txt = f.readline() all_line = [] # txt是否为空可以作为判断文件是否到了末尾 while txt: txt = txt.replace("<", "<").replace(">", ">").replace("&", "&").replace(""", '"').replace( "'", "'") all_line.append(txt) # 读取文件的下一行 txt = f.readline() f.close() f1 = open(filepath, 'w', encoding="UTF-8") for line in all_line: f1.write(line) f1.close() def path_changer(xosc_path, xodr_path, osgb_path): """ provided by Dongpeng Ding :param xosc_path: :param xodr_path: :param osgb_path: :return: """ tree = ET.parse(xosc_path) treeRoot = tree.getroot() # for OpenScenario v0.9, v1.0 for RoadNetwork in treeRoot.findall('RoadNetwork'): for Logics in RoadNetwork.findall('LogicFile'): Logics.attrib['filepath'] = xodr_path for SceneGraph in RoadNetwork.findall('SceneGraphFile'): SceneGraph.attrib['filepath'] = osgb_path for Logics in RoadNetwork.findall('Logics'): Logics.attrib['filepath'] = xodr_path for SceneGraph in RoadNetwork.findall('SceneGraph'): SceneGraph.attrib['filepath'] = osgb_path # for VTD xml for Layout in treeRoot.findall('Layout'): Layout.attrib['File'] = xodr_path Layout.attrib['Database'] = osgb_path tree.write(xosc_path, xml_declaration=True) def readXML(xoscPath): xodrFileName = "" osgbFileName = "" tree = ET.parse(xoscPath) treeRoot = tree.getroot() for RoadNetwork in treeRoot.findall('RoadNetwork'): for Logics in RoadNetwork.findall('LogicFile'): xodrFileName = Logics.attrib['filepath'] for SceneGraph in RoadNetwork.findall('SceneGraphFile'): osgbFileName = SceneGraph.attrib['filepath'] for Logics in RoadNetwork.findall('Logics'): xodrFileName = Logics.attrib['filepath'] for SceneGraph in RoadNetwork.findall('SceneGraph'): osgbFileName = SceneGraph.attrib['filepath'] return xodrFileName[xodrFileName.rindex("/") + 1:], osgbFileName[osgbFileName.rindex("/") + 1:] def formatThree(rootDirectory): """ xodr and osgb file path are fixed :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if ".xosc" == file[-5:]: # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.xodr" # 泛化效果好的场景用的 # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.osgb" # 泛化效果好的场景用的 # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.xodr" # 泛化效果好的场景用的 # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.osgb" # 泛化效果好的场景用的 xodrFilePath = "/home/hancheng/hc_project/2023-08-08-10-50-16_jsons1/2023-08-08-10-50-16_jsons_0/2023-08-08-10-50-16_jsons_0.xodr" # 直路,泛化好用 osgbFilePath = "/home/hancheng/VIRES/VTD.2022.1/Runtime/Tools/RodDistro_7001_Rod4.6.3/DefaultProject/Database/json0.opt.osgb" # 直路,泛化好用 # xodrFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.xodr" # 十字路口,泛化好用 # osgbFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.opt.osgb" # 十字路口,泛化好用 # xodrFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.xodr" # Sharing-van还原用的 # osgbFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.opt.osgb" # Sharing-van还原用的 path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def formatTwo(rootDirectory): """ data format: simulation file.xosc file.xodr file.osgb :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if ".xosc" == file[-5:]: xodrFilePath = "" osgbFilePath = "" for odrFile in os.listdir(root): if ".xodr" == odrFile[-5:]: xodrFilePath = root + "/" + odrFile break for osgbFile in os.listdir(root): if ".osgb" == osgbFile[-5:]: osgbFilePath = root + "/" + osgbFile break path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def formatOne(rootDirectory): """ data format: openx xosc file.xosc xodr file.xodr osgb file.osgb :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if "xosc" == file[-4:]: xodrFilePath = "" osgbFilePath = "" for odrFile in os.listdir(root[:-4] + "xodr"): if "xodr" == odrFile[-4:]: xodrFilePath = root[:-4] + "xodr/" + odrFile break for osgbFile in os.listdir(root[:-4] + "osgb"): if "osgb" == osgbFile[-4:]: osgbFilePath = root[:-4] + "osgb/" + osgbFile break path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def chongQingFormat(rootDirectory): """ supporting file format: chong qing folder format :return: """ counter = 1 for root, dirs, files in os.walk(rootDirectory): for file in files: if "xosc" == file[-4:]: if "ver1.0.xosc" == file[-11:]: xodrFileName, osgbFileName = readXML(root + "/" + file) xodrFilePath = "/Xodrs/" + xodrFileName osgbFilePath = "/Databases/" + osgbFileName path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print(counter, "Change success: " + root + "/" + file) else: xodrFileName, osgbFileName = readXML(root + "/" + file) xodrFilePath = "/Xodrs/" + xodrFileName osgbFilePath = "/Databases/" + osgbFileName path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print(counter, "Change success: " + root + "/" + file) counter += 1