|
@@ -1,6 +1,7 @@
|
|
# ... 保留原有导入和常量定义 ...
|
|
# ... 保留原有导入和常量定义 ...
|
|
import math
|
|
import math
|
|
-import operator
|
|
|
|
|
|
+import operator
|
|
|
|
+import copy
|
|
import numpy as np
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pandas as pd
|
|
from typing import Dict, Any, List, Optional
|
|
from typing import Dict, Any, List, Optional
|
|
@@ -9,6 +10,7 @@ from modules.lib.score import Score
|
|
from modules.lib.log_manager import LogManager
|
|
from modules.lib.log_manager import LogManager
|
|
from modules.lib import data_process
|
|
from modules.lib import data_process
|
|
|
|
|
|
|
|
+
|
|
OVERTAKE_INFO = [
|
|
OVERTAKE_INFO = [
|
|
"simTime",
|
|
"simTime",
|
|
"simFrame",
|
|
"simFrame",
|
|
@@ -49,7 +51,7 @@ TURNAROUND_INFO = [
|
|
|
|
|
|
TRFFICSIGN_INFO = [
|
|
TRFFICSIGN_INFO = [
|
|
"simTime",
|
|
"simTime",
|
|
- "simFrame",
|
|
|
|
|
|
+ "simFrame",
|
|
"playerId",
|
|
"playerId",
|
|
"speedX",
|
|
"speedX",
|
|
"speedY",
|
|
"speedY",
|
|
@@ -261,7 +263,7 @@ def calculate_generalroadirregularlaneuse(data_processed):
|
|
def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
|
|
def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
|
|
"""计算城市快速路或高速公路骑车道线行驶指标"""
|
|
"""计算城市快速路或高速公路骑车道线行驶指标"""
|
|
warningviolation = WarningViolation(data_processed)
|
|
warningviolation = WarningViolation(data_processed)
|
|
- urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider()
|
|
|
|
|
|
+ urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider_count()
|
|
return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
|
|
return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
|
|
|
|
|
|
def calculate_nostraightthrough(data_processed):
|
|
def calculate_nostraightthrough(data_processed):
|
|
@@ -283,7 +285,7 @@ def calculate_minimumspeedlimitviolation(data_processed):
|
|
trafficsignviolation = TrafficSignViolation(data_processed)
|
|
trafficsignviolation = TrafficSignViolation(data_processed)
|
|
calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
|
|
calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
|
|
return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
|
|
return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
|
|
-# ... 保留原有类定义 ...
|
|
|
|
|
|
+
|
|
|
|
|
|
# 修改 TrafficRegistry 类的 _build_registry 方法
|
|
# 修改 TrafficRegistry 类的 _build_registry 方法
|
|
class TrafficRegistry:
|
|
class TrafficRegistry:
|
|
@@ -349,7 +351,6 @@ class TrafficManager:
|
|
traffic_result = self.registry.batch_execute()
|
|
traffic_result = self.registry.batch_execute()
|
|
return traffic_result
|
|
return traffic_result
|
|
|
|
|
|
-# ... 保留原有类定义和实现 ...
|
|
|
|
|
|
|
|
class OvertakingViolation(object):
|
|
class OvertakingViolation(object):
|
|
"""超车违规类"""
|
|
"""超车违规类"""
|
|
@@ -358,36 +359,78 @@ class OvertakingViolation(object):
|
|
print("超车违规类初始化中...")
|
|
print("超车违规类初始化中...")
|
|
self.traffic_violations_type = "超车违规类"
|
|
self.traffic_violations_type = "超车违规类"
|
|
|
|
|
|
- # self.logger = log.get_logger() # 使用时再初始化
|
|
|
|
-
|
|
|
|
- self.data = df_data.obj_data[1]
|
|
|
|
- self.ego_data = (
|
|
|
|
- self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
- ) # Copy to avoid modifying the original DataFrame
|
|
|
|
- header = self.ego_data.columns
|
|
|
|
|
|
+ # 存储原始数据引用,不进行拷贝
|
|
|
|
+ self._raw_data = df_data.obj_data[1] # 自车数据
|
|
|
|
+
|
|
|
|
+ # 安全获取其他车辆数据
|
|
|
|
+ self._data_obj = None
|
|
|
|
+ self._other_obj_data1 = None
|
|
|
|
+
|
|
|
|
+ # 检查是否存在ID为2的对象数据
|
|
if 2 in df_data.obj_id_list:
|
|
if 2 in df_data.obj_id_list:
|
|
- self.data_obj = df_data.obj_data[2]
|
|
|
|
- self.obj_data = (
|
|
|
|
- self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
- ) # Copy to avoid modifying the original DataFrame
|
|
|
|
- else:
|
|
|
|
- self.obj_data = pd.DataFrame(columns=header)
|
|
|
|
|
|
+ self._data_obj = df_data.obj_data[2]
|
|
|
|
+
|
|
|
|
+ # 检查是否存在ID为3的对象数据
|
|
if 3 in df_data.obj_id_list:
|
|
if 3 in df_data.obj_id_list:
|
|
- self.other_obj_data1 = df_data.obj_data[3]
|
|
|
|
- self.other_obj_data = (
|
|
|
|
- self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
- )
|
|
|
|
- else:
|
|
|
|
- self.other_obj_data = pd.DataFrame(columns=header)
|
|
|
|
- self.overtake_on_right_count = 0
|
|
|
|
- self.overtake_when_turn_around_count = 0
|
|
|
|
- self.overtake_when_passing_car_count = 0
|
|
|
|
- self.overtake_in_forbid_lane_count = 0
|
|
|
|
- self.overtake_in_ramp_count = 0
|
|
|
|
- self.overtake_in_tunnel_count = 0
|
|
|
|
- self.overtake_on_accelerate_lane_count = 0
|
|
|
|
- self.overtake_on_decelerate_lane_count = 0
|
|
|
|
- self.overtake_in_different_senerios_count = 0
|
|
|
|
|
|
+ self._other_obj_data1 = df_data.obj_data[3]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._ego_data = None
|
|
|
|
+ self._obj_data = None
|
|
|
|
+ self._other_obj_data = None
|
|
|
|
+
|
|
|
|
+ # 使用字典统一管理违规计数器
|
|
|
|
+ self.violation_counts = {
|
|
|
|
+ "overtake_on_right": 0,
|
|
|
|
+ "overtake_when_turn_around": 0,
|
|
|
|
+ "overtake_when_passing_car": 0,
|
|
|
|
+ "overtake_in_forbid_lane": 0,
|
|
|
|
+ "overtake_in_ramp": 0,
|
|
|
|
+ "overtake_in_tunnel": 0,
|
|
|
|
+ "overtake_on_accelerate_lane": 0,
|
|
|
|
+ "overtake_on_decelerate_lane": 0,
|
|
|
|
+ "overtake_in_different_senerios": 0
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 标记计算状态
|
|
|
|
+ self._calculated = {
|
|
|
|
+ "illegal_overtake": False,
|
|
|
|
+ "forbid_lane": False,
|
|
|
|
+ "ramp_area": False,
|
|
|
|
+ "tunnel_area": False,
|
|
|
|
+ "accelerate_lane": False,
|
|
|
|
+ "decelerate_lane": False,
|
|
|
|
+ "different_senerios": False
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def ego_data(self):
|
|
|
|
+ """懒加载方式获取ego数据,只在首次访问时创建副本"""
|
|
|
|
+ if self._ego_data is None:
|
|
|
|
+ self._ego_data = self._raw_data[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ return self._ego_data
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def obj_data(self):
|
|
|
|
+ """懒加载方式获取obj数据"""
|
|
|
|
+ if self._obj_data is None:
|
|
|
|
+ if self._data_obj is not None:
|
|
|
|
+ self._obj_data = self._data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ else:
|
|
|
|
+ # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
|
|
|
|
+ self._obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
|
|
|
|
+ return self._obj_data
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def other_obj_data(self):
|
|
|
|
+ """懒加载方式获取other_obj数据"""
|
|
|
|
+ if self._other_obj_data is None:
|
|
|
|
+ if self._other_obj_data1 is not None:
|
|
|
|
+ self._other_obj_data = self._other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ else:
|
|
|
|
+ # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
|
|
|
|
+ self._other_obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
|
|
|
|
+ return self._other_obj_data
|
|
|
|
|
|
def different_road_area_simtime(self, df, threshold=0.5):
|
|
def different_road_area_simtime(self, df, threshold=0.5):
|
|
if not df:
|
|
if not df:
|
|
@@ -425,9 +468,17 @@ class OvertakingViolation(object):
|
|
|
|
|
|
return car_dx, car_dy
|
|
return car_dx, car_dy
|
|
|
|
|
|
- # 在前车右侧超车、会车时超车、前车掉头时超车
|
|
|
|
-
|
|
|
|
def illegal_overtake_with_car_detector(self, window_width=250):
|
|
def illegal_overtake_with_car_detector(self, window_width=250):
|
|
|
|
+ """检测超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["illegal_overtake"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测超车违规,默认为0")
|
|
|
|
+ self._calculated["illegal_overtake"] = True
|
|
|
|
+ return
|
|
|
|
|
|
# 获取csv文件中最短的帧数
|
|
# 获取csv文件中最短的帧数
|
|
frame_id_length = len(self.ego_data["simFrame"])
|
|
frame_id_length = len(self.ego_data["simFrame"])
|
|
@@ -442,28 +493,47 @@ class OvertakingViolation(object):
|
|
ego_data_frames = self.ego_data[
|
|
ego_data_frames = self.ego_data[
|
|
self.ego_data["simFrame"].isin(simframe_window)
|
|
self.ego_data["simFrame"].isin(simframe_window)
|
|
]
|
|
]
|
|
|
|
+
|
|
|
|
+ # 确保有足够的数据进行处理
|
|
|
|
+ if len(ego_data_frames) == 0:
|
|
|
|
+ start_frame_id += 1
|
|
|
|
+ continue
|
|
|
|
+
|
|
obj_data_frames = self.obj_data[
|
|
obj_data_frames = self.obj_data[
|
|
self.obj_data["simFrame"].isin(simframe_window)
|
|
self.obj_data["simFrame"].isin(simframe_window)
|
|
]
|
|
]
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,跳过当前窗口
|
|
|
|
+ if len(obj_data_frames) == 0:
|
|
|
|
+ start_frame_id += 1
|
|
|
|
+ continue
|
|
|
|
+
|
|
other_data_frames = self.other_obj_data[
|
|
other_data_frames = self.other_obj_data[
|
|
self.other_obj_data["simFrame"].isin(simframe_window)
|
|
self.other_obj_data["simFrame"].isin(simframe_window)
|
|
]
|
|
]
|
|
|
|
+
|
|
# 读取前后的laneId
|
|
# 读取前后的laneId
|
|
lane_id = ego_data_frames["lane_id"].tolist()
|
|
lane_id = ego_data_frames["lane_id"].tolist()
|
|
- # 读取前后方向盘转角steeringWheel,
|
|
|
|
|
|
+
|
|
|
|
+ # 读取前后方向盘转角steeringWheel
|
|
driverctrl_start_state = ego_data_frames["posH"].iloc[0]
|
|
driverctrl_start_state = ego_data_frames["posH"].iloc[0]
|
|
driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
|
|
driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
|
|
|
|
+
|
|
# 读取车辆前后的位置信息
|
|
# 读取车辆前后的位置信息
|
|
dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
|
|
dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
|
|
ego_speedx = ego_data_frames["speedX"].tolist()
|
|
ego_speedx = ego_data_frames["speedX"].tolist()
|
|
ego_speedy = ego_data_frames["speedY"].tolist()
|
|
ego_speedy = ego_data_frames["speedY"].tolist()
|
|
|
|
|
|
- obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
|
|
|
|
- "speedX"
|
|
|
|
- ].tolist()
|
|
|
|
- obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
|
|
|
|
- "speedY"
|
|
|
|
- ].tolist()
|
|
|
|
|
|
+ # 安全获取obj_speedx和obj_speedy
|
|
|
|
+ obj_with_id_2 = obj_data_frames[obj_data_frames["playerId"] == 2]
|
|
|
|
+ if not obj_with_id_2.empty:
|
|
|
|
+ obj_speedx = obj_with_id_2["speedX"].tolist()
|
|
|
|
+ obj_speedy = obj_with_id_2["speedY"].tolist()
|
|
|
|
+ else:
|
|
|
|
+ obj_speedx = []
|
|
|
|
+ obj_speedy = []
|
|
|
|
+
|
|
|
|
+ # 检查会车时超车
|
|
if len(other_data_frames) > 0:
|
|
if len(other_data_frames) > 0:
|
|
other_start_speedx = other_data_frames["speedX"].iloc[0]
|
|
other_start_speedx = other_data_frames["speedX"].iloc[0]
|
|
other_start_speedy = other_data_frames["speedY"].iloc[0]
|
|
other_start_speedy = other_data_frames["speedY"].iloc[0]
|
|
@@ -472,31 +542,47 @@ class OvertakingViolation(object):
|
|
+ ego_speedy[0] * other_start_speedy
|
|
+ ego_speedy[0] * other_start_speedy
|
|
< 0
|
|
< 0
|
|
):
|
|
):
|
|
- self.overtake_when_passing_car_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_when_passing_car"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
start_frame_id += window_width
|
|
start_frame_id += window_width
|
|
- """
|
|
|
|
- 如果滑动窗口开始和最后的laneid一致;
|
|
|
|
- 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
|
|
|
|
- 自车和前车的位置发生的交换;
|
|
|
|
- 则认为右超车
|
|
|
|
- """
|
|
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # 检查右侧超车
|
|
if driverctrl_start_state > 0 and driverctrl_end_state < 0:
|
|
if driverctrl_start_state > 0 and driverctrl_end_state < 0:
|
|
- self.overtake_on_right_count += self._is_overtake(
|
|
|
|
- lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
- )
|
|
|
|
- start_frame_id += window_width
|
|
|
|
- elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
|
|
|
|
- self.overtake_when_turn_around_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_on_right"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
start_frame_id += window_width
|
|
start_frame_id += window_width
|
|
- else:
|
|
|
|
- start_frame_id += 1
|
|
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # 检查掉头时超车
|
|
|
|
+ if obj_speedx and obj_speedy: # 确保列表不为空
|
|
|
|
+ if ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
|
|
|
|
+ self.violation_counts["overtake_when_turn_around"] += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ start_frame_id += window_width
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # 如果没有检测到任何违规,移动窗口
|
|
|
|
+ start_frame_id += 1
|
|
|
|
+
|
|
|
|
+ self._calculated["illegal_overtake"] = True
|
|
|
|
|
|
# 借道超车场景
|
|
# 借道超车场景
|
|
def overtake_in_forbid_lane_detector(self):
|
|
def overtake_in_forbid_lane_detector(self):
|
|
|
|
+ """检测借道超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["forbid_lane"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测借道超车违规,默认为0")
|
|
|
|
+ self._calculated["forbid_lane"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
simTime = self.obj_data["simTime"].tolist()
|
|
simTime = self.obj_data["simTime"].tolist()
|
|
simtime_devide = self.different_road_area_simtime(simTime)
|
|
simtime_devide = self.different_road_area_simtime(simTime)
|
|
for simtime in simtime_devide:
|
|
for simtime in simtime_devide:
|
|
@@ -506,13 +592,25 @@ class OvertakingViolation(object):
|
|
if (50002 in lane_type and len(set(lane_type)) > 2) or (
|
|
if (50002 in lane_type and len(set(lane_type)) > 2) or (
|
|
50002 not in lane_type and len(set(lane_type)) > 1
|
|
50002 not in lane_type and len(set(lane_type)) > 1
|
|
):
|
|
):
|
|
- self.overtake_in_forbid_lane_count += 1
|
|
|
|
|
|
+ self.violation_counts["overtake_in_forbid_lane"] += 1
|
|
except Exception as e:
|
|
except Exception as e:
|
|
print("数据缺少lane_type信息")
|
|
print("数据缺少lane_type信息")
|
|
- # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["forbid_lane"] = True
|
|
|
|
|
|
# 在匝道超车
|
|
# 在匝道超车
|
|
def overtake_in_ramp_area_detector(self):
|
|
def overtake_in_ramp_area_detector(self):
|
|
|
|
+ """检测匝道超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["ramp_area"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测匝道超车违规,默认为0")
|
|
|
|
+ self._calculated["ramp_area"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
|
|
ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
|
|
"simTime"
|
|
"simTime"
|
|
].tolist()
|
|
].tolist()
|
|
@@ -527,14 +625,26 @@ class OvertakingViolation(object):
|
|
ego_speedx = ego_in_ramp["speedX"].tolist()
|
|
ego_speedx = ego_in_ramp["speedX"].tolist()
|
|
ego_speedy = ego_in_ramp["speedY"].tolist()
|
|
ego_speedy = ego_in_ramp["speedY"].tolist()
|
|
if len(lane_id) > 0:
|
|
if len(lane_id) > 0:
|
|
- self.overtake_in_ramp_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_in_ramp"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
continue
|
|
continue
|
|
- # print(f"在匝道超车{self.overtake_in_ramp_count}次")
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["ramp_area"] = True
|
|
|
|
|
|
def overtake_in_tunnel_area_detector(self):
|
|
def overtake_in_tunnel_area_detector(self):
|
|
|
|
+ """检测隧道超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["tunnel_area"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测隧道超车违规,默认为0")
|
|
|
|
+ self._calculated["tunnel_area"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
|
|
tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
|
|
"simTime"
|
|
"simTime"
|
|
].tolist()
|
|
].tolist()
|
|
@@ -549,15 +659,27 @@ class OvertakingViolation(object):
|
|
ego_speedx = ego_in_tunnel["speedX"].tolist()
|
|
ego_speedx = ego_in_tunnel["speedX"].tolist()
|
|
ego_speedy = ego_in_tunnel["speedY"].tolist()
|
|
ego_speedy = ego_in_tunnel["speedY"].tolist()
|
|
if len(lane_id) > 0:
|
|
if len(lane_id) > 0:
|
|
- self.overtake_in_tunnel_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_in_tunnel"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
continue
|
|
continue
|
|
- # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["tunnel_area"] = True
|
|
|
|
|
|
# 加速车道超车
|
|
# 加速车道超车
|
|
def overtake_on_accelerate_lane_detector(self):
|
|
def overtake_on_accelerate_lane_detector(self):
|
|
|
|
+ """检测加速车道超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["accelerate_lane"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测加速车道超车违规,默认为0")
|
|
|
|
+ self._calculated["accelerate_lane"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
|
|
accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
|
|
"simTime"
|
|
"simTime"
|
|
].tolist()
|
|
].tolist()
|
|
@@ -576,13 +698,25 @@ class OvertakingViolation(object):
|
|
ego_speedx = ego_in_accelerate["speedX"].tolist()
|
|
ego_speedx = ego_in_accelerate["speedX"].tolist()
|
|
ego_speedy = ego_in_accelerate["speedY"].tolist()
|
|
ego_speedy = ego_in_accelerate["speedY"].tolist()
|
|
|
|
|
|
- self.overtake_on_accelerate_lane_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_on_accelerate_lane"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
- # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["accelerate_lane"] = True
|
|
|
|
|
|
# 减速车道超车
|
|
# 减速车道超车
|
|
def overtake_on_decelerate_lane_detector(self):
|
|
def overtake_on_decelerate_lane_detector(self):
|
|
|
|
+ """检测减速车道超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["decelerate_lane"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测减速车道超车违规,默认为0")
|
|
|
|
+ self._calculated["decelerate_lane"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
|
|
decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
|
|
"simTime"
|
|
"simTime"
|
|
].tolist()
|
|
].tolist()
|
|
@@ -601,13 +735,25 @@ class OvertakingViolation(object):
|
|
ego_speedx = ego_in_decelerate["speedX"].tolist()
|
|
ego_speedx = ego_in_decelerate["speedX"].tolist()
|
|
ego_speedy = ego_in_decelerate["speedY"].tolist()
|
|
ego_speedy = ego_in_decelerate["speedY"].tolist()
|
|
|
|
|
|
- self.overtake_on_decelerate_lane_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_on_decelerate_lane"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
- # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["decelerate_lane"] = True
|
|
|
|
|
|
# 在交叉路口
|
|
# 在交叉路口
|
|
def overtake_in_different_senerios_detector(self):
|
|
def overtake_in_different_senerios_detector(self):
|
|
|
|
+ """检测不同场景超车违规"""
|
|
|
|
+ # 如果已经计算过,直接返回
|
|
|
|
+ if self._calculated["different_senerios"]:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 如果没有其他车辆数据,直接返回,保持默认值0
|
|
|
|
+ if self.obj_data.empty:
|
|
|
|
+ print("没有其他车辆数据,无法检测不同场景超车违规,默认为0")
|
|
|
|
+ self._calculated["different_senerios"] = True
|
|
|
|
+ return
|
|
|
|
+
|
|
crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
|
|
crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
|
|
"simTime"
|
|
"simTime"
|
|
].tolist() # 判断是路口或者隧道区域
|
|
].tolist() # 判断是路口或者隧道区域
|
|
@@ -629,47 +775,56 @@ class OvertakingViolation(object):
|
|
则认为发生超车
|
|
则认为发生超车
|
|
"""
|
|
"""
|
|
if len(lane_id) > 0:
|
|
if len(lane_id) > 0:
|
|
- self.overtake_in_different_senerios_count += self._is_overtake(
|
|
|
|
|
|
+ self.violation_counts["overtake_in_different_senerios"] += self._is_overtake(
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
)
|
|
)
|
|
- else:
|
|
|
|
- pass
|
|
|
|
|
|
+
|
|
|
|
+ self._calculated["different_senerios"] = True
|
|
|
|
+
|
|
def calculate_overtake_when_passing_car_count(self):
|
|
def calculate_overtake_when_passing_car_count(self):
|
|
|
|
+ """计算会车时超车违规次数"""
|
|
self.illegal_overtake_with_car_detector()
|
|
self.illegal_overtake_with_car_detector()
|
|
- return self.overtake_when_passing_car_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_when_passing_car"]
|
|
|
|
|
|
def calculate_overtake_on_right_count(self):
|
|
def calculate_overtake_on_right_count(self):
|
|
|
|
+ """计算右侧超车违规次数"""
|
|
self.illegal_overtake_with_car_detector()
|
|
self.illegal_overtake_with_car_detector()
|
|
- return self.overtake_on_right_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_on_right"]
|
|
|
|
|
|
def calculate_overtake_when_turn_around_count(self):
|
|
def calculate_overtake_when_turn_around_count(self):
|
|
|
|
+ """计算掉头时超车违规次数"""
|
|
self.illegal_overtake_with_car_detector()
|
|
self.illegal_overtake_with_car_detector()
|
|
- return self.overtake_when_turn_around_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_when_turn_around"]
|
|
|
|
|
|
def calculate_overtake_in_forbid_lane_count(self):
|
|
def calculate_overtake_in_forbid_lane_count(self):
|
|
|
|
+ """计算借道超车违规次数"""
|
|
self.overtake_in_forbid_lane_detector()
|
|
self.overtake_in_forbid_lane_detector()
|
|
- return self.overtake_in_forbid_lane_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_in_forbid_lane"]
|
|
|
|
|
|
def calculate_overtake_in_ramp_area_count(self):
|
|
def calculate_overtake_in_ramp_area_count(self):
|
|
|
|
+ """计算匝道超车违规次数"""
|
|
self.overtake_in_ramp_area_detector()
|
|
self.overtake_in_ramp_area_detector()
|
|
- return self.overtake_in_ramp_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_in_ramp"]
|
|
|
|
|
|
def calculate_overtake_in_tunnel_area_count(self):
|
|
def calculate_overtake_in_tunnel_area_count(self):
|
|
|
|
+ """计算隧道超车违规次数"""
|
|
self.overtake_in_tunnel_area_detector()
|
|
self.overtake_in_tunnel_area_detector()
|
|
- return self.overtake_in_tunnel_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_in_tunnel"]
|
|
|
|
|
|
def calculate_overtake_on_accelerate_lane_count(self):
|
|
def calculate_overtake_on_accelerate_lane_count(self):
|
|
|
|
+ """计算加速车道超车违规次数"""
|
|
self.overtake_on_accelerate_lane_detector()
|
|
self.overtake_on_accelerate_lane_detector()
|
|
- return self.overtake_on_accelerate_lane_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_on_accelerate_lane"]
|
|
|
|
|
|
def calculate_overtake_on_decelerate_lane_count(self):
|
|
def calculate_overtake_on_decelerate_lane_count(self):
|
|
|
|
+ """计算减速车道超车违规次数"""
|
|
self.overtake_on_decelerate_lane_detector()
|
|
self.overtake_on_decelerate_lane_detector()
|
|
- return self.overtake_on_decelerate_lane_count
|
|
|
|
|
|
+ return self.violation_counts["overtake_on_decelerate_lane"]
|
|
|
|
|
|
def calculate_overtake_in_different_senerios_count(self):
|
|
def calculate_overtake_in_different_senerios_count(self):
|
|
|
|
+ """计算不同场景超车违规次数"""
|
|
self.overtake_in_different_senerios_detector()
|
|
self.overtake_in_different_senerios_detector()
|
|
- return self.overtake_in_different_senerios_count
|
|
|
|
-
|
|
|
|
|
|
+ return self.violation_counts["overtake_in_different_senerios"]
|
|
|
|
|
|
|
|
|
|
class SlowdownViolation(object):
|
|
class SlowdownViolation(object):
|
|
@@ -678,24 +833,43 @@ class SlowdownViolation(object):
|
|
def __init__(self, df_data):
|
|
def __init__(self, df_data):
|
|
print("减速让行违规类-------------------------")
|
|
print("减速让行违规类-------------------------")
|
|
self.traffic_violations_type = "减速让行违规类"
|
|
self.traffic_violations_type = "减速让行违规类"
|
|
- self.object_items = []
|
|
|
|
- self.data = df_data.obj_data[1]
|
|
|
|
- self.ego_data = (
|
|
|
|
- self.data[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
- ) # Copy to avoid modifying the original DataFrame
|
|
|
|
- self.pedestrian_data = pd.DataFrame()
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ # 存储原始数据引用
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
self.object_items = set(df_data.object_df.type.tolist())
|
|
self.object_items = set(df_data.object_df.type.tolist())
|
|
|
|
+
|
|
|
|
+ # 存储行人数据引用
|
|
|
|
+ self._pedestrian_df = None
|
|
if 13 in self.object_items: # 行人的type是13
|
|
if 13 in self.object_items: # 行人的type是13
|
|
- self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
- self.pedestrian_data = (
|
|
|
|
- self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
|
|
+ self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._ego_data = None
|
|
|
|
+ self._pedestrian_data = None
|
|
|
|
+
|
|
|
|
+ # 初始化计数器
|
|
self.slow_down_in_crosswalk_count = 0
|
|
self.slow_down_in_crosswalk_count = 0
|
|
self.avoid_pedestrian_in_crosswalk_count = 0
|
|
self.avoid_pedestrian_in_crosswalk_count = 0
|
|
self.avoid_pedestrian_in_the_road_count = 0
|
|
self.avoid_pedestrian_in_the_road_count = 0
|
|
self.aviod_pedestrian_when_turning_count = 0
|
|
self.aviod_pedestrian_when_turning_count = 0
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def ego_data(self):
|
|
|
|
+ """懒加载方式获取ego数据"""
|
|
|
|
+ if self._ego_data is None:
|
|
|
|
+ self._ego_data = self._raw_data[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ return self._ego_data
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def pedestrian_data(self):
|
|
|
|
+ """懒加载方式获取行人数据"""
|
|
|
|
+ if self._pedestrian_data is None:
|
|
|
|
+ if self._pedestrian_df is not None:
|
|
|
|
+ # 使用浅拷贝代替深拷贝
|
|
|
|
+ self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ else:
|
|
|
|
+ self._pedestrian_data = pd.DataFrame()
|
|
|
|
+ return self._pedestrian_data
|
|
|
|
|
|
def pedestrian_in_front_of_car(self):
|
|
def pedestrian_in_front_of_car(self):
|
|
if len(self.pedestrian_data) == 0:
|
|
if len(self.pedestrian_data) == 0:
|
|
@@ -875,22 +1049,40 @@ class TurnaroundViolation(object):
|
|
print("掉头违规类初始化中...")
|
|
print("掉头违规类初始化中...")
|
|
self.traffic_violations_type = "掉头违规类"
|
|
self.traffic_violations_type = "掉头违规类"
|
|
|
|
|
|
- self.data = df_data.obj_data[1]
|
|
|
|
- self.ego_data = (
|
|
|
|
- self.data[TURNAROUND_INFO].copy().reset_index(drop=True)
|
|
|
|
- ) # Copy to avoid modifying the original DataFrame
|
|
|
|
- self.pedestrian_data = pd.DataFrame()
|
|
|
|
-
|
|
|
|
|
|
+ # 存储原始数据引用
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
self.object_items = set(df_data.object_df.type.tolist())
|
|
self.object_items = set(df_data.object_df.type.tolist())
|
|
|
|
+
|
|
|
|
+ # 存储行人数据引用
|
|
|
|
+ self._pedestrian_df = None
|
|
if 13 in self.object_items: # 行人的type是13
|
|
if 13 in self.object_items: # 行人的type是13
|
|
- self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
- self.pedestrian_data = (
|
|
|
|
- self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
|
|
+ self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._ego_data = None
|
|
|
|
+ self._pedestrian_data = None
|
|
|
|
+
|
|
|
|
+ # 初始化计数器
|
|
self.turning_in_forbiden_turn_back_sign_count = 0
|
|
self.turning_in_forbiden_turn_back_sign_count = 0
|
|
self.turning_in_forbiden_turn_left_sign_count = 0
|
|
self.turning_in_forbiden_turn_left_sign_count = 0
|
|
self.avoid_pedestrian_when_turn_back_count = 0
|
|
self.avoid_pedestrian_when_turn_back_count = 0
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def ego_data(self):
|
|
|
|
+ """懒加载方式获取ego数据"""
|
|
|
|
+ if self._ego_data is None:
|
|
|
|
+ self._ego_data = self._raw_data[TURNAROUND_INFO].copy().reset_index(drop=True)
|
|
|
|
+ return self._ego_data
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def pedestrian_data(self):
|
|
|
|
+ """懒加载方式获取行人数据"""
|
|
|
|
+ if self._pedestrian_data is None:
|
|
|
|
+ if self._pedestrian_df is not None:
|
|
|
|
+ self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ else:
|
|
|
|
+ self._pedestrian_data = pd.DataFrame()
|
|
|
|
+ return self._pedestrian_data
|
|
|
|
|
|
def pedestrian_in_front_of_car(self):
|
|
def pedestrian_in_front_of_car(self):
|
|
if len(self.pedestrian_data) == 0:
|
|
if len(self.pedestrian_data) == 0:
|
|
@@ -1021,20 +1213,33 @@ class TurnaroundViolation(object):
|
|
self.avoid_pedestrian_when_turn_back_detector()
|
|
self.avoid_pedestrian_when_turn_back_detector()
|
|
return self.avoid_pedestrian_when_turn_back_count
|
|
return self.avoid_pedestrian_when_turn_back_count
|
|
|
|
|
|
-
|
|
|
|
-class WrongWayViolation:
|
|
|
|
|
|
+class WrongWayViolation(object):
|
|
"""停车违规类"""
|
|
"""停车违规类"""
|
|
|
|
|
|
def __init__(self, df_data):
|
|
def __init__(self, df_data):
|
|
print("停车违规类初始化中...")
|
|
print("停车违规类初始化中...")
|
|
self.traffic_violations_type = "停车违规类"
|
|
self.traffic_violations_type = "停车违规类"
|
|
- self.data = df_data.obj_data[1]
|
|
|
|
|
|
+
|
|
|
|
+ # 存储原始数据引用
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._data = None
|
|
|
|
+
|
|
# 初始化违规统计
|
|
# 初始化违规统计
|
|
self.violation_count = {
|
|
self.violation_count = {
|
|
"urbanExpresswayOrHighwayDrivingLaneStopped": 0,
|
|
"urbanExpresswayOrHighwayDrivingLaneStopped": 0,
|
|
"urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
|
|
"urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
|
|
"urbanExpresswayEmergencyLaneDriving": 0,
|
|
"urbanExpresswayEmergencyLaneDriving": 0,
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def data(self):
|
|
|
|
+ """懒加载方式获取数据"""
|
|
|
|
+ if self._data is None:
|
|
|
|
+ # 使用浅拷贝代替深拷贝
|
|
|
|
+ self._data = self._raw_data.copy()
|
|
|
|
+ return self._data
|
|
|
|
|
|
def process_violations(self):
|
|
def process_violations(self):
|
|
"""处理停车或者紧急车道行驶违规数据"""
|
|
"""处理停车或者紧急车道行驶违规数据"""
|
|
@@ -1102,9 +1307,11 @@ class SpeedingViolation(object):
|
|
def __init__(self, df_data):
|
|
def __init__(self, df_data):
|
|
print("超速违规类初始化中...")
|
|
print("超速违规类初始化中...")
|
|
self.traffic_violations_type = "超速违规类"
|
|
self.traffic_violations_type = "超速违规类"
|
|
- self.data = df_data.obj_data[
|
|
|
|
- 1
|
|
|
|
- ] # Copy to avoid modifying the original DataFrame
|
|
|
|
|
|
+ # 存储原始数据引用
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._data = None
|
|
# 初始化违规统计
|
|
# 初始化违规统计
|
|
self.violation_counts = {
|
|
self.violation_counts = {
|
|
"urbanExpresswayOrHighwaySpeedOverLimit50": 0,
|
|
"urbanExpresswayOrHighwaySpeedOverLimit50": 0,
|
|
@@ -1115,6 +1322,16 @@ class SpeedingViolation(object):
|
|
"generalRoadSpeedOverLimit20to50": 0,
|
|
"generalRoadSpeedOverLimit20to50": 0,
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @property
|
|
|
|
+ def data(self):
|
|
|
|
+ """懒加载方式获取数据"""
|
|
|
|
+ if self._data is None:
|
|
|
|
+ # 使用浅拷贝代替深拷贝
|
|
|
|
+ self._data = self._raw_data.copy()
|
|
|
|
+ # 预处理数据 - 转换速度单位
|
|
|
|
+ self._data["v"] *= 3.6 # 转换为 km/h
|
|
|
|
+ return self._data
|
|
|
|
+
|
|
def process_violations(self):
|
|
def process_violations(self):
|
|
"""处理数据帧,检查超速和其他违规行为"""
|
|
"""处理数据帧,检查超速和其他违规行为"""
|
|
# 提取有效道路类型
|
|
# 提取有效道路类型
|
|
@@ -1365,61 +1582,92 @@ class WarningViolation(object):
|
|
"""警告性违规类"""
|
|
"""警告性违规类"""
|
|
|
|
|
|
def __init__(self, df_data):
|
|
def __init__(self, df_data):
|
|
|
|
+ print("警告性违规类初始化中...")
|
|
self.traffic_violations_type = "警告性违规类"
|
|
self.traffic_violations_type = "警告性违规类"
|
|
- print("警告性违规类 类初始化中...")
|
|
|
|
|
|
+
|
|
|
|
+ # 存储原始数据引用
|
|
self.config = df_data.vehicle_config
|
|
self.config = df_data.vehicle_config
|
|
- self.data_ego = df_data.obj_data[1]
|
|
|
|
- self.data = self.data_ego.copy() # 避免修改原始 DataFrame
|
|
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._data = None
|
|
|
|
+
|
|
|
|
+ # 初始化违规计数器
|
|
self.violation_counts = {
|
|
self.violation_counts = {
|
|
"generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
|
|
"generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
|
|
"urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
|
|
"urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def data(self):
|
|
|
|
+ """懒加载方式获取数据"""
|
|
|
|
+ if self._data is None:
|
|
|
|
+ # 使用浅拷贝代替深拷贝
|
|
|
|
+ self._data = self._raw_data.copy()
|
|
|
|
+ return self._data
|
|
|
|
|
|
def process_violations(self):
|
|
def process_violations(self):
|
|
- general_road = {3} # 普通道路
|
|
|
|
- lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
|
|
|
|
- # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
|
|
|
|
- # config = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
- car_width = self.config["CAR_WIDTH"]
|
|
|
|
- lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
|
|
|
|
|
|
+ """处理所有违规类型"""
|
|
|
|
+ # 处理普通道路不按规定车道行驶违规
|
|
|
|
+ self._process_irregular_lane_use()
|
|
|
|
+
|
|
|
|
+ # 处理骑、轧车行道分界线违规
|
|
|
|
+ self._process_lane_divider_violation()
|
|
|
|
|
|
- # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
|
|
|
|
|
|
+ def _process_irregular_lane_use(self):
|
|
|
|
+ """处理普通道路不按规定车道行驶违规"""
|
|
|
|
+ # 定义道路和车道类型
|
|
|
|
+ general_road = {3} # 普通道路
|
|
|
|
+ lane_type = {11} # 非机动车道
|
|
|
|
+
|
|
# 使用布尔索引来筛选满足条件的行
|
|
# 使用布尔索引来筛选满足条件的行
|
|
condition = (self.data["road_fc"].isin(general_road)) & (
|
|
condition = (self.data["road_fc"].isin(general_road)) & (
|
|
self.data["lane_type"].isin(lane_type)
|
|
self.data["lane_type"].isin(lane_type)
|
|
)
|
|
)
|
|
-
|
|
|
|
|
|
+
|
|
# 创建一个新的列,并根据条件设置值
|
|
# 创建一个新的列,并根据条件设置值
|
|
self.data["is_violation"] = condition
|
|
self.data["is_violation"] = condition
|
|
-
|
|
|
|
|
|
+
|
|
# 统计满足条件的连续时间段
|
|
# 统计满足条件的连续时间段
|
|
violation_segments = self.count_continuous_violations(
|
|
violation_segments = self.count_continuous_violations(
|
|
self.data["is_violation"], self.data["simTime"]
|
|
self.data["is_violation"], self.data["simTime"]
|
|
)
|
|
)
|
|
|
|
+
|
|
|
|
+ # 更新违规计数
|
|
|
|
+ self.violation_counts["generalRoadIrregularLaneUse"] = len(violation_segments)
|
|
|
|
|
|
- # 更新骑行车道线违规计数
|
|
|
|
- self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
|
|
|
|
-
|
|
|
|
- # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
|
|
|
|
-
|
|
|
|
|
|
+ def _process_lane_divider_violation(self):
|
|
|
|
+ """处理骑、轧车行道分界线违规"""
|
|
|
|
+ # 获取车辆和车道宽度
|
|
|
|
+ car_width = self.config["CAR_WIDTH"]
|
|
|
|
+ lane_width = self.data["lane_width"]
|
|
|
|
+
|
|
# 计算阈值
|
|
# 计算阈值
|
|
threshold = (lane_width - car_width) / 2
|
|
threshold = (lane_width - car_width) / 2
|
|
-
|
|
|
|
|
|
+
|
|
# 找到满足条件的行
|
|
# 找到满足条件的行
|
|
self.data["is_violation"] = self.data["laneOffset"] > threshold
|
|
self.data["is_violation"] = self.data["laneOffset"] > threshold
|
|
-
|
|
|
|
|
|
+
|
|
# 统计满足条件的连续时间段
|
|
# 统计满足条件的连续时间段
|
|
violation_segments = self.count_continuous_violations(
|
|
violation_segments = self.count_continuous_violations(
|
|
self.data["is_violation"], self.data["simTime"]
|
|
self.data["is_violation"], self.data["simTime"]
|
|
)
|
|
)
|
|
-
|
|
|
|
- # 更新骑行车道线违规计数
|
|
|
|
- self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
|
|
|
|
|
|
+
|
|
|
|
+ # 更新违规计数
|
|
|
|
+ self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] = len(
|
|
violation_segments
|
|
violation_segments
|
|
)
|
|
)
|
|
|
|
|
|
def count_continuous_violations(self, violation_series, time_series):
|
|
def count_continuous_violations(self, violation_series, time_series):
|
|
- """统计连续违规的时间段数量"""
|
|
|
|
|
|
+ """统计连续违规的时间段数量
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ violation_series: 表示是否违规的布尔序列
|
|
|
|
+ time_series: 对应的时间序列
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ list: 连续违规时间段列表
|
|
|
|
+ """
|
|
continuous_segments = []
|
|
continuous_segments = []
|
|
current_segment = []
|
|
current_segment = []
|
|
|
|
|
|
@@ -1429,25 +1677,29 @@ class WarningViolation(object):
|
|
current_segment.append(time)
|
|
current_segment.append(time)
|
|
else:
|
|
else:
|
|
if current_segment: # 连续段结束
|
|
if current_segment: # 连续段结束
|
|
|
|
+ current_segment.append(time) # 添加结束时间
|
|
continuous_segments.append(current_segment)
|
|
continuous_segments.append(current_segment)
|
|
current_segment = []
|
|
current_segment = []
|
|
|
|
|
|
# 检查是否有一个未结束的连续段在最后
|
|
# 检查是否有一个未结束的连续段在最后
|
|
if current_segment:
|
|
if current_segment:
|
|
|
|
+ current_segment.append(time_series.iloc[-1]) # 使用最后的时间作为结束时间
|
|
continuous_segments.append(current_segment)
|
|
continuous_segments.append(current_segment)
|
|
|
|
|
|
return continuous_segments
|
|
return continuous_segments
|
|
|
|
|
|
-
|
|
|
|
def calculate_generalRoadIrregularLaneUse_count(self):
|
|
def calculate_generalRoadIrregularLaneUse_count(self):
|
|
- self.process_violations()
|
|
|
|
|
|
+ """计算普通道路不按规定车道行驶违规次数"""
|
|
|
|
+ # 只处理普通道路不按规定车道行驶违规
|
|
|
|
+ self._process_irregular_lane_use()
|
|
return self.violation_counts["generalRoadIrregularLaneUse"]
|
|
return self.violation_counts["generalRoadIrregularLaneUse"]
|
|
|
|
|
|
- def calculate_urbanExpresswayOrHighwayRideLaneDivider(self):
|
|
|
|
- self.process_violations()
|
|
|
|
|
|
+ def calculate_urbanExpresswayOrHighwayRideLaneDivider_count(self):
|
|
|
|
+ """计算骑、轧车行道分界线违规次数"""
|
|
|
|
+ # 只处理骑、轧车行道分界线违规
|
|
|
|
+ self._process_lane_divider_violation()
|
|
return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
|
|
return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
|
|
|
|
|
|
-
|
|
|
|
class TrafficSignViolation:
|
|
class TrafficSignViolation:
|
|
"""交通标志违规类"""
|
|
"""交通标志违规类"""
|
|
|
|
|
|
@@ -1457,12 +1709,14 @@ class TrafficSignViolation:
|
|
SIGN_TYPE_MIN_SPEED_LIMIT = 13
|
|
SIGN_TYPE_MIN_SPEED_LIMIT = 13
|
|
|
|
|
|
def __init__(self, df_data):
|
|
def __init__(self, df_data):
|
|
|
|
+ print("交通标志违规类初始化中...")
|
|
self.traffic_violations_type = "交通标志违规类"
|
|
self.traffic_violations_type = "交通标志违规类"
|
|
- print("交通标志违规类 初始化中...")
|
|
|
|
|
|
|
|
- # 数据预处理
|
|
|
|
- self._raw_data = df_data.obj_data[1].copy()
|
|
|
|
- self.data_ego = self._raw_data.sort_values('simTime').reset_index(drop=True)
|
|
|
|
|
|
+ # 存储原始数据引用
|
|
|
|
+ self._raw_data = df_data.obj_data[1]
|
|
|
|
+
|
|
|
|
+ # 初始化属性,但不立即创建数据副本
|
|
|
|
+ self._data = None
|
|
|
|
|
|
# 延迟计算标志
|
|
# 延迟计算标志
|
|
self._calculated = False
|
|
self._calculated = False
|
|
@@ -1471,6 +1725,16 @@ class TrafficSignViolation:
|
|
"SpeedLimitViolation": 0,
|
|
"SpeedLimitViolation": 0,
|
|
"MinimumSpeedLimitViolation": 0
|
|
"MinimumSpeedLimitViolation": 0
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def data(self):
|
|
|
|
+ """懒加载方式获取数据"""
|
|
|
|
+ if self._data is None:
|
|
|
|
+ # 使用浅拷贝代替深拷贝
|
|
|
|
+ self._data = self._raw_data.copy()
|
|
|
|
+ # 预处理数据 - 按时间排序
|
|
|
|
+ self._data = self._data.sort_values('simTime').reset_index(drop=True)
|
|
|
|
+ return self._data
|
|
|
|
|
|
def _ensure_calculated(self):
|
|
def _ensure_calculated(self):
|
|
"""保证计算只执行一次"""
|
|
"""保证计算只执行一次"""
|
|
@@ -1513,7 +1777,7 @@ class TrafficSignViolation:
|
|
|
|
|
|
def _check_straight_violation(self):
|
|
def _check_straight_violation(self):
|
|
"""检查禁止直行违规"""
|
|
"""检查禁止直行违规"""
|
|
- straight_df = self.data_ego[self.data_ego["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
|
|
|
|
|
|
+ straight_df = self.data[self.data["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
|
|
|
|
|
|
if not straight_df.empty:
|
|
if not straight_df.empty:
|
|
# 计算航向角变化并填充缺失值
|
|
# 计算航向角变化并填充缺失值
|
|
@@ -1526,16 +1790,21 @@ class TrafficSignViolation:
|
|
(straight_df['v'] > 0)
|
|
(straight_df['v'] > 0)
|
|
)
|
|
)
|
|
|
|
|
|
- self.violation_counts["NoStraightThrough"] = mask.sum()
|
|
|
|
|
|
+ self._violation_counts["NoStraightThrough"] = mask.sum()
|
|
|
|
|
|
def _check_speed_violation(self, sign_type, compare_op, count_key):
|
|
def _check_speed_violation(self, sign_type, compare_op, count_key):
|
|
- """通用速度违规检查方法"""
|
|
|
|
- violation_df = self.data_ego[self.data_ego["sign_type1"] == sign_type]
|
|
|
|
|
|
+ """通用速度违规检查方法
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ sign_type: 标志类型
|
|
|
|
+ compare_op: 比较操作符
|
|
|
|
+ count_key: 违规计数键名
|
|
|
|
+ """
|
|
|
|
+ violation_df = self.data[self.data["sign_type1"] == sign_type]
|
|
|
|
|
|
if not violation_df.empty:
|
|
if not violation_df.empty:
|
|
mask = compare_op(violation_df['v'], violation_df['sign_speed'])
|
|
mask = compare_op(violation_df['v'], violation_df['sign_speed'])
|
|
- self.violation_counts[count_key] = mask.sum()
|
|
|
|
-
|
|
|
|
|
|
+ self._violation_counts[count_key] = mask.sum()
|
|
|
|
|
|
|
|
|
|
|
|
|