import sys import os import math import numpy as np import pandas as pd from pathlib import Path root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) from models.common.score import Score from config import config from models.common import log # 确保这个路径是正确的,或者调整它 log_path = config.LOG_PATH logger = log.get_logger(log_path) class TrafficViolation: """交通违规基类""" def __init__(self, vehicle_id: str): self.vehicle_id = vehicle_id self.violation_count = 0 # 违规次数 def get_violation_info(self): return f"车辆ID: {self.vehicle_id} - 违规次数: {self.violation_count}" class OvertakingViolation(object): """超车违规类""" def __init__(self, df_data): print("OvertakingViolation-------------------------") self.traffic_violations_type = "超车违规类" self.ego_data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame self.laneinfo_new_data = df_data.lane_info_new_df self.roadinfo_data = df_data.road_info_df self.drivercrtrl_data = df_data.driver_ctrl_df self.interinfo_data = df_data.inter_info_df self.crosswalkinfo_data = df_data.cross_walk_df self.object_data = df_data.object_df self.overtake_on_right_count = 0 self.overtake_when_turn_around_count = 0 self.overtake_when_passing_car_count = 0 self.overtake_in_forbid_lane_count = 0 self.overtake_in_ramp_count = 0 self.overtake_in_tunnel_count = 0 self.overtake_on_accelerate_lane_count = 0 self.overtake_on_decelerate_lane_count = 0 self.overtake_in_different_senerios_count = 0 def different_road_area_simtime(self, df, threshold = 0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i-1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy): lane_start_id = lane_id[0] lane_end_id = lane_id[-1] dx_start = dx[0] dx_end = dx[-1] dy_start = dy[0] dy_end = dy[-1] ego_start_speedx = ego_speedx[0] ego_end_speedx = ego_speedx[-1] ego_start_speedy = ego_speedy[0] ego_end_speedy = ego_speedy[-1] if (lane_start_id == lane_end_id and (dx_start * ego_start_speedx + dy_start * ego_start_speedy >= 0) and (dx_end * ego_end_speedx + dy_end * ego_end_speedy < 0)): # 返回读取的帧和下一个起始帧ID(即当前帧之后的第11帧,如果存在的话) return True else: return False def _is_dxy_of_car(self, df, id): ''' :param df: objstate.csv and so on :param id: playerId :param string_type: posX/Y or speedX/Y and so on :return: dataframe of dx/y and so on ''' car_dx = df[df['playerId'] == id]['posX'].values - df[df['playerId'] == 1]['posX'].values car_dy = df[df['playerId'] == id]['posY'].values - df[df['playerId'] == 1]['posY'].values return car_dx, car_dy # 在前车右侧超车、会车时超车、前车掉头时超车 def illegal_overtake_with_car(self, window_width=250): # 获取csv文件中最短的帧数 frame_id_length = min(len(self.object_data['simFrame']), len(self.laneinfo_new_data['simFrame']), len(self.drivercrtrl_data['simFrame'])) start_frame_id = self.object_data['simFrame'].iloc[0] # 获取起始点的帧数 while (start_frame_id + window_width) < frame_id_length: simframe_window = list(np.arange(start_frame_id, start_frame_id + window_width)) # 读取滑动窗口的dataframe数据 obj_data_frames = self.object_data[self.object_data['simFrame'].isin(simframe_window)] lane_data_frames = self.laneinfo_new_data[self.laneinfo_new_data['simFrame'].isin(simframe_window)] driver_data_frames = self.drivercrtrl_data[self.drivercrtrl_data['simFrame'].isin(simframe_window)] # 读取前后的laneId lane_id = lane_data_frames['lane_id'] # 读取前后方向盘转角steeringWheel driverctrl_start_state = driver_data_frames['steeringWheel'].iloc[0] driverctrl_end_state = driver_data_frames['steeringWheel'].iloc[-1] # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(obj_data_frames, 2) ego_speedx = obj_data_frames[obj_data_frames['playerId'] == 1]['speedX'] ego_speedy = obj_data_frames[obj_data_frames['playerId'] == 1]['speedY'] obj_speedx = obj_data_frames[obj_data_frames['playerId'] == 2]['speedX'] obj_speedy = obj_data_frames[obj_data_frames['playerId'] == 2]['speedY'] if len(obj_data_frames[obj_data_frames['playerId'] == 3]) > 0: obj1_start_speedx = obj_data_frames[obj_data_frames['playerId'] == 3]['speedX'].iloc[0] obj1_start_speedy = obj_data_frames[obj_data_frames['playerId'] == 3]['speedY'].iloc[0] if ego_speedx.iloc[0] * obj1_start_speedx + ego_speedy.iloc[0] * obj1_start_speedy < 0: self.overtake_when_passing_car_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) start_frame_id += window_width ''' 如果滑动窗口开始和最后的laneid一致; 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左); 自车和前车的位置发生的交换; 则认为右超车 ''' if driverctrl_start_state > 0 and driverctrl_end_state < 0: self.overtake_on_right_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) start_frame_id += window_width elif ego_speedx.iloc[0] * obj_speedx.iloc[0] + ego_speedy.iloc[0] * obj_speedy.iloc[0] < 0: self.overtake_when_turn_around_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) start_frame_id += window_width else: start_frame_id += 1 print( f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次") # 借道超车场景 def overtake_in_forbid_lane(self): simTime = self.object_data[self.object_data['playerId'] == 2]['simTime'].tolist() simtime_devide = self.different_road_area_simtime(simTime) for simtime in simtime_devide: lane_overtake = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(simtime)] try: lane_type = lane_overtake['lane_type'].tolist() if 2 in lane_type: self.overtake_in_forbid_lane_count += 1 except Exception as e: print("数据缺少lane_type信息") print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次") # 在匝道超车 def overtake_in_ramp_area(self): ramp_simtime_list = self.roadinfo_data[(self.roadinfo_data['road_type'] == 19)]['simTime'].tolist() ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list) for ramp_simtime in ramp_simTime_list: lane_id = self.laneinfo_new_data['lane_id'].tolist() objstate_in_ramp = self.object_data[self.object_data['simTime'].isin(ramp_simtime)] dx, dy = self._is_dxy_of_car(objstate_in_ramp, 2) ego_speedx = objstate_in_ramp[objstate_in_ramp['playerId'] == 1]['speedX'].tolist() ego_speedy = objstate_in_ramp[objstate_in_ramp['playerId'] == 1]['speedY'].tolist() if len(lane_id) > 0: self.overtake_in_ramp_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) else: continue print(f"在匝道超车{self.overtake_in_ramp_count}次") def overtake_in_tunnel_area(self): tunnel_simtime_list = self.roadinfo_data[(self.roadinfo_data['road_type'] == 15)]['simTime'].tolist() tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list) for tunnel_simtime in tunnel_simTime_list: lane_id = self.laneinfo_new_data['lane_id'].tolist() objstate_in_tunnel = self.object_data[self.object_data['simTime'].isin(tunnel_simtime)] dx, dy = self._is_dxy_of_car(objstate_in_tunnel, 2) ego_speedx = objstate_in_tunnel[objstate_in_tunnel['playerId'] == 1]['speedX'].tolist() ego_speedy = objstate_in_tunnel[objstate_in_tunnel['playerId'] == 1]['speedY'].tolist() if len(lane_id) > 0: self.overtake_in_tunnel_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) else: continue print(f"在suidao超车{self.overtake_in_tunnel_count}次") # 加速车道超车 def overtake_on_accelerate_lane(self): accelerate_simtime_list = \ self.laneinfo_new_data[self.laneinfo_new_data['lane_type'] == 2]['simTime'].tolist() accelerate_simTime_list = self.different_road_area_simtime(accelerate_simtime_list) for accelerate_simtime in accelerate_simTime_list: lane_id = self.laneinfo_new_data['lane_id'].tolist() objstate_in_accelerate = self.object_data[self.object_data['simTime'].isin(accelerate_simtime)] dx, dy = self._is_dxy_of_car(objstate_in_accelerate, 2) ego_speedx = objstate_in_accelerate[objstate_in_accelerate['playerId'] == 1]['speedX'].tolist() ego_speedy = objstate_in_accelerate[objstate_in_accelerate['playerId'] == 1]['speedY'].tolist() self.overtake_on_accelerate_lane_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次") # 减速车道超车 def overtake_on_decelerate_lane(self): decelerate_simtime_list = self.laneinfo_new_data[(self.laneinfo_new_data['lane_type'] == 3)]['simTime'].tolist() decelerate_simTime_list = self.different_road_area_simtime(decelerate_simtime_list) for decelerate_simtime in decelerate_simTime_list: lane_id = self.laneinfo_new_data[self.laneinfo_new_data['playerId'] == 1]['id'] objstate_in_decelerate = self.object_data[self.object_data['simTime'].isin(decelerate_simtime)] dx, dy = self._is_dxy_of_car(objstate_in_decelerate, 2) ego_speedx = objstate_in_decelerate[objstate_in_decelerate['playerId'] == 1]['speedX'] ego_speedy = objstate_in_decelerate[objstate_in_decelerate['playerId'] == 1]['speedY'] self.overtake_on_decelerate_lane_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次") # 在交叉路口 def overtake_in_different_senerios(self): crossroad_simTime = self.interinfo_data[self.interinfo_data['interid'] != 10000][ 'simTime'].tolist() # 判断是路口或者隧道区域 # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据 crossroad_objstate = self.object_data[self.object_data['simTime'].isin(crossroad_simTime)] crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)] # 读取前后的laneId lane_id = crossroad_laneinfo['lane_id'] # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(crossroad_objstate, 2) ego_speedx = crossroad_objstate[crossroad_objstate['playerId'] == 1]['speedX'].tolist() ego_speedy = crossroad_objstate[crossroad_objstate['playerId'] == 1]['speedY'].tolist() ''' 如果滑动窗口开始和最后的laneid一致; 自车和前车的位置发生的交换; 则认为发生超车 ''' if len(lane_id) > 0: self.overtake_in_different_senerios_count += self._is_overtake(lane_id, dx, dy, ego_speedx, ego_speedy) else: pass print(f"在路口超车{self.overtake_in_different_senerios_count}次") def overtake_statistic(self): self.overtake_in_forbid_lane() self.overtake_on_decelerate_lane() self.overtake_on_accelerate_lane() self.overtake_in_ramp_area() self.overtake_in_tunnel_area() self.overtake_in_different_senerios() self.illegal_overtake_with_car() self.calculated_value = { "overtake_on_right": self.overtake_on_right_count, "overtake_when_turn_around": self.overtake_when_turn_around_count, "overtake_when_passing_car": self.overtake_when_passing_car_count, "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count, "overtake_in_ramp": self.overtake_in_ramp_count, "overtake_in_tunnel": self.overtake_in_tunnel_count, "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count, "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count, "overtake_in_different_senerios": self.overtake_in_different_senerios_count } return self.calculated_value class SlowdownViolation(object): """减速让行违规类""" def __init__(self, df_data): print("OvertakingViolation-------------------------") self.traffic_violations_type = "减速让行违规类" self.ego_data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame self.laneinfo_new_data = df_data.lane_info_new_df self.roadinfo_data = df_data.road_info_df self.drivercrtrl_data = df_data.driver_ctrl_df self.interinfo_data = df_data.inter_info_df self.crosswalkinfo_data = df_data.cross_walk_df self.object_data = df_data.object_df self.slow_down_in_crosswalk_count = 0 self.pedestrian_in_crosswalk_count = 0 def different_road_area_simtime(self, df, threshold = 0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i-1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def slow_down_in_crosswalk(self): crosswalk_simTime = self.crosswalkinfo_data[self.crosswalkinfo_data['crossid'] != 20000][ 'simTime'].tolist() # 判断是路口或者隧道区域 crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_devide: # 筛选在人行横道区域的crosswalk_objstate数据 crosswalk_objstate = self.object_data[self.object_data['simTime'].isin(crosswalk_simtime)] ego_speedx = crosswalk_objstate[crosswalk_objstate['playerId'] == 1]['speedX'].tolist() ego_speedy = crosswalk_objstate[crosswalk_objstate['playerId'] == 1]['speedY'].tolist() ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2) if(max(ego_speed)*3.6 > 15): self.slow_down_in_crosswalk_count += 1 print(f"在人行横道超车{self.slow_down_in_crosswalk_count}次") def pedestrian_in_crosswalk(self): crosswalk_simTime = self.crosswalkinfo_data[self.crosswalkinfo_data['crossid'] != 20000][ 'simTime'].tolist() # 判断是路口或者隧道区域 crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_devide: crosswalk_objstate = self.object_data[self.object_data['simTime'].isin(crosswalk_simtime)] if len(crosswalk_objstate[crosswalk_objstate['playerId'] == 5]) > 0: pedestrian_simtime = crosswalk_objstate[crosswalk_objstate['playerId'] == 5]['simTime'] pedestrian_objstate = crosswalk_objstate[crosswalk_objstate['simTime'].isin(pedestrian_simtime)] ego_speed = np.sqrt(pedestrian_objstate['speedX']**2 + pedestrian_objstate['speedY']**2) if ego_speed.any() > 0: self.pedestrian_in_crosswalk_count += 1 def slowdown_statistic(self): self.slow_down_in_crosswalk() self.pedestrian_in_crosswalk() self.calculated_value = { "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count, "pedestrian_in_crosswalk": self.pedestrian_in_crosswalk_count } return self.calculated_value class WrongWayViolation(TrafficViolation): """逆行违规类""" def __init__(self, vehicle_id: str, wrong_way_location: str): super().__init__(vehicle_id) self.wrong_way_location = wrong_way_location def get_violation_info(self): return f"{super().get_violation_info()} - 逆行发生在 {self.wrong_way_location}" class SpeedingViolation(object): """超速违规类""" def __init__(self, df_data): print("SpeedingViolation-------------------------") self.traffic_violations_type = "超速违规类" self.data = df_data.obj_data[1] # Copy to avoid modifying the original DataFrame # 初始化违规统计 self.violation_counts = { "urbanExpresswayOrHighwaySpeedOverLimit50": 0, "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0, "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0, "urbanExpresswayOrHighwaySpeedUnderLimit": 0, "generalRoadSpeedOverLimit50": 0, "generalRoadSpeedOverLimit20to50": 0, } # 处理数据 self.process_violations() def process_violations(self): """处理数据帧,检查超速和其他违规行为""" # 定义速度限制 self.data["speed_limit_max"] = self.data["road_speed_max"] self.data["speed_limit_min"] = self.data["road_speed_min"] # 提取有效道路类型 # 提取有效道路类型 urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合 general_road = {3} # 直接创建包含一个元素的集合 # 违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["speed_limit_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["speed_limit_max"] * 1.2) & (self.data["v"] <= self.data["speed_limit_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["speed_limit_max"]) & (self.data["v"] <= self.data["speed_limit_max"] * 1.2) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] < self.data["speed_limit_min"]) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["speed_limit_max"] * 1.5) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["speed_limit_max"] * 1.2) & (self.data["v"] <= self.data["speed_limit_max"] * 1.5) ), ] violation_types = [ "urbanExpresswayOrHighwaySpeedOverLimit50", "urbanExpresswayOrHighwaySpeedOverLimit20to50", "urbanExpresswayOrHighwaySpeedOverLimit0to20", "urbanExpresswayOrHighwaySpeedUnderLimit", "generalRoadSpeedOverLimit50", "generalRoadSpeedOverLimit20to50", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计各类违规情况 self.violation_counts = self.data["violation_type"].value_counts().to_dict() # def get_violation_info(self) -> str: # return ( # f"超过50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit50', 0)}, " # f"超过20%-50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit20to50', 0)}, " # f"超过20%以内: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit0to20', 0)}, " # f"低于最低时速: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedUnderLimit', 0)}, " # f"非高速公路超速50%以上: {self.violation_counts.get('generalRoadSpeedOverLimit50', 0)}, " # f"非高速公路超速20%-50%: {self.violation_counts.get('generalRoadSpeedOverLimit20to50', 0)}" # ) def report_statistic(self) -> str: return ( f"高速或者城市快速路超过50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit50', 0)},\n" f"高速或者城市快速路超过20%-50%: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit20to50', 0)},\n" f"高速或者城市快速路超过20%以内: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedOverLimit0to20', 0)},\n" f"高速或者城市快速路低于最低时速: {self.violation_counts.get('urbanExpresswayOrHighwaySpeedUnderLimit', 0)},\n" f"非高速公路超速50%以上: {self.violation_counts.get('generalRoadSpeedOverLimit50', 0)},\n" f"非高速公路超速20%-50%: {self.violation_counts.get('generalRoadSpeedOverLimit20to50', 0)}" ) class ParkingViolation(TrafficViolation): """违规停车类""" def __init__(self, vehicle_id: str, parking_location: str): super().__init__(vehicle_id) self.traffic_violations_type = " 违规停车类 " self.parking_location = parking_location self.city_road_violation_count = 0 self.city_emergency_violation_count = 0 self.highway_road_violation_count = 0 self.highway_emergency_violation_count = 0 def get_violation_info(self): return ( f"{super().get_violation_info()} - 违规停车发生在 {self.parking_location}" ) def city_road_violation(self): # 驾驶机动车在城市快速路行车道上违法停车的 self.city_road_violation_count += 1 return self.city_road_violation_count, True def city_emergency_violation(self): # 驾驶机动车非紧急情况下在城市快速路应急车道上停车的 self.city_emergency_violation_count += 1 return self.city_emergency_violation_count, True def highway_road_violation(self): # 驾驶机动车在高速公路行车道上违法停车的 self.highway_road_violation_count += 1 return self.highway_road_violation_count, True def highway_emergency_violation(self): # 驾驶机动车非紧急情况下在高速公路应急车道上停车的 self.highway_emergency_violation_count += 1 return self.highway_emergency_violation_count, True class TrafficLightViolation(TrafficViolation): """违反交通灯类""" def __init__(self, vehicle_id: str, violation_type: str, traffic_location: str): super().__init__(vehicle_id) self.traffic_violations_type = " 违反交通灯类 " self.violation_type = violation_type # 可能是 '闯红灯' 等 self.traffic_location = traffic_location def get_violation_info(self): return f"{super().get_violation_info()} - {self.violation_type} 发生在 {self.traffic_location}" class ViolationManager: """违规管理类,用于管理所有违规行为""" def __init__(self, data_processed): self.violations = [] self.data = data_processed.obj_data[1] self.ego_df = self.data[config.TRIFFIC_INFO].copy() self.SpeedingViolation = SpeedingViolation self.ParkingViolation = ParkingViolation self.TrafficLightViolation = TrafficLightViolation def add_violation(self, violation: TrafficViolation): self.violations.append(violation) def report_violations(self): for violation in self.violations: print(violation.get_violation_info()) # 示例使用 if __name__ == "__main__": manager = ViolationManager() # 添加超车违规 overtaking_violation = OvertakingViolation("ABC123", "1号公路") manager.add_violation(overtaking_violation) # 添加逆行违规 wrong_way_violation = WrongWayViolation("XYZ789", "2号公路") manager.add_violation(wrong_way_violation) # 添加超速违规 speeding_violation = SpeedingViolation("LMN456", 85.0, 60.0) manager.add_violation(speeding_violation) # 添加违规停车 parking_violation = ParkingViolation("DEF012", "商场停车场") manager.add_violation(parking_violation) # 添加违反交通灯行为 traffic_light_violation = TrafficLightViolation("GHI345", "闯红灯", "第三街口") manager.add_violation(traffic_light_violation) # 报告所有违规行为 manager.report_violations()