123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559 |
- import sys
- import os
- import math
- import numpy as np
- import pandas as pd
- from pathlib import Path
- root_path = Path(__file__).resolve().parent.parent
- sys.path.append(str(root_path))
- from models.common.score import Score
- from config import config
- from models.common import log # 确保这个路径是正确的,或者调整它
- log_path = config.LOG_PATH
- logger = log.get_logger(log_path)
- class TrafficViolation:
- """交通违规基类"""
- def __init__(self, vehicle_id: str):
- self.vehicle_id = vehicle_id
- self.violation_count = 0 # 违规次数
- def get_violation_info(self):
- return f"车辆ID: {self.vehicle_id} - 违规次数: {self.violation_count}"
- class OvertakingViolation(object):
- """超车违规类"""
- def __init__(self, df_data):
- print("OvertakingViolation-------------------------")
- self.traffic_violations_type = "超车违规类"
- self.ego_data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame
- self.laneinfo_new_data = df_data.lane_info_new_df
- self.roadinfo_data = df_data.road_info_df
- self.drivercrtrl_data = df_data.driver_ctrl_df
- self.interinfo_data = df_data.inter_info_df
- self.crosswalkinfo_data = df_data.cross_walk_df
- self.object_data = df_data.object_df
- self.overtake_on_right_count = 0
- self.overtake_when_turn_around_count = 0
- self.overtake_when_passing_car_count = 0
- self.overtake_in_forbid_lane_count = 0
- self.overtake_in_ramp_count = 0
- self.overtake_in_tunnel_count = 0
- self.overtake_on_accelerate_lane_count = 0
- self.overtake_on_decelerate_lane_count = 0
- self.overtake_in_different_senerios_count = 0
- def different_road_area_simtime(self, df, threshold = 0.5):
- if not df:
- return []
- simtime_group = []
- current_simtime_group = [df[0]]
- for i in range(1, len(df)):
- if abs(df[i] - df[i-1]) <= threshold:
- current_simtime_group.append(df[i])
- else:
- simtime_group.append(current_simtime_group)
- current_simtime_group = [df[i]]
- simtime_group.append(current_simtime_group)
- return simtime_group
- def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
- lane_start_id = lane_id[0]
- lane_end_id = lane_id[-1]
- dx_start = dx[0]
- dx_end = dx[-1]
- dy_start = dy[0]
- dy_end = dy[-1]
- ego_start_speedx = ego_speedx[0]
- ego_end_speedx = ego_speedx[-1]
- ego_start_speedy = ego_speedy[0]
- ego_end_speedy = ego_speedy[-1]
- if (lane_start_id == lane_end_id
- and (dx_start * ego_start_speedx + dy_start * ego_start_speedy >= 0)
- and (dx_end * ego_end_speedx + dy_end * ego_end_speedy < 0)):
- # 返回读取的帧和下一个起始帧ID(即当前帧之后的第11帧,如果存在的话)
- return True
- else:
- return False
- def _is_dxy_of_car(self, df, id):
- '''
- :param df: objstate.csv and so on
- :param id: playerId
- :param string_type: posX/Y or speedX/Y and so on
- :return: dataframe of dx/y and so on
- '''
- car_dx = df[df['playerId'] == id]['posX'].values - df[df['playerId'] == 1]['posX'].values
- car_dy = df[df['playerId'] == id]['posY'].values - df[df['playerId'] == 1]['posY'].values
- return car_dx, car_dy
- # 在前车右侧超车、会车时超车、前车掉头时超车
- def illegal_overtake_with_car(self, window_width=250):
- # 获取csv文件中最短的帧数
- frame_id_length = min(len(self.object_data['simFrame']), len(self.laneinfo_new_data['simFrame']),
- len(self.drivercrtrl_data['simFrame']))
- start_frame_id = self.object_data['simFrame'].iloc[0] # 获取起始点的帧数
- while (start_frame_id + window_width) < frame_id_length:
- simframe_window = list(np.arange(start_frame_id, start_frame_id + window_width))
- # 读取滑动窗口的dataframe数据
- obj_data_frames = self.object_data[self.object_data['simFrame'].isin(simframe_window)]
- lane_data_frames = self.laneinfo_new_data[self.laneinfo_new_data['simFrame'].isin(simframe_window)]
- driver_data_frames = self.drivercrtrl_data[self.drivercrtrl_data['simFrame'].isin(simframe_window)]
- # 读取前后的laneId
- lane_id = lane_data_frames['lane_id']
- # 读取前后方向盘转角steeringWheel
- driverctrl_start_state = driver_data_frames['steeringWheel'].iloc[0]
- driverctrl_end_state = driver_data_frames['steeringWheel'].iloc[-1]
- # 读取车辆前后的位置信息
- dx, dy = self._is_dxy_of_car(obj_data_frames, 2)
- ego_speedx = obj_data_frames[obj_data_frames['playerId'] == 1]['speedX']
- ego_speedy = obj_data_frames[obj_data_frames['playerId'] == 1]['speedY']
- obj_speedx = obj_data_frames[obj_data_frames['playerId'] == 2]['speedX']
- obj_speedy = obj_data_frames[obj_data_frames['playerId'] == 2]['speedY']
- if len(obj_data_frames[obj_data_frames['playerId'] == 3]) > 0:
- obj1_start_speedx = obj_data_frames[obj_data_frames['playerId'] == 3]['speedX'].iloc[0]
- obj1_start_speedy = obj_data_frames[obj_data_frames['playerId'] == 3]['speedY'].iloc[0]
- if ego_speedx.iloc[0] * obj1_start_speedx + ego_speedy.iloc[0] * obj1_start_speedy < 0:
- self.overtake_when_passing_car_count += self._is_overtake(lane_id, dx, dy, ego_speedx,
- ego_speedy)
- start_frame_id += window_width
- '''
- 如果滑动窗口开始和最后的laneid一致;
- 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
- 自车和前车的位置发生的交换;
- 则认为右超车
- '''
- if driverctrl_start_state > 0 and driverctrl_end_state < 0:
- self.overtake_on_right_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- start_frame_id += window_width
- elif ego_speedx.iloc[0] * obj_speedx.iloc[0] + ego_speedy.iloc[0] * obj_speedy.iloc[0] < 0:
- self.overtake_when_turn_around_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- start_frame_id += window_width
- else:
- start_frame_id += 1
- print(
- f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
- # 借道超车场景
- def overtake_in_forbid_lane(self):
- simTime = self.object_data[self.object_data['playerId'] == 2]['simTime'].tolist()
- simtime_devide = self.different_road_area_simtime(simTime)
- for simtime in simtime_devide:
- lane_overtake = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(simtime)]
- try:
- lane_type = lane_overtake['lane_type'].tolist()
- if 2 in lane_type:
- self.overtake_in_forbid_lane_count += 1
- except Exception as e:
- print("数据缺少lane_type信息")
- print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
- # 在匝道超车
- def overtake_in_ramp_area(self):
- ramp_simtime_list = self.roadinfo_data[(self.roadinfo_data['road_type'] == 19)]['simTime'].tolist()
- ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
- for ramp_simtime in ramp_simTime_list:
- lane_id = self.laneinfo_new_data['lane_id'].tolist()
- objstate_in_ramp = self.object_data[self.object_data['simTime'].isin(ramp_simtime)]
- dx, dy = self._is_dxy_of_car(objstate_in_ramp, 2)
- ego_speedx = objstate_in_ramp[objstate_in_ramp['playerId'] == 1]['speedX'].tolist()
- ego_speedy = objstate_in_ramp[objstate_in_ramp['playerId'] == 1]['speedY'].tolist()
- if len(lane_id) > 0:
- self.overtake_in_ramp_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- else:
- continue
- print(f"在匝道超车{self.overtake_in_ramp_count}次")
- def overtake_in_tunnel_area(self):
- tunnel_simtime_list = self.roadinfo_data[(self.roadinfo_data['road_type'] == 15)]['simTime'].tolist()
- tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
- for tunnel_simtime in tunnel_simTime_list:
- lane_id = self.laneinfo_new_data['lane_id'].tolist()
- objstate_in_tunnel = self.object_data[self.object_data['simTime'].isin(tunnel_simtime)]
- dx, dy = self._is_dxy_of_car(objstate_in_tunnel, 2)
- ego_speedx = objstate_in_tunnel[objstate_in_tunnel['playerId'] == 1]['speedX'].tolist()
- ego_speedy = objstate_in_tunnel[objstate_in_tunnel['playerId'] == 1]['speedY'].tolist()
- if len(lane_id) > 0:
- self.overtake_in_tunnel_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- else:
- continue
- print(f"在suidao超车{self.overtake_in_tunnel_count}次")
- # 加速车道超车
- def overtake_on_accelerate_lane(self):
- accelerate_simtime_list = \
- self.laneinfo_new_data[self.laneinfo_new_data['lane_type'] == 2]['simTime'].tolist()
- accelerate_simTime_list = self.different_road_area_simtime(accelerate_simtime_list)
- for accelerate_simtime in accelerate_simTime_list:
- lane_id = self.laneinfo_new_data['lane_id'].tolist()
- objstate_in_accelerate = self.object_data[self.object_data['simTime'].isin(accelerate_simtime)]
- dx, dy = self._is_dxy_of_car(objstate_in_accelerate, 2)
- ego_speedx = objstate_in_accelerate[objstate_in_accelerate['playerId'] == 1]['speedX'].tolist()
- ego_speedy = objstate_in_accelerate[objstate_in_accelerate['playerId'] == 1]['speedY'].tolist()
- self.overtake_on_accelerate_lane_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
- # 减速车道超车
- def overtake_on_decelerate_lane(self):
- decelerate_simtime_list = self.laneinfo_new_data[(self.laneinfo_new_data['lane_type'] == 3)]['simTime'].tolist()
- decelerate_simTime_list = self.different_road_area_simtime(decelerate_simtime_list)
- for decelerate_simtime in decelerate_simTime_list:
- lane_id = self.laneinfo_new_data[self.laneinfo_new_data['playerId'] == 1]['id']
- objstate_in_decelerate = self.object_data[self.object_data['simTime'].isin(decelerate_simtime)]
- dx, dy = self._is_dxy_of_car(objstate_in_decelerate, 2)
- ego_speedx = objstate_in_decelerate[objstate_in_decelerate['playerId'] == 1]['speedX']
- ego_speedy = objstate_in_decelerate[objstate_in_decelerate['playerId'] == 1]['speedY']
- self.overtake_on_decelerate_lane_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
- # 在交叉路口
- def overtake_in_different_senerios(self):
- crossroad_simTime = self.interinfo_data[self.interinfo_data['interid'] != 10000][
- 'simTime'].tolist() # 判断是路口或者隧道区域
- # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
- crossroad_objstate = self.object_data[self.object_data['simTime'].isin(crossroad_simTime)]
- crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
- # 读取前后的laneId
- lane_id = crossroad_laneinfo['lane_id']
- # 读取车辆前后的位置信息
- dx, dy = self._is_dxy_of_car(crossroad_objstate, 2)
- ego_speedx = crossroad_objstate[crossroad_objstate['playerId'] == 1]['speedX'].tolist()
- ego_speedy = crossroad_objstate[crossroad_objstate['playerId'] == 1]['speedY'].tolist()
- '''
- 如果滑动窗口开始和最后的laneid一致;
- 自车和前车的位置发生的交换;
- 则认为发生超车
- '''
- if len(lane_id) > 0:
- self.overtake_in_different_senerios_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy)
- else:
- pass
- print(f"在路口超车{self.overtake_in_different_senerios_count}次")
- def overtake_statistic(self):
- self.overtake_in_forbid_lane()
- self.overtake_on_decelerate_lane()
- self.overtake_on_accelerate_lane()
- self.overtake_in_ramp_area()
- self.overtake_in_tunnel_area()
- self.overtake_in_different_senerios()
- self.illegal_overtake_with_car()
- self.calculated_value = {
- "overtake_on_right": self.overtake_on_right_count,
- "overtake_when_turn_around": self.overtake_when_turn_around_count,
- "overtake_when_passing_car": self.overtake_when_passing_car_count,
- "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
- "overtake_in_ramp": self.overtake_in_ramp_count,
- "overtake_in_tunnel": self.overtake_in_tunnel_count,
- "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
- "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
- "overtake_in_different_senerios": self.overtake_in_different_senerios_count
- }
- return self.calculated_value
- class SlowdownViolation(object):
- """减速让行违规类"""
- def __init__(self, df_data):
- print("OvertakingViolation-------------------------")
- self.traffic_violations_type = "减速让行违规类"
- self.ego_data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame
- self.laneinfo_new_data = df_data.lane_info_new_df
- self.roadinfo_data = df_data.road_info_df
- self.drivercrtrl_data = df_data.driver_ctrl_df
- self.interinfo_data = df_data.inter_info_df
- self.crosswalkinfo_data = df_data.cross_walk_df
- self.object_data = df_data.object_df
- self.slow_down_in_crosswalk_count = 0
- self.pedestrian_in_crosswalk_count = 0
- def different_road_area_simtime(self, df, threshold = 0.5):
- if not df:
- return []
- simtime_group = []
- current_simtime_group = [df[0]]
- for i in range(1, len(df)):
- if abs(df[i] - df[i-1]) <= threshold:
- current_simtime_group.append(df[i])
- else:
- simtime_group.append(current_simtime_group)
- current_simtime_group = [df[i]]
- simtime_group.append(current_simtime_group)
- return simtime_group
- def slow_down_in_crosswalk(self):
- crosswalk_simTime = self.crosswalkinfo_data[self.crosswalkinfo_data['crossid'] != 20000][
- 'simTime'].tolist() # 判断是路口或者隧道区域
- crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
- for crosswalk_simtime in crosswalk_simTime_devide:
- # 筛选在人行横道区域的crosswalk_objstate数据
- crosswalk_objstate = self.object_data[self.object_data['simTime'].isin(crosswalk_simtime)]
- ego_speedx = crosswalk_objstate[crosswalk_objstate['playerId'] == 1]['speedX'].tolist()
- ego_speedy = crosswalk_objstate[crosswalk_objstate['playerId'] == 1]['speedY'].tolist()
- ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2)
- if(max(ego_speed)*3.6 > 15):
- self.slow_down_in_crosswalk_count += 1
- print(f"在人行横道超车{self.slow_down_in_crosswalk_count}次")
- def pedestrian_in_crosswalk(self):
- crosswalk_simTime = self.crosswalkinfo_data[self.crosswalkinfo_data['crossid'] != 20000][
- 'simTime'].tolist() # 判断是路口或者隧道区域
- crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
- for crosswalk_simtime in crosswalk_simTime_devide:
- crosswalk_objstate = self.object_data[self.object_data['simTime'].isin(crosswalk_simtime)]
- if len(crosswalk_objstate[crosswalk_objstate['playerId'] == 5]) > 0:
- pedestrian_simtime = crosswalk_objstate[crosswalk_objstate['playerId'] == 5]['simTime']
- pedestrian_objstate = crosswalk_objstate[crosswalk_objstate['simTime'].isin(pedestrian_simtime)]
- ego_speed = np.sqrt(pedestrian_objstate['speedX']**2 + pedestrian_objstate['speedY']**2)
- if ego_speed.any() > 0:
- self.pedestrian_in_crosswalk_count += 1
- def slowdown_statistic(self):
- self.slow_down_in_crosswalk()
- self.pedestrian_in_crosswalk()
- self.calculated_value = {
- "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
- "pedestrian_in_crosswalk": self.pedestrian_in_crosswalk_count
- }
- return self.calculated_value
- class WrongWayViolation(TrafficViolation):
- """逆行违规类"""
- def __init__(self, vehicle_id: str, wrong_way_location: str):
- super().__init__(vehicle_id)
- self.wrong_way_location = wrong_way_location
- def get_violation_info(self):
- return f"{super().get_violation_info()} - 逆行发生在 {self.wrong_way_location}"
- class SpeedingViolation(object):
- """超速违规类"""
- def __init__(self, df_data):
- print("SpeedingViolation-------------------------")
- self.traffic_violations_type = "超速违规类"
- self.data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame
- # 初始化违规统计
- self.violation_counts = {
- "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
- "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
- "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
- "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
- "generalRoadSpeedOverLimit50": 0,
- "generalRoadSpeedOverLimit20to50": 0,
- }
- # 处理数据
- self.process_violations()
- def process_violations(self):
- """处理数据帧,检查超速和其他违规行为"""
- # 定义速度限制
- self.data["speed_limit_max"] = self.data["road_speed_max"]
- self.data["speed_limit_min"] = self.data["road_speed_min"]
- # 提取有效道路类型
- # 提取有效道路类型
- urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
- general_road = {3} # 直接创建包含一个元素的集合
- # 违规判定
- conditions = [
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["speed_limit_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["speed_limit_max"] * 1.2)
- & (self.data["v"] <= self.data["speed_limit_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] > self.data["speed_limit_max"])
- & (self.data["v"] <= self.data["speed_limit_max"] * 1.2)
- ),
- (
- self.data["road_fc"].isin(urban_expressway_or_highway)
- & (self.data["v"] < self.data["speed_limit_min"])
- ),
- (
- self.data["road_fc"].isin(general_road)
- & (self.data["v"] > self.data["speed_limit_max"] * 1.5)
- ),
- (
- self.data["road_fc"].isin(general_road)
- & (self.data["v"] > self.data["speed_limit_max"] * 1.2)
- & (self.data["v"] <= self.data["speed_limit_max"] * 1.5)
- ),
- ]
- violation_types = [
- "urbanExpresswayOrHighwaySpeedOverLimit50",
- "urbanExpresswayOrHighwaySpeedOverLimit20to50",
- "urbanExpresswayOrHighwaySpeedOverLimit0to20",
- "urbanExpresswayOrHighwaySpeedUnderLimit",
- "generalRoadSpeedOverLimit50",
- "generalRoadSpeedOverLimit20to50",
- ]
- # 设置违规类型
- self.data["violation_type"] = None
- for condition, violation_type in zip(conditions, violation_types):
- self.data.loc[condition, "violation_type"] = violation_type
- # 统计各类违规情况
- self.violation_counts = self.data["violation_type"].value_counts().to_dict()
- # def get_violation_info(self) -> str:
- # return (
- # f"超过50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit50', 0)}, "
- # f"超过20%-50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit20to50', 0)}, "
- # f"超过20%以内: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit0to20', 0)}, "
- # f"低于最低时速: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedUnderLimit', 0)}, "
- # f"非高速公路超速50%以上: {self.violation_counts.get('generalRoadSpeedOverLimit50', 0)}, "
- # f"非高速公路超速20%-50%: {self.violation_counts.get('generalRoadSpeedOverLimit20to50', 0)}"
- # )
- def report_statistic(self) -> str:
- return (
- f"高速或者城市快速路超过50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit50', 0)},\n"
- f"高速或者城市快速路超过20%-50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit20to50', 0)},\n"
- f"高速或者城市快速路超过20%以内: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit0to20', 0)},\n"
- f"高速或者城市快速路低于最低时速: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedUnderLimit', 0)},\n"
- f"非高速公路超速50%以上: {self.violation_counts.get('generalRoadSpeedOverLimit50', 0)},\n"
- f"非高速公路超速20%-50%: {self.violation_counts.get('generalRoadSpeedOverLimit20to50', 0)}"
- )
- class ParkingViolation(TrafficViolation):
- """违规停车类"""
- def __init__(self, vehicle_id: str, parking_location: str):
- super().__init__(vehicle_id)
- self.traffic_violations_type = " 违规停车类 "
- self.parking_location = parking_location
- self.city_road_violation_count = 0
- self.city_emergency_violation_count = 0
- self.highway_road_violation_count = 0
- self.highway_emergency_violation_count = 0
- def get_violation_info(self):
- return (
- f"{super().get_violation_info()} - 违规停车发生在 {self.parking_location}"
- )
- def city_road_violation(self):
- # 驾驶机动车在城市快速路行车道上违法停车的
- self.city_road_violation_count += 1
- return self.city_road_violation_count, True
- def city_emergency_violation(self):
- # 驾驶机动车非紧急情况下在城市快速路应急车道上停车的
- self.city_emergency_violation_count += 1
- return self.city_emergency_violation_count, True
- def highway_road_violation(self):
- # 驾驶机动车在高速公路行车道上违法停车的
- self.highway_road_violation_count += 1
- return self.highway_road_violation_count, True
- def highway_emergency_violation(self):
- # 驾驶机动车非紧急情况下在高速公路应急车道上停车的
- self.highway_emergency_violation_count += 1
- return self.highway_emergency_violation_count, True
- class TrafficLightViolation(TrafficViolation):
- """违反交通灯类"""
- def __init__(self, vehicle_id: str, violation_type: str, traffic_location: str):
- super().__init__(vehicle_id)
- self.traffic_violations_type = " 违反交通灯类 "
- self.violation_type = violation_type # 可能是 '闯红灯' 等
- self.traffic_location = traffic_location
- def get_violation_info(self):
- return f"{super().get_violation_info()} - {self.violation_type} 发生在 {self.traffic_location}"
- class ViolationManager:
- """违规管理类,用于管理所有违规行为"""
- def __init__(self, data_processed):
- self.violations = []
- self.data = data_processed.obj_data[1]
- self.ego_df = self.data[config.TRIFFIC_INFO].copy()
- self.SpeedingViolation = SpeedingViolation
- self.ParkingViolation = ParkingViolation
- self.TrafficLightViolation = TrafficLightViolation
- def add_violation(self, violation: TrafficViolation):
- self.violations.append(violation)
- def report_violations(self):
- for violation in self.violations:
- print(violation.get_violation_info())
- # 示例使用
- if __name__ == "__main__":
- manager = ViolationManager()
- # 添加超车违规
- overtaking_violation = OvertakingViolation("ABC123", "1号公路")
- manager.add_violation(overtaking_violation)
- # 添加逆行违规
- wrong_way_violation = WrongWayViolation("XYZ789", "2号公路")
- manager.add_violation(wrong_way_violation)
- # 添加超速违规
- speeding_violation = SpeedingViolation("LMN456", 85.0, 60.0)
- manager.add_violation(speeding_violation)
- # 添加违规停车
- parking_violation = ParkingViolation("DEF012", "商场停车场")
- manager.add_violation(parking_violation)
- # 添加违反交通灯行为
- traffic_light_violation = TrafficLightViolation("GHI345", "闯红灯", "第三街口")
- manager.add_violation(traffic_light_violation)
- # 报告所有违规行为
- manager.report_violations()
|