# ... 保留原有导入和常量定义 ... import math import operator import copy import numpy as np import pandas as pd from typing import Dict, Any, List, Optional from modules.lib import log_manager from modules.lib.score import Score from modules.lib.log_manager import LogManager from modules.lib import data_process OVERTAKE_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "posH", "lane_id", "lane_type", "road_type", "interid", "crossid", ] SLOWDOWN_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "crossid", "lane_type", ] TURNAROUND_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "posX", "posY", "sign_type1", "lane_type", ] TRFFICSIGN_INFO = [ "simTime", "simFrame", "playerId", "speedX", "speedY", "v", "posX", "posY", "sign_type1", "sign_ref_link", "sign_x", "sign_y", ] # 修改指标函数名称为 calculate_xxx 格式 def calculate_overtake_when_passing_car(data_processed): """计算会车时超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count() return {"overtake_when_passing_car": overtake_when_passing_car_count} def calculate_overtake_on_right(data_processed): """计算右侧超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count() return {"overtake_on_right": overtake_on_right_count} def calculate_overtake_when_turn_around(data_processed): """计算掉头时超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count() return {"overtake_when_turn_around": overtake_when_turn_around_count} def calculate_overtake_in_forbid_lane(data_processed): """计算在禁止车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count() return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count} def calculate_overtake_in_ramp(data_processed): """计算在匝道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count() return {"overtake_in_ramp": overtake_in_ramp_area_count} def calculate_overtake_in_tunnel(data_processed): """计算在隧道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count() return {"overtake_in_tunnel": overtake_in_tunnel_area_count} def calculate_overtake_on_accelerate_lane(data_processed): """计算在加速车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count() return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count} def calculate_overtake_on_decelerate_lane(data_processed): """计算在减速车道超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count() return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count} def calculate_overtake_in_different_senerios(data_processed): """计算在不同场景超车指标""" overtakingviolation = OvertakingViolation(data_processed) overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count() return {"overtake_in_different_senerios": overtake_in_different_senerios_count} def calculate_slow_down_in_crosswalk(data_processed): """计算在人行横道减速指标""" slowdownviolation = SlowdownViolation(data_processed) slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count() return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count} def calculate_avoid_pedestrian_in_crosswalk(data_processed): """计算在人行横道避让行人指标""" avoidpedestrianincrosswalk = SlowdownViolation(data_processed) avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count() return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count} def calculate_avoid_pedestrian_in_the_road(data_processed): """计算在道路上避让行人指标""" avoidpedestrianintheroad = SlowdownViolation(data_processed) avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count() return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count} def calculate_avoid_pedestrian_when_turning(data_processed): """计算转弯时避让行人指标""" avoidpedestrianwhenturning = SlowdownViolation(data_processed) avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count() return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count} def calculate_turn_in_forbiden_turn_left_sign(data_processed): """计算在禁止左转标志处左转指标""" turnaroundviolation = TurnaroundViolation(data_processed) turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count() return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count} def calculate_turn_in_forbiden_turn_back_sign(data_processed): """计算在禁止掉头标志处掉头指标""" turnaroundviolation = TurnaroundViolation(data_processed) turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count() return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count} def calculate_avoid_pedestrian_when_turn_back(data_processed): """计算掉头时避让行人指标""" turnaroundviolation = TurnaroundViolation(data_processed) avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count() return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count} def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed): """计算城市快速路或高速公路行车道停车指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count() return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count} def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed): """计算城市快速路或高速公路应急车道停车指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count() return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count} def calculate_urbanexpresswayemergencylanedriving(data_processed): """计算城市快速路应急车道行驶指标""" wrongwayviolation = WrongWayViolation(data_processed) urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving() return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count} def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed): """计算城市快速路或高速公路超速50%以上指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count() return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count} def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed): """计算城市快速路或高速公路超速20%-50%指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count() return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count} def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed): """计算城市快速路或高速公路超速0-20%指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count() return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count} def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed): """计算城市快速路或高速公路低于最低限速指标""" speedingviolation = SpeedingViolation(data_processed) urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count() return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count} def calculate_generalroadspeedoverlimit50(data_processed): """计算一般道路超速50%以上指标""" speedingviolation = SpeedingViolation(data_processed) generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50() return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count} def calculate_generalroadspeedoverlimit20to50(data_processed): """计算一般道路超速20%-50%指标""" speedingviolation = SpeedingViolation(data_processed) generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count() return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count} def calculate_trafficsignalviolation(data_processed): """计算交通信号违规指标""" trafficlightviolation = TrafficLightViolation(data_processed) trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count() return {"trafficSignalViolation": trafficSignalViolation_count} def calculate_illegaldrivingorparkingatcrossroads(data_processed): """计算交叉路口违法行驶或停车指标""" trafficlightviolation = TrafficLightViolation(data_processed) illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads() return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count} def calculate_generalroadirregularlaneuse(data_processed): """计算一般道路不按规定车道行驶指标""" warningviolation = WarningViolation(data_processed) generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count() return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count} def calculate_urbanexpresswayorhighwayridelanedivider(data_processed): """计算城市快速路或高速公路骑车道线行驶指标""" warningviolation = WarningViolation(data_processed) urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider_count() return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count} def calculate_nostraightthrough(data_processed): """计算禁止直行标志牌处直行指标""" trafficsignviolation = TrafficSignViolation(data_processed) noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count() return {"NoStraightThrough": noStraightThrough_count} def calculate_speedlimitviolation(data_processed): """计算违反限速规定指标""" trafficsignviolation = TrafficSignViolation(data_processed) SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count() return {"SpeedLimitViolation": SpeedLimitViolation_count} def calculate_minimumspeedlimitviolation(data_processed): """计算违反最低限速规定指标""" trafficsignviolation = TrafficSignViolation(data_processed) calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count() return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count} # 修改 TrafficRegistry 类的 _build_registry 方法 class TrafficRegistry: """交通违规指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() self.data = data_processed self.traffic_config = data_processed.traffic_config["traffic"] self.metrics = self._extract_metrics(self.traffic_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """从配置中提取指标名称""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的交通违规指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """构建指标函数注册表""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" try: registry[metric_name] = globals()[func_name] except KeyError: self.logger.error(f"未实现交通违规指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) # 新增:将每个指标的结果写入日志 self.logger.info(f'交通违规指标[{name}]计算结果: {result}') except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'交通违规指标计算结果:{results}') return results class TrafficManager: """交通违规指标管理类""" def __init__(self, data_processed): self.data = data_processed self.logger = LogManager().get_logger() self.registry = TrafficRegistry(self.data) def report_statistic(self): """计算并报告交通违规指标结果""" traffic_result = self.registry.batch_execute() return traffic_result class OvertakingViolation(object): """超车违规类""" def __init__(self, df_data): print("超车违规类初始化中...") self.traffic_violations_type = "超车违规类" # 存储原始数据引用,不进行拷贝 self._raw_data = df_data.obj_data[1] # 自车数据 # 安全获取其他车辆数据 self._data_obj = None self._other_obj_data1 = None # 检查是否存在ID为2的对象数据 if 2 in df_data.obj_id_list: self._data_obj = df_data.obj_data[2] # 检查是否存在ID为3的对象数据 if 3 in df_data.obj_id_list: self._other_obj_data1 = df_data.obj_data[3] # 初始化属性,但不立即创建数据副本 self._ego_data = None self._obj_data = None self._other_obj_data = None # 使用字典统一管理违规计数器 self.violation_counts = { "overtake_on_right": 0, "overtake_when_turn_around": 0, "overtake_when_passing_car": 0, "overtake_in_forbid_lane": 0, "overtake_in_ramp": 0, "overtake_in_tunnel": 0, "overtake_on_accelerate_lane": 0, "overtake_on_decelerate_lane": 0, "overtake_in_different_senerios": 0 } # 标记计算状态 self._calculated = { "illegal_overtake": False, "forbid_lane": False, "ramp_area": False, "tunnel_area": False, "accelerate_lane": False, "decelerate_lane": False, "different_senerios": False } @property def ego_data(self): """懒加载方式获取ego数据,只在首次访问时创建副本""" if self._ego_data is None: self._ego_data = self._raw_data[OVERTAKE_INFO].copy().reset_index(drop=True) return self._ego_data @property def obj_data(self): """懒加载方式获取obj数据""" if self._obj_data is None: if self._data_obj is not None: self._obj_data = self._data_obj[OVERTAKE_INFO].copy().reset_index(drop=True) else: # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同 self._obj_data = pd.DataFrame(columns=OVERTAKE_INFO) return self._obj_data @property def other_obj_data(self): """懒加载方式获取other_obj数据""" if self._other_obj_data is None: if self._other_obj_data1 is not None: self._other_obj_data = self._other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True) else: # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同 self._other_obj_data = pd.DataFrame(columns=OVERTAKE_INFO) return self._other_obj_data def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy): lane_start = lane_id[0] lane_end = lane_id[-1] start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0 end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0 return lane_start == lane_end and start_condition and end_condition def _is_dxy_of_car(self, ego_df, obj_df): """ :param df: objstate.csv and so on :param id: playerId :param string_type: posX/Y or speedX/Y and so on :return: dataframe of dx/y and so on """ car_dx = obj_df["posX"].values - ego_df["posX"].values car_dy = obj_df["posY"].values - ego_df["posY"].values return car_dx, car_dy def illegal_overtake_with_car_detector(self, window_width=250): """检测超车违规""" # 如果已经计算过,直接返回 if self._calculated["illegal_overtake"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测超车违规,默认为0") self._calculated["illegal_overtake"] = True return # 获取csv文件中最短的帧数 frame_id_length = len(self.ego_data["simFrame"]) start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数 while (start_frame_id + window_width) < frame_id_length: simframe_window1 = list( np.arange(start_frame_id, start_frame_id + window_width) ) simframe_window = list(map(int, simframe_window1)) # 读取滑动窗口的dataframe数据 ego_data_frames = self.ego_data[ self.ego_data["simFrame"].isin(simframe_window) ] # 确保有足够的数据进行处理 if len(ego_data_frames) == 0: start_frame_id += 1 continue obj_data_frames = self.obj_data[ self.obj_data["simFrame"].isin(simframe_window) ] # 如果没有其他车辆数据,跳过当前窗口 if len(obj_data_frames) == 0: start_frame_id += 1 continue other_data_frames = self.other_obj_data[ self.other_obj_data["simFrame"].isin(simframe_window) ] # 读取前后的laneId lane_id = ego_data_frames["lane_id"].tolist() # 读取前后方向盘转角steeringWheel driverctrl_start_state = ego_data_frames["posH"].iloc[0] driverctrl_end_state = ego_data_frames["posH"].iloc[-1] # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames) ego_speedx = ego_data_frames["speedX"].tolist() ego_speedy = ego_data_frames["speedY"].tolist() # 安全获取obj_speedx和obj_speedy obj_with_id_2 = obj_data_frames[obj_data_frames["playerId"] == 2] if not obj_with_id_2.empty: obj_speedx = obj_with_id_2["speedX"].tolist() obj_speedy = obj_with_id_2["speedY"].tolist() else: obj_speedx = [] obj_speedy = [] # 检查会车时超车 if len(other_data_frames) > 0: other_start_speedx = other_data_frames["speedX"].iloc[0] other_start_speedy = other_data_frames["speedY"].iloc[0] if ( ego_speedx[0] * other_start_speedx + ego_speedy[0] * other_start_speedy < 0 ): self.violation_counts["overtake_when_passing_car"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width continue # 检查右侧超车 if driverctrl_start_state > 0 and driverctrl_end_state < 0: self.violation_counts["overtake_on_right"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width continue # 检查掉头时超车 if obj_speedx and obj_speedy: # 确保列表不为空 if ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0: self.violation_counts["overtake_when_turn_around"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) start_frame_id += window_width continue # 如果没有检测到任何违规,移动窗口 start_frame_id += 1 self._calculated["illegal_overtake"] = True # 借道超车场景 def overtake_in_forbid_lane_detector(self): """检测借道超车违规""" # 如果已经计算过,直接返回 if self._calculated["forbid_lane"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测借道超车违规,默认为0") self._calculated["forbid_lane"] = True return simTime = self.obj_data["simTime"].tolist() simtime_devide = self.different_road_area_simtime(simTime) for simtime in simtime_devide: lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)] try: lane_type = lane_overtake["lane_type"].tolist() if (50002 in lane_type and len(set(lane_type)) > 2) or ( 50002 not in lane_type and len(set(lane_type)) > 1 ): self.violation_counts["overtake_in_forbid_lane"] += 1 except Exception as e: print("数据缺少lane_type信息") self._calculated["forbid_lane"] = True # 在匝道超车 def overtake_in_ramp_area_detector(self): """检测匝道超车违规""" # 如果已经计算过,直接返回 if self._calculated["ramp_area"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测匝道超车违规,默认为0") self._calculated["ramp_area"] = True return ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][ "simTime" ].tolist() ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list) for ramp_simtime in ramp_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)] objstate_in_ramp = self.obj_data[ self.obj_data["simTime"].isin(ramp_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp) ego_speedx = ego_in_ramp["speedX"].tolist() ego_speedy = ego_in_ramp["speedY"].tolist() if len(lane_id) > 0: self.violation_counts["overtake_in_ramp"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue self._calculated["ramp_area"] = True def overtake_in_tunnel_area_detector(self): """检测隧道超车违规""" # 如果已经计算过,直接返回 if self._calculated["tunnel_area"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测隧道超车违规,默认为0") self._calculated["tunnel_area"] = True return tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][ "simTime" ].tolist() tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list) for tunnel_simtime in tunnel_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)] objstate_in_tunnel = self.obj_data[ self.obj_data["simTime"].isin(tunnel_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel) ego_speedx = ego_in_tunnel["speedX"].tolist() ego_speedy = ego_in_tunnel["speedY"].tolist() if len(lane_id) > 0: self.violation_counts["overtake_in_tunnel"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) else: continue self._calculated["tunnel_area"] = True # 加速车道超车 def overtake_on_accelerate_lane_detector(self): """检测加速车道超车违规""" # 如果已经计算过,直接返回 if self._calculated["accelerate_lane"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测加速车道超车违规,默认为0") self._calculated["accelerate_lane"] = True return accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][ "simTime" ].tolist() accelerate_simTime_list = self.different_road_area_simtime( accelerate_simtime_list ) for accelerate_simtime in accelerate_simTime_list: lane_id = self.ego_data["lane_id"].tolist() ego_in_accelerate = self.ego_data[ self.ego_data["simTime"].isin(accelerate_simtime) ] objstate_in_accelerate = self.obj_data[ self.obj_data["simTime"].isin(accelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate) ego_speedx = ego_in_accelerate["speedX"].tolist() ego_speedy = ego_in_accelerate["speedY"].tolist() self.violation_counts["overtake_on_accelerate_lane"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) self._calculated["accelerate_lane"] = True # 减速车道超车 def overtake_on_decelerate_lane_detector(self): """检测减速车道超车违规""" # 如果已经计算过,直接返回 if self._calculated["decelerate_lane"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测减速车道超车违规,默认为0") self._calculated["decelerate_lane"] = True return decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][ "simTime" ].tolist() decelerate_simTime_list = self.different_road_area_simtime( decelerate_simtime_list ) for decelerate_simtime in decelerate_simTime_list: lane_id = self.ego_data["id"].tolist() ego_in_decelerate = self.ego_data[ self.ego_data["simTime"].isin(decelerate_simtime) ] objstate_in_decelerate = self.obj_data[ self.obj_data["simTime"].isin(decelerate_simtime) ] dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate) ego_speedx = ego_in_decelerate["speedX"].tolist() ego_speedy = ego_in_decelerate["speedY"].tolist() self.violation_counts["overtake_on_decelerate_lane"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) self._calculated["decelerate_lane"] = True # 在交叉路口 def overtake_in_different_senerios_detector(self): """检测不同场景超车违规""" # 如果已经计算过,直接返回 if self._calculated["different_senerios"]: return # 如果没有其他车辆数据,直接返回,保持默认值0 if self.obj_data.empty: print("没有其他车辆数据,无法检测不同场景超车违规,默认为0") self._calculated["different_senerios"] = True return crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][ "simTime" ].tolist() # 判断是路口或者隧道区域 # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据 crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)] crossroad_objstate = self.obj_data[ self.obj_data["simTime"].isin(crossroad_simTime) ] # 读取前后的laneId lane_id = crossroad_ego["lane_id"].tolist() # 读取车辆前后的位置信息 dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate) ego_speedx = crossroad_ego["speedX"].tolist() ego_speedy = crossroad_ego["speedY"].tolist() """ 如果滑动窗口开始和最后的laneid一致; 自车和前车的位置发生的交换; 则认为发生超车 """ if len(lane_id) > 0: self.violation_counts["overtake_in_different_senerios"] += self._is_overtake( lane_id, dx, dy, ego_speedx, ego_speedy ) self._calculated["different_senerios"] = True def calculate_overtake_when_passing_car_count(self): """计算会车时超车违规次数""" self.illegal_overtake_with_car_detector() return self.violation_counts["overtake_when_passing_car"] def calculate_overtake_on_right_count(self): """计算右侧超车违规次数""" self.illegal_overtake_with_car_detector() return self.violation_counts["overtake_on_right"] def calculate_overtake_when_turn_around_count(self): """计算掉头时超车违规次数""" self.illegal_overtake_with_car_detector() return self.violation_counts["overtake_when_turn_around"] def calculate_overtake_in_forbid_lane_count(self): """计算借道超车违规次数""" self.overtake_in_forbid_lane_detector() return self.violation_counts["overtake_in_forbid_lane"] def calculate_overtake_in_ramp_area_count(self): """计算匝道超车违规次数""" self.overtake_in_ramp_area_detector() return self.violation_counts["overtake_in_ramp"] def calculate_overtake_in_tunnel_area_count(self): """计算隧道超车违规次数""" self.overtake_in_tunnel_area_detector() return self.violation_counts["overtake_in_tunnel"] def calculate_overtake_on_accelerate_lane_count(self): """计算加速车道超车违规次数""" self.overtake_on_accelerate_lane_detector() return self.violation_counts["overtake_on_accelerate_lane"] def calculate_overtake_on_decelerate_lane_count(self): """计算减速车道超车违规次数""" self.overtake_on_decelerate_lane_detector() return self.violation_counts["overtake_on_decelerate_lane"] def calculate_overtake_in_different_senerios_count(self): """计算不同场景超车违规次数""" self.overtake_in_different_senerios_detector() return self.violation_counts["overtake_in_different_senerios"] class SlowdownViolation(object): """减速让行违规类""" def __init__(self, df_data): print("减速让行违规类-------------------------") self.traffic_violations_type = "减速让行违规类" # 存储原始数据引用 self._raw_data = df_data.obj_data[1] self.object_items = set(df_data.object_df.type.tolist()) # 存储行人数据引用 self._pedestrian_df = None if 13 in self.object_items: # 行人的type是13 self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13] # 初始化属性,但不立即创建数据副本 self._ego_data = None self._pedestrian_data = None # 初始化计数器 self.slow_down_in_crosswalk_count = 0 self.avoid_pedestrian_in_crosswalk_count = 0 self.avoid_pedestrian_in_the_road_count = 0 self.aviod_pedestrian_when_turning_count = 0 @property def ego_data(self): """懒加载方式获取ego数据""" if self._ego_data is None: self._ego_data = self._raw_data[SLOWDOWN_INFO].copy().reset_index(drop=True) return self._ego_data @property def pedestrian_data(self): """懒加载方式获取行人数据""" if self._pedestrian_data is None: if self._pedestrian_df is not None: # 使用浅拷贝代替深拷贝 self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True) else: self._pedestrian_data = pd.DataFrame() return self._pedestrian_data def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.6): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def slow_down_in_crosswalk_detector(self): # 筛选出路口或隧道区域的时间点 crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_divide: # 筛选出当前时间段内的数据 # start_time, end_time = crosswalk_simtime start_time = crosswalk_simtime[0] end_time = crosswalk_simtime[-1] print(f"当前时间段:{start_time} - {end_time}") crosswalk_objstate = self.ego_data[ (self.ego_data["simTime"] >= start_time) & (self.ego_data["simTime"] <= end_time) ] # 计算车辆速度 ego_speedx = np.array(crosswalk_objstate["speedX"].tolist()) ego_speedy = np.array(crosswalk_objstate["speedY"].tolist()) ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2) # 判断是否超速 if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s self.slow_down_in_crosswalk_count += 1 # 输出总次数 print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次") def avoid_pedestrian_in_crosswalk_detector(self): crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][ "simTime" ].tolist() crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime) for crosswalk_simtime in crosswalk_simTime_devide: if not self.pedestrian_data.empty: crosswalk_objstate = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(crosswalk_simtime) ] else: crosswalk_objstate = pd.DataFrame() if len(crosswalk_objstate) > 0: pedestrian_simtime = crosswalk_objstate["simTime"] pedestrian_objstate = crosswalk_objstate[ crosswalk_objstate["simTime"].isin(pedestrian_simtime) ] ego_speed = np.sqrt( pedestrian_objstate["speedX"] ** 2 + pedestrian_objstate["speedY"] ** 2 ) if ego_speed.any() > 0: self.avoid_pedestrian_in_crosswalk_count += 1 def avoid_pedestrian_in_the_road_detector(self): simtime = self.pedestrian_in_front_of_car() if len(simtime) == 0: self.avoid_pedestrian_in_the_road_count += 0 else: pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime) ] simTime = pedestrian_on_the_road["simTime"].tolist() simTime_devide = self.different_road_area_simtime(simTime) for simtime1 in simTime_devide: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime1) ] ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))] dist = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) speed = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) data = {"dist": dist, "speed": speed} new_ego_car = pd.DataFrame(data) new_ego_car = new_ego_car.assign( Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0) ) if new_ego_car["Column3"].any(): self.avoid_pedestrian_in_the_road_count += 1 def aviod_pedestrian_when_turning_detector(self): pedestrian_simtime_list = self.pedestrian_in_front_of_car() if len(pedestrian_simtime_list) > 0: simtime_list = self.ego_data[ (self.ego_data["simTime"].isin(pedestrian_simtime_list)) & (self.ego_data["lane_type"] == 20) ]["simTime"].tolist() simTime_list = self.different_road_area_simtime(simtime_list) pedestrian_on_the_road = self.pedestrian_data[ self.pedestrian_data["simTime"].isin(simtime_list) ] for simtime in simTime_list: sub_pedestrian_on_the_road = pedestrian_on_the_road[ pedestrian_on_the_road["simTime"].isin(simtime) ] ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))] ego_car["dist"] = np.sqrt( (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values) ** 2 + ( ego_car["posY"].values - sub_pedestrian_on_the_road["posY"].values ) ** 2 ) ego_car["speed"] = np.sqrt( ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2 ) if any(ego_car["speed"].tolist()) != 0: self.aviod_pedestrian_when_turning_count += 1 def calculate_slow_down_in_crosswalk_count(self): self.slow_down_in_crosswalk_detector() return self.slow_down_in_crosswalk_count def calculate_avoid_pedestrian_in_the_crosswalk_count(self): self.avoid_pedestrian_in_crosswalk_detector() return self.avoid_pedestrian_in_crosswalk_count def calculate_avoid_pedestrian_in_the_road_count(self): self.avoid_pedestrian_in_the_road_detector() return self.avoid_pedestrian_in_the_road_count def calculate_avoid_pedestrian_when_turning_count(self): self.aviod_pedestrian_when_turning_detector() return self.aviod_pedestrian_when_turning_count class TurnaroundViolation(object): def __init__(self, df_data): print("掉头违规类初始化中...") self.traffic_violations_type = "掉头违规类" # 存储原始数据引用 self._raw_data = df_data.obj_data[1] self.object_items = set(df_data.object_df.type.tolist()) # 存储行人数据引用 self._pedestrian_df = None if 13 in self.object_items: # 行人的type是13 self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13] # 初始化属性,但不立即创建数据副本 self._ego_data = None self._pedestrian_data = None # 初始化计数器 self.turning_in_forbiden_turn_back_sign_count = 0 self.turning_in_forbiden_turn_left_sign_count = 0 self.avoid_pedestrian_when_turn_back_count = 0 @property def ego_data(self): """懒加载方式获取ego数据""" if self._ego_data is None: self._ego_data = self._raw_data[TURNAROUND_INFO].copy().reset_index(drop=True) return self._ego_data @property def pedestrian_data(self): """懒加载方式获取行人数据""" if self._pedestrian_data is None: if self._pedestrian_df is not None: self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True) else: self._pedestrian_data = pd.DataFrame() return self._pedestrian_data def pedestrian_in_front_of_car(self): if len(self.pedestrian_data) == 0: return [] else: self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"] self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"] self.ego_data["dist"] = np.sqrt( self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2 ) self.ego_data["rela_pos"] = ( self.ego_data["dx"] * self.ego_data["speedX"] + self.ego_data["dy"] * self.ego_data["speedY"] ) simtime = self.ego_data[ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50) ]["simTime"].tolist() return simtime def different_road_area_simtime(self, df, threshold=0.5): if not df: return [] simtime_group = [] current_simtime_group = [df[0]] for i in range(1, len(df)): if abs(df[i] - df[i - 1]) <= threshold: current_simtime_group.append(df[i]) else: simtime_group.append(current_simtime_group) current_simtime_group = [df[i]] simtime_group.append(current_simtime_group) return simtime_group def turn_back_in_forbiden_sign_detector(self): """ 禁止掉头type = 8 """ forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][ "simTime" ].tolist() forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][ "simTime" ].tolist() forbiden_turn_back_simtime_devide = self.different_road_area_simtime( forbiden_turn_back_simTime ) forbiden_turn_left_simtime_devide = self.different_road_area_simtime( forbiden_turn_left_simTime ) for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide: ego_car1 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime)) ] ego_start_speedx1 = ego_car1["speedX"].iloc[0] ego_start_speedy1 = ego_car1["speedY"].iloc[0] ego_end_speedx1 = ego_car1["speedX"].iloc[-1] ego_end_speedy1 = ego_car1["speedY"].iloc[-1] if ( ego_end_speedx1 * ego_start_speedx1 + ego_end_speedy1 * ego_start_speedy1 < 0 ): self.turning_in_forbiden_turn_back_sign_count += 1 for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide: ego_car2 = self.ego_data.loc[ (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime)) ] ego_start_speedx2 = ego_car2["speedX"].iloc[0] ego_start_speedy2 = ego_car2["speedY"].iloc[0] ego_end_speedx2 = ego_car2["speedX"].iloc[-1] ego_end_speedy2 = ego_car2["speedY"].iloc[-1] if ( ego_end_speedx2 * ego_start_speedx2 + ego_end_speedy2 * ego_start_speedy2 < 0 ): self.turning_in_forbiden_turn_left_sign_count += 1 def avoid_pedestrian_when_turn_back_detector(self): sensor_on_intersection = self.pedestrian_in_front_of_car() avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[ self.ego_data["lane_type"] == 20 ]["simTime"].tolist() avoid_pedestrian_when_turn_back_simTime_devide = ( self.different_road_area_simtime( avoid_pedestrian_when_turn_back_simTime_list ) ) if len(sensor_on_intersection) > 0: for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide: pedestrian_in_intersection_simtime = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( avoid_pedestrian_when_turn_back_simtime ) ].tolist() ego_df = self.ego_data[ self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime) ].reset_index(drop=True) pedestrian_df = self.pedestrian_data[ self.pedestrian_data["simTime"].isin( pedestrian_in_intersection_simtime ) ].reset_index(drop=True) ego_df["dist"] = np.sqrt( (ego_df["posx"] - pedestrian_df["posx"]) ** 2 + (ego_df["posy"] - pedestrian_df["posy"]) ** 2 ) ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2) if any(ego_df["speed"].tolist()) != 0: self.avoid_pedestrian_when_turn_back_count += 1 def calculate_turn_in_forbiden_turn_left_sign_count(self): self.turn_back_in_forbiden_sign_detector() return self.turning_in_forbiden_turn_left_sign_count def calculate_turn_in_forbiden_turn_back_sign_count(self): self.turn_back_in_forbiden_sign_detector() return self.turning_in_forbiden_turn_back_sign_count def calaulate_avoid_pedestrian_when_turn_back_count(self): self.avoid_pedestrian_when_turn_back_detector() return self.avoid_pedestrian_when_turn_back_count class WrongWayViolation(object): """停车违规类""" def __init__(self, df_data): print("停车违规类初始化中...") self.traffic_violations_type = "停车违规类" # 存储原始数据引用 self._raw_data = df_data.obj_data[1] # 初始化属性,但不立即创建数据副本 self._data = None # 初始化违规统计 self.violation_count = { "urbanExpresswayOrHighwayDrivingLaneStopped": 0, "urbanExpresswayOrHighwayEmergencyLaneStopped": 0, "urbanExpresswayEmergencyLaneDriving": 0, } @property def data(self): """懒加载方式获取数据""" if self._data is None: # 使用浅拷贝代替深拷贝 self._data = self._raw_data.copy() return self._data def process_violations(self): """处理停车或者紧急车道行驶违规数据""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} driving_lane = {1, 4, 5, 6} emergency_lane = {12} self.data["v"] *= 3.6 # 转换速度 # 使用向量化和条件判断进行违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(driving_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] == 0) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & self.data["lane_type"].isin(emergency_lane) & (self.data["v"] != 0) ), ] violation_types = [ "urbanExpresswayOrHighwayDrivingLaneStopped", "urbanExpresswayOrHighwayEmergencyLaneStopped", "urbanExpresswayEmergencyLaneDriving", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计违规情况 self.violation_count = ( self.data["violation_type"] .value_counts() .reindex(violation_types, fill_value=0) .to_dict() ) def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self): self.process_violations() return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"] def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self): self.process_violations() return self.violation_count["urbanExpresswayEmergencyLaneDriving"] def calculate_urbanExpresswayEmergencyLaneDriving(self): self.process_violations() return self.violation_count["urbanExpresswayEmergencyLaneDriving"] class SpeedingViolation(object): """超速违规类""" """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息""" def __init__(self, df_data): print("超速违规类初始化中...") self.traffic_violations_type = "超速违规类" # 存储原始数据引用 self._raw_data = df_data.obj_data[1] # 初始化属性,但不立即创建数据副本 self._data = None # 初始化违规统计 self.violation_counts = { "urbanExpresswayOrHighwaySpeedOverLimit50": 0, "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0, "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0, "urbanExpresswayOrHighwaySpeedUnderLimit": 0, "generalRoadSpeedOverLimit50": 0, "generalRoadSpeedOverLimit20to50": 0, } @property def data(self): """懒加载方式获取数据""" if self._data is None: # 使用浅拷贝代替深拷贝 self._data = self._raw_data.copy() # 预处理数据 - 转换速度单位 self._data["v"] *= 3.6 # 转换为 km/h return self._data def process_violations(self): """处理数据帧,检查超速和其他违规行为""" # 提取有效道路类型 urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合 general_road = {3} # 直接创建包含一个元素的集合 self.data["v"] *= 3.6 # 转换速度 # 违规判定 conditions = [ ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] > self.data["road_speed_max"]) & (self.data["v"] <= self.data["road_speed_max"] * 1.2) ), ( self.data["road_fc"].isin(urban_expressway_or_highway) & (self.data["v"] < self.data["road_speed_min"]) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.5) ), ( self.data["road_fc"].isin(general_road) & (self.data["v"] > self.data["road_speed_max"] * 1.2) & (self.data["v"] <= self.data["road_speed_max"] * 1.5) ), ] violation_types = [ "urbanExpresswayOrHighwaySpeedOverLimit50", "urbanExpresswayOrHighwaySpeedOverLimit20to50", "urbanExpresswayOrHighwaySpeedOverLimit0to20", "urbanExpresswayOrHighwaySpeedUnderLimit", "generalRoadSpeedOverLimit50", "generalRoadSpeedOverLimit20to50", ] # 设置违规类型 self.data["violation_type"] = None for condition, violation_type in zip(conditions, violation_types): self.data.loc[condition, "violation_type"] = violation_type # 统计各类违规情况 self.violation_counts = self.data["violation_type"].value_counts().to_dict() def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self): self.process_violations() return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit50") else 0 def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0 def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0 def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self): self.process_violations() return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get( "urbanExpresswayOrHighwaySpeedUnderLimit") else 0 def calculate_generalRoadSpeedOverLimit50(self): self.process_violations() return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get( "generalRoadSpeedOverLimit50") else 0 def calculate_generalRoadSpeedOverLimit20to50_count(self): self.process_violations() return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get( "generalRoadSpeedOverLimit20to50") else 0 class TrafficLightViolation(object): """违反交通灯类""" """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯""" def __init__(self, df_data): """初始化方法""" self.traffic_violations_type = "违反交通灯类" print("违反交通灯类 类初始化中...") self.config = df_data.vehicle_config self.data_ego = df_data.ego_data # 获取数据 self.violation_counts = { "trafficSignalViolation": 0, "illegalDrivingOrParkingAtCrossroads": 0, } # 处理数据并判定违规 self.process_violations() def is_point_cross_line(self, point, stop_line_points): """ 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。 :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制) :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]] :return: True 如果车辆跨越了停止线,否则 False """ line_vector = np.array( [ stop_line_points[1][0] - stop_line_points[0][0], stop_line_points[1][1] - stop_line_points[0][1], ] ) point_vector = np.array( [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]] ) cross_product = np.cross(line_vector, point_vector) if cross_product != 0: return False mid_point = ( np.array([stop_line_points[0][0], stop_line_points[0][1]]) + 0.5 * line_vector ) axletree_to_mid_vector = np.array( [point[0] - mid_point[0], point[1] - mid_point[1]] ) direction_vector = np.array([math.cos(point[2]), math.sin(point[2])]) norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector) norm_direction = np.linalg.norm(direction_vector) if norm_axletree_to_mid == 0 or norm_direction == 0: return False cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / ( norm_axletree_to_mid * norm_direction ) angle_theta = math.degrees(math.acos(cos_theta)) return angle_theta <= 90 def _filter_data(self): """过滤数据,筛选出需要分析的记录""" return self.data_ego[ (self.data_ego["stopline_id"] != -1) & (self.data_ego["stopline_type"] == 1) & (self.data_ego["trafficlight_id"] != -1) ] def _group_data(self, filtered_data): """按时间差对数据进行分组""" filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0) threshold = 0.5 filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum() return filtered_data.groupby("group") def _analyze_group(self, group_data): """分析单个分组的数据,判断是否闯红灯""" photos = [] stop_in_intersection = False for _, row in group_data.iterrows(): vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]]) stop_line_points = [ [row["stopline_x1"], row["stopline_y1"]], [row["stopline_x2"], row["stopline_y2"]], ] traffic_light_status = row["traffic_light_status"] heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])]) heading_vector = heading_vector / np.linalg.norm(heading_vector) # with open(self.config_path / "vehicle_config.yaml", 'r') as f: # config = yaml.load(f, Loader=yaml.FullLoader) front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector dist = math.sqrt( (row["posX"] - row["traffic_light_x"]) ** 2 + (row["posY"] - row["traffic_light_y"]) ** 2 ) if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01: has_crossed_line_front = ( self.is_point_cross_line(front_wheel_pos, stop_line_points) and traffic_light_status == 1 ) has_crossed_line_rear = ( self.is_point_cross_line(rear_wheel_pos, stop_line_points) and row["v"] > 0 and traffic_light_status == 1 ) has_stop_in_intersection = has_crossed_line_front and row["v"] == 0 has_passed_intersection = has_crossed_line_front and dist < 1.0 # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}') photos.extend( [ has_crossed_line_front, has_crossed_line_rear, has_passed_intersection, has_stop_in_intersection, ] ) stop_in_intersection = has_passed_intersection return photos, stop_in_intersection def is_vehicle_run_a_red_light(self): """判断车辆是否闯红灯""" filtered_data = self._filter_data() grouped_data = self._group_data(filtered_data) self.photos_group = [] self.stop_in_intersections = [] for _, group_data in grouped_data: photos, stop_in_intersection = self._analyze_group(group_data) self.photos_group.append(photos) self.stop_in_intersections.append(stop_in_intersection) def process_violations(self): """处理数据并判定违规""" self.is_vehicle_run_a_red_light() count_1 = sum(all(photos) for photos in self.photos_group) count_2 = sum( stop_in_intersection for stop_in_intersection in self.stop_in_intersections ) self.violation_counts["trafficSignalViolation"] = count_1 self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2 def calculate_trafficSignalViolation_count(self): self.process_violations() return self.violation_counts["trafficSignalViolation"] def calculate_illegalDrivingOrParkingAtCrossroads(self): self.process_violations() return self.violation_counts["illegalDrivingOrParkingAtCrossroads"] class WarningViolation(object): """警告性违规类""" def __init__(self, df_data): print("警告性违规类初始化中...") self.traffic_violations_type = "警告性违规类" # 存储原始数据引用 self.config = df_data.vehicle_config self._raw_data = df_data.obj_data[1] # 初始化属性,但不立即创建数据副本 self._data = None # 初始化违规计数器 self.violation_counts = { "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶 "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线 } @property def data(self): """懒加载方式获取数据""" if self._data is None: # 使用浅拷贝代替深拷贝 self._data = self._raw_data.copy() return self._data def process_violations(self): """处理所有违规类型""" # 处理普通道路不按规定车道行驶违规 self._process_irregular_lane_use() # 处理骑、轧车行道分界线违规 self._process_lane_divider_violation() def _process_irregular_lane_use(self): """处理普通道路不按规定车道行驶违规""" # 定义道路和车道类型 general_road = {3} # 普通道路 lane_type = {11} # 非机动车道 # 使用布尔索引来筛选满足条件的行 condition = (self.data["road_fc"].isin(general_road)) & ( self.data["lane_type"].isin(lane_type) ) # 创建一个新的列,并根据条件设置值 self.data["is_violation"] = condition # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新违规计数 self.violation_counts["generalRoadIrregularLaneUse"] = len(violation_segments) def _process_lane_divider_violation(self): """处理骑、轧车行道分界线违规""" # 获取车辆和车道宽度 car_width = self.config["CAR_WIDTH"] lane_width = self.data["lane_width"] # 计算阈值 threshold = (lane_width - car_width) / 2 # 找到满足条件的行 self.data["is_violation"] = self.data["laneOffset"] > threshold # 统计满足条件的连续时间段 violation_segments = self.count_continuous_violations( self.data["is_violation"], self.data["simTime"] ) # 更新违规计数 self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] = len( violation_segments ) def count_continuous_violations(self, violation_series, time_series): """统计连续违规的时间段数量 Args: violation_series: 表示是否违规的布尔序列 time_series: 对应的时间序列 Returns: list: 连续违规时间段列表 """ continuous_segments = [] current_segment = [] for is_violation, time in zip(violation_series, time_series): if is_violation: if not current_segment: # 新的连续段开始 current_segment.append(time) else: if current_segment: # 连续段结束 current_segment.append(time) # 添加结束时间 continuous_segments.append(current_segment) current_segment = [] # 检查是否有一个未结束的连续段在最后 if current_segment: current_segment.append(time_series.iloc[-1]) # 使用最后的时间作为结束时间 continuous_segments.append(current_segment) return continuous_segments def calculate_generalRoadIrregularLaneUse_count(self): """计算普通道路不按规定车道行驶违规次数""" # 只处理普通道路不按规定车道行驶违规 self._process_irregular_lane_use() return self.violation_counts["generalRoadIrregularLaneUse"] def calculate_urbanExpresswayOrHighwayRideLaneDivider_count(self): """计算骑、轧车行道分界线违规次数""" # 只处理骑、轧车行道分界线违规 self._process_lane_divider_violation() return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] class TrafficSignViolation: """交通标志违规类""" PROHIBITED_STRAIGHT_THRESHOLD = 5 SIGN_TYPE_STRAIGHT_PROHIBITED = 7 SIGN_TYPE_SPEED_LIMIT = 12 SIGN_TYPE_MIN_SPEED_LIMIT = 13 def __init__(self, df_data): print("交通标志违规类初始化中...") self.traffic_violations_type = "交通标志违规类" # 存储原始数据引用 self._raw_data = df_data.obj_data[1] # 初始化属性,但不立即创建数据副本 self._data = None # 延迟计算标志 self._calculated = False self._violation_counts = { "NoStraightThrough": 0, "SpeedLimitViolation": 0, "MinimumSpeedLimitViolation": 0 } @property def data(self): """懒加载方式获取数据""" if self._data is None: # 使用浅拷贝代替深拷贝 self._data = self._raw_data.copy() # 预处理数据 - 按时间排序 self._data = self._data.sort_values('simTime').reset_index(drop=True) return self._data def _ensure_calculated(self): """保证计算只执行一次""" if not self._calculated: self._check_prohibition_violations() self._check_instruction_violations() self._calculated = True def calculate_NoStraightThrough_count(self): """计算禁止直行违规次数""" self._ensure_calculated() return self._violation_counts["NoStraightThrough"] def calculate_SpeedLimitViolation_count(self): """计算超速违规次数""" self._ensure_calculated() return self._violation_counts["SpeedLimitViolation"] def calculate_MinimumSpeedLimitViolation_count(self): """计算最低限速违规次数""" self._ensure_calculated() return self._violation_counts["MinimumSpeedLimitViolation"] def _check_prohibition_violations(self): """处理禁令标志违规(禁止直行和限速)""" self._check_straight_violation() self._check_speed_violation( self.SIGN_TYPE_SPEED_LIMIT, operator.gt, "SpeedLimitViolation" ) def _check_instruction_violations(self): """处理指示标志违规(最低限速)""" self._check_speed_violation( self.SIGN_TYPE_MIN_SPEED_LIMIT, operator.lt, "MinimumSpeedLimitViolation" ) def _check_straight_violation(self): """检查禁止直行违规""" straight_df = self.data[self.data["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED] if not straight_df.empty: # 计算航向角变化并填充缺失值 straight_df = straight_df.copy() straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0) # 创建筛选条件 mask = ( (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) & (straight_df['v'] > 0) ) self._violation_counts["NoStraightThrough"] = mask.sum() def _check_speed_violation(self, sign_type, compare_op, count_key): """通用速度违规检查方法 Args: sign_type: 标志类型 compare_op: 比较操作符 count_key: 违规计数键名 """ violation_df = self.data[self.data["sign_type1"] == sign_type] if not violation_df.empty: mask = compare_op(violation_df['v'], violation_df['sign_speed']) self._violation_counts[count_key] = mask.sum()