import math import numpy as np import pandas as pd from modules.lib import log_manager from modules.lib.score import Score from modules.config import config class OvertakingViolation(object): """超车违规类""" def __init__(self, df_data): print("超车违规类初始化中...") self.traffic_violations_type = "超车违规类" # self.logger = log.get_logger() # 使用时再初始化 self.data = df_data self.data_ego = df_data.ego_data self.ego_data = ( self.data_ego[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame header = self.ego_data.columns self.overtake_on_right_count = 0 self.overtake_when_turn_around_count = 0 self.overtake_when_passing_car_count = 0 self.overtake_in_forbid_lane_count = 0 self.overtake_in_ramp_count = 0 self.overtake_in_tunnel_count = 0 self.overtake_on_accelerate_lane_count = 0 self.overtake_on_decelerate_lane_count = 0 self.overtake_in_different_senerios_count = 0 def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy): lane_start = lane_id[0] lane_end = lane_id[-1] start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0 end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0 return lane_start == lane_end and start_condition and end_condition def _is_dxy_of_car(self, ego_df, obj_df): """ :param df: objstate.csv and so on :param string_type: posX/Y or speedX/Y and so on :return: dataframe of dx/y and so on """ car_dx = obj_df["posX"].values - ego_df["posX"].values car_dy = obj_df["posY"].values - ego_df["posY"].values return car_dx, car_dy # 在前车右侧超车、会车时超车、前车掉头时超车 def _is_objectcar_of_overtake(self, obj_df): if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and ( (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX' ].values *obj_df['speedX'].values).any() >= 0): return obj_df else: return None def _is_passingcar(self, obj_df): if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and ( (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX'].values * obj_df['speedX'].values).any() < 0): return obj_df else: return None def illegal_overtake_with_car(self, window_width=250): # 获取csv文件中最短的帧数 frame_id_length = len(self.ego_data["simFrame"]) start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数 for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) while (start_frame_id + window_width) < frame_id_length: simframe_window1 = list( np.arange(start_frame_id, start_frame_id + window_width) ) simframe_window = list(map(int, simframe_window1)) # 读取滑动窗口的dataframe数据 ego_data_frames = self.ego_data[ self.ego_data["simFrame"].isin(simframe_window) ] obj_data_frames = obj_data[ obj_data["simFrame"].isin(simframe_window) ] # 读取前后的laneId lane_id = ego_data_frames["lane_id"].tolist() # 读取前后方向盘转角steeringWheel, driverctrl_start_state = ego_data_frames["posH"].iloc[0] driverctrl_end_state = ego_data_frames["posH"].iloc[-1] # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames) ego_speedx = ego_data_frames["speedX"].tolist() ego_speedy = ego_data_frames["speedY"].tolist() obj_speedx = obj_data_frames[obj_data_frames["playerId"] == i][ "speedX" ].tolist() obj_speedy = obj_data_frames[obj_data_frames["playerId"] == i][ "speedY" ].tolist() for j in self.data.obj_id_list: data_pass_obj = self.data.obj_data[j] obj_pass_datas = self._is_passingcar(data_pass_obj) if obj_pass_datas: other_data_frames = obj_pass_datas[ obj_pass_datas["simFrame"].isin(simframe_window) ] if len(other_data_frames) > 0: other_start_speedx = other_data_frames["speedX"].iloc[0] other_start_speedy = other_data_frames["speedY"].iloc[0] if ( ego_speedx[0] * other_start_speedx + ego_speedy[0] * other_start_speedy < 0 ): self.overtake_when_passing_car_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width """ 如果滑动窗口开始和最后的laneid一致; 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左); 自车和前车的位置发生的交换; 则认为右超车 """ if driverctrl_start_state > 0 and driverctrl_end_state < 0: self.overtake_on_right_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width elif (len(ego_speedx ) *len(obj_speedx) > 0) and \ (ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0): self.overtake_when_turn_around_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width else: start_frame_id += 1 # print( # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次") # 借道超车场景 def overtake_in_forbid_lane(self): for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) simTime = obj_data["simTime"].tolist() simtime_devide = self.different_road_area_simtime(simTime) if len(simtime_devide) == 0: self.overtake_in_forbid_lane_count += 0 return else: for simtime in simtime_devide: lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)] try: lane_type = lane_overtake["lane_type"].tolist() if (50002 in lane_type and len(set(lane_type)) > 2) or ( 50002 not in lane_type and len(set(lane_type)) > 1 ): self.overtake_in_forbid_lane_count += 1 except Exception as e: print("数据缺少lane_type信息") # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次") # 在匝道超车 def overtake_in_ramp_area(self): ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][ "simTime" ].tolist() if len(ramp_simtime_list) == 0: self.overtake_in_ramp_count += 0 return else: ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list) for ramp_simtime in ramp_simTime_list: for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) lane_id = self.ego_data["lane_id"].tolist() ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)] objstate_in_ramp = obj_data[ obj_data["simTime"].isin(ramp_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp) ego_speedx = ego_in_ramp["speedX"].tolist() ego_speedy = ego_in_ramp["speedY"].tolist() if len(lane_id) > 0: self.overtake_in_ramp_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue # print(f"在匝道超车{self.overtake_in_ramp_count}次") def overtake_in_tunnel_area(self): tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][ "simTime" ].tolist() if len(tunnel_simtime_list) == 0: self.overtake_in_tunnel_count += 0 return else: tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list) for tunnel_simtime in tunnel_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)] for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) objstate_in_tunnel = obj_data[ obj_data["simTime"].isin(tunnel_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel) ego_speedx = ego_in_tunnel["speedX"].tolist() ego_speedy = ego_in_tunnel["speedY"].tolist() if len(lane_id) > 0: self.overtake_in_tunnel_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue # print(f"在隧道超车{self.overtake_in_tunnel_count}次") # 加速车道超车 def overtake_on_accelerate_lane(self): accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][ "simTime" ].tolist() if len(accelerate_simtime_list) == 0: self.overtake_on_accelerate_lane_count += 0 return else: accelerate_simTime_list = self.different_road_area_simtime( accelerate_simtime_list ) for accelerate_simtime in accelerate_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_accelerate = self.ego_data[ self.ego_data["simTime"].isin(accelerate_simtime) ] for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) objstate_in_accelerate = obj_data[ obj_data["simTime"].isin(accelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate) ego_speedx = ego_in_accelerate["speedX"].tolist() ego_speedy = ego_in_accelerate["speedY"].tolist() self.overtake_on_accelerate_lane_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次") # 减速车道超车 def overtake_on_decelerate_lane(self): decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][ "simTime" ].tolist() if len(decelerate_simtime_list) == 0: self.overtake_on_decelerate_lane_count += 0 return else: decelerate_simTime_list = self.different_road_area_simtime( decelerate_simtime_list ) for decelerate_simtime in decelerate_simTime_list: lane_id = self.ego_data["id"].tolist() ego_in_decelerate = self.ego_data[ self.ego_data["simTime"].isin(decelerate_simtime) ] for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) objstate_in_decelerate = obj_data[ obj_data["simTime"].isin(decelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate) ego_speedx = ego_in_decelerate["speedX"].tolist() ego_speedy = ego_in_decelerate["speedY"].tolist() self.overtake_on_decelerate_lane_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次") # 在交叉路口 def overtake_in_different_senerios(self): crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][ "simTime" ].tolist() # 判断是路口或者隧道区域 if len(crossroad_simTime) == 0: self.overtake_in_different_senerios_count += 0 return else: # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据 crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)] for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: obj_data = ( obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True) ) crossroad_objstate = obj_data[ obj_data["simTime"].isin(crossroad_simTime) ] # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)] # 读取前后的laneId lane_id = crossroad_ego["lane_id"].tolist() # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate) ego_speedx = crossroad_ego["speedX"].tolist() ego_speedy = crossroad_ego["speedY"].tolist() """ 如果滑动窗口开始和最后的laneid一致; 自车和前车的位置发生的交换; 则认为发生超车 """ if len(lane_id) > 0: self.overtake_in_different_senerios_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: pass # print(f"在路口超车{self.overtake_in_different_senerios_count}次") def statistic(self): for i in self.data.obj_id_list: if i != 1: data_obj = self.data.obj_data[i] else: continue obj_datas = self._is_objectcar_of_overtake(data_obj) if obj_datas is not None: self.overtake_in_forbid_lane() self.overtake_on_decelerate_lane() self.overtake_on_accelerate_lane() self.overtake_in_ramp_area() self.overtake_in_tunnel_area() self.overtake_in_different_senerios() self.illegal_overtake_with_car() else: pass self.calculated_value = { "overtake_on_right": self.overtake_on_right_count, "overtake_when_turn_around": self.overtake_when_turn_around_count, "overtake_when_passing_car": self.overtake_when_passing_car_count, "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count, "overtake_in_ramp": self.overtake_in_ramp_count, "overtake_in_tunnel": self.overtake_in_tunnel_count, "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count, "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count, "overtake_in_different_senerios": self.overtake_in_different_senerios_count, } # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}") return self.calculated_value class SlowdownViolation(object): """减速让行违规类""" def __init__(self, df_data): print("减速让行违规类-------------------------") self.traffic_violations_type = "减速让行违规类" self.object_items = [] self.data = df_data.ego_data self.ego_data = ( self.data[config.SLOWDOWN_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame self.pedestrian_data = pd.DataFrame() self.object_items = set(df_data.object_df.type.tolist()) if 13 in self.object_items: # 行人的type是13 self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13] self.pedestrian_data = ( self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True) ) self.slow_down_in_crosswalk_count = 0 self.avoid_pedestrian_in_crosswalk_count = 0 self.avoid_pedestrian_in_the_road_count = 0 self.aviod_pedestrian_when_turning_count = 0 def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.6): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def slow_down_in_crosswalk(self): # 筛选出路口或隧道区域的时间点 crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() if len(crosswalk_simTime) == 0: self.slow_down_in_crosswalk_count += 0 return else: crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_divide: # 筛选出当前时间段内的数据 # start_time, end_time = crosswalk_simtime start_time = crosswalk_simtime[0] end_time = crosswalk_simtime[-1] print(f"当前时间段:{start_time} - {end_time}") crosswalk_objstate = self.ego_data[ (self.ego_data["simTime"] >= start_time) & (self.ego_data["simTime"] <= end_time) ] # 计算车辆速度 ego_speedx = np.array(crosswalk_objstate["speedX"].tolist()) ego_speedy = np.array(crosswalk_objstate["speedY"].tolist()) ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2) # 判断是否超速 if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s self.slow_down_in_crosswalk_count += 1 # 输出总次数 print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次") def avoid_pedestrian_in_crosswalk(self): crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() if len(crosswalk_simTime) == 0: self.avoid_pedestrian_in_crosswalk_count += 0 return else: crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_devide: if not self.pedestrian_data.empty: crosswalk_objstate = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(crosswalk_simtime) ] else: crosswalk_objstate = pd.DataFrame() if len(crosswalk_objstate) > 0: pedestrian_simtime = crosswalk_objstate["simTime"] pedestrian_objstate = crosswalk_objstate[ crosswalk_objstate["simTime"].isin(pedestrian_simtime) ] ego_speed = np.sqrt( pedestrian_objstate["speedX"] ** 2 + pedestrian_objstate["speedY"] ** 2 ) if ego_speed.any() > 0: self.avoid_pedestrian_in_crosswalk_count += 1 def avoid_pedestrian_in_the_road(self): simtime = self.pedestrian_in_front_of_car() if len(simtime) == 0: self.avoid_pedestrian_in_the_road_count += 0 return else: pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime) ] simTime = pedestrian_on_the_road["simTime"].tolist() simTime_devide = self.different_road_area_simtime(simTime) if len(simTime_devide) == 0: pass else: for simtime1 in simTime_devide: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime1) ] ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))] dist = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) speed = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) data = {"dist": dist, "speed": speed} new_ego_car = pd.DataFrame(data) new_ego_car = new_ego_car.assign( Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0) ) if new_ego_car["Column3"].any(): self.avoid_pedestrian_in_the_road_count += 1 def aviod_pedestrian_when_turning(self): pedestrian_simtime_list = self.pedestrian_in_front_of_car() if len(pedestrian_simtime_list) > 0: simtime_list = self.ego_data[ (self.ego_data["simTime"].isin(pedestrian_simtime_list)) & (self.ego_data["lane_type"] == 20) ]["simTime"].tolist() simTime_list = self.different_road_area_simtime(simtime_list) pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime_list) ] if len(simTime_list) == 0: pass else: for simtime in simTime_list: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime) ] if len(sub_pedestrian_on_the_road) > 0: ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))] ego_car["dist"] = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) ego_car["speed"] = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) if any(ego_car["speed"].tolist()) != 0: self.aviod_pedestrian_when_turning_count += 1 def statistic(self): self.slow_down_in_crosswalk() self.avoid_pedestrian_in_crosswalk() self.avoid_pedestrian_in_the_road() self.aviod_pedestrian_when_turning() self.calculated_value = { "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count, "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count, "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count, "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count, } # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}") return self.calculated_value class TurnaroundViolation(object): def __init__(self, df_data): print("掉头违规类初始化中...") self.traffic_violations_type = "掉头违规类" self.data = df_data.obj_data[1] self.ego_data = ( self.data[config.TURNAROUND_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame self.pedestrian_data = pd.DataFrame() self.object_items = set(df_data.object_df.type.tolist()) if 13 in self.object_items: # 行人的type是13 self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13] self.pedestrian_data = ( self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True) ) self.turning_in_forbiden_turn_back_sign_count = 0 self.turning_in_forbiden_turn_left_sign_count = 0 self.avoid_pedestrian_when_turn_back_count = 0 def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def turn_back_in_forbiden_sign(self): """ 禁止掉头type = 8 """ forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][ "simTime" ].tolist() forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][ "simTime" ].tolist() forbiden_turn_back_simtime_devide = self.different_road_area_simtime( forbiden_turn_back_simTime ) forbiden_turn_left_simtime_devide = self.different_road_area_simtime( forbiden_turn_left_simTime ) if len(forbiden_turn_back_simtime_devide) == 0: pass else: for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide: ego_car1 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime)) ] ego_start_speedx1 = ego_car1["speedX"].iloc[0] ego_start_speedy1 = ego_car1["speedY"].iloc[0] ego_end_speedx1 = ego_car1["speedX"].iloc[-1] ego_end_speedy1 = ego_car1["speedY"].iloc[-1] if ( ego_end_speedx1 * ego_start_speedx1 + ego_end_speedy1 * ego_start_speedy1 < 0 ): self.turning_in_forbiden_turn_back_sign_count += 1 if len(forbiden_turn_left_simtime_devide) == 0: pass else: for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide: ego_car2 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime)) ] ego_start_speedx2 = ego_car2["speedX"].iloc[0] ego_start_speedy2 = ego_car2["speedY"].iloc[0] ego_end_speedx2 = ego_car2["speedX"].iloc[-1] ego_end_speedy2 = ego_car2["speedY"].iloc[-1] if ( ego_end_speedx2 * ego_start_speedx2 + ego_end_speedy2 * ego_start_speedy2 < 0 ): self.turning_in_forbiden_turn_left_sign_count += 1 def avoid_pedestrian_when_turn_back(self): sensor_on_intersection = self.pedestrian_in_front_of_car() avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[ self.ego_data["lane_type"] == 20 ]["simTime"].tolist() avoid_pedestrian_when_turn_back_simTime_devide = ( self.different_road_area_simtime( avoid_pedestrian_when_turn_back_simTime_list ) ) if (len(sensor_on_intersection) > 0) and (len(avoid_pedestrian_when_turn_back_simTime_devide) > 0): for ( avoid_pedestrian_when_turn_back_simtime ) in avoid_pedestrian_when_turn_back_simTime_devide: pedestrian_in_intersection_simtime = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( avoid_pedestrian_when_turn_back_simtime ) ].tolist() ego_df = self.ego_data[ self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime) ].reset_index(drop=True) pedestrian_df = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( pedestrian_in_intersection_simtime ) ].reset_index(drop=True) ego_df["dist"] = np.sqrt( (ego_df["posx"] - pedestrian_df["posx"]) ** 2 + (ego_df["posy"] - pedestrian_df["posy"]) ** 2 ) ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2) if (any(ego_df["speed"].tolist()) != 0) or (any(ego_df["dist"].tolist()) == 0): self.avoid_pedestrian_when_turn_back_count += 1 def statistic(self): self.turn_back_in_forbiden_sign() self.avoid_pedestrian_when_turn_back() self.calculated_value = { "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count, "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count, "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count, } # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}") return self.calculated_value class WrongWayViolation: """停车违规类""" def __init__(self, df_data): print("停车违规类初始化中...") self.traffic_violations_type = "停车违规类" self.data = df_data.obj_data[1].copy() # 初始化违规统计 self.violation_count = { "urbanExpresswayOrHighwayDrivingLaneStopped": 0, "urbanExpresswayOrHighwayEmergencyLaneStopped": 0, "urbanExpresswayEmergencyLaneDriving": 0, } def process_violations(self): """处理停车或者紧急车道行驶违规数据""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} driving_lane = {1, 4, 5, 6} emergency_lane = {12} self.data["v"] *= 3.6 # 转换速度 # 使用向量化和条件判断进行违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(driving_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] != 0) ), ] violation_types = [ "urbanExpresswayOrHighwayDrivingLaneStopped", "urbanExpresswayOrHighwayEmergencyLaneStopped", "urbanExpresswayEmergencyLaneDriving", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计违规情况 self.violation_count = ( self.data["violation_type"] .value_counts() .reindex(violation_types, fill_value=0) .to_dict() ) def statistic(self) -> str: self.process_violations() # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}") return self.violation_count class SpeedingViolation(object): """超速违规类""" """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息""" def __init__(self, df_data): print("超速违规类初始化中...") self.traffic_violations_type = "超速违规类" self.data = df_data.obj_data[1].copy() # 初始化违规统计 self.violation_counts = { "urbanExpresswayOrHighwaySpeedOverLimit50": 0, "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0, "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0, "urbanExpresswayOrHighwaySpeedUnderLimit": 0, "generalRoadSpeedOverLimit50": 0, "generalRoadSpeedOverLimit20to50": 0, } def process_violations(self): """处理数据帧,检查超速和其他违规行为""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合 general_road = {3} # 直接创建包含一个元素的集合 # 转换速度 self.data["v"] *= 3.6 # 转换速度 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"]) & (self.data["v"] <= self.data["road_speed_max"] * 1.2) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] < self.data["road_speed_min"]) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ] violation_types = [ "urbanExpresswayOrHighwaySpeedOverLimit50", "urbanExpresswayOrHighwaySpeedOverLimit20to50", "urbanExpresswayOrHighwaySpeedOverLimit0to20", "urbanExpresswayOrHighwaySpeedUnderLimit", "generalRoadSpeedOverLimit50", "generalRoadSpeedOverLimit20to50", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计各类违规情况 self.violation_counts = self.data["violation_type"].value_counts().to_dict() # 添加statistic方法 def statistic(self): """返回统计结果""" # 处理数据 self.process_violations() print(f"超速违规类指标统计完成,统计结果:{self.violation_counts}") return self.violation_counts class TrafficLightViolation(object): """违反交通灯类""" """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯""" def __init__(self, df_data): """初始化方法""" self.traffic_violations_type = "违反交通灯类" print("违反交通灯类 类初始化中...") self.config = df_data.vehicle_config self.data_ego = df_data.ego_data # 获取数据 self.violation_counts = { "trafficSignalViolation": 0, "illegalDrivingOrParkingAtCrossroads": 0, } # 处理数据并判定违规 self.process_violations() def is_point_cross_line(self, point, stop_line_points): """ 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。 :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制) :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]] :return: True 如果车辆跨越了停止线,否则 False """ line_vector = np.array( [ stop_line_points[1][0] - stop_line_points[0][0], stop_line_points[1][1] - stop_line_points[0][1], ] ) point_vector = np.array( [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]] ) cross_product = np.cross(line_vector, point_vector) if cross_product != 0: return False mid_point = ( np.array([stop_line_points[0][0], stop_line_points[0][1]]) + 0.5 * line_vector ) axletree_to_mid_vector = np.array( [point[0] - mid_point[0], point[1] - mid_point[1]] ) direction_vector = np.array([math.cos(point[2]), math.sin(point[2])]) norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector) norm_direction = np.linalg.norm(direction_vector) if norm_axletree_to_mid == 0 or norm_direction == 0: return False cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / ( norm_axletree_to_mid * norm_direction ) angle_theta = math.degrees(math.acos(cos_theta)) return angle_theta <= 90 def _filter_data(self): """过滤数据,筛选出需要分析的记录""" return self.data_ego[ (self.data_ego["stopline_id"] != -1) & (self.data_ego["stopline_type"] == 1) & (self.data_ego["trafficlight_id"] != -1) ] def _group_data(self, filtered_data): """按时间差对数据进行分组""" filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0) threshold = 0.5 filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum() return filtered_data.groupby("group") def _analyze_group(self, group_data): """分析单个分组的数据,判断是否闯红灯""" photos = [] stop_in_intersection = False for _, row in group_data.iterrows(): vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]]) stop_line_points = [ [row["stopline_x1"], row["stopline_y1"]], [row["stopline_x2"], row["stopline_y2"]], ] stateMask = row["stateMask"] heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])]) heading_vector = heading_vector / np.linalg.norm(heading_vector) # with open(self.config_path / "vehicle_config.yaml", 'r') as f: # config = yaml.load(f, Loader=yaml.FullLoader) front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector dist = math.sqrt( (row["posX"] - row["traffic_light_x"]) ** 2 + (row["posY"] - row["traffic_light_y"]) ** 2 ) if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01: has_crossed_line_front = ( self.is_point_cross_line(front_wheel_pos, stop_line_points) and stateMask == 1 ) has_crossed_line_rear = ( self.is_point_cross_line(rear_wheel_pos, stop_line_points) and row["v"] > 0 and stateMask == 1 ) has_stop_in_intersection = has_crossed_line_front and row["v"] == 0 has_passed_intersection = has_crossed_line_front and dist < 1.0 # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}') photos.extend( [ has_crossed_line_front, has_crossed_line_rear, has_passed_intersection, has_stop_in_intersection, ] ) stop_in_intersection = has_passed_intersection return photos, stop_in_intersection def is_vehicle_run_a_red_light(self): """判断车辆是否闯红灯""" filtered_data = self._filter_data() grouped_data = self._group_data(filtered_data) self.photos_group = [] self.stop_in_intersections = [] for _, group_data in grouped_data: photos, stop_in_intersection = self._analyze_group(group_data) self.photos_group.append(photos) self.stop_in_intersections.append(stop_in_intersection) def process_violations(self): """处理数据并判定违规""" self.is_vehicle_run_a_red_light() count_1 = sum(all(photos) for photos in self.photos_group) count_2 = sum( stop_in_intersection for stop_in_intersection in self.stop_in_intersections ) self.violation_counts["trafficSignalViolation"] = count_1 self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2 def statistic(self): """返回统计结果""" return self.violation_counts class WarningViolation(object): """警告性违规类""" def __init__(self, df_data): self.traffic_violations_type = "警告性违规类" print("警告性违规类 类初始化中...") self.config = df_data.vehicle_config self.data_ego = df_data.obj_data[1] self.data = self.data_ego.copy() # 避免修改原始 DataFrame self.violation_counts = { "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶 "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线 } def process_violations(self): general_road = {3} # 普通道路 lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道 # with open(self.config_path / "vehicle_config.yaml", 'r') as f: # config = yaml.load(f, Loader=yaml.FullLoader) car_width = self.config["CAR_WIDTH"] lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在 # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶 # 使用布尔索引来筛选满足条件的行 condition = (self.data["road_fc"].isin(general_road)) & ( self.data["lane_type"].isin(lane_type) ) # 创建一个新的列,并根据条件设置值 self.data["is_violation"] = condition # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新骑行车道线违规计数 self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments) # 机动车在高速公路或者城市快速路上骑、轧车行道分界线 # 计算阈值 threshold = (lane_width - car_width) / 2 # 找到满足条件的行 self.data["is_violation"] = self.data["laneOffset"] > threshold # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新骑行车道线违规计数 self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len( violation_segments ) def count_continuous_violations(self, violation_series, time_series): """统计连续违规的时间段数量""" continuous_segments = [] current_segment = [] for is_violation, time in zip(violation_series, time_series): if is_violation: if not current_segment: # 新的连续段开始 current_segment.append(time) else: if current_segment: # 连续段结束 continuous_segments.append(current_segment) current_segment = [] # 检查是否有一个未结束的连续段在最后 if current_segment: continuous_segments.append(current_segment) return continuous_segments def statistic(self): # 处理数据 self.process_violations() # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}") return self.violation_counts class TrafficSignViolation(object): """交通标志违规类""" def __init__(self, df_data): self.traffic_violations_type = "交通标志违规类" print("交通标志违规类 类初始化中...") self.data_ego = df_data.obj_data[1] self.ego_data = ( self.data_ego[config.TRFFICSIGN_INFO].copy().reset_index(drop=True) ) self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame self.violation_counts = { "NoStraightThrough": 0, # 禁止直行标志地方直行 "SpeedLimitViolation": 0, # 违反限速规定 "MinimumSpeedLimitViolation": 0, # 违反最低限速规定 } # def checkForProhibitionViolation(self): # """禁令标志判断违规:7 禁止直行,12:限制速度""" # # 筛选出sign_type1为7(禁止直行) # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7] # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12] def checkForProhibitionViolation(self): """禁令标志判断违规:7 禁止直行,12:限制速度""" # 筛选出 sign_type1 为7(禁止直行)的数据 violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy() # 判断车辆是否在禁止直行路段直行 if not violation_straight_df.empty: # 按时间戳排序(假设数据按时间顺序处理) violation_straight_df = violation_straight_df.sort_values('simTime') # 计算航向角变化(前后时间点的差值绝对值) violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs() # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0 threshold = 5 # 单位:度(根据场景调整) mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0) straight_violations = violation_straight_df[mask] # 统计违规次数或记录违规数据 self.violation_counts["prohibition_straight"] = len(straight_violations) # 限制速度判断(原代码) violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12] if violation_speed_limit_df.empty: mask = self.data_ego["v"] > self.data_ego["sign_speed"] self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask]) def checkForInstructionViolation(self): """限速标志属于指示性标志:13:最低限速""" violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13] if violation_minimum_speed_limit_df.empty: mask = self.data_ego["v"] < self.data_ego["sign_speed"] self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask]) def statistic(self): self.checkForProhibitionViolation() self.checkForInstructionViolation() # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}") return self.violation_counts class ViolationManager: """违规管理类,用于管理所有违规行为""" def __init__(self, data_processed): self.violations = [] self.data = data_processed self.config = data_processed.traffic_config self.over_take_violation = OvertakingViolation(self.data) self.slow_down_violation = SlowdownViolation(self.data) self.wrong_way_violation = WrongWayViolation(self.data) self.speeding_violation = SpeedingViolation(self.data) self.traffic_light_violation = TrafficLightViolation(self.data) self.warning_violation = WarningViolation(self.data) # self.report_statistic() def report_statistic(self): traffic_result = self.over_take_violation.statistic() traffic_result.update(self.slow_down_violation.statistic()) traffic_result.update(self.traffic_light_violation.statistic()) traffic_result.update(self.wrong_way_violation.statistic()) traffic_result.update(self.speeding_violation.statistic()) traffic_result.update(self.warning_violation.statistic()) evaluator = Score(self.config) result = evaluator.evaluate(traffic_result) print("\n[交规类表现及得分情况]") # self.logger.info(f"Traffic Result:{traffic_result}") return result # 示例使用 if __name__ == "__main__": pass