#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Mar 14 15:44:01 2022 @author: lxj """ import xml.etree.ElementTree as ET from scenariogeneration import xodr from scenariogeneration import xosc from scenariogeneration import ScenarioGenerator from scenariogeneration.xodr import RoadSide, Object, ObjectType, Dynamic, Orientation import math, os # from xodr_generator2 import road_detector_new from datetime import datetime import traceback import json import pandas as pd from enum import Enum from xml.sax.saxutils import escape import warnings warnings.filterwarnings("ignore") xodr_list = ['/mnt/disk001/simulation_outdoor/thq_1116.xodr', '/mnt/disk001/simulation_outdoor/anqing.xodr'] osgb_list = ['/mnt/disk001/simulation_outdoor/thq_1116.opt.osgb', '/mnt/disk001/simulation_outdoor/anqing.osgb'] class Scenario(ScenarioGenerator): def __init__(self, gps, obs, gps_time, ego_speed, work_mode, period, abspath, timeofday, vehicle_type, map_id): ScenarioGenerator.__init__(self) self.gps = gps self.obs = obs self.gps_time = gps_time self.ego_speed = ego_speed self.work_mode = work_mode self.period = period # 场景持续的时间 self.abspath = abspath self.weather = xosc.PrecipitationType.dry # 默认是晴天 self.visual_fog_range = 20000 # 正常的能见度,没有雾 self.time = 43200 # 强制为白天 self.vehicle_type = vehicle_type self.map_id = map_id # self.map_id = map_id # self.ObjectID = ObjectID # self.Speed = Speed def road(self, **kwargs): positionEgo = self.gps planview = xodr.PlanView() nextx = positionEgo[0].x nexty = positionEgo[0].y h = float(positionEgo[0].h) # for i in range(len(positionEgo) - 1): # x = 0.000001 if abs(positionEgo[i].x) < 0.000001 else positionEgo[i].x # y = 0.000001 if abs(positionEgo[i].y) < 0.000001 else positionEgo[i].y # nextx = 0.000001 if abs(positionEgo[i + 1].x) < 0.000001 else positionEgo[i + 1].x # nexty = 0.000001 if abs(positionEgo[i + 1].y) < 0.000001 else positionEgo[i + 1].y # h = float(positionEgo[i].h) # # planview.add_fixed_geometry(xodr.Line(math.sqrt(math.pow(nextx - x, 2) + math.pow(nexty - y, 2))), x, y, h) planview.add_fixed_geometry(xodr.Line(100), nextx, nexty, h) # create two different roadmarkings rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid, 0.2) rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken, 0.2) # create simple lanes lanes = xodr.Lanes() lanesection1 = xodr.LaneSection(0, xodr.standard_lane(offset=1.75, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_left_lane(xodr.standard_lane(offset=3.5, rm=rm_solid)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_dashed)) lanesection1.add_right_lane(xodr.standard_lane(offset=3.5, rm=rm_solid)) lanes.add_lanesection(lanesection1) lanes.add_laneoffset(xodr.LaneOffset(a=1.75)) road = xodr.Road(0, planview, lanes) odr = xodr.OpenDrive('myroad') # 静态元素 tree = ET.parse(os.getcwd() + '/models/static_object_models.xodr') root = tree.getroot() objects_dict = root.iter(tag='object') road = create_static_object(road, objects_dict) # road = create_static_object(road, object_id=0, object_type=ObjectType.railing, repeat_flag=1) odr.add_road(road) odr.adjust_roads_and_lanes() return odr def get_section(self, left_lanemark): return None def split_road_section(self, enu_file, lane_file): lane_content = pd.read_csv(lane_file) # 把车道分割开来,一个车道段,多种geometry # 首先判断车道线数值是否有断裂,断裂处根据自车轨迹生成道路参数 left_lanemark = lane_content[lane_content['LaneID'] == 1][['Time','FrameID','LanePosition','LaneQuality']] right_lanemark = lane_content[lane_content['LaneID'] == 2][['Time','FrameID','LanePosition','LaneQuality']] left_break_section = self.get_section(left_lanemark, 1) right_break_section = self.get_section(right_lanemark, 1) break_section = [] normal_section = [] # 判断是否变道,然后判断是否弯道 # 最终目标是得到多个geometry参数,确定每个连接线处的s/t偏移量?,以及左右道路的个数和宽度 return None def check_outliers_and_modify(self, datalist): for index in datalist.index: if index < 3: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 else 2 * datalist[index + 1] - datalist[index + 1:index + 3].mean() elif 3 <= index < len(datalist) - 3: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index + 1:index + 3].mean()) < 0.5 or abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else (datalist[index - 1] + datalist[index + 1]) / 2 else: datalist.loc[index] = datalist[index] if abs(datalist[index] - datalist[index - 3:index].mean()) < 0.5 else 2 * datalist[index - 1] - datalist[index - 3:index].mean() return datalist def get_new_LaneID(self, change_flag, laneID, frameid, item): ''' 修改laneID,change_list包含了每个变道片段 ''' if item[0] <= frameid < item[1]: return laneID + change_flag else: return laneID def get_lane_width(self, data1, data2): lane_union = pd.merge(data1, data2, on=['FrameID']) lane_union['width'] = lane_union['LanePosition_y'] - lane_union['LanePosition_x'] a = lane_union["width"].quantile(0.75) b = lane_union["width"].quantile(0.25) lane_width = abs((a + b) / 2) return lane_width def road_pre_study(self, label_file, lane_file): laneDF = pd.read_csv(lane_file) road_label = '' flag = 0 # flag = 0 can use road_ego to generate .xodr, flag = 1 better use road to generate .xodr with open(label_file) as f: label = json.load(f) ego_actions = label['自车行为'] for item in ego_actions: if ('变道' in item['type_']) or ('转成功' in item['type_']) or ('掉头' in item['type_']): road_label = item['type_'] flag = 1 break else: road_label = item['type_'] if road_label == '向左变道成功': # 查找变道标记点 change_flag = 1 lane_1 = laneDF[(laneDF['LaneID'] == 1) & (laneDF['LaneQuality'] > 1)] lane_1 = lane_1.reset_index(drop=True) leftlanepositon = lane_1['LanePosition'] lane_1['LanePosition'] = self.check_outliers_and_modify(leftlanepositon) lane_1['last_Position'] = lane_1[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_1['last_FrameID'] = lane_1[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_1['change_flag'] = lane_1.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。 lane_1['split_flag'] = lane_1.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1) lane_2 = laneDF[(laneDF['LaneID'] == 2) & (laneDF['LaneQuality'] > 1)] lane_2 = lane_2.reset_index(drop=True) rightlanepositon = lane_2['LanePosition'] lane_2['LanePosition'] = self.check_outliers_and_modify(rightlanepositon) lane_2['last_Position'] = lane_2[['LanePosition']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_2['last_FrameID'] = lane_2[['FrameID']].shift(axis = 0, periods = 1) #axis指定移动的轴:0为行,1为列 lane_2['change_flag'] = lane_2.apply(lambda x: change_flag if (x['LanePosition'] - x['last_Position'] <= -3) and (x['FrameID'] - x['last_FrameID'] == 1) else 0, axis=1) # 默认情况下 axis=0 表示按列,axis=1 表示按行。 lane_2['split_flag'] = lane_2.apply(lambda x: 99 if x['FrameID'] - x['last_FrameID'] > 1 else 0, axis=1) lane_width = self.get_lane_width(lane_1[['FrameID','LanePosition']], lane_2[['FrameID','LanePosition']]) # 左变道用左车道线数据靠谱,不会有他车遮挡,找到变道数据段 change_lane_point = list(lane_1[lane_1['change_flag'] == change_flag]['change_flag'].index) + [list(lane_1.index)[-1]] change_lane_section = [(lane_1.loc[change_lane_point[i], 'FrameID'], lane_1.loc[change_lane_point[i+1], 'FrameID']) for i in range(len(change_lane_point)-1)] for item in change_lane_section: # 变道后修改原始laneID # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if item[0] <= x['FrameID'] < item[1] else x['LaneID'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if item[0] <= x['FrameID'] < item[1] else x['LanePosition'], axis=1) # laneDF['LaneID'] = laneDF.apply(lambda x: x['LaneID'] - change_flag if x['FrameID'] == max(laneDF['FrameID']) else x['LaneID'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition'] - change_flag * lane_width if x['FrameID'] == max(laneDF['FrameID']) else x['LanePosition'], axis=1) # 修正左左车道线和右右车道线数值 if change_flag == 1: lane_0 = laneDF[(laneDF['LaneID'] == 0) & (laneDF['LaneQuality'] > 1) & (laneDF['FrameID'] < item[0])] if not lane_0.empty: left_lane_width = self.get_lane_width(lane_0[['FrameID','LanePosition']], lane_1[['FrameID','LanePosition']]) else: left_lane_width = lane_width print(left_lane_width) # new_lane_1 = laneDF[laneDF['LaneID'] == 1][['FrameID','LanePosition']] # new_lane_1.rename(columns={'LanePosition':'LanePosition2'},inplace=True) # laneDF = pd.merge(laneDF, new_lane_1, on=['FrameID']) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if (item[0] <= x['FrameID'] < item[1]) and (x['LaneID'] == -1) else x['LanePosition'], axis=1) # laneDF['LanePosition'] = laneDF.apply(lambda x: x['LanePosition2'] - left_lane_width if x['FrameID'] == max(laneDF['FrameID']) and (x['LaneID'] == 0) else x['LanePosition'], axis=1) # laneDF['LaneID'] = laneDF.apply(lambda x: 3 if x['LaneID'] == -1 else x['LaneID'], axis=1) # 缺右车道线right_lane_width处理 print(laneDF[laneDF['FrameID']>1860][['FrameID', 'LaneID', 'LanePosition']].head(50)) print('road_label: ', road_label) return laneDF # def road_new(self, **kwargs): # polyfit_fig_path = os.path.join(self.abspath, 'simulation','polyfit.jpg') # # # run road detector # enu_file = os.path.join(self.abspath, 'pos.csv') # lane_file = os.path.join(self.abspath, 'lane.csv') # label_file = os.path.join(self.abspath, 'label.json') # # # study road style: straight\curve\split-line; ego action: change-lane\no-change\cross-turn # lane_content = self.road_pre_study(label_file, lane_file) # # # road split sections # # self.split_road_section(enu_file, lane_file) # # try: # laneid_tuple = [0, 1, 2, 3] # uv_coord_info, road_info, left_lane_info, right_lane_info = road_detector_new(enu_file, lane_content, laneid_tuple, polyfit_fig_path) # except: # print('road detection failed!!!! Use ego trail road.') # error = {'time':datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],'traceback':traceback.format_exc()} # with open('error.log','a+') as f: # json.dump(error, f, indent=4) # f.write('\n') # return self.road_ego() # # origin_x = uv_coord_info[0] # origin_y = uv_coord_info[1] # uv_coord_hdg = uv_coord_info[2] # # reference_line_offset = road_info[0] # model_u = road_info[1] # in fomrat [c3, c2, c1, c0] # model_v = road_info[2] # in fomrat [c3, c2, c1, c0] # road_length = road_info[3] # print(uv_coord_info) # print(road_info) # # generate xodr file # planview = xodr.PlanView(origin_x, origin_y, uv_coord_hdg) # # # Create geometry and add it to the planview # # parampoly3 = xodr.ParamPoly3(au=model_u[3], bu=model_u[2], cu=model_u[1], du=model_u[0], av=model_v[3], bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length) # parampoly3 = xodr.ParamPoly3(au=0, bu=model_u[2], cu=model_u[1], du=model_u[0], av=0, bv=model_v[2], cv=model_v[1], dv=model_v[0], prange='arcLength',length=road_length) # planview.add_geometry(parampoly3) # # # create two different roadmarkings # rm_solid = xodr.RoadMark(xodr.RoadMarkType.solid,0.2) # rm_dashed = xodr.RoadMark(xodr.RoadMarkType.broken,0.2,laneChange=xodr.LaneChange.both) # # ##4. Create centerlane # centerlane = xodr.Lane(a=reference_line_offset) # centerlane.add_roadmark(rm_dashed) # # ##5. Create lane section form the centerlane # lanesec = xodr.LaneSection(0,centerlane) # # ##6. Create left and right lanes # lane_id = 0 # for lane in left_lane_info: # lane_id += 1 # lane_width = lane[0] # spd_limit = lane[1] # lane_left = xodr.Lane(a=lane_width) # lane_left.add_roadmark(rm_dashed) # # Add lanes to lane section # lanesec.add_left_lane(lane_left) # # lane_id = 0 # for lane in right_lane_info: # lane_id -= 1 # lane_width = lane[0] # spd_limit = lane[1] # lane_right = xodr.Lane(a=lane_width) # lane_right.add_roadmark(rm_dashed) # # Add lanes to lane section # lanesec.add_right_lane(lane_right) # # ##8. Add lane section to Lanes # lanes = xodr.Lanes() # lanes.add_laneoffset(xodr.LaneOffset(s = 0, a=reference_line_offset)) # lanes.add_lanesection(lanesec) # # # ##9. Create Road from Planview and Lanes # road = xodr.Road(1,planview,lanes) # # ##10. Create the OpenDrive class (Master class) # odr = xodr.OpenDrive('myroad') # # ##11. Finally add roads to Opendrive # odr.add_road(road) # # ##12. Adjust initial positions of the roads looking at succ-pred logic # odr.adjust_roads_and_lanes() # # # ##13. Print the .xodr file # # xodr.prettyprint(odr.get_element()) # # # ##14. Run the .xodr with esmini # # xodr.run_road(odr,os.path.join('..','pyoscx','esmini')) # # return odr def getXoscPosition(egodata, t, e, n, h, z, obj_type, offset_x, offset_y, offset_h, start_time): '将dataframe的轨迹转换为场景文件的轨迹数据, Time, ego_e, ego_n, headinga' position = [] time = [] # cur_time = 0.0 egodata = egodata[[t, e, n, h, z, obj_type]] egodata = egodata.reset_index(drop=True) #egodata[t] = egodata.index / 10 #egodata[t] = (egodata[t] - start_time) / 1000 egodata[t] = egodata[t] egodata[e] = egodata[e] + offset_x egodata[n] = egodata[n] + offset_y egodata[h] = egodata[h] + offset_h egodata[h] = egodata.apply(lambda x: x[h] if x[h] < 360 else x[h] - 360, axis=1) type_list = list(egodata[obj_type]) ego_type = int(max(type_list, key=type_list.count)) lasth = float(egodata.at[0, h]) init_flag = True for row in egodata.iterrows(): hhh = math.radians(row[1][h]) if init_flag: position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0)) init_flag = False else: if float(row[1][h]) - lasth > 300: hhh = math.radians(float(row[1][h]) - 360) elif float(row[1][h]) - lasth < -300: hhh = math.radians(float(row[1][h]) + 360) position.append(xosc.WorldPosition(x=float(row[1][e]), y=float(row[1][n]), z=float(row[1][z]), h=hhh, p=0, r=0)) lasth = float(row[1][h]) time.append(float(row[1][t])) # time.append(cur_time) # cur_time += 0.1 return position, time, ego_type def scenario(self, **kwargs): road = xosc.RoadNetwork(roadfile=xodr_list[self.map_id], scenegraph=osgb_list[self.map_id]) catalog = xosc.Catalog() catalog.add_catalog('VehicleCatalog', 'Distros/Current/Config/Players/Vehicles') catalog.add_catalog('PedestrianCatalog', 'Distros/Current/Config/Players/Pedestrians') catalog.add_catalog('MiscObjectCatalog', 'Distros/Current/Config/Players/Objects') catalog.add_catalog('ControllerCatalog', 'Distros/Current/Config/Players/driverCfg.xml') pbb = xosc.BoundingBox(0.5, 0.5, 1.8, 2.0, 0, 0.9) mbb = xosc.BoundingBox(1, 2, 1.7, 1.5, 0, 0.9) bb = xosc.BoundingBox(2.1, 4.5, 1.8, 1.5, 0, 0.9) bus_bb = xosc.BoundingBox(2.55, 10.5, 3.1, 5.1, 0, 1.45) bus_fa = xosc.Axle(0.48, 0.91, 2.5, 4.5, 1.5) bus_ba = xosc.Axle(0, 0.91, 2.5, 4.5, 1.5) truck_bb = xosc.BoundingBox(2.8, 8.744, 3.78, 4.372, 0, 1.89) truck_fa = xosc.Axle(0.48, 1, 2.8, 4.372, 1.5) truck_ba = xosc.Axle(0, 1, 2.8, 4.372, 1.5) fa = xosc.Axle(0.5, 0.6, 1.8, 3.1, 0.3) ba = xosc.Axle(0, 0.6, 1.8, 0, 0.3) cone_bb = xosc.BoundingBox(0.5, 0.5, 1.0, 2.0, 0, 0.9) red_veh = xosc.Vehicle('Audi_A3_2009_black', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) red_veh.add_property(name='control', value='external') segway = xosc.Vehicle('Segway', xosc.VehicleCategory.car, pbb, fa, ba, 69.444, 200, 10) PuJin_patrol_robot = xosc.Vehicle('PuJin_patrol_robot', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) PuJin_distribution = xosc.Vehicle('PuJin_distribution', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) white_veh = xosc.Vehicle('Audi_A3_2009_red', xosc.VehicleCategory.car, bb, fa, ba, 69.444, 200, 10) male_ped = xosc.Pedestrian('Christian', 'male_adult', 70, xosc.PedestrianCategory.pedestrian, pbb) motorcycle = xosc.Vehicle('Kawasaki_ZX-9R_black', xosc.VehicleCategory.motorbike, mbb, fa, ba, 69.444, 200, 10) motorcycle.add_property(name='control', value='external') truck = xosc.Vehicle('MANTGS_11_Black', xosc.VehicleCategory.truck, truck_bb, truck_fa, truck_ba, 26.4, 10, 10) bus = xosc.Vehicle('MercedesTravego_10_ArcticWhite', xosc.VehicleCategory.bus, bus_bb, bus_fa, bus_ba, 29, 10, 10) cone = xosc.MiscObject('RdMiscPylon03-32cm', 10, xosc.MiscObjectCategory.obstacle, cone_bb) prop = xosc.Properties() cnt = xosc.Controller('DefaultDriver', prop) cnt2 = xosc.Controller('No Driver', prop) # 添加自车实体 egoname = 'Ego' entities = xosc.Entities() if self.vehicle_type == "0": entities.add_scenario_object(egoname, PuJin_distribution, cnt) else: entities.add_scenario_object(egoname, PuJin_patrol_robot, cnt) # 添加目标车实体 objname = 'Player' ped_type, car_type, bicycle_motor_type, bus_type, truck_type, cone_type = get_obj_type(self.work_mode) positionEgo = self.gps positionObj = self.obs for i, row in enumerate(positionObj): if len(row[1]) < 1: continue obj_type = row[2] # 根据目标物类型添加对应的实体 if obj_type in ped_type: entities.add_scenario_object(objname + str(i), segway) elif obj_type in car_type: entities.add_scenario_object(objname + str(i), white_veh, cnt2) elif obj_type in bicycle_motor_type: entities.add_scenario_object(objname + str(i), motorcycle, cnt2) elif obj_type in bus_type: entities.add_scenario_object(objname + str(i), bus, cnt2) elif obj_type in truck_type: entities.add_scenario_object(objname + str(i), truck, cnt2) elif obj_type in cone_type: entities.add_scenario_object(objname + str(i), cone) # 初始化 init = xosc.Init() step_time = xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 0) egospeed = xosc.AbsoluteSpeedAction(self.ego_speed, step_time) # 设置初始环境要素,例如天气时间等 default_precipitation = xosc.PrecipitationType.dry default_cloudstate = xosc.CloudState.free # json_path = os.path.join(self.abspath, 'label.json') # with open(json_path, 'r', encoding='utf-8') as f: # sc_json = json.load(f) # weather = sc_json['天气情况'][0]['type_'] #['自然环境'] # if weather == '晴': # f.close() # elif weather == '多云': # default_cloudstate = xosc.CloudState.cloudy # elif weather == '阴': # default_cloudstate = xosc.CloudState.overcast # elif weather == '雨': # default_cloudstate = xosc.CloudState.overcast # default_precipitation = xosc.PrecipitationType.rain # elif weather == '雪': # default_cloudstate = xosc.CloudState.overcast # default_precipitation = xosc.PrecipitationType.snow # elif weather == '雾霾' or weather == '沙尘': # self.visual_fog_range = 700 # default_cloudstate = xosc.CloudState.cloudy init.add_global_action( xosc.EnvironmentAction(name="InitEnvironment", environment=xosc.Environment( xosc.TimeOfDay(True, 2019, 12, 19, 12, 0, 0), xosc.Weather( sun_intensity=2000, sun_azimuth=40, sun_elevation=20, cloudstate=default_cloudstate, precipitation=default_precipitation, precipitation_intensity=1, visual_fog_range=self.visual_fog_range), xosc.RoadCondition(friction_scale_factor=0.7)))) # ego car init ego_init_h = positionEgo[0].h init.add_init_action(egoname, xosc.TeleportAction( xosc.WorldPosition(x=positionEgo[0].x, y=positionEgo[0].y, z=positionEgo[0].z, h=ego_init_h, p=0, r=0))) init.add_init_action(egoname, egospeed) # ego car trail trajectory = xosc.Trajectory('egoTrajectory', False) polyline = xosc.Polyline(self.gps_time, positionEgo) trajectory.add_shape(polyline) speedaction = xosc.FollowTrajectoryAction(trajectory, xosc.FollowMode.position, xosc.ReferenceContext.absolute, 1, 0) trigger = xosc.ValueTrigger('drive_start_trigger', 0, xosc.ConditionEdge.rising, xosc.SimulationTimeCondition(0, xosc.Rule.greaterThan)) trigger_init = xosc.ValueTrigger(name='init_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=0, rule=xosc.Rule.greaterThan)) event_init = xosc.Event('Event_init', xosc.Priority.overwrite) action_light = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0) light_flag = 'auto' if self.time == 64800: light_flag = 'true' text = f'' node = ET.Element("CustomCommandAction") node.attrib = {'type': 'scp'} node.text = f"" action_light.add_element(node) event_init.add_action('headlight', action_light) event_init.add_trigger(trigger_init) event = xosc.Event('Event1', xosc.Priority.overwrite) event.add_trigger(trigger) event.add_action('newspeed', speedaction) man = xosc.Maneuver('my maneuver') man.add_event(event_init) man.add_event(event) mangr = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True) mangr.add_actor('Ego') mangr.add_maneuver(man) trigger0 = xosc.Trigger('start') act = xosc.Act('Act1', trigger0) act.add_maneuver_group(mangr) ego_story = xosc.Story('mystory_ego') ego_story.add_act(act) stop_trigger = xosc.ValueTrigger('stop_trigger', 0, xosc.ConditionEdge.none, xosc.SimulationTimeCondition(self.period, xosc.Rule.greaterThan), 'stop') sb = xosc.StoryBoard(init, stoptrigger=stop_trigger) sb.add_story(ego_story) # object car trail if positionObj: for i, row in enumerate(positionObj): name = objname + str(i) if len(row[1]) < 2: continue obj_type = row[2] # 根据目标车需要动态创建实体 x = row[0][0].x y = row[0][0].y z = row[0][0].z h = row[0][0].h start_time = row[1][0] init_position = xosc.WorldPosition(x=x, y=y, z=z, h=h, p=0, r=0) newentity = xosc.AddEntityAction(name, init_position) newentity_trigger = xosc.ValueTrigger(name='newentity_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=start_time, rule=xosc.Rule.greaterThan)) newentity_event = xosc.Event('Event_newentity', xosc.Priority.overwrite) newentity_event.add_trigger(newentity_trigger) newentity_event.add_action('newentity_action', newentity) man_obj = xosc.Maneuver('my maneuver') man_obj.add_event(newentity_event) if obj_type != 10: # 添加目标车轨迹 trajectoryM = xosc.Trajectory('objectTrajectory', False) polylineM = xosc.Polyline(row[1], row[0]) trajectoryM.add_shape(polylineM) obj_trail_action = xosc.FollowTrajectoryAction(trajectory=trajectoryM, following_mode=xosc.FollowMode.position, reference_domain=xosc.ReferenceContext.absolute, scale=1, offset=0, initialDistanceOffset=0) obj_trail_event = xosc.Event('Event1', xosc.Priority.overwrite) # obj_start_trigger = xosc.EntityTrigger("obj-start-trigger", 0, xosc.ConditionEdge.rising, xosc.SpeedCondition(0, xosc.Rule.greaterThan),'Ego') obj_start_trigger = xosc.ValueTrigger('obj_start_trigger', 0, xosc.ConditionEdge.rising, xosc.SimulationTimeCondition(start_time, xosc.Rule.greaterThan)) obj_trail_event.add_trigger(obj_start_trigger) obj_trail_event.add_action('trail_action', obj_trail_action) # if obj_type in ped_type: # walp_trigger = xosc.EntityTrigger('ped_walp_trigger', 0, xosc.ConditionEdge.rising,\ # xosc.TimeToCollisionCondition(self.intersectime, xosc.Rule.lessThan, entity=name), 'Ego') # obj_trail_event.add_trigger(walp_trigger) man_obj.add_event(obj_trail_event) # 行人特有的事件 if obj_type in ped_type: ped_event = xosc.Event('Event_ped', xosc.Priority.overwrite) ped_event.add_trigger(obj_start_trigger) ped_action = xosc.CustomCommandAction(0, 0, 0, 0, 1, 0, 0) ped_action.add_element(self.createUDAction(name)) ped_event.add_action('ped_trail_action', ped_action) man_obj.add_event(ped_event) # finish_trigger = xosc.EntityTrigger('finish_trigger', 0, xosc.ConditionEdge.rising, xosc.ReachPositionCondition(position=row[1][0],tolerance=1),name) # event4 = xosc.Event('Event_ped',xosc.Priority.overwrite) # event4.add_trigger(finish_trigger) # be_still_action = xosc.AbsoluteSpeedAction(0, xosc.TransitionDynamics(xosc.DynamicsShapes.step, xosc.DynamicsDimension.time, 1)) # event4.add_action('ped_be_still_action',be_still_action) # man_obj.add_event(event4) # 轨迹最后删除实体 last_time = row[1][-1] del_action = xosc.DeleteEntityAction(name) del_trigger = xosc.ValueTrigger(name='entity_del_trigger', delay=0, conditionedge=xosc.ConditionEdge.rising, valuecondition=xosc.SimulationTimeCondition(value=last_time, rule=xosc.Rule.greaterThan)) del_event = xosc.Event('Event_del', xosc.Priority.overwrite) del_event.add_trigger(del_trigger) del_event.add_action('del_action', del_action) man_obj.add_event(del_event) obj_mangroup = xosc.ManeuverGroup('mangroup', selecttriggeringentities=True) obj_mangroup.add_actor(name) obj_mangroup.add_maneuver(man_obj) obj_act = xosc.Act('Act2', trigger0) obj_act.add_maneuver_group(obj_mangroup) obj_story = xosc.Story('mystory_' + name) obj_story.add_act(obj_act) sb.add_story(obj_story) # prettyprint(sb.get_element()) paramet = xosc.ParameterDeclarations() sce = xosc.Scenario('my scenario', 'Maggie', paramet, entities, sb, road, catalog) return sce def createUDAction(self, name): # tree = ET.parse('./models/ped_CDATA.xosc') # root = tree.getroot() # ele = root[5][2][1][0][1][1][0][0][0] # newnode = ET.Element("CustomCommandAction") # newnode.attrib = {'type': 'scp'} # newnode.text = '' speed = 3 motion = 'walk' # text = f'\n \n' text = '\n \n'.format(name, motion, speed) # text = '<Traffic> <ActionMotion actor="Player0" move="walk" speed="3" force="false"rate="0" delayTime="0.0" activateOnExit="false"/></Traffic>'.format(name, motion, speed) newnode = ET.Element("CustomCommandAction") newnode.attrib = {'type': 'scp'} newnode.text = "".format(text) return newnode class WorkMode(Enum): cicv = 0 # CICV车端 roadside = 1 # 车网路端 car = 2 # 车网车端 merge = 3 # 车网车端路端融合 def get_obj_type(mode): if mode == WorkMode.cicv.value: # CICV车端 ped_type = [7] car_type = [2] bicycle_motor_type = [8] bus_type = [3] truck_type = [4] cone_type = [103] elif mode == WorkMode.roadside.value: # 车网路端 ped_type = [0] car_type = [2] bicycle_motor_type = [1, 3] bus_type = [5] truck_type = [7] cone_type = [103] elif mode == WorkMode.car.value: # 车网车端 ped_type = [1] car_type = [6] bicycle_motor_type = [2, 3, 4, 5] bus_type = [7, 8] truck_type = [9] cone_type = [103] else: # 车网车端路端融合 ped_type = [1] car_type = [6] bicycle_motor_type = [2, 3, 4, 5] bus_type = [7, 8] truck_type = [9] cone_type = [103] return ped_type, car_type, bicycle_motor_type, bus_type, truck_type, cone_type # 创建道路旁静止的场景 def create_static_object(road, object_dict): for single_object in object_dict: for k, v in ObjectType.__members__.items(): if k == single_object.attrib['type']: single_object.attrib['type'] = v break road_object = Object( s=single_object.attrib['s'], t=single_object.attrib['t'], Type=single_object.attrib['type'], dynamic=Dynamic.no, id=single_object.attrib['id'], name=single_object.attrib['name'], zOffset=single_object.attrib['zOffset'], validLength=single_object.attrib['validLength'], orientation=Orientation.none, length=single_object.attrib['length'], width=single_object.attrib['width'], height=single_object.attrib['height'], pitch=single_object.attrib['pitch'], roll=single_object.attrib['roll'] ) # 判断此object是否是重复的元素 repeat = single_object.find('repeat') if repeat is not None: road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.left, tOffset=1.75) road.add_object_roadside(road_object_prototype=road_object, repeatDistance=0, side=RoadSide.right, tOffset=-1.755) else: road.add_object(road_object) return road def change_CDATA(filepath): '行人场景特例,对xosc文件内的特殊字符做转换' f = open(filepath, "r", encoding="UTF-8") txt = f.readline() all_line = [] # txt是否为空可以作为判断文件是否到了末尾 while txt: txt = txt.replace("<", "<").replace(">", ">").replace("&", "&").replace(""", '"').replace( "'", "'") all_line.append(txt) # 读取文件的下一行 txt = f.readline() f.close() f1 = open(filepath, 'w', encoding="UTF-8") for line in all_line: f1.write(line) f1.close() def path_changer(xosc_path, xodr_path, osgb_path): """ provided by Dongpeng Ding :param xosc_path: :param xodr_path: :param osgb_path: :return: """ tree = ET.parse(xosc_path) treeRoot = tree.getroot() # for OpenScenario v0.9, v1.0 for RoadNetwork in treeRoot.findall('RoadNetwork'): for Logics in RoadNetwork.findall('LogicFile'): Logics.attrib['filepath'] = xodr_path for SceneGraph in RoadNetwork.findall('SceneGraphFile'): SceneGraph.attrib['filepath'] = osgb_path for Logics in RoadNetwork.findall('Logics'): Logics.attrib['filepath'] = xodr_path for SceneGraph in RoadNetwork.findall('SceneGraph'): SceneGraph.attrib['filepath'] = osgb_path # for VTD xml for Layout in treeRoot.findall('Layout'): Layout.attrib['File'] = xodr_path Layout.attrib['Database'] = osgb_path tree.write(xosc_path, xml_declaration=True) def readXML(xoscPath): xodrFileName = "" osgbFileName = "" tree = ET.parse(xoscPath) treeRoot = tree.getroot() for RoadNetwork in treeRoot.findall('RoadNetwork'): for Logics in RoadNetwork.findall('LogicFile'): xodrFileName = Logics.attrib['filepath'] for SceneGraph in RoadNetwork.findall('SceneGraphFile'): osgbFileName = SceneGraph.attrib['filepath'] for Logics in RoadNetwork.findall('Logics'): xodrFileName = Logics.attrib['filepath'] for SceneGraph in RoadNetwork.findall('SceneGraph'): osgbFileName = SceneGraph.attrib['filepath'] return xodrFileName[xodrFileName.rindex("/") + 1:], osgbFileName[osgbFileName.rindex("/") + 1:] def formatThree(rootDirectory): """ xodr and osgb file path are fixed :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if ".xosc" == file[-5:]: # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.xodr" # 泛化效果好的场景用的 # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Rd_001.osgb" # 泛化效果好的场景用的 # xodrFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.xodr" # 泛化效果好的场景用的 # osgbFilePath = "/volume4T/goodscenarios/generalization/toyiqi/model/Cross.osgb" # 泛化效果好的场景用的 xodrFilePath = "/home/hancheng/hc_project/2023-08-08-10-50-16_jsons1/2023-08-08-10-50-16_jsons_0/2023-08-08-10-50-16_jsons_0.xodr" # 直路,泛化好用 osgbFilePath = "/home/hancheng/VIRES/VTD.2022.1/Runtime/Tools/RodDistro_7001_Rod4.6.3/DefaultProject/Database/json0.opt.osgb" # 直路,泛化好用 # xodrFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.xodr" # 十字路口,泛化好用 # osgbFilePath = "/home/lxj/wendang_lxj/L4/L4_scenarios/piliang_model/China_Crossing_002.opt.osgb" # 十字路口,泛化好用 # xodrFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.xodr" # Sharing-van还原用的 # osgbFilePath = "/home/lxj/wendang_lxj/Sharing_VAN/homework/test/DF_yuexiang_1224.opt.osgb" # Sharing-van还原用的 path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def formatTwo(rootDirectory): """ data format: simulation file.xosc file.xodr file.osgb :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if ".xosc" == file[-5:]: xodrFilePath = "" osgbFilePath = "" for odrFile in os.listdir(root): if ".xodr" == odrFile[-5:]: xodrFilePath = root + "/" + odrFile break for osgbFile in os.listdir(root): if ".osgb" == osgbFile[-5:]: osgbFilePath = root + "/" + osgbFile break path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def formatOne(rootDirectory): """ data format: openx xosc file.xosc xodr file.xodr osgb file.osgb :return: """ for root, dirs, files in os.walk(rootDirectory): for file in files: if "xosc" == file[-4:]: xodrFilePath = "" osgbFilePath = "" for odrFile in os.listdir(root[:-4] + "xodr"): if "xodr" == odrFile[-4:]: xodrFilePath = root[:-4] + "xodr/" + odrFile break for osgbFile in os.listdir(root[:-4] + "osgb"): if "osgb" == osgbFile[-4:]: osgbFilePath = root[:-4] + "osgb/" + osgbFile break path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print("Change success: " + root + "/" + file) def chongQingFormat(rootDirectory): """ supporting file format: chong qing folder format :return: """ counter = 1 for root, dirs, files in os.walk(rootDirectory): for file in files: if "xosc" == file[-4:]: if "ver1.0.xosc" == file[-11:]: xodrFileName, osgbFileName = readXML(root + "/" + file) xodrFilePath = "/Xodrs/" + xodrFileName osgbFilePath = "/Databases/" + osgbFileName path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print(counter, "Change success: " + root + "/" + file) else: xodrFileName, osgbFileName = readXML(root + "/" + file) xodrFilePath = "/Xodrs/" + xodrFileName osgbFilePath = "/Databases/" + osgbFileName path_changer(root + "/" + file, xodrFilePath, osgbFilePath) print(counter, "Change success: " + root + "/" + file) counter += 1