12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301 |
- import math
- import numpy as np
- import pandas as pd
- from modules.lib import log_manager
- from modules.lib.score import Score
- from modules.config import config
- class OvertakingViolation(object):
- """超车违规类"""
- def __init__(self, df_data):
- print("超车违规类初始化中...")
- self.traffic_violations_type = "超车违规类"
- # self.logger = log.get_logger() # 使用时再初始化
- self.data = df_data
- self.data_ego = df_data.ego_data
- self.ego_data = (
- self.data_ego[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- ) # Copy to avoid modifying the original DataFrame
- header = self.ego_data.columns
- self.overtake_on_right_count = 0
- self.overtake_when_turn_around_count = 0
- self.overtake_when_passing_car_count = 0
- self.overtake_in_forbid_lane_count = 0
- self.overtake_in_ramp_count = 0
- self.overtake_in_tunnel_count = 0
- self.overtake_on_accelerate_lane_count = 0
- self.overtake_on_decelerate_lane_count = 0
- self.overtake_in_different_senerios_count = 0
- def different_road_area_simtime(self, df, threshold=0.5):
- if not df:
- return []
- simtime_group = []
- current_simtime_group = [df[0]]
- for i in range(1, len(df)):
- if abs(df[i] - df[i - 1]) <= threshold:
- current_simtime_group.append(df[i])
- else:
- simtime_group.append(current_simtime_group)
- current_simtime_group = [df[i]]
- simtime_group.append(current_simtime_group)
- return simtime_group
- def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
- lane_start = lane_id[0]
- lane_end = lane_id[-1]
- start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
- end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
- return lane_start == lane_end and start_condition and end_condition
- def _is_dxy_of_car(self, ego_df, obj_df):
- """
- :param df: objstate.csv and so on
- :param string_type: posX/Y or speedX/Y and so on
- :return: dataframe of dx/y and so on
- """
- car_dx = obj_df["posX"].values - ego_df["posX"].values
- car_dy = obj_df["posY"].values - ego_df["posY"].values
- return car_dx, car_dy
- # 在前车右侧超车、会车时超车、前车掉头时超车
- def _is_objectcar_of_overtake(self, obj_df):
- if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and (
- (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX' ].values *obj_df['speedX'].values).any() >= 0):
- return obj_df
- else:
- return None
- def _is_passingcar(self, obj_df):
- if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and (
- (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX'].values * obj_df['speedX'].values).any() < 0):
- return obj_df
- else:
- return None
- def illegal_overtake_with_car(self, window_width=250):
- # 获取csv文件中最短的帧数
- frame_id_length = len(self.ego_data["simFrame"])
- start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- while (start_frame_id + window_width) < frame_id_length:
- simframe_window1 = list(
- np.arange(start_frame_id, start_frame_id + window_width)
- )
- simframe_window = list(map(int, simframe_window1))
- # 读取滑动窗口的dataframe数据
- ego_data_frames = self.ego_data[
- self.ego_data["simFrame"].isin(simframe_window)
- ]
- obj_data_frames = obj_data[
- obj_data["simFrame"].isin(simframe_window)
- ]
- # 读取前后的laneId
- lane_id = ego_data_frames["lane_id"].tolist()
- # 读取前后方向盘转角steeringWheel,
- driverctrl_start_state = ego_data_frames["posH"].iloc[0]
- driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
- # 读取车辆前后的位置信息
- dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
- ego_speedx = ego_data_frames["speedX"].tolist()
- ego_speedy = ego_data_frames["speedY"].tolist()
- obj_speedx = obj_data_frames[obj_data_frames["playerId"] == i][
- "speedX"
- ].tolist()
- obj_speedy = obj_data_frames[obj_data_frames["playerId"] == i][
- "speedY"
- ].tolist()
- for j in self.data.obj_id_list:
- data_pass_obj = self.data.obj_data[j]
- obj_pass_datas = self._is_passingcar(data_pass_obj)
- if obj_pass_datas:
- other_data_frames = obj_pass_datas[
- obj_pass_datas["simFrame"].isin(simframe_window)
- ]
- if len(other_data_frames) > 0:
- other_start_speedx = other_data_frames["speedX"].iloc[0]
- other_start_speedy = other_data_frames["speedY"].iloc[0]
- if (
- ego_speedx[0] * other_start_speedx
- + ego_speedy[0] * other_start_speedy
- < 0
- ):
- self.overtake_when_passing_car_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- start_frame_id += window_width
- """
- 如果滑动窗口开始和最后的laneid一致;
- 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
- 自车和前车的位置发生的交换;
- 则认为右超车
- """
- if driverctrl_start_state > 0 and driverctrl_end_state < 0:
- self.overtake_on_right_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- start_frame_id += window_width
- elif (len(ego_speedx ) *len(obj_speedx) > 0) and \
- (ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0):
- self.overtake_when_turn_around_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- start_frame_id += window_width
- else:
- start_frame_id += 1
- # print(
- # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
- # 借道超车场景
- def overtake_in_forbid_lane(self):
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- simTime = obj_data["simTime"].tolist()
- simtime_devide = self.different_road_area_simtime(simTime)
- if len(simtime_devide) == 0:
- self.overtake_in_forbid_lane_count += 0
- return
- else:
- for simtime in simtime_devide:
- lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
- try:
- lane_type = lane_overtake["lane_type"].tolist()
- if (50002 in lane_type and len(set(lane_type)) > 2) or (
- 50002 not in lane_type and len(set(lane_type)) > 1
- ):
- self.overtake_in_forbid_lane_count += 1
- except Exception as e:
- print("数据缺少lane_type信息")
- # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
- # 在匝道超车
- def overtake_in_ramp_area(self):
- ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
- "simTime"
- ].tolist()
- if len(ramp_simtime_list) == 0:
- self.overtake_in_ramp_count += 0
- return
- else:
- ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
- for ramp_simtime in ramp_simTime_list:
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- lane_id = self.ego_data["lane_id"].tolist()
- ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
- objstate_in_ramp = obj_data[
- obj_data["simTime"].isin(ramp_simtime)
- ]
- dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
- ego_speedx = ego_in_ramp["speedX"].tolist()
- ego_speedy = ego_in_ramp["speedY"].tolist()
- if len(lane_id) > 0:
- self.overtake_in_ramp_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- else:
- continue
- # print(f"在匝道超车{self.overtake_in_ramp_count}次")
- def overtake_in_tunnel_area(self):
- tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
- "simTime"
- ].tolist()
- if len(tunnel_simtime_list) == 0:
- self.overtake_in_tunnel_count += 0
- return
- else:
- tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
- for tunnel_simtime in tunnel_simTime_list:
- lane_id = self.ego_data["lane_id"].tolist()
- ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- objstate_in_tunnel = obj_data[
- obj_data["simTime"].isin(tunnel_simtime)
- ]
- dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
- ego_speedx = ego_in_tunnel["speedX"].tolist()
- ego_speedy = ego_in_tunnel["speedY"].tolist()
- if len(lane_id) > 0:
- self.overtake_in_tunnel_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- else:
- continue
- # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
- # 加速车道超车
- def overtake_on_accelerate_lane(self):
- accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
- "simTime"
- ].tolist()
- if len(accelerate_simtime_list) == 0:
- self.overtake_on_accelerate_lane_count += 0
- return
- else:
- accelerate_simTime_list = self.different_road_area_simtime(
- accelerate_simtime_list
- )
- for accelerate_simtime in accelerate_simTime_list:
- lane_id = self.ego_data["lane_id"].tolist()
- ego_in_accelerate = self.ego_data[
- self.ego_data["simTime"].isin(accelerate_simtime)
- ]
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- objstate_in_accelerate = obj_data[
- obj_data["simTime"].isin(accelerate_simtime)
- ]
- dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
- ego_speedx = ego_in_accelerate["speedX"].tolist()
- ego_speedy = ego_in_accelerate["speedY"].tolist()
- self.overtake_on_accelerate_lane_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
- # 减速车道超车
- def overtake_on_decelerate_lane(self):
- decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
- "simTime"
- ].tolist()
- if len(decelerate_simtime_list) == 0:
- self.overtake_on_decelerate_lane_count += 0
- return
- else:
- decelerate_simTime_list = self.different_road_area_simtime(
- decelerate_simtime_list
- )
- for decelerate_simtime in decelerate_simTime_list:
- lane_id = self.ego_data["id"].tolist()
- ego_in_decelerate = self.ego_data[
- self.ego_data["simTime"].isin(decelerate_simtime)
- ]
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- objstate_in_decelerate = obj_data[
- obj_data["simTime"].isin(decelerate_simtime)
- ]
- dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
- ego_speedx = ego_in_decelerate["speedX"].tolist()
- ego_speedy = ego_in_decelerate["speedY"].tolist()
- self.overtake_on_decelerate_lane_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
- # 在交叉路口
- def overtake_in_different_senerios(self):
- crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
- "simTime"
- ].tolist() # 判断是路口或者隧道区域
- if len(crossroad_simTime) == 0:
- self.overtake_in_different_senerios_count += 0
- return
- else:
- # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
- crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- obj_data = (
- obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
- )
- crossroad_objstate = obj_data[
- obj_data["simTime"].isin(crossroad_simTime)
- ]
- # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
- # 读取前后的laneId
- lane_id = crossroad_ego["lane_id"].tolist()
- # 读取车辆前后的位置信息
- dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
- ego_speedx = crossroad_ego["speedX"].tolist()
- ego_speedy = crossroad_ego["speedY"].tolist()
- """
- 如果滑动窗口开始和最后的laneid一致;
- 自车和前车的位置发生的交换;
- 则认为发生超车
- """
- if len(lane_id) > 0:
- self.overtake_in_different_senerios_count += self._is_overtake(
- lane_id, dx, dy, ego_speedx, ego_speedy
- )
- else:
- pass
- # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
- def statistic(self):
- for i in self.data.obj_id_list:
- if i != 1:
- data_obj = self.data.obj_data[i]
- else:
- continue
- obj_datas = self._is_objectcar_of_overtake(data_obj)
- if obj_datas is not None:
- self.overtake_in_forbid_lane()
- self.overtake_on_decelerate_lane()
- self.overtake_on_accelerate_lane()
- self.overtake_in_ramp_area()
- self.overtake_in_tunnel_area()
- self.overtake_in_different_senerios()
- self.illegal_overtake_with_car()
- else:
- pass
- self.calculated_value = {
- "overtake_on_right": self.overtake_on_right_count,
- "overtake_when_turn_around": self.overtake_when_turn_around_count,
- "overtake_when_passing_car": self.overtake_when_passing_car_count,
- "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
- "overtake_in_ramp": self.overtake_in_ramp_count,
- "overtake_in_tunnel": self.overtake_in_tunnel_count,
- "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
- "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
- "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
- }
- # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
- return self.calculated_value
- class SlowdownViolation(object):
- """减速让行违规类"""
- def __init__(self, df_data):
- print("减速让行违规类-------------------------")
- self.traffic_violations_type = "减速让行违规类"
- self.object_items = []
- self.data = df_data.ego_data
- self.ego_data = (
- self.data[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
- ) # Copy to avoid modifying the original DataFrame
- self.pedestrian_data = pd.DataFrame()
- self.object_items = set(df_data.object_df.type.tolist())
- if 13 in self.object_items: # 行人的type是13
- self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
- self.pedestrian_data = (
- self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
- )
- self.slow_down_in_crosswalk_count = 0
- self.avoid_pedestrian_in_crosswalk_count = 0
- self.avoid_pedestrian_in_the_road_count = 0
- self.aviod_pedestrian_when_turning_count = 0
- def pedestrian_in_front_of_car(self):
- if len(self.pedestrian_data) == 0:
- return []
- else:
- self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
- self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
- self.ego_data["dist"] = np.sqrt(
- self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
- )
- self.ego_data["rela_pos"] = (
- self.ego_data["dx"] * self.ego_data["speedX"]
- + self.ego_data["dy"] * self.ego_data["speedY"]
- )
- simtime = self.ego_data[
- (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
- ]["simTime"].tolist()
- return simtime
- def different_road_area_simtime(self, df, threshold=0.6):
- if not df:
- return []
- simtime_group = []
- current_simtime_group = [df[0]]
- for i in range(1, len(df)):
- if abs(df[i] - df[i - 1]) <= threshold:
- current_simtime_group.append(df[i])
- else:
- simtime_group.append(current_simtime_group)
- current_simtime_group = [df[i]]
- simtime_group.append(current_simtime_group)
- return simtime_group
- def slow_down_in_crosswalk(self):
- # 筛选出路口或隧道区域的时间点
- crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
- "simTime"
- ].tolist()
- if len(crosswalk_simTime) == 0:
- self.slow_down_in_crosswalk_count += 0
- return
- else:
- crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
- for crosswalk_simtime in crosswalk_simTime_divide:
- # 筛选出当前时间段内的数据
- # start_time, end_time = crosswalk_simtime
- start_time = crosswalk_simtime[0]
- end_time = crosswalk_simtime[-1]
- print(f"当前时间段:{start_time} - {end_time}")
- crosswalk_objstate = self.ego_data[
- (self.ego_data["simTime"] >= start_time)
- & (self.ego_data["simTime"] <= end_time)
- ]
- # 计算车辆速度
- ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
- ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
- ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
- # 判断是否超速
- if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
- self.slow_down_in_crosswalk_count += 1
- # 输出总次数
- print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
- def avoid_pedestrian_in_crosswalk(self):
- crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
- "simTime"
- ].tolist()
- if len(crosswalk_simTime) == 0:
- self.avoid_pedestrian_in_crosswalk_count += 0
- return
- else:
- crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
- for crosswalk_simtime in crosswalk_simTime_devide:
- if not self.pedestrian_data.empty:
- crosswalk_objstate = self.pedestrian_data[
- self.pedestrian_data["simTime"].isin(crosswalk_simtime)
- ]
- else:
- crosswalk_objstate = pd.DataFrame()
- if len(crosswalk_objstate) > 0:
- pedestrian_simtime = crosswalk_objstate["simTime"]
- pedestrian_objstate = crosswalk_objstate[
- crosswalk_objstate["simTime"].isin(pedestrian_simtime)
- ]
- ego_speed = np.sqrt(
- pedestrian_objstate["speedX"] ** 2
- + pedestrian_objstate["speedY"] ** 2
- )
- if ego_speed.any() > 0:
- self.avoid_pedestrian_in_crosswalk_count += 1
- def avoid_pedestrian_in_the_road(self):
- simtime = self.pedestrian_in_front_of_car()
- if len(simtime) == 0:
- self.avoid_pedestrian_in_the_road_count += 0
- return
- else:
- pedestrian_on_the_road = self.pedestrian_data[
- self.pedestrian_data["simTime"].isin(simtime)
- ]
- simTime = pedestrian_on_the_road["simTime"].tolist()
- simTime_devide = self.different_road_area_simtime(simTime)
- if len(simTime_devide) == 0:
- pass
- else:
- for simtime1 in simTime_devide:
- sub_pedestrian_on_the_road = pedestrian_on_the_road[
- pedestrian_on_the_road["simTime"].isin(simtime1)
- ]
- ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
- dist = np.sqrt(
- (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
- ** 2
- + (
- ego_car["posY"].values
- - sub_pedestrian_on_the_road["posY"].values
- )
- ** 2
- )
- speed = np.sqrt(
- ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
- )
- data = {"dist": dist, "speed": speed}
- new_ego_car = pd.DataFrame(data)
- new_ego_car = new_ego_car.assign(
- Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
- )
- if new_ego_car["Column3"].any():
- self.avoid_pedestrian_in_the_road_count += 1
- def aviod_pedestrian_when_turning(self):
- pedestrian_simtime_list = self.pedestrian_in_front_of_car()
- if len(pedestrian_simtime_list) > 0:
- simtime_list = self.ego_data[
- (self.ego_data["simTime"].isin(pedestrian_simtime_list))
- & (self.ego_data["lane_type"] == 20)
- ]["simTime"].tolist()
- simTime_list = self.different_road_area_simtime(simtime_list)
- pedestrian_on_the_road = self.pedestrian_data[
- self.pedestrian_data["simTime"].isin(simtime_list)
- ]
- if len(simTime_list) == 0:
- pass
- else:
- for simtime in simTime_list:
- sub_pedestrian_on_the_road = pedestrian_on_the_road[
- pedestrian_on_the_road["simTime"].isin(simtime)
- ]
- if len(sub_pedestrian_on_the_road) > 0:
- ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
- ego_car["dist"] = np.sqrt(
- (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
- ** 2
- + (
- ego_car["posY"].values
- - sub_pedestrian_on_the_road["posY"].values
- )
- ** 2
- )
- ego_car["speed"] = np.sqrt(
- ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
- )
- if any(ego_car["speed"].tolist()) != 0:
- self.aviod_pedestrian_when_turning_count += 1
- def statistic(self):
- self.slow_down_in_crosswalk()
- self.avoid_pedestrian_in_crosswalk()
- self.avoid_pedestrian_in_the_road()
- self.aviod_pedestrian_when_turning()
- self.calculated_value = {
- "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
- "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
- "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
- "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
- }
- # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
- return self.calculated_value
- class TurnaroundViolation(object):
- def __init__(self, df_data):
- print("掉头违规类初始化中...")
- self.traffic_violations_type = "掉头违规类"
- self.data = df_data.obj_data[1]
- self.ego_data = (
- self.data[config.TURNAROUND_INFO].copy().reset_index(drop=True)
- ) # Copy to avoid modifying the original DataFrame
- self.pedestrian_data = pd.DataFrame()
- self.object_items = set(df_data.object_df.type.tolist())
- if 13 in self.object_items: # 行人的type是13
- self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
- self.pedestrian_data = (
- self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
- )
- self.turning_in_forbiden_turn_back_sign_count = 0
- self.turning_in_forbiden_turn_left_sign_count = 0
- self.avoid_pedestrian_when_turn_back_count = 0
- def pedestrian_in_front_of_car(self):
- if len(self.pedestrian_data) == 0:
- return []
- else:
- self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
- self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
- self.ego_data["dist"] = np.sqrt(
- self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
- )
- self.ego_data["rela_pos"] = (
- self.ego_data["dx"] * self.ego_data["speedX"]
- + self.ego_data["dy"] * self.ego_data["speedY"]
- )
- simtime = self.ego_data[
- (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
- ]["simTime"].tolist()
- return simtime
- def different_road_area_simtime(self, df, threshold=0.5):
- if not df:
- return []
- simtime_group = []
- current_simtime_group = [df[0]]
- for i in range(1, len(df)):
- if abs(df[i] - df[i - 1]) <= threshold:
- current_simtime_group.append(df[i])
- else:
- simtime_group.append(current_simtime_group)
- current_simtime_group = [df[i]]
- simtime_group.append(current_simtime_group)
- return simtime_group
- def turn_back_in_forbiden_sign(self):
- """
- 禁止掉头type = 8
- """
- forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
- "simTime"
- ].tolist()
- forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
- "simTime"
- ].tolist()
- forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
- forbiden_turn_back_simTime
- )
- forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
- forbiden_turn_left_simTime
- )
- if len(forbiden_turn_back_simtime_devide) == 0:
- pass
- else:
- for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
- ego_car1 = self.ego_data.loc[
- (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
- ]
- ego_start_speedx1 = ego_car1["speedX"].iloc[0]
- ego_start_speedy1 = ego_car1["speedY"].iloc[0]
- ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
- ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
- if (
- ego_end_speedx1 * ego_start_speedx1
- + ego_end_speedy1 * ego_start_speedy1
- < 0
- ):
- self.turning_in_forbiden_turn_back_sign_count += 1
- if len(forbiden_turn_left_simtime_devide) == 0:
- pass
- else:
- for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
- ego_car2 = self.ego_data.loc[
- (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
- ]
- ego_start_speedx2 = ego_car2["speedX"].iloc[0]
- ego_start_speedy2 = ego_car2["speedY"].iloc[0]
- ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
- ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
- if (
- ego_end_speedx2 * ego_start_speedx2
- + ego_end_speedy2 * ego_start_speedy2
- < 0
- ):
- self.turning_in_forbiden_turn_left_sign_count += 1
- def avoid_pedestrian_when_turn_back(self):
- sensor_on_intersection = self.pedestrian_in_front_of_car()
- avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
- self.ego_data["lane_type"] == 20
- ]["simTime"].tolist()
- avoid_pedestrian_when_turn_back_simTime_devide = (
- self.different_road_area_simtime(
- avoid_pedestrian_when_turn_back_simTime_list
- )
- )
- if (len(sensor_on_intersection) > 0) and (len(avoid_pedestrian_when_turn_back_simTime_devide) > 0):
- for (
- avoid_pedestrian_when_turn_back_simtime
- ) in avoid_pedestrian_when_turn_back_simTime_devide:
- pedestrian_in_intersection_simtime = self.pedestrian_data[
- self.pedestrian_data["simTime"].isin(
- avoid_pedestrian_when_turn_back_simtime
- )
- ].tolist()
- ego_df = self.ego_data[
- self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
- ].reset_index(drop=True)
- pedestrian_df = self.pedestrian_data[
- self.pedestrian_data["simTime"].isin(
- pedestrian_in_intersection_simtime
- )
- ].reset_index(drop=True)
- ego_df["dist"] = np.sqrt(
- (ego_df["posx"] - pedestrian_df["posx"]) ** 2
- + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
- )
- ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
- if (any(ego_df["speed"].tolist()) != 0) or (any(ego_df["dist"].tolist()) == 0):
- self.avoid_pedestrian_when_turn_back_count += 1
- def statistic(self):
- self.turn_back_in_forbiden_sign()
- self.avoid_pedestrian_when_turn_back()
- self.calculated_value = {
- "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
- "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
- "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
- }
- # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
- return self.calculated_value
- class WrongWayViolation:
- """停车违规类"""
- def __init__(self, df_data):
- print("停车违规类初始化中...")
- self.traffic_violations_type = "停车违规类"
- self.data = df_data.obj_data[1].copy()
- # 初始化违规统计
- self.violation_count = {
- "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
- "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
- "urbanExpresswayEmergencyLaneDriving": 0,
- }
- def process_violations(self):
- """处理停车或者紧急车道行驶违规数据"""
- # 提取有效道路类型
- urban_expressway_or_highway = {1, 2}
- driving_lane = {1, 4, 5, 6}
- emergency_lane = {12}
- self.data["v"] *= 3.6 # 转换速度
- # 使用向量化和条件判断进行违规判定
- conditions = [
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & self.data["lane_type"].isin(driving_lane)
- & (self.data["v"] == 0)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & self.data["lane_type"].isin(emergency_lane)
- & (self.data["v"] == 0)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & self.data["lane_type"].isin(emergency_lane)
- & (self.data["v"] != 0)
- ),
- ]
- violation_types = [
- "urbanExpresswayOrHighwayDrivingLaneStopped",
- "urbanExpresswayOrHighwayEmergencyLaneStopped",
- "urbanExpresswayEmergencyLaneDriving",
- ]
- # 设置违规类型
- self.data["violation_type"] = None
- for condition, violation_type in zip(conditions, violation_types):
- self.data.loc[condition, "violation_type"] = violation_type
- # 统计违规情况
- self.violation_count = (
- self.data["violation_type"]
- .value_counts()
- .reindex(violation_types, fill_value=0)
- .to_dict()
- )
- def statistic(self) -> str:
- self.process_violations()
- # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
- return self.violation_count
- class SpeedingViolation(object):
- """超速违规类"""
- """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
- def __init__(self, df_data):
- print("超速违规类初始化中...")
- self.traffic_violations_type = "超速违规类"
- self.data = df_data.obj_data[1].copy()
- # 初始化违规统计
- self.violation_counts = {
- "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
- "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
- "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
- "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
- "generalRoadSpeedOverLimit50": 0,
- "generalRoadSpeedOverLimit20to50": 0,
- }
- def process_violations(self):
- """处理数据帧,检查超速和其他违规行为"""
- # 提取有效道路类型
- urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
- general_road = {3} # 直接创建包含一个元素的集合
- # 转换速度
- self.data["v"] *= 3.6 # 转换速度
- conditions = [
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["road_speed_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["road_speed_max"] * 1.2)
- & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["road_speed_max"])
- & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] < self.data["road_speed_min"])
- ),
- (
- self.data["road_fc"].isin(general_road)
- & (self.data["v"] > self.data["road_speed_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(general_road)
- & (self.data["v"] > self.data["road_speed_max"] * 1.2)
- & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
- ),
- ]
- violation_types = [
- "urbanExpresswayOrHighwaySpeedOverLimit50",
- "urbanExpresswayOrHighwaySpeedOverLimit20to50",
- "urbanExpresswayOrHighwaySpeedOverLimit0to20",
- "urbanExpresswayOrHighwaySpeedUnderLimit",
- "generalRoadSpeedOverLimit50",
- "generalRoadSpeedOverLimit20to50",
- ]
- # 设置违规类型
- self.data["violation_type"] = None
- for condition, violation_type in zip(conditions, violation_types):
- self.data.loc[condition, "violation_type"] = violation_type
- # 统计各类违规情况
- self.violation_counts = self.data["violation_type"].value_counts().to_dict()
- # 添加statistic方法
- def statistic(self):
- """返回统计结果"""
- # 处理数据
- self.process_violations()
- print(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
- return self.violation_counts
- class TrafficLightViolation(object):
- """违反交通灯类"""
- """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
- def __init__(self, df_data):
- """初始化方法"""
- self.traffic_violations_type = "违反交通灯类"
- print("违反交通灯类 类初始化中...")
- self.config = df_data.vehicle_config
- self.data_ego = df_data.ego_data # 获取数据
- self.violation_counts = {
- "trafficSignalViolation": 0,
- "illegalDrivingOrParkingAtCrossroads": 0,
- }
- # 处理数据并判定违规
- self.process_violations()
- def is_point_cross_line(self, point, stop_line_points):
- """
- 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
- 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
- :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
- :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
- :return: True 如果车辆跨越了停止线,否则 False
- """
- line_vector = np.array(
- [
- stop_line_points[1][0] - stop_line_points[0][0],
- stop_line_points[1][1] - stop_line_points[0][1],
- ]
- )
- point_vector = np.array(
- [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
- )
- cross_product = np.cross(line_vector, point_vector)
- if cross_product != 0:
- return False
- mid_point = (
- np.array([stop_line_points[0][0], stop_line_points[0][1]])
- + 0.5 * line_vector
- )
- axletree_to_mid_vector = np.array(
- [point[0] - mid_point[0], point[1] - mid_point[1]]
- )
- direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
- norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
- norm_direction = np.linalg.norm(direction_vector)
- if norm_axletree_to_mid == 0 or norm_direction == 0:
- return False
- cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
- norm_axletree_to_mid * norm_direction
- )
- angle_theta = math.degrees(math.acos(cos_theta))
- return angle_theta <= 90
- def _filter_data(self):
- """过滤数据,筛选出需要分析的记录"""
- return self.data_ego[
- (self.data_ego["stopline_id"] != -1)
- & (self.data_ego["stopline_type"] == 1)
- & (self.data_ego["trafficlight_id"] != -1)
- ]
- def _group_data(self, filtered_data):
- """按时间差对数据进行分组"""
- filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
- threshold = 0.5
- filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
- return filtered_data.groupby("group")
- def _analyze_group(self, group_data):
- """分析单个分组的数据,判断是否闯红灯"""
- photos = []
- stop_in_intersection = False
- for _, row in group_data.iterrows():
- vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
- stop_line_points = [
- [row["stopline_x1"], row["stopline_y1"]],
- [row["stopline_x2"], row["stopline_y2"]],
- ]
- stateMask = row["stateMask"]
- heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
- heading_vector = heading_vector / np.linalg.norm(heading_vector)
- # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
- # config = yaml.load(f, Loader=yaml.FullLoader)
- front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
- rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
- dist = math.sqrt(
- (row["posX"] - row["traffic_light_x"]) ** 2
- + (row["posY"] - row["traffic_light_y"]) ** 2
- )
- if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
- has_crossed_line_front = (
- self.is_point_cross_line(front_wheel_pos, stop_line_points)
- and stateMask == 1
- )
- has_crossed_line_rear = (
- self.is_point_cross_line(rear_wheel_pos, stop_line_points)
- and row["v"] > 0
- and stateMask == 1
- )
- has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
- has_passed_intersection = has_crossed_line_front and dist < 1.0
- # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
- photos.extend(
- [
- has_crossed_line_front,
- has_crossed_line_rear,
- has_passed_intersection,
- has_stop_in_intersection,
- ]
- )
- stop_in_intersection = has_passed_intersection
- return photos, stop_in_intersection
- def is_vehicle_run_a_red_light(self):
- """判断车辆是否闯红灯"""
- filtered_data = self._filter_data()
- grouped_data = self._group_data(filtered_data)
- self.photos_group = []
- self.stop_in_intersections = []
- for _, group_data in grouped_data:
- photos, stop_in_intersection = self._analyze_group(group_data)
- self.photos_group.append(photos)
- self.stop_in_intersections.append(stop_in_intersection)
- def process_violations(self):
- """处理数据并判定违规"""
- self.is_vehicle_run_a_red_light()
- count_1 = sum(all(photos) for photos in self.photos_group)
- count_2 = sum(
- stop_in_intersection for stop_in_intersection in self.stop_in_intersections
- )
- self.violation_counts["trafficSignalViolation"] = count_1
- self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
- def statistic(self):
- """返回统计结果"""
- return self.violation_counts
- class WarningViolation(object):
- """警告性违规类"""
- def __init__(self, df_data):
- self.traffic_violations_type = "警告性违规类"
- print("警告性违规类 类初始化中...")
- self.config = df_data.vehicle_config
- self.data_ego = df_data.obj_data[1]
- self.data = self.data_ego.copy() # 避免修改原始 DataFrame
- self.violation_counts = {
- "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
- "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
- }
- def process_violations(self):
- general_road = {3} # 普通道路
- lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
- # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
- # config = yaml.load(f, Loader=yaml.FullLoader)
- car_width = self.config["CAR_WIDTH"]
- lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
- # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
- # 使用布尔索引来筛选满足条件的行
- condition = (self.data["road_fc"].isin(general_road)) & (
- self.data["lane_type"].isin(lane_type)
- )
- # 创建一个新的列,并根据条件设置值
- self.data["is_violation"] = condition
- # 统计满足条件的连续时间段
- violation_segments = self.count_continuous_violations(
- self.data["is_violation"], self.data["simTime"]
- )
- # 更新骑行车道线违规计数
- self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
- # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
- # 计算阈值
- threshold = (lane_width - car_width) / 2
- # 找到满足条件的行
- self.data["is_violation"] = self.data["laneOffset"] > threshold
- # 统计满足条件的连续时间段
- violation_segments = self.count_continuous_violations(
- self.data["is_violation"], self.data["simTime"]
- )
- # 更新骑行车道线违规计数
- self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
- violation_segments
- )
- def count_continuous_violations(self, violation_series, time_series):
- """统计连续违规的时间段数量"""
- continuous_segments = []
- current_segment = []
- for is_violation, time in zip(violation_series, time_series):
- if is_violation:
- if not current_segment: # 新的连续段开始
- current_segment.append(time)
- else:
- if current_segment: # 连续段结束
- continuous_segments.append(current_segment)
- current_segment = []
- # 检查是否有一个未结束的连续段在最后
- if current_segment:
- continuous_segments.append(current_segment)
- return continuous_segments
- def statistic(self):
- # 处理数据
- self.process_violations()
- # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
- return self.violation_counts
- class TrafficSignViolation(object):
- """交通标志违规类"""
- def __init__(self, df_data):
- self.traffic_violations_type = "交通标志违规类"
- print("交通标志违规类 类初始化中...")
- self.data_ego = df_data.obj_data[1]
- self.ego_data = (
- self.data_ego[config.TRFFICSIGN_INFO].copy().reset_index(drop=True)
- )
- self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
- self.violation_counts = {
- "NoStraightThrough": 0, # 禁止直行标志地方直行
- "SpeedLimitViolation": 0, # 违反限速规定
- "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
- }
- # def checkForProhibitionViolation(self):
- # """禁令标志判断违规:7 禁止直行,12:限制速度"""
- # # 筛选出sign_type1为7(禁止直行)
- # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
- # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
- def checkForProhibitionViolation(self):
- """禁令标志判断违规:7 禁止直行,12:限制速度"""
- # 筛选出 sign_type1 为7(禁止直行)的数据
- violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
- # 判断车辆是否在禁止直行路段直行
- if not violation_straight_df.empty:
- # 按时间戳排序(假设数据按时间顺序处理)
- violation_straight_df = violation_straight_df.sort_values('simTime')
- # 计算航向角变化(前后时间点的差值绝对值)
- violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
- # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
- threshold = 5 # 单位:度(根据场景调整)
- mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
- straight_violations = violation_straight_df[mask]
- # 统计违规次数或记录违规数据
- self.violation_counts["prohibition_straight"] = len(straight_violations)
- # 限制速度判断(原代码)
- violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
- if violation_speed_limit_df.empty:
- mask = self.data_ego["v"] > self.data_ego["sign_speed"]
- self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
- def checkForInstructionViolation(self):
- """限速标志属于指示性标志:13:最低限速"""
- violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
- if violation_minimum_speed_limit_df.empty:
- mask = self.data_ego["v"] < self.data_ego["sign_speed"]
- self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
- def statistic(self):
- self.checkForProhibitionViolation()
- self.checkForInstructionViolation()
- # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
- return self.violation_counts
- class ViolationManager:
- """违规管理类,用于管理所有违规行为"""
- def __init__(self, data_processed):
- self.violations = []
- self.data = data_processed
- self.config = data_processed.traffic_config
- self.over_take_violation = OvertakingViolation(self.data)
- self.slow_down_violation = SlowdownViolation(self.data)
- self.wrong_way_violation = WrongWayViolation(self.data)
- self.speeding_violation = SpeedingViolation(self.data)
- self.traffic_light_violation = TrafficLightViolation(self.data)
- self.warning_violation = WarningViolation(self.data)
- # self.report_statistic()
- def report_statistic(self):
- traffic_result = self.over_take_violation.statistic()
- traffic_result.update(self.slow_down_violation.statistic())
- traffic_result.update(self.traffic_light_violation.statistic())
- traffic_result.update(self.wrong_way_violation.statistic())
- traffic_result.update(self.speeding_violation.statistic())
- traffic_result.update(self.warning_violation.statistic())
- evaluator = Score(self.config)
- result = evaluator.evaluate(traffic_result)
- print("\n[交规类表现及得分情况]")
- # self.logger.info(f"Traffic Result:{traffic_result}")
- return result
- # 示例使用
- if __name__ == "__main__":
- pass
|