# ... 保留原有导入和常量定义 ... import math import operator import numpy as np import pandas as pd from typing import Dict, Any, List, Optional from modules.lib import log_manager from modules.lib.score import Score from modules.lib.log_manager import LogManager from modules.lib import data_process OVERTAKE_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "posH", "lane_id", "lane_type", "road_type", "interid", "crossid", ] SLOWDOWN_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "crossid", "lane_type", ] TURNAROUND_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "sign_type1", "lane_type", ] TRFFICSIGN_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "v", "posX", "posY", "sign_type1", "sign_ref_link", "sign_x", "sign_y", ] # 修改指标函数名称为 calculate_xxx 格式 def calculate_overtake_when_passing_car(data_processed): """计算会车时超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count() return {"overtake_when_passing_car": overtake_when_passing_car_count} def calculate_overtake_on_right(data_processed): """计算右侧超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count() return {"overtake_on_right": overtake_on_right_count} def calculate_overtake_when_turn_around(data_processed): """计算掉头时超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count() return {"overtake_when_turn_around": overtake_when_turn_around_count} def calculate_overtake_in_forbid_lane(data_processed): """计算在禁止车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count() return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count} def calculate_overtake_in_ramp(data_processed): """计算在匝道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count() return {"overtake_in_ramp": overtake_in_ramp_area_count} def calculate_overtake_in_tunnel(data_processed): """计算在隧道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count() return {"overtake_in_tunnel": overtake_in_tunnel_area_count} def calculate_overtake_on_accelerate_lane(data_processed): """计算在加速车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count() return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count} def calculate_overtake_on_decelerate_lane(data_processed): """计算在减速车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count() return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count} def calculate_overtake_in_different_senerios(data_processed): """计算在不同场景超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count() return {"overtake_in_different_senerios": overtake_in_different_senerios_count} def calculate_slow_down_in_crosswalk(data_processed): """计算在人行横道减速指标""" slowdownviolation = SlowdownViolation(data_processed) slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count() return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count} def calculate_avoid_pedestrian_in_crosswalk(data_processed): """计算在人行横道避让行人指标""" avoidpedestrianincrosswalk = SlowdownViolation(data_processed) avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count() return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count} def calculate_avoid_pedestrian_in_the_road(data_processed): """计算在道路上避让行人指标""" avoidpedestrianintheroad = SlowdownViolation(data_processed) avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count() return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count} def calculate_avoid_pedestrian_when_turning(data_processed): """计算转弯时避让行人指标""" avoidpedestrianwhenturning = SlowdownViolation(data_processed) avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count() return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count} def calculate_turn_in_forbiden_turn_left_sign(data_processed): """计算在禁止左转标志处左转指标""" turnaroundviolation = TurnaroundViolation(data_processed) turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count() return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count} def calculate_turn_in_forbiden_turn_back_sign(data_processed): """计算在禁止掉头标志处掉头指标""" turnaroundviolation = TurnaroundViolation(data_processed) turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count() return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count} def calculate_avoid_pedestrian_when_turn_back(data_processed): """计算掉头时避让行人指标""" turnaroundviolation = TurnaroundViolation(data_processed) avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count() return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count} def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed): """计算城市快速路或高速公路行车道停车指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count() return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count} def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed): """计算城市快速路或高速公路应急车道停车指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count() return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count} def calculate_urbanexpresswayemergencylanedriving(data_processed): """计算城市快速路应急车道行驶指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving() return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count} def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed): """计算城市快速路或高速公路超速50%以上指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count() return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count} def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed): """计算城市快速路或高速公路超速20%-50%指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count() return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count} def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed): """计算城市快速路或高速公路超速0-20%指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count() return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count} def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed): """计算城市快速路或高速公路低于最低限速指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count() return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count} def calculate_generalroadspeedoverlimit50(data_processed): """计算一般道路超速50%以上指标""" speedingviolation = SpeedingViolation(data_processed) generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50() return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count} def calculate_generalroadspeedoverlimit20to50(data_processed): """计算一般道路超速20%-50%指标""" speedingviolation = SpeedingViolation(data_processed) generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count() return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count} def calculate_trafficsignalviolation(data_processed): """计算交通信号违规指标""" trafficlightviolation = TrafficLightViolation(data_processed) trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count() return {"trafficSignalViolation": trafficSignalViolation_count} def calculate_illegaldrivingorparkingatcrossroads(data_processed): """计算交叉路口违法行驶或停车指标""" trafficlightviolation = TrafficLightViolation(data_processed) illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads() return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count} def calculate_generalroadirregularlaneuse(data_processed): """计算一般道路不按规定车道行驶指标""" warningviolation = WarningViolation(data_processed) generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count() return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count} def calculate_urbanexpresswayorhighwayridelanedivider(data_processed): """计算城市快速路或高速公路骑车道线行驶指标""" warningviolation = WarningViolation(data_processed) urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider() return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count} def calculate_nostraightthrough(data_processed): """计算禁止直行标志牌处直行指标""" trafficsignviolation = TrafficSignViolation(data_processed) noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count() return {"NoStraightThrough": noStraightThrough_count} def calculate_speedlimitviolation(data_processed): """计算违反限速规定指标""" trafficsignviolation = TrafficSignViolation(data_processed) SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count() return {"SpeedLimitViolation": SpeedLimitViolation_count} def calculate_minimumspeedlimitviolation(data_processed): """计算违反最低限速规定指标""" trafficsignviolation = TrafficSignViolation(data_processed) calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count() return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count} # ... 保留原有类定义 ... # 修改 TrafficRegistry 类的 _build_registry 方法 class TrafficRegistry: """交通违规指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() self.data = data_processed self.traffic_config = data_processed.traffic_config["traffic"] self.metrics = self._extract_metrics(self.traffic_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """从配置中提取指标名称""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的交通违规指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """构建指标函数注册表""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" try: registry[metric_name] = globals()[func_name] except KeyError: self.logger.error(f"未实现交通违规指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) # 新增:将每个指标的结果写入日志 self.logger.info(f'交通违规指标[{name}]计算结果: {result}') except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'交通违规指标计算结果:{results}') return results class TrafficManager: """交通违规指标管理类""" def __init__(self, data_processed): self.data = data_processed self.logger = LogManager().get_logger() self.registry = TrafficRegistry(self.data) def report_statistic(self): """计算并报告交通违规指标结果""" traffic_result = self.registry.batch_execute() return traffic_result # ... 保留原有类定义和实现 ... class OvertakingViolation(object): """超车违规类""" def __init__(self, df_data): print("超车违规类初始化中...") self.traffic_violations_type = "超车违规类" # self.logger = log.get_logger() # 使用时再初始化 self.data = df_data.obj_data[1] self.ego_data = ( self.data[OVERTAKE_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame header = self.ego_data.columns if 2 in df_data.obj_id_list: self.data_obj = df_data.obj_data[2] self.obj_data = ( self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame else: self.obj_data = pd.DataFrame(columns=header) if 3 in df_data.obj_id_list: self.other_obj_data1 = df_data.obj_data[3] self.other_obj_data = ( self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True) ) else: self.other_obj_data = pd.DataFrame(columns=header) self.overtake_on_right_count = 0 self.overtake_when_turn_around_count = 0 self.overtake_when_passing_car_count = 0 self.overtake_in_forbid_lane_count = 0 self.overtake_in_ramp_count = 0 self.overtake_in_tunnel_count = 0 self.overtake_on_accelerate_lane_count = 0 self.overtake_on_decelerate_lane_count = 0 self.overtake_in_different_senerios_count = 0 def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy): lane_start = lane_id[0] lane_end = lane_id[-1] start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0 end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0 return lane_start == lane_end and start_condition and end_condition def _is_dxy_of_car(self, ego_df, obj_df): """ :param df: objstate.csv and so on :param id: playerId :param string_type: posX/Y or speedX/Y and so on :return: dataframe of dx/y and so on """ car_dx = obj_df["posX"].values - ego_df["posX"].values car_dy = obj_df["posY"].values - ego_df["posY"].values return car_dx, car_dy # 在前车右侧超车、会车时超车、前车掉头时超车 def illegal_overtake_with_car_detector(self, window_width=250): # 获取csv文件中最短的帧数 frame_id_length = len(self.ego_data["simFrame"]) start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数 while (start_frame_id + window_width) < frame_id_length: simframe_window1 = list( np.arange(start_frame_id, start_frame_id + window_width) ) simframe_window = list(map(int, simframe_window1)) # 读取滑动窗口的dataframe数据 ego_data_frames = self.ego_data[ self.ego_data["simFrame"].isin(simframe_window) ] obj_data_frames = self.obj_data[ self.obj_data["simFrame"].isin(simframe_window) ] other_data_frames = self.other_obj_data[ self.other_obj_data["simFrame"].isin(simframe_window) ] # 读取前后的laneId lane_id = ego_data_frames["lane_id"].tolist() # 读取前后方向盘转角steeringWheel, driverctrl_start_state = ego_data_frames["posH"].iloc[0] driverctrl_end_state = ego_data_frames["posH"].iloc[-1] # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames) ego_speedx = ego_data_frames["speedX"].tolist() ego_speedy = ego_data_frames["speedY"].tolist() obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][ "speedX" ].tolist() obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][ "speedY" ].tolist() if len(other_data_frames) > 0: other_start_speedx = other_data_frames["speedX"].iloc[0] other_start_speedy = other_data_frames["speedY"].iloc[0] if ( ego_speedx[0] * other_start_speedx + ego_speedy[0] * other_start_speedy < 0 ): self.overtake_when_passing_car_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width """ 如果滑动窗口开始和最后的laneid一致; 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左); 自车和前车的位置发生的交换; 则认为右超车 """ if driverctrl_start_state > 0 and driverctrl_end_state < 0: self.overtake_on_right_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0: self.overtake_when_turn_around_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width else: start_frame_id += 1 # 借道超车场景 def overtake_in_forbid_lane_detector(self): simTime = self.obj_data["simTime"].tolist() simtime_devide = self.different_road_area_simtime(simTime) for simtime in simtime_devide: lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)] try: lane_type = lane_overtake["lane_type"].tolist() if (50002 in lane_type and len(set(lane_type)) > 2) or ( 50002 not in lane_type and len(set(lane_type)) > 1 ): self.overtake_in_forbid_lane_count += 1 except Exception as e: print("数据缺少lane_type信息") # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次") # 在匝道超车 def overtake_in_ramp_area_detector(self): ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][ "simTime" ].tolist() ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list) for ramp_simtime in ramp_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)] objstate_in_ramp = self.obj_data[ self.obj_data["simTime"].isin(ramp_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp) ego_speedx = ego_in_ramp["speedX"].tolist() ego_speedy = ego_in_ramp["speedY"].tolist() if len(lane_id) > 0: self.overtake_in_ramp_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue # print(f"在匝道超车{self.overtake_in_ramp_count}次") def overtake_in_tunnel_area_detector(self): tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][ "simTime" ].tolist() tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list) for tunnel_simtime in tunnel_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)] objstate_in_tunnel = self.obj_data[ self.obj_data["simTime"].isin(tunnel_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel) ego_speedx = ego_in_tunnel["speedX"].tolist() ego_speedy = ego_in_tunnel["speedY"].tolist() if len(lane_id) > 0: self.overtake_in_tunnel_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue # print(f"在隧道超车{self.overtake_in_tunnel_count}次") # 加速车道超车 def overtake_on_accelerate_lane_detector(self): accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][ "simTime" ].tolist() accelerate_simTime_list = self.different_road_area_simtime( accelerate_simtime_list ) for accelerate_simtime in accelerate_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_accelerate = self.ego_data[ self.ego_data["simTime"].isin(accelerate_simtime) ] objstate_in_accelerate = self.obj_data[ self.obj_data["simTime"].isin(accelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate) ego_speedx = ego_in_accelerate["speedX"].tolist() ego_speedy = ego_in_accelerate["speedY"].tolist() self.overtake_on_accelerate_lane_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次") # 减速车道超车 def overtake_on_decelerate_lane_detector(self): decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][ "simTime" ].tolist() decelerate_simTime_list = self.different_road_area_simtime( decelerate_simtime_list ) for decelerate_simtime in decelerate_simTime_list: lane_id = self.ego_data["id"].tolist() ego_in_decelerate = self.ego_data[ self.ego_data["simTime"].isin(decelerate_simtime) ] objstate_in_decelerate = self.obj_data[ self.obj_data["simTime"].isin(decelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate) ego_speedx = ego_in_decelerate["speedX"].tolist() ego_speedy = ego_in_decelerate["speedY"].tolist() self.overtake_on_decelerate_lane_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次") # 在交叉路口 def overtake_in_different_senerios_detector(self): crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][ "simTime" ].tolist() # 判断是路口或者隧道区域 # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据 crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)] crossroad_objstate = self.obj_data[ self.obj_data["simTime"].isin(crossroad_simTime) ] # 读取前后的laneId lane_id = crossroad_ego["lane_id"].tolist() # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate) ego_speedx = crossroad_ego["speedX"].tolist() ego_speedy = crossroad_ego["speedY"].tolist() """ 如果滑动窗口开始和最后的laneid一致; 自车和前车的位置发生的交换; 则认为发生超车 """ if len(lane_id) > 0: self.overtake_in_different_senerios_count += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: pass def calculate_overtake_when_passing_car_count(self): self.illegal_overtake_with_car_detector() return self.overtake_when_passing_car_count def calculate_overtake_on_right_count(self): self.illegal_overtake_with_car_detector() return self.overtake_on_right_count def calculate_overtake_when_turn_around_count(self): self.illegal_overtake_with_car_detector() return self.overtake_when_turn_around_count def calculate_overtake_in_forbid_lane_count(self): self.overtake_in_forbid_lane_detector() return self.overtake_in_forbid_lane_count def calculate_overtake_in_ramp_area_count(self): self.overtake_in_ramp_area_detector() return self.overtake_in_ramp_count def calculate_overtake_in_tunnel_area_count(self): self.overtake_in_tunnel_area_detector() return self.overtake_in_tunnel_count def calculate_overtake_on_accelerate_lane_count(self): self.overtake_on_accelerate_lane_detector() return self.overtake_on_accelerate_lane_count def calculate_overtake_on_decelerate_lane_count(self): self.overtake_on_decelerate_lane_detector() return self.overtake_on_decelerate_lane_count def calculate_overtake_in_different_senerios_count(self): self.overtake_in_different_senerios_detector() return self.overtake_in_different_senerios_count class SlowdownViolation(object): """减速让行违规类""" def __init__(self, df_data): print("减速让行违规类-------------------------") self.traffic_violations_type = "减速让行违规类" self.object_items = [] self.data = df_data.obj_data[1] self.ego_data = ( self.data[SLOWDOWN_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame self.pedestrian_data = pd.DataFrame() self.object_items = set(df_data.object_df.type.tolist()) if 13 in self.object_items: # 行人的type是13 self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13] self.pedestrian_data = ( self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True) ) self.slow_down_in_crosswalk_count = 0 self.avoid_pedestrian_in_crosswalk_count = 0 self.avoid_pedestrian_in_the_road_count = 0 self.aviod_pedestrian_when_turning_count = 0 def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.6): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def slow_down_in_crosswalk_detector(self): # 筛选出路口或隧道区域的时间点 crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_divide: # 筛选出当前时间段内的数据 # start_time, end_time = crosswalk_simtime start_time = crosswalk_simtime[0] end_time = crosswalk_simtime[-1] print(f"当前时间段:{start_time} - {end_time}") crosswalk_objstate = self.ego_data[ (self.ego_data["simTime"] >= start_time) & (self.ego_data["simTime"] <= end_time) ] # 计算车辆速度 ego_speedx = np.array(crosswalk_objstate["speedX"].tolist()) ego_speedy = np.array(crosswalk_objstate["speedY"].tolist()) ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2) # 判断是否超速 if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s self.slow_down_in_crosswalk_count += 1 # 输出总次数 print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次") def avoid_pedestrian_in_crosswalk_detector(self): crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_devide: if not self.pedestrian_data.empty: crosswalk_objstate = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(crosswalk_simtime) ] else: crosswalk_objstate = pd.DataFrame() if len(crosswalk_objstate) > 0: pedestrian_simtime = crosswalk_objstate["simTime"] pedestrian_objstate = crosswalk_objstate[ crosswalk_objstate["simTime"].isin(pedestrian_simtime) ] ego_speed = np.sqrt( pedestrian_objstate["speedX"] ** 2 + pedestrian_objstate["speedY"] ** 2 ) if ego_speed.any() > 0: self.avoid_pedestrian_in_crosswalk_count += 1 def avoid_pedestrian_in_the_road_detector(self): simtime = self.pedestrian_in_front_of_car() if len(simtime) == 0: self.avoid_pedestrian_in_the_road_count += 0 else: pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime) ] simTime = pedestrian_on_the_road["simTime"].tolist() simTime_devide = self.different_road_area_simtime(simTime) for simtime1 in simTime_devide: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime1) ] ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))] dist = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) speed = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) data = {"dist": dist, "speed": speed} new_ego_car = pd.DataFrame(data) new_ego_car = new_ego_car.assign( Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0) ) if new_ego_car["Column3"].any(): self.avoid_pedestrian_in_the_road_count += 1 def aviod_pedestrian_when_turning_detector(self): pedestrian_simtime_list = self.pedestrian_in_front_of_car() if len(pedestrian_simtime_list) > 0: simtime_list = self.ego_data[ (self.ego_data["simTime"].isin(pedestrian_simtime_list)) & (self.ego_data["lane_type"] == 20) ]["simTime"].tolist() simTime_list = self.different_road_area_simtime(simtime_list) pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime_list) ] for simtime in simTime_list: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime) ] ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))] ego_car["dist"] = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) ego_car["speed"] = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) if any(ego_car["speed"].tolist()) != 0: self.aviod_pedestrian_when_turning_count += 1 def calculate_slow_down_in_crosswalk_count(self): self.slow_down_in_crosswalk_detector() return self.slow_down_in_crosswalk_count def calculate_avoid_pedestrian_in_the_crosswalk_count(self): self.avoid_pedestrian_in_crosswalk_detector() return self.avoid_pedestrian_in_crosswalk_count def calculate_avoid_pedestrian_in_the_road_count(self): self.avoid_pedestrian_in_the_road_detector() return self.avoid_pedestrian_in_the_road_count def calculate_avoid_pedestrian_when_turning_count(self): self.aviod_pedestrian_when_turning_detector() return self.aviod_pedestrian_when_turning_count class TurnaroundViolation(object): def __init__(self, df_data): print("掉头违规类初始化中...") self.traffic_violations_type = "掉头违规类" self.data = df_data.obj_data[1] self.ego_data = ( self.data[TURNAROUND_INFO].copy().reset_index(drop=True) ) # Copy to avoid modifying the original DataFrame self.pedestrian_data = pd.DataFrame() self.object_items = set(df_data.object_df.type.tolist()) if 13 in self.object_items: # 行人的type是13 self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13] self.pedestrian_data = ( self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True) ) self.turning_in_forbiden_turn_back_sign_count = 0 self.turning_in_forbiden_turn_left_sign_count = 0 self.avoid_pedestrian_when_turn_back_count = 0 def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def turn_back_in_forbiden_sign_detector(self): """ 禁止掉头type = 8 """ forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][ "simTime" ].tolist() forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][ "simTime" ].tolist() forbiden_turn_back_simtime_devide = self.different_road_area_simtime( forbiden_turn_back_simTime ) forbiden_turn_left_simtime_devide = self.different_road_area_simtime( forbiden_turn_left_simTime ) for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide: ego_car1 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime)) ] ego_start_speedx1 = ego_car1["speedX"].iloc[0] ego_start_speedy1 = ego_car1["speedY"].iloc[0] ego_end_speedx1 = ego_car1["speedX"].iloc[-1] ego_end_speedy1 = ego_car1["speedY"].iloc[-1] if ( ego_end_speedx1 * ego_start_speedx1 + ego_end_speedy1 * ego_start_speedy1 < 0 ): self.turning_in_forbiden_turn_back_sign_count += 1 for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide: ego_car2 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime)) ] ego_start_speedx2 = ego_car2["speedX"].iloc[0] ego_start_speedy2 = ego_car2["speedY"].iloc[0] ego_end_speedx2 = ego_car2["speedX"].iloc[-1] ego_end_speedy2 = ego_car2["speedY"].iloc[-1] if ( ego_end_speedx2 * ego_start_speedx2 + ego_end_speedy2 * ego_start_speedy2 < 0 ): self.turning_in_forbiden_turn_left_sign_count += 1 def avoid_pedestrian_when_turn_back_detector(self): sensor_on_intersection = self.pedestrian_in_front_of_car() avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[ self.ego_data["lane_type"] == 20 ]["simTime"].tolist() avoid_pedestrian_when_turn_back_simTime_devide = ( self.different_road_area_simtime( avoid_pedestrian_when_turn_back_simTime_list ) ) if len(sensor_on_intersection) > 0: for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide: pedestrian_in_intersection_simtime = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( avoid_pedestrian_when_turn_back_simtime ) ].tolist() ego_df = self.ego_data[ self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime) ].reset_index(drop=True) pedestrian_df = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( pedestrian_in_intersection_simtime ) ].reset_index(drop=True) ego_df["dist"] = np.sqrt( (ego_df["posx"] - pedestrian_df["posx"]) ** 2 + (ego_df["posy"] - pedestrian_df["posy"]) ** 2 ) ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2) if any(ego_df["speed"].tolist()) != 0: self.avoid_pedestrian_when_turn_back_count += 1 def calculate_turn_in_forbiden_turn_left_sign_count(self): self.turn_back_in_forbiden_sign_detector() return self.turning_in_forbiden_turn_left_sign_count def calculate_turn_in_forbiden_turn_back_sign_count(self): self.turn_back_in_forbiden_sign_detector() return self.turning_in_forbiden_turn_back_sign_count def calaulate_avoid_pedestrian_when_turn_back_count(self): self.avoid_pedestrian_when_turn_back_detector() return self.avoid_pedestrian_when_turn_back_count class WrongWayViolation: """停车违规类""" def __init__(self, df_data): print("停车违规类初始化中...") self.traffic_violations_type = "停车违规类" self.data = df_data.obj_data[1] # 初始化违规统计 self.violation_count = { "urbanExpresswayOrHighwayDrivingLaneStopped": 0, "urbanExpresswayOrHighwayEmergencyLaneStopped": 0, "urbanExpresswayEmergencyLaneDriving": 0, } def process_violations(self): """处理停车或者紧急车道行驶违规数据""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} driving_lane = {1, 4, 5, 6} emergency_lane = {12} self.data["v"] *= 3.6 # 转换速度 # 使用向量化和条件判断进行违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(driving_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] != 0) ), ] violation_types = [ "urbanExpresswayOrHighwayDrivingLaneStopped", "urbanExpresswayOrHighwayEmergencyLaneStopped", "urbanExpresswayEmergencyLaneDriving", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计违规情况 self.violation_count = ( self.data["violation_type"] .value_counts() .reindex(violation_types, fill_value=0) .to_dict() ) def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self): self.process_violations() return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"] def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self): self.process_violations() return self.violation_count["urbanExpresswayEmergencyLaneDriving"] def calculate_urbanExpresswayEmergencyLaneDriving(self): self.process_violations() return self.violation_count["urbanExpresswayEmergencyLaneDriving"] class SpeedingViolation(object): """超速违规类""" """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息""" def __init__(self, df_data): print("超速违规类初始化中...") self.traffic_violations_type = "超速违规类" self.data = df_data.obj_data[ 1 ] # Copy to avoid modifying the original DataFrame # 初始化违规统计 self.violation_counts = { "urbanExpresswayOrHighwaySpeedOverLimit50": 0, "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0, "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0, "urbanExpresswayOrHighwaySpeedUnderLimit": 0, "generalRoadSpeedOverLimit50": 0, "generalRoadSpeedOverLimit20to50": 0, } def process_violations(self): """处理数据帧,检查超速和其他违规行为""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合 general_road = {3} # 直接创建包含一个元素的集合 self.data["v"] *= 3.6 # 转换速度 # 违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"]) & (self.data["v"] <= self.data["road_speed_max"] * 1.2) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] < self.data["road_speed_min"]) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ] violation_types = [ "urbanExpresswayOrHighwaySpeedOverLimit50", "urbanExpresswayOrHighwaySpeedOverLimit20to50", "urbanExpresswayOrHighwaySpeedOverLimit0to20", "urbanExpresswayOrHighwaySpeedUnderLimit", "generalRoadSpeedOverLimit50", "generalRoadSpeedOverLimit20to50", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计各类违规情况 self.violation_counts = self.data["violation_type"].value_counts().to_dict() def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self): self.process_violations() return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit50") else 0 def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0 def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0 def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedUnderLimit") else 0 def calculate_generalRoadSpeedOverLimit50(self): self.process_violations() return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get( "generalRoadSpeedOverLimit50") else 0 def calculate_generalRoadSpeedOverLimit20to50_count(self): self.process_violations() return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get( "generalRoadSpeedOverLimit20to50") else 0 class TrafficLightViolation(object): """违反交通灯类""" """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯""" def __init__(self, df_data): """初始化方法""" self.traffic_violations_type = "违反交通灯类" print("违反交通灯类 类初始化中...") self.config = df_data.vehicle_config self.data_ego = df_data.ego_data # 获取数据 self.violation_counts = { "trafficSignalViolation": 0, "illegalDrivingOrParkingAtCrossroads": 0, } # 处理数据并判定违规 self.process_violations() def is_point_cross_line(self, point, stop_line_points): """ 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。 :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制) :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]] :return: True 如果车辆跨越了停止线,否则 False """ line_vector = np.array( [ stop_line_points[1][0] - stop_line_points[0][0], stop_line_points[1][1] - stop_line_points[0][1], ] ) point_vector = np.array( [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]] ) cross_product = np.cross(line_vector, point_vector) if cross_product != 0: return False mid_point = ( np.array([stop_line_points[0][0], stop_line_points[0][1]]) + 0.5 * line_vector ) axletree_to_mid_vector = np.array( [point[0] - mid_point[0], point[1] - mid_point[1]] ) direction_vector = np.array([math.cos(point[2]), math.sin(point[2])]) norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector) norm_direction = np.linalg.norm(direction_vector) if norm_axletree_to_mid == 0 or norm_direction == 0: return False cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / ( norm_axletree_to_mid * norm_direction ) angle_theta = math.degrees(math.acos(cos_theta)) return angle_theta <= 90 def _filter_data(self): """过滤数据,筛选出需要分析的记录""" return self.data_ego[ (self.data_ego["stopline_id"] != -1) & (self.data_ego["stopline_type"] == 1) & (self.data_ego["trafficlight_id"] != -1) ] def _group_data(self, filtered_data): """按时间差对数据进行分组""" filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0) threshold = 0.5 filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum() return filtered_data.groupby("group") def _analyze_group(self, group_data): """分析单个分组的数据,判断是否闯红灯""" photos = [] stop_in_intersection = False for _, row in group_data.iterrows(): vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]]) stop_line_points = [ [row["stopline_x1"], row["stopline_y1"]], [row["stopline_x2"], row["stopline_y2"]], ] traffic_light_status = row["traffic_light_status"] heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])]) heading_vector = heading_vector / np.linalg.norm(heading_vector) # with open(self.config_path / "vehicle_config.yaml", 'r') as f: # config = yaml.load(f, Loader=yaml.FullLoader) front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector dist = math.sqrt( (row["posX"] - row["traffic_light_x"]) ** 2 + (row["posY"] - row["traffic_light_y"]) ** 2 ) if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01: has_crossed_line_front = ( self.is_point_cross_line(front_wheel_pos, stop_line_points) and traffic_light_status == 1 ) has_crossed_line_rear = ( self.is_point_cross_line(rear_wheel_pos, stop_line_points) and row["v"] > 0 and traffic_light_status == 1 ) has_stop_in_intersection = has_crossed_line_front and row["v"] == 0 has_passed_intersection = has_crossed_line_front and dist < 1.0 # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}') photos.extend( [ has_crossed_line_front, has_crossed_line_rear, has_passed_intersection, has_stop_in_intersection, ] ) stop_in_intersection = has_passed_intersection return photos, stop_in_intersection def is_vehicle_run_a_red_light(self): """判断车辆是否闯红灯""" filtered_data = self._filter_data() grouped_data = self._group_data(filtered_data) self.photos_group = [] self.stop_in_intersections = [] for _, group_data in grouped_data: photos, stop_in_intersection = self._analyze_group(group_data) self.photos_group.append(photos) self.stop_in_intersections.append(stop_in_intersection) def process_violations(self): """处理数据并判定违规""" self.is_vehicle_run_a_red_light() count_1 = sum(all(photos) for photos in self.photos_group) count_2 = sum( stop_in_intersection for stop_in_intersection in self.stop_in_intersections ) self.violation_counts["trafficSignalViolation"] = count_1 self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2 def calculate_trafficSignalViolation_count(self): self.process_violations() return self.violation_counts["trafficSignalViolation"] def calculate_illegalDrivingOrParkingAtCrossroads(self): self.process_violations() return self.violation_counts["illegalDrivingOrParkingAtCrossroads"] class WarningViolation(object): """警告性违规类""" def __init__(self, df_data): self.traffic_violations_type = "警告性违规类" print("警告性违规类 类初始化中...") self.config = df_data.vehicle_config self.data_ego = df_data.obj_data[1] self.data = self.data_ego.copy() # 避免修改原始 DataFrame self.violation_counts = { "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶 "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线 } def process_violations(self): general_road = {3} # 普通道路 lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道 # with open(self.config_path / "vehicle_config.yaml", 'r') as f: # config = yaml.load(f, Loader=yaml.FullLoader) car_width = self.config["CAR_WIDTH"] lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在 # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶 # 使用布尔索引来筛选满足条件的行 condition = (self.data["road_fc"].isin(general_road)) & ( self.data["lane_type"].isin(lane_type) ) # 创建一个新的列,并根据条件设置值 self.data["is_violation"] = condition # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新骑行车道线违规计数 self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments) # 机动车在高速公路或者城市快速路上骑、轧车行道分界线 # 计算阈值 threshold = (lane_width - car_width) / 2 # 找到满足条件的行 self.data["is_violation"] = self.data["laneOffset"] > threshold # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新骑行车道线违规计数 self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len( violation_segments ) def count_continuous_violations(self, violation_series, time_series): """统计连续违规的时间段数量""" continuous_segments = [] current_segment = [] for is_violation, time in zip(violation_series, time_series): if is_violation: if not current_segment: # 新的连续段开始 current_segment.append(time) else: if current_segment: # 连续段结束 continuous_segments.append(current_segment) current_segment = [] # 检查是否有一个未结束的连续段在最后 if current_segment: continuous_segments.append(current_segment) return continuous_segments def calculate_generalRoadIrregularLaneUse_count(self): self.process_violations() return self.violation_counts["generalRoadIrregularLaneUse"] def calculate_urbanExpresswayOrHighwayRideLaneDivider(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] class TrafficSignViolation: """交通标志违规类""" PROHIBITED_STRAIGHT_THRESHOLD = 5 SIGN_TYPE_STRAIGHT_PROHIBITED = 7 SIGN_TYPE_SPEED_LIMIT = 12 SIGN_TYPE_MIN_SPEED_LIMIT = 13 def __init__(self, df_data): self.traffic_violations_type = "交通标志违规类" print("交通标志违规类 初始化中...") # 数据预处理 self._raw_data = df_data.obj_data[1].copy() self.data_ego = self._raw_data.sort_values('simTime').reset_index(drop=True) # 延迟计算标志 self._calculated = False self._violation_counts = { "NoStraightThrough": 0, "SpeedLimitViolation": 0, "MinimumSpeedLimitViolation": 0 } def _ensure_calculated(self): """保证计算只执行一次""" if not self._calculated: self._check_prohibition_violations() self._check_instruction_violations() self._calculated = True def calculate_NoStraightThrough_count(self): """计算禁止直行违规次数""" self._ensure_calculated() return self._violation_counts["NoStraightThrough"] def calculate_SpeedLimitViolation_count(self): """计算超速违规次数""" self._ensure_calculated() return self._violation_counts["SpeedLimitViolation"] def calculate_MinimumSpeedLimitViolation_count(self): """计算最低限速违规次数""" self._ensure_calculated() return self._violation_counts["MinimumSpeedLimitViolation"] def _check_prohibition_violations(self): """处理禁令标志违规(禁止直行和限速)""" self._check_straight_violation() self._check_speed_violation( self.SIGN_TYPE_SPEED_LIMIT, operator.gt, "SpeedLimitViolation" ) def _check_instruction_violations(self): """处理指示标志违规(最低限速)""" self._check_speed_violation( self.SIGN_TYPE_MIN_SPEED_LIMIT, operator.lt, "MinimumSpeedLimitViolation" ) def _check_straight_violation(self): """检查禁止直行违规""" straight_df = self.data_ego[self.data_ego["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED] if not straight_df.empty: # 计算航向角变化并填充缺失值 straight_df = straight_df.copy() straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0) # 创建筛选条件 mask = ( (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) & (straight_df['v'] > 0) ) self.violation_counts["NoStraightThrough"] = mask.sum() def _check_speed_violation(self, sign_type, compare_op, count_key): """通用速度违规检查方法""" violation_df = self.data_ego[self.data_ego["sign_type1"] == sign_type] if not violation_df.empty: mask = compare_op(violation_df['v'], violation_df['sign_speed']) self.violation_counts[count_key] = mask.sum()