|
@@ -1,9 +1,10 @@
|
|
|
-
|
|
|
import math
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from modules.lib import log_manager
|
|
|
from modules.lib.score import Score
|
|
|
+from modules.lib.log_manager import LogManager
|
|
|
+from modules.lib import data_process
|
|
|
|
|
|
OVERTAKE_INFO = [
|
|
|
"simTime",
|
|
@@ -59,6 +60,60 @@ TRFFICSIGN_INFO = [
|
|
|
]
|
|
|
|
|
|
|
|
|
+def overtake_when_passing_car(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
|
|
|
+ return {"overtake_when_passing_car": overtake_when_passing_car_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_on_right(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
|
|
|
+ return {"overtake_on_right": overtake_on_right_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_when_turn_around(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
|
|
|
+ return {"overtake_when_turn_around": overtake_when_turn_around_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_in_forbid_lane(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
|
|
|
+ return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_in_ramp(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
|
|
|
+ return {"overtake_in_ramp": overtake_in_ramp_area_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_in_tunnel(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
|
|
|
+ return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_on_accelerate_lane(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
|
|
|
+ return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_on_decelerate_lane(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
|
|
|
+ return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
|
|
|
+
|
|
|
+
|
|
|
+def overtake_in_different_senerios(data_processed):
|
|
|
+ overtakingviolation = OvertakingViolation(data_processed)
|
|
|
+ overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
|
|
|
+ return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
|
|
|
+
|
|
|
+
|
|
|
class OvertakingViolation(object):
|
|
|
"""超车违规类"""
|
|
|
|
|
@@ -72,19 +127,21 @@ class OvertakingViolation(object):
|
|
|
self.ego_data = (
|
|
|
self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
) # Copy to avoid modifying the original DataFrame
|
|
|
- self.data_obj = df_data.obj_data[2]
|
|
|
- self.obj_data = (
|
|
|
- self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
- ) # Copy to avoid modifying the original DataFrame
|
|
|
- self.object_items = []
|
|
|
- for i, item in df_data.obj_data.items():
|
|
|
- self.object_items.append(i)
|
|
|
- if 3 in self.object_items:
|
|
|
+ header = self.ego_data.columns
|
|
|
+ if 2 in df_data.obj_id_list:
|
|
|
+ self.data_obj = df_data.obj_data[2]
|
|
|
+ self.obj_data = (
|
|
|
+ self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
+ ) # Copy to avoid modifying the original DataFrame
|
|
|
+ else:
|
|
|
+ self.obj_data = pd.DataFrame(columns=header)
|
|
|
+ if 3 in df_data.obj_id_list:
|
|
|
self.other_obj_data1 = df_data.obj_data[3]
|
|
|
self.other_obj_data = (
|
|
|
self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
)
|
|
|
-
|
|
|
+ else:
|
|
|
+ self.other_obj_data = pd.DataFrame(columns=header)
|
|
|
self.overtake_on_right_count = 0
|
|
|
self.overtake_when_turn_around_count = 0
|
|
|
self.overtake_when_passing_car_count = 0
|
|
@@ -133,15 +190,13 @@ class OvertakingViolation(object):
|
|
|
|
|
|
# 在前车右侧超车、会车时超车、前车掉头时超车
|
|
|
|
|
|
- def illegal_overtake_with_car(self, window_width=250):
|
|
|
+ def illegal_overtake_with_car_detector(self, window_width=250):
|
|
|
|
|
|
# 获取csv文件中最短的帧数
|
|
|
frame_id_length = len(self.ego_data["simFrame"])
|
|
|
start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
|
|
|
|
|
|
while (start_frame_id + window_width) < frame_id_length:
|
|
|
- # if start_frame_id == 828:
|
|
|
- # print("end")
|
|
|
simframe_window1 = list(
|
|
|
np.arange(start_frame_id, start_frame_id + window_width)
|
|
|
)
|
|
@@ -176,9 +231,9 @@ class OvertakingViolation(object):
|
|
|
other_start_speedx = other_data_frames["speedX"].iloc[0]
|
|
|
other_start_speedy = other_data_frames["speedY"].iloc[0]
|
|
|
if (
|
|
|
- ego_speedx[0] * other_start_speedx
|
|
|
- + ego_speedy[0] * other_start_speedy
|
|
|
- < 0
|
|
|
+ ego_speedx[0] * other_start_speedx
|
|
|
+ + ego_speedy[0] * other_start_speedy
|
|
|
+ < 0
|
|
|
):
|
|
|
self.overtake_when_passing_car_count += self._is_overtake(
|
|
|
lane_id, dx, dy, ego_speedx, ego_speedy
|
|
@@ -202,11 +257,9 @@ class OvertakingViolation(object):
|
|
|
start_frame_id += window_width
|
|
|
else:
|
|
|
start_frame_id += 1
|
|
|
- # print(
|
|
|
- # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
|
|
|
|
|
|
# 借道超车场景
|
|
|
- def overtake_in_forbid_lane(self):
|
|
|
+ def overtake_in_forbid_lane_detector(self):
|
|
|
simTime = self.obj_data["simTime"].tolist()
|
|
|
simtime_devide = self.different_road_area_simtime(simTime)
|
|
|
for simtime in simtime_devide:
|
|
@@ -214,7 +267,7 @@ class OvertakingViolation(object):
|
|
|
try:
|
|
|
lane_type = lane_overtake["lane_type"].tolist()
|
|
|
if (50002 in lane_type and len(set(lane_type)) > 2) or (
|
|
|
- 50002 not in lane_type and len(set(lane_type)) > 1
|
|
|
+ 50002 not in lane_type and len(set(lane_type)) > 1
|
|
|
):
|
|
|
self.overtake_in_forbid_lane_count += 1
|
|
|
except Exception as e:
|
|
@@ -222,7 +275,7 @@ class OvertakingViolation(object):
|
|
|
# print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
|
|
|
|
|
|
# 在匝道超车
|
|
|
- def overtake_in_ramp_area(self):
|
|
|
+ def overtake_in_ramp_area_detector(self):
|
|
|
ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
|
|
|
"simTime"
|
|
|
].tolist()
|
|
@@ -244,7 +297,7 @@ class OvertakingViolation(object):
|
|
|
continue
|
|
|
# print(f"在匝道超车{self.overtake_in_ramp_count}次")
|
|
|
|
|
|
- def overtake_in_tunnel_area(self):
|
|
|
+ def overtake_in_tunnel_area_detector(self):
|
|
|
tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
|
|
|
"simTime"
|
|
|
].tolist()
|
|
@@ -267,7 +320,7 @@ class OvertakingViolation(object):
|
|
|
# print(f"在隧道超车{self.overtake_in_tunnel_count}次")
|
|
|
|
|
|
# 加速车道超车
|
|
|
- def overtake_on_accelerate_lane(self):
|
|
|
+ def overtake_on_accelerate_lane_detector(self):
|
|
|
accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
|
|
|
"simTime"
|
|
|
].tolist()
|
|
@@ -292,7 +345,7 @@ class OvertakingViolation(object):
|
|
|
# print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
|
|
|
|
|
|
# 减速车道超车
|
|
|
- def overtake_on_decelerate_lane(self):
|
|
|
+ def overtake_on_decelerate_lane_detector(self):
|
|
|
decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
|
|
|
"simTime"
|
|
|
].tolist()
|
|
@@ -317,7 +370,7 @@ class OvertakingViolation(object):
|
|
|
# print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
|
|
|
|
|
|
# 在交叉路口
|
|
|
- def overtake_in_different_senerios(self):
|
|
|
+ def overtake_in_different_senerios_detector(self):
|
|
|
crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
|
|
|
"simTime"
|
|
|
].tolist() # 判断是路口或者隧道区域
|
|
@@ -326,8 +379,6 @@ class OvertakingViolation(object):
|
|
|
crossroad_objstate = self.obj_data[
|
|
|
self.obj_data["simTime"].isin(crossroad_simTime)
|
|
|
]
|
|
|
- # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
|
|
|
-
|
|
|
# 读取前后的laneId
|
|
|
lane_id = crossroad_ego["lane_id"].tolist()
|
|
|
|
|
@@ -346,30 +397,66 @@ class OvertakingViolation(object):
|
|
|
)
|
|
|
else:
|
|
|
pass
|
|
|
- # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
|
|
|
|
|
|
- def statistic(self):
|
|
|
- self.overtake_in_forbid_lane()
|
|
|
- self.overtake_on_decelerate_lane()
|
|
|
- self.overtake_on_accelerate_lane()
|
|
|
- self.overtake_in_ramp_area()
|
|
|
- self.overtake_in_tunnel_area()
|
|
|
- self.overtake_in_different_senerios()
|
|
|
- self.illegal_overtake_with_car()
|
|
|
-
|
|
|
- self.calculated_value = {
|
|
|
- "overtake_on_right": self.overtake_on_right_count,
|
|
|
- "overtake_when_turn_around": self.overtake_when_turn_around_count,
|
|
|
- "overtake_when_passing_car": self.overtake_when_passing_car_count,
|
|
|
- "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
|
|
|
- "overtake_in_ramp": self.overtake_in_ramp_count,
|
|
|
- "overtake_in_tunnel": self.overtake_in_tunnel_count,
|
|
|
- "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
|
|
|
- "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
|
|
|
- "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
|
|
|
- }
|
|
|
- # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
- return self.calculated_value
|
|
|
+ def calculate_overtake_when_passing_car_count(self):
|
|
|
+ self.illegal_overtake_with_car_detector()
|
|
|
+ return self.overtake_when_passing_car_count
|
|
|
+
|
|
|
+ def calculate_overtake_on_right_count(self):
|
|
|
+ self.illegal_overtake_with_car_detector()
|
|
|
+ return self.overtake_on_right_count
|
|
|
+
|
|
|
+ def calculate_overtake_when_turn_around_count(self):
|
|
|
+ self.illegal_overtake_with_car_detector()
|
|
|
+ return self.overtake_when_turn_around_count
|
|
|
+
|
|
|
+ def calculate_overtake_in_forbid_lane_count(self):
|
|
|
+ self.overtake_in_forbid_lane_detector()
|
|
|
+ return self.overtake_in_forbid_lane_count
|
|
|
+
|
|
|
+ def calculate_overtake_in_ramp_area_count(self):
|
|
|
+ self.overtake_in_ramp_area_detector()
|
|
|
+ return self.overtake_in_ramp_count
|
|
|
+
|
|
|
+ def calculate_overtake_in_tunnel_area_count(self):
|
|
|
+ self.overtake_in_tunnel_area_detector()
|
|
|
+ return self.overtake_in_tunnel_count
|
|
|
+
|
|
|
+ def calculate_overtake_on_accelerate_lane_count(self):
|
|
|
+ self.overtake_on_accelerate_lane_detector()
|
|
|
+ return self.overtake_on_accelerate_lane_count
|
|
|
+
|
|
|
+ def calculate_overtake_on_decelerate_lane_count(self):
|
|
|
+ self.overtake_on_decelerate_lane_detector()
|
|
|
+ return self.overtake_on_decelerate_lane_count
|
|
|
+
|
|
|
+ def calculate_overtake_in_different_senerios_count(self):
|
|
|
+ self.overtake_in_different_senerios_detector()
|
|
|
+ return self.overtake_in_different_senerios_count
|
|
|
+
|
|
|
+
|
|
|
+def slow_down_in_crosswalk(data_processed):
|
|
|
+ slowdownviolation = SlowdownViolation(data_processed)
|
|
|
+ slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
|
|
|
+ return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
|
|
|
+
|
|
|
+
|
|
|
+def avoid_pedestrian_in_crosswalk(data_processed):
|
|
|
+ avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
|
|
|
+ avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
|
|
|
+ return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
|
|
|
+
|
|
|
+
|
|
|
+def avoid_pedestrian_in_the_road(data_processed):
|
|
|
+ avoidpedestrianintheroad = SlowdownViolation(data_processed)
|
|
|
+ avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
|
|
|
+ return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
|
|
|
+
|
|
|
+
|
|
|
+def aviod_pedestrian_when_turning(data_processed):
|
|
|
+ avoidpedestrianwhenturning = SlowdownViolation(data_processed)
|
|
|
+ avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
|
|
|
+ return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
|
|
|
|
|
|
|
|
|
class SlowdownViolation(object):
|
|
@@ -408,12 +495,12 @@ class SlowdownViolation(object):
|
|
|
)
|
|
|
|
|
|
self.ego_data["rela_pos"] = (
|
|
|
- self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
- + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
+ self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
+ + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
)
|
|
|
simtime = self.ego_data[
|
|
|
(self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
|
|
|
- ]["simTime"].tolist()
|
|
|
+ ]["simTime"].tolist()
|
|
|
return simtime
|
|
|
|
|
|
def different_road_area_simtime(self, df, threshold=0.6):
|
|
@@ -432,7 +519,7 @@ class SlowdownViolation(object):
|
|
|
simtime_group.append(current_simtime_group)
|
|
|
return simtime_group
|
|
|
|
|
|
- def slow_down_in_crosswalk(self):
|
|
|
+ def slow_down_in_crosswalk_detector(self):
|
|
|
# 筛选出路口或隧道区域的时间点
|
|
|
crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
|
|
|
"simTime"
|
|
@@ -448,12 +535,12 @@ class SlowdownViolation(object):
|
|
|
crosswalk_objstate = self.ego_data[
|
|
|
(self.ego_data["simTime"] >= start_time)
|
|
|
& (self.ego_data["simTime"] <= end_time)
|
|
|
- ]
|
|
|
+ ]
|
|
|
|
|
|
# 计算车辆速度
|
|
|
ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
|
|
|
ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
|
|
|
- ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2)
|
|
|
+ ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
|
|
|
|
|
|
# 判断是否超速
|
|
|
if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
|
|
@@ -462,7 +549,7 @@ class SlowdownViolation(object):
|
|
|
# 输出总次数
|
|
|
print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
|
|
|
|
|
|
- def avoid_pedestrian_in_crosswalk(self):
|
|
|
+ def avoid_pedestrian_in_crosswalk_detector(self):
|
|
|
crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
|
|
|
"simTime"
|
|
|
].tolist()
|
|
@@ -486,7 +573,7 @@ class SlowdownViolation(object):
|
|
|
if ego_speed.any() > 0:
|
|
|
self.avoid_pedestrian_in_crosswalk_count += 1
|
|
|
|
|
|
- def avoid_pedestrian_in_the_road(self):
|
|
|
+ def avoid_pedestrian_in_the_road_detector(self):
|
|
|
simtime = self.pedestrian_in_front_of_car()
|
|
|
if len(simtime) == 0:
|
|
|
self.avoid_pedestrian_in_the_road_count += 0
|
|
@@ -505,8 +592,8 @@ class SlowdownViolation(object):
|
|
|
(ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
|
|
|
** 2
|
|
|
+ (
|
|
|
- ego_car["posY"].values
|
|
|
- - sub_pedestrian_on_the_road["posY"].values
|
|
|
+ ego_car["posY"].values
|
|
|
+ - sub_pedestrian_on_the_road["posY"].values
|
|
|
)
|
|
|
** 2
|
|
|
)
|
|
@@ -521,13 +608,13 @@ class SlowdownViolation(object):
|
|
|
if new_ego_car["Column3"].any():
|
|
|
self.avoid_pedestrian_in_the_road_count += 1
|
|
|
|
|
|
- def aviod_pedestrian_when_turning(self):
|
|
|
+ def aviod_pedestrian_when_turning_detector(self):
|
|
|
pedestrian_simtime_list = self.pedestrian_in_front_of_car()
|
|
|
if len(pedestrian_simtime_list) > 0:
|
|
|
simtime_list = self.ego_data[
|
|
|
(self.ego_data["simTime"].isin(pedestrian_simtime_list))
|
|
|
& (self.ego_data["lane_type"] == 20)
|
|
|
- ]["simTime"].tolist()
|
|
|
+ ]["simTime"].tolist()
|
|
|
simTime_list = self.different_road_area_simtime(simtime_list)
|
|
|
pedestrian_on_the_road = self.pedestrian_data[
|
|
|
self.pedestrian_data["simTime"].isin(simtime_list)
|
|
@@ -541,8 +628,8 @@ class SlowdownViolation(object):
|
|
|
(ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
|
|
|
** 2
|
|
|
+ (
|
|
|
- ego_car["posY"].values
|
|
|
- - sub_pedestrian_on_the_road["posY"].values
|
|
|
+ ego_car["posY"].values
|
|
|
+ - sub_pedestrian_on_the_road["posY"].values
|
|
|
)
|
|
|
** 2
|
|
|
)
|
|
@@ -552,20 +639,39 @@ class SlowdownViolation(object):
|
|
|
if any(ego_car["speed"].tolist()) != 0:
|
|
|
self.aviod_pedestrian_when_turning_count += 1
|
|
|
|
|
|
- def statistic(self):
|
|
|
- self.slow_down_in_crosswalk()
|
|
|
- self.avoid_pedestrian_in_crosswalk()
|
|
|
- self.avoid_pedestrian_in_the_road()
|
|
|
- self.aviod_pedestrian_when_turning()
|
|
|
-
|
|
|
- self.calculated_value = {
|
|
|
- "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
|
|
|
- "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
|
|
|
- "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
|
|
|
- "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
|
|
|
- }
|
|
|
- # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
- return self.calculated_value
|
|
|
+ def calculate_slow_down_in_crosswalk_count(self):
|
|
|
+ self.slow_down_in_crosswalk_detector()
|
|
|
+ return self.slow_down_in_crosswalk_count
|
|
|
+
|
|
|
+ def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
|
|
|
+ self.avoid_pedestrian_in_crosswalk_detector()
|
|
|
+ return self.avoid_pedestrian_in_crosswalk_count
|
|
|
+
|
|
|
+ def calculate_avoid_pedestrian_in_the_road_count(self):
|
|
|
+ self.avoid_pedestrian_in_the_road_detector()
|
|
|
+ return self.avoid_pedestrian_in_the_road_count
|
|
|
+
|
|
|
+ def calculate_avoid_pedestrian_when_turning_count(self):
|
|
|
+ self.aviod_pedestrian_when_turning_detector()
|
|
|
+ return self.aviod_pedestrian_when_turning_count
|
|
|
+
|
|
|
+
|
|
|
+def turn_in_forbiden_turn_left_sign(data_processed):
|
|
|
+ turnaroundviolation = TurnaroundViolation(data_processed)
|
|
|
+ turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
|
|
|
+ return turn_in_forbiden_turn_left_sign_count
|
|
|
+
|
|
|
+
|
|
|
+def turn_in_forbiden_turn_back_sign(data_processed):
|
|
|
+ turnaroundviolation = TurnaroundViolation(data_processed)
|
|
|
+ turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
|
|
|
+ return turn_in_forbiden_turn_back_sign_count
|
|
|
+
|
|
|
+
|
|
|
+def avoid_pedestrian_when_turn_back(data_processed):
|
|
|
+ turnaroundviolation = TurnaroundViolation(data_processed)
|
|
|
+ avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
|
|
|
+ return avoid_pedestrian_when_turn_back_count
|
|
|
|
|
|
|
|
|
class TurnaroundViolation(object):
|
|
@@ -602,12 +708,12 @@ class TurnaroundViolation(object):
|
|
|
)
|
|
|
|
|
|
self.ego_data["rela_pos"] = (
|
|
|
- self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
- + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
+ self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
+ + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
)
|
|
|
simtime = self.ego_data[
|
|
|
(self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
|
|
|
- ]["simTime"].tolist()
|
|
|
+ ]["simTime"].tolist()
|
|
|
return simtime
|
|
|
|
|
|
def different_road_area_simtime(self, df, threshold=0.5):
|
|
@@ -626,7 +732,7 @@ class TurnaroundViolation(object):
|
|
|
simtime_group.append(current_simtime_group)
|
|
|
return simtime_group
|
|
|
|
|
|
- def turn_back_in_forbiden_sign(self):
|
|
|
+ def turn_back_in_forbiden_sign_detector(self):
|
|
|
"""
|
|
|
禁止掉头type = 8
|
|
|
"""
|
|
@@ -653,9 +759,9 @@ class TurnaroundViolation(object):
|
|
|
ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
|
|
|
|
|
|
if (
|
|
|
- ego_end_speedx1 * ego_start_speedx1
|
|
|
- + ego_end_speedy1 * ego_start_speedy1
|
|
|
- < 0
|
|
|
+ ego_end_speedx1 * ego_start_speedx1
|
|
|
+ + ego_end_speedy1 * ego_start_speedy1
|
|
|
+ < 0
|
|
|
):
|
|
|
self.turning_in_forbiden_turn_back_sign_count += 1
|
|
|
|
|
@@ -669,26 +775,24 @@ class TurnaroundViolation(object):
|
|
|
ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
|
|
|
|
|
|
if (
|
|
|
- ego_end_speedx2 * ego_start_speedx2
|
|
|
- + ego_end_speedy2 * ego_start_speedy2
|
|
|
- < 0
|
|
|
+ ego_end_speedx2 * ego_start_speedx2
|
|
|
+ + ego_end_speedy2 * ego_start_speedy2
|
|
|
+ < 0
|
|
|
):
|
|
|
self.turning_in_forbiden_turn_left_sign_count += 1
|
|
|
|
|
|
- def avoid_pedestrian_when_turn_back(self):
|
|
|
+ def avoid_pedestrian_when_turn_back_detector(self):
|
|
|
sensor_on_intersection = self.pedestrian_in_front_of_car()
|
|
|
avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
|
|
|
self.ego_data["lane_type"] == 20
|
|
|
- ]["simTime"].tolist()
|
|
|
+ ]["simTime"].tolist()
|
|
|
avoid_pedestrian_when_turn_back_simTime_devide = (
|
|
|
self.different_road_area_simtime(
|
|
|
avoid_pedestrian_when_turn_back_simTime_list
|
|
|
)
|
|
|
)
|
|
|
if len(sensor_on_intersection) > 0:
|
|
|
- for (
|
|
|
- avoid_pedestrian_when_turn_back_simtime
|
|
|
- ) in avoid_pedestrian_when_turn_back_simTime_devide:
|
|
|
+ for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
|
|
|
pedestrian_in_intersection_simtime = self.pedestrian_data[
|
|
|
self.pedestrian_data["simTime"].isin(
|
|
|
avoid_pedestrian_when_turn_back_simtime
|
|
@@ -710,17 +814,35 @@ class TurnaroundViolation(object):
|
|
|
if any(ego_df["speed"].tolist()) != 0:
|
|
|
self.avoid_pedestrian_when_turn_back_count += 1
|
|
|
|
|
|
- def statistic(self):
|
|
|
- self.turn_back_in_forbiden_sign()
|
|
|
- self.avoid_pedestrian_when_turn_back()
|
|
|
+ def calculate_turn_in_forbiden_turn_left_sign_count(self):
|
|
|
+ self.turn_back_in_forbiden_sign_detector()
|
|
|
+ return self.turning_in_forbiden_turn_left_sign_count
|
|
|
|
|
|
- self.calculated_value = {
|
|
|
- "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
|
|
|
- "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
|
|
|
- "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
|
|
|
- }
|
|
|
- # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
- return self.calculated_value
|
|
|
+ def calculate_turn_in_forbiden_turn_back_sign_count(self):
|
|
|
+ self.turn_back_in_forbiden_sign_detector()
|
|
|
+ return self.turning_in_forbiden_turn_back_sign_count
|
|
|
+
|
|
|
+ def calaulate_avoid_pedestrian_when_turn_back_count(self):
|
|
|
+ self.avoid_pedestrian_when_turn_back_detector()
|
|
|
+ return self.avoid_pedestrian_when_turn_back_count
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwayDrivingLaneStopped(data_processed):
|
|
|
+ wrongwayviolation = WrongWayViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
|
|
|
+ return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwayEmergencyLaneStopped(data_processed):
|
|
|
+ wrongwayviolation = WrongWayViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
|
|
|
+ return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayEmergencyLaneDriving(data_processed):
|
|
|
+ wrongwayviolation = WrongWayViolation(data_processed)
|
|
|
+ urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
|
|
|
+ return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
|
|
|
|
|
|
|
|
|
class WrongWayViolation:
|
|
@@ -748,19 +870,19 @@ class WrongWayViolation:
|
|
|
# 使用向量化和条件判断进行违规判定
|
|
|
conditions = [
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & self.data["lane_type"].isin(driving_lane)
|
|
|
- & (self.data["v"] == 0)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & self.data["lane_type"].isin(driving_lane)
|
|
|
+ & (self.data["v"] == 0)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & self.data["lane_type"].isin(emergency_lane)
|
|
|
- & (self.data["v"] == 0)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & self.data["lane_type"].isin(emergency_lane)
|
|
|
+ & (self.data["v"] == 0)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & self.data["lane_type"].isin(emergency_lane)
|
|
|
- & (self.data["v"] != 0)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & self.data["lane_type"].isin(emergency_lane)
|
|
|
+ & (self.data["v"] != 0)
|
|
|
),
|
|
|
]
|
|
|
|
|
@@ -783,11 +905,53 @@ class WrongWayViolation:
|
|
|
.to_dict()
|
|
|
)
|
|
|
|
|
|
- def statistic(self) -> str:
|
|
|
+ def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
|
|
|
+
|
|
|
+ def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
|
|
|
|
|
|
+ def calculate_urbanExpresswayEmergencyLaneDriving(self):
|
|
|
self.process_violations()
|
|
|
- # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
|
|
|
- return self.violation_count
|
|
|
+ return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwaySpeedOverLimit50(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
|
|
|
+ return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwaySpeedOverLimit20to50(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
|
|
|
+ return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwaySpeedOverLimit0to20(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
|
|
|
+ return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwaySpeedUnderLimit(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
|
|
|
+ return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
|
|
|
+
|
|
|
+
|
|
|
+def generalRoadSpeedOverLimit50(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
|
|
|
+ return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
|
|
|
+
|
|
|
+
|
|
|
+def generalRoadSpeedOverLimit20to50(data_processed):
|
|
|
+ speedingviolation = SpeedingViolation(data_processed)
|
|
|
+ generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
|
|
|
+ return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
|
|
|
|
|
|
|
|
|
class SpeedingViolation(object):
|
|
@@ -821,31 +985,31 @@ class SpeedingViolation(object):
|
|
|
# 违规判定
|
|
|
conditions = [
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
- & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & (self.data["v"] > self.data["road_speed_max"])
|
|
|
- & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"])
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
- & (self.data["v"] < self.data["road_speed_min"])
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
+ & (self.data["v"] < self.data["road_speed_min"])
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(general_road)
|
|
|
- & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
+ self.data["road_fc"].isin(general_road)
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
),
|
|
|
(
|
|
|
- self.data["road_fc"].isin(general_road)
|
|
|
- & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
- & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
+ self.data["road_fc"].isin(general_road)
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
),
|
|
|
]
|
|
|
|
|
@@ -866,11 +1030,47 @@ class SpeedingViolation(object):
|
|
|
# 统计各类违规情况
|
|
|
self.violation_counts = self.data["violation_type"].value_counts().to_dict()
|
|
|
|
|
|
- def statistic(self) -> str:
|
|
|
- # 处理数据
|
|
|
+ def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
|
|
|
self.process_violations()
|
|
|
- # self.logger.info(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
|
|
|
- return self.violation_counts
|
|
|
+ return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
|
|
|
+
|
|
|
+ def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
|
|
|
+
|
|
|
+ def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
|
|
|
+
|
|
|
+ def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
|
|
|
+ "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
|
|
|
+
|
|
|
+ def calculate_generalRoadSpeedOverLimit50(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
|
|
|
+ "generalRoadSpeedOverLimit50") else 0
|
|
|
+
|
|
|
+ def calculate_generalRoadSpeedOverLimit20to50_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
|
|
|
+ "generalRoadSpeedOverLimit20to50") else 0
|
|
|
+
|
|
|
+
|
|
|
+def trafficSignalViolation(data_processed):
|
|
|
+ trafficlightviolation = TrafficLightViolation(data_processed)
|
|
|
+ trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
|
|
|
+ return {"trafficSignalViolation": trafficSignalViolation_count}
|
|
|
+
|
|
|
+
|
|
|
+def illegalDrivingOrParkingAtCrossroads(data_processed):
|
|
|
+ trafficlightviolation = TrafficLightViolation(data_processed)
|
|
|
+ illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
|
|
|
+ return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
|
|
|
|
|
|
|
|
|
class TrafficLightViolation(object):
|
|
@@ -917,8 +1117,8 @@ class TrafficLightViolation(object):
|
|
|
return False
|
|
|
|
|
|
mid_point = (
|
|
|
- np.array([stop_line_points[0][0], stop_line_points[0][1]])
|
|
|
- + 0.5 * line_vector
|
|
|
+ np.array([stop_line_points[0][0], stop_line_points[0][1]])
|
|
|
+ + 0.5 * line_vector
|
|
|
)
|
|
|
axletree_to_mid_vector = np.array(
|
|
|
[point[0] - mid_point[0], point[1] - mid_point[1]]
|
|
@@ -932,7 +1132,7 @@ class TrafficLightViolation(object):
|
|
|
return False
|
|
|
|
|
|
cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
|
|
|
- norm_axletree_to_mid * norm_direction
|
|
|
+ norm_axletree_to_mid * norm_direction
|
|
|
)
|
|
|
angle_theta = math.degrees(math.acos(cos_theta))
|
|
|
|
|
@@ -944,7 +1144,7 @@ class TrafficLightViolation(object):
|
|
|
(self.data_ego["stopline_id"] != -1)
|
|
|
& (self.data_ego["stopline_type"] == 1)
|
|
|
& (self.data_ego["trafficlight_id"] != -1)
|
|
|
- ]
|
|
|
+ ]
|
|
|
|
|
|
def _group_data(self, filtered_data):
|
|
|
"""按时间差对数据进行分组"""
|
|
@@ -979,13 +1179,13 @@ class TrafficLightViolation(object):
|
|
|
|
|
|
if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
|
|
|
has_crossed_line_front = (
|
|
|
- self.is_point_cross_line(front_wheel_pos, stop_line_points)
|
|
|
- and traffic_light_status == 1
|
|
|
+ self.is_point_cross_line(front_wheel_pos, stop_line_points)
|
|
|
+ and traffic_light_status == 1
|
|
|
)
|
|
|
has_crossed_line_rear = (
|
|
|
- self.is_point_cross_line(rear_wheel_pos, stop_line_points)
|
|
|
- and row["v"] > 0
|
|
|
- and traffic_light_status == 1
|
|
|
+ self.is_point_cross_line(rear_wheel_pos, stop_line_points)
|
|
|
+ and row["v"] > 0
|
|
|
+ and traffic_light_status == 1
|
|
|
)
|
|
|
has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
|
|
|
has_passed_intersection = has_crossed_line_front and dist < 1.0
|
|
@@ -1026,9 +1226,25 @@ class TrafficLightViolation(object):
|
|
|
self.violation_counts["trafficSignalViolation"] = count_1
|
|
|
self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
|
|
|
|
|
|
- def statistic(self):
|
|
|
- """返回统计结果"""
|
|
|
- return self.violation_counts
|
|
|
+ def calculate_trafficSignalViolation_count(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["trafficSignalViolation"]
|
|
|
+
|
|
|
+ def calculate_illegalDrivingOrParkingAtCrossroads(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
|
|
|
+
|
|
|
+
|
|
|
+def generalRoadIrregularLaneUse(data_processed):
|
|
|
+ warningviolation = WarningViolation(data_processed)
|
|
|
+ generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
|
|
|
+ return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
|
|
|
+
|
|
|
+
|
|
|
+def urbanExpresswayOrHighwayRideLaneDivider(data_processed):
|
|
|
+ warningviolation = WarningViolation(data_processed)
|
|
|
+ urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider()
|
|
|
+ return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
|
|
|
|
|
|
|
|
|
class WarningViolation(object):
|
|
@@ -1108,11 +1324,15 @@ class WarningViolation(object):
|
|
|
|
|
|
return continuous_segments
|
|
|
|
|
|
- def statistic(self):
|
|
|
- # 处理数据
|
|
|
+
|
|
|
+ def calculate_generalRoadIrregularLaneUse_count(self):
|
|
|
self.process_violations()
|
|
|
- # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
|
|
|
- return self.violation_counts
|
|
|
+ return self.violation_counts["generalRoadIrregularLaneUse"]
|
|
|
+
|
|
|
+ def calculate_urbanExpresswayOrHighwayRideLaneDivider(self):
|
|
|
+ self.process_violations()
|
|
|
+ return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
|
|
|
+
|
|
|
|
|
|
class TrafficSignViolation(object):
|
|
|
"""交通标志违规类"""
|
|
@@ -1129,9 +1349,10 @@ class TrafficSignViolation(object):
|
|
|
"NoStraightThrough": 0, # 禁止直行标志地方直行
|
|
|
"SpeedLimitViolation": 0, # 违反限速规定
|
|
|
"MinimumSpeedLimitViolation": 0, # 违反最低限速规定
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+ # def checkForProhibitionViolation(self):
|
|
|
|
|
|
- # def checkForProhibitionViolation(self):
|
|
|
# """禁令标志判断违规:7 禁止直行,12:限制速度"""
|
|
|
# # 筛选出sign_type1为7(禁止直行)
|
|
|
# violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
|
|
@@ -1141,24 +1362,23 @@ class TrafficSignViolation(object):
|
|
|
"""禁令标志判断违规:7 禁止直行,12:限制速度"""
|
|
|
# 筛选出 sign_type1 为7(禁止直行)的数据
|
|
|
violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
|
|
|
-
|
|
|
+
|
|
|
# 判断车辆是否在禁止直行路段直行
|
|
|
if not violation_straight_df.empty:
|
|
|
# 按时间戳排序(假设数据按时间顺序处理)
|
|
|
violation_straight_df = violation_straight_df.sort_values('simTime')
|
|
|
-
|
|
|
+
|
|
|
# 计算航向角变化(前后时间点的差值绝对值)
|
|
|
violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
|
|
|
-
|
|
|
+
|
|
|
# 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
|
|
|
threshold = 5 # 单位:度(根据场景调整)
|
|
|
mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
|
|
|
straight_violations = violation_straight_df[mask]
|
|
|
-
|
|
|
+
|
|
|
# 统计违规次数或记录违规数据
|
|
|
self.violation_counts["prohibition_straight"] = len(straight_violations)
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
# 限制速度判断(原代码)
|
|
|
violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
|
|
|
if violation_speed_limit_df.empty:
|
|
@@ -1171,6 +1391,7 @@ class TrafficSignViolation(object):
|
|
|
if violation_minimum_speed_limit_df.empty:
|
|
|
mask = self.data_ego["v"] < self.data_ego["sign_speed"]
|
|
|
self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
|
|
|
+
|
|
|
def statistic(self):
|
|
|
self.checkForProhibitionViolation()
|
|
|
self.checkForInstructionViolation()
|
|
@@ -1178,6 +1399,70 @@ class TrafficSignViolation(object):
|
|
|
return self.violation_counts
|
|
|
|
|
|
|
|
|
+class TrafficRegistry:
|
|
|
+ """舒适性指标注册器"""
|
|
|
+
|
|
|
+ def __init__(self, data_processed):
|
|
|
+ self.logger = LogManager().get_logger() # 获取全局日志实例
|
|
|
+ self.data = data_processed
|
|
|
+ self.traffic_config = data_processed.traffic_config["traffic"]
|
|
|
+ self.metrics = self._extract_metrics(self.traffic_config)
|
|
|
+ self._registry = self._build_registry()
|
|
|
+
|
|
|
+ def _extract_metrics(self, config_node: dict) -> list:
|
|
|
+ """DFS遍历提取指标"""
|
|
|
+ metrics = []
|
|
|
+
|
|
|
+ def _recurse(node):
|
|
|
+ if isinstance(node, dict):
|
|
|
+ if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
|
|
|
+ metrics.append(node['name'])
|
|
|
+ for v in node.values():
|
|
|
+ _recurse(v)
|
|
|
+
|
|
|
+ _recurse(config_node)
|
|
|
+ self.logger.info(f'评比的合规性指标列表:{metrics}')
|
|
|
+ return metrics
|
|
|
+
|
|
|
+ def _build_registry(self) -> dict:
|
|
|
+ """自动注册指标函数"""
|
|
|
+ registry = {}
|
|
|
+ for metric_name in self.metrics:
|
|
|
+ try:
|
|
|
+ registry[metric_name] = globals()[metric_name]
|
|
|
+ except KeyError:
|
|
|
+ self.logger.error(f"未实现指标函数: {metric_name}")
|
|
|
+ return registry
|
|
|
+
|
|
|
+ def batch_execute(self) -> dict:
|
|
|
+ """批量执行指标计算"""
|
|
|
+ results = {}
|
|
|
+ for name, func in self._registry.items():
|
|
|
+ try:
|
|
|
+ result = func(self.data)
|
|
|
+ results.update(result)
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
|
|
|
+ results[name] = None
|
|
|
+ self.logger.info(f'合规性指标计算结果:{results}')
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+class TrafficManager:
|
|
|
+ """合规性指标计算主类"""
|
|
|
+
|
|
|
+ def __init__(self, data_processed):
|
|
|
+ self.data = data_processed
|
|
|
+ self.logger = LogManager().get_logger()
|
|
|
+ self.registry = TrafficRegistry(self.data)
|
|
|
+
|
|
|
+ def report_statistic(self):
|
|
|
+ """生成合规性评分报告"""
|
|
|
+ traffic_result = self.registry.batch_execute()
|
|
|
+ return traffic_result
|
|
|
+
|
|
|
+
|
|
|
+'''
|
|
|
class ViolationManager:
|
|
|
"""违规管理类,用于管理所有违规行为"""
|
|
|
|
|
@@ -1213,8 +1498,19 @@ class ViolationManager:
|
|
|
# # self.logger.info(f"Traffic Result:{traffic_result}")
|
|
|
# return result
|
|
|
return traffic_result
|
|
|
-
|
|
|
+'''
|
|
|
|
|
|
# 示例使用
|
|
|
if __name__ == "__main__":
|
|
|
- pass
|
|
|
+ case_name = 'D:\Cicv\招远\V2V_CSAE53-2020_ForwardCollision_LST_02-03'
|
|
|
+ mode_label = 'D:\Cicv\招远\zhaoyuan0410\config\metrics_config.yaml'
|
|
|
+
|
|
|
+ data = data_process.DataPreprocessing(case_name, mode_label)
|
|
|
+ traffic_instance = TrafficManager(data)
|
|
|
+
|
|
|
+ try:
|
|
|
+ traffic_result = traffic_instance.report_statistic()
|
|
|
+ result = {'traffic': traffic_result}
|
|
|
+ print(result)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"An error occurred in Traffict.report_statistic: {e}")
|