|
@@ -0,0 +1,1220 @@
|
|
|
|
+
|
|
|
|
+import math
|
|
|
|
+import numpy as np
|
|
|
|
+import pandas as pd
|
|
|
|
+from modules.lib import log_manager
|
|
|
|
+from modules.lib.score import Score
|
|
|
|
+
|
|
|
|
+OVERTAKE_INFO = [
|
|
|
|
+ "simTime",
|
|
|
|
+ "simFrame",
|
|
|
|
+ "playerId",
|
|
|
|
+ "speedX",
|
|
|
|
+ "speedY",
|
|
|
|
+ "posX",
|
|
|
|
+ "posY",
|
|
|
|
+ "posH",
|
|
|
|
+ "lane_id",
|
|
|
|
+ "lane_type",
|
|
|
|
+ "road_type",
|
|
|
|
+ "interid",
|
|
|
|
+ "crossid",
|
|
|
|
+]
|
|
|
|
+SLOWDOWN_INFO = [
|
|
|
|
+ "simTime",
|
|
|
|
+ "simFrame",
|
|
|
|
+ "playerId",
|
|
|
|
+ "speedX",
|
|
|
|
+ "speedY",
|
|
|
|
+ "posX",
|
|
|
|
+ "posY",
|
|
|
|
+ "crossid",
|
|
|
|
+ "lane_type",
|
|
|
|
+]
|
|
|
|
+TURNAROUND_INFO = [
|
|
|
|
+ "simTime",
|
|
|
|
+ "simFrame",
|
|
|
|
+ "playerId",
|
|
|
|
+ "speedX",
|
|
|
|
+ "speedY",
|
|
|
|
+ "posX",
|
|
|
|
+ "posY",
|
|
|
|
+ "sign_type1",
|
|
|
|
+ "lane_type",
|
|
|
|
+]
|
|
|
|
+
|
|
|
|
+TRFFICSIGN_INFO = [
|
|
|
|
+ "simTime",
|
|
|
|
+ "simFrame",
|
|
|
|
+ "playerId",
|
|
|
|
+ "speedX",
|
|
|
|
+ "speedY",
|
|
|
|
+ "v",
|
|
|
|
+ "posX",
|
|
|
|
+ "posY",
|
|
|
|
+ "sign_type1",
|
|
|
|
+ "sign_ref_link",
|
|
|
|
+ "sign_x",
|
|
|
|
+ "sign_y",
|
|
|
|
+]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class OvertakingViolation(object):
|
|
|
|
+ """超车违规类"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ print("超车违规类初始化中...")
|
|
|
|
+ self.traffic_violations_type = "超车违规类"
|
|
|
|
+
|
|
|
|
+ # self.logger = log.get_logger() # 使用时再初始化
|
|
|
|
+
|
|
|
|
+ self.data = df_data.obj_data[1]
|
|
|
|
+ self.ego_data = (
|
|
|
|
+ self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ ) # Copy to avoid modifying the original DataFrame
|
|
|
|
+ self.data_obj = df_data.obj_data[2]
|
|
|
|
+ self.obj_data = (
|
|
|
|
+ self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ ) # Copy to avoid modifying the original DataFrame
|
|
|
|
+ self.object_items = []
|
|
|
|
+ for i, item in df_data.obj_data.items():
|
|
|
|
+ self.object_items.append(i)
|
|
|
|
+ if 3 in self.object_items:
|
|
|
|
+ self.other_obj_data1 = df_data.obj_data[3]
|
|
|
|
+ self.other_obj_data = (
|
|
|
|
+ self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.overtake_on_right_count = 0
|
|
|
|
+ self.overtake_when_turn_around_count = 0
|
|
|
|
+ self.overtake_when_passing_car_count = 0
|
|
|
|
+ self.overtake_in_forbid_lane_count = 0
|
|
|
|
+ self.overtake_in_ramp_count = 0
|
|
|
|
+ self.overtake_in_tunnel_count = 0
|
|
|
|
+ self.overtake_on_accelerate_lane_count = 0
|
|
|
|
+ self.overtake_on_decelerate_lane_count = 0
|
|
|
|
+ self.overtake_in_different_senerios_count = 0
|
|
|
|
+
|
|
|
|
+ def different_road_area_simtime(self, df, threshold=0.5):
|
|
|
|
+ if not df:
|
|
|
|
+ return []
|
|
|
|
+ simtime_group = []
|
|
|
|
+ current_simtime_group = [df[0]]
|
|
|
|
+
|
|
|
|
+ for i in range(1, len(df)):
|
|
|
|
+ if abs(df[i] - df[i - 1]) <= threshold:
|
|
|
|
+ current_simtime_group.append(df[i])
|
|
|
|
+ else:
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ current_simtime_group = [df[i]]
|
|
|
|
+
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ return simtime_group
|
|
|
|
+
|
|
|
|
+ def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
|
|
|
|
+ lane_start = lane_id[0]
|
|
|
|
+ lane_end = lane_id[-1]
|
|
|
|
+ start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
|
|
|
|
+ end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
|
|
|
|
+
|
|
|
|
+ return lane_start == lane_end and start_condition and end_condition
|
|
|
|
+
|
|
|
|
+ def _is_dxy_of_car(self, ego_df, obj_df):
|
|
|
|
+ """
|
|
|
|
+ :param df: objstate.csv and so on
|
|
|
|
+ :param id: playerId
|
|
|
|
+ :param string_type: posX/Y or speedX/Y and so on
|
|
|
|
+ :return: dataframe of dx/y and so on
|
|
|
|
+ """
|
|
|
|
+ car_dx = obj_df["posX"].values - ego_df["posX"].values
|
|
|
|
+ car_dy = obj_df["posY"].values - ego_df["posY"].values
|
|
|
|
+
|
|
|
|
+ return car_dx, car_dy
|
|
|
|
+
|
|
|
|
+ # 在前车右侧超车、会车时超车、前车掉头时超车
|
|
|
|
+
|
|
|
|
+ def illegal_overtake_with_car(self, window_width=250):
|
|
|
|
+
|
|
|
|
+ # 获取csv文件中最短的帧数
|
|
|
|
+ frame_id_length = len(self.ego_data["simFrame"])
|
|
|
|
+ start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
|
|
|
|
+
|
|
|
|
+ while (start_frame_id + window_width) < frame_id_length:
|
|
|
|
+ # if start_frame_id == 828:
|
|
|
|
+ # print("end")
|
|
|
|
+ simframe_window1 = list(
|
|
|
|
+ np.arange(start_frame_id, start_frame_id + window_width)
|
|
|
|
+ )
|
|
|
|
+ simframe_window = list(map(int, simframe_window1))
|
|
|
|
+ # 读取滑动窗口的dataframe数据
|
|
|
|
+ ego_data_frames = self.ego_data[
|
|
|
|
+ self.ego_data["simFrame"].isin(simframe_window)
|
|
|
|
+ ]
|
|
|
|
+ obj_data_frames = self.obj_data[
|
|
|
|
+ self.obj_data["simFrame"].isin(simframe_window)
|
|
|
|
+ ]
|
|
|
|
+ other_data_frames = self.other_obj_data[
|
|
|
|
+ self.other_obj_data["simFrame"].isin(simframe_window)
|
|
|
|
+ ]
|
|
|
|
+ # 读取前后的laneId
|
|
|
|
+ lane_id = ego_data_frames["lane_id"].tolist()
|
|
|
|
+ # 读取前后方向盘转角steeringWheel,
|
|
|
|
+ driverctrl_start_state = ego_data_frames["posH"].iloc[0]
|
|
|
|
+ driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
|
|
|
|
+ # 读取车辆前后的位置信息
|
|
|
|
+ dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
|
|
|
|
+ ego_speedx = ego_data_frames["speedX"].tolist()
|
|
|
|
+ ego_speedy = ego_data_frames["speedY"].tolist()
|
|
|
|
+
|
|
|
|
+ obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
|
|
|
|
+ "speedX"
|
|
|
|
+ ].tolist()
|
|
|
|
+ obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
|
|
|
|
+ "speedY"
|
|
|
|
+ ].tolist()
|
|
|
|
+ if len(other_data_frames) > 0:
|
|
|
|
+ other_start_speedx = other_data_frames["speedX"].iloc[0]
|
|
|
|
+ other_start_speedy = other_data_frames["speedY"].iloc[0]
|
|
|
|
+ if (
|
|
|
|
+ ego_speedx[0] * other_start_speedx
|
|
|
|
+ + ego_speedy[0] * other_start_speedy
|
|
|
|
+ < 0
|
|
|
|
+ ):
|
|
|
|
+ self.overtake_when_passing_car_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ start_frame_id += window_width
|
|
|
|
+ """
|
|
|
|
+ 如果滑动窗口开始和最后的laneid一致;
|
|
|
|
+ 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
|
|
|
|
+ 自车和前车的位置发生的交换;
|
|
|
|
+ 则认为右超车
|
|
|
|
+ """
|
|
|
|
+ if driverctrl_start_state > 0 and driverctrl_end_state < 0:
|
|
|
|
+ self.overtake_on_right_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ start_frame_id += window_width
|
|
|
|
+ elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
|
|
|
|
+ self.overtake_when_turn_around_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ start_frame_id += window_width
|
|
|
|
+ else:
|
|
|
|
+ start_frame_id += 1
|
|
|
|
+ # print(
|
|
|
|
+ # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
|
|
|
|
+
|
|
|
|
+ # 借道超车场景
|
|
|
|
+ def overtake_in_forbid_lane(self):
|
|
|
|
+ simTime = self.obj_data["simTime"].tolist()
|
|
|
|
+ simtime_devide = self.different_road_area_simtime(simTime)
|
|
|
|
+ for simtime in simtime_devide:
|
|
|
|
+ lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
|
|
|
|
+ try:
|
|
|
|
+ lane_type = lane_overtake["lane_type"].tolist()
|
|
|
|
+ if (50002 in lane_type and len(set(lane_type)) > 2) or (
|
|
|
|
+ 50002 not in lane_type and len(set(lane_type)) > 1
|
|
|
|
+ ):
|
|
|
|
+ self.overtake_in_forbid_lane_count += 1
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print("数据缺少lane_type信息")
|
|
|
|
+ # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
|
|
|
|
+
|
|
|
|
+ # 在匝道超车
|
|
|
|
+ def overtake_in_ramp_area(self):
|
|
|
|
+ ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
|
|
|
|
+ for ramp_simtime in ramp_simTime_list:
|
|
|
|
+ lane_id = self.ego_data["lane_id"].tolist()
|
|
|
|
+ ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
|
|
|
|
+ objstate_in_ramp = self.obj_data[
|
|
|
|
+ self.obj_data["simTime"].isin(ramp_simtime)
|
|
|
|
+ ]
|
|
|
|
+ dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
|
|
|
|
+ ego_speedx = ego_in_ramp["speedX"].tolist()
|
|
|
|
+ ego_speedy = ego_in_ramp["speedY"].tolist()
|
|
|
|
+ if len(lane_id) > 0:
|
|
|
|
+ self.overtake_in_ramp_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+ # print(f"在匝道超车{self.overtake_in_ramp_count}次")
|
|
|
|
+
|
|
|
|
+ def overtake_in_tunnel_area(self):
|
|
|
|
+ tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
|
|
|
|
+ for tunnel_simtime in tunnel_simTime_list:
|
|
|
|
+ lane_id = self.ego_data["lane_id"].tolist()
|
|
|
|
+ ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
|
|
|
|
+ objstate_in_tunnel = self.obj_data[
|
|
|
|
+ self.obj_data["simTime"].isin(tunnel_simtime)
|
|
|
|
+ ]
|
|
|
|
+ dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
|
|
|
|
+ ego_speedx = ego_in_tunnel["speedX"].tolist()
|
|
|
|
+ ego_speedy = ego_in_tunnel["speedY"].tolist()
|
|
|
|
+ if len(lane_id) > 0:
|
|
|
|
+ self.overtake_in_tunnel_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+ # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
|
|
|
|
+
|
|
|
|
+ # 加速车道超车
|
|
|
|
+ def overtake_on_accelerate_lane(self):
|
|
|
|
+ accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ accelerate_simTime_list = self.different_road_area_simtime(
|
|
|
|
+ accelerate_simtime_list
|
|
|
|
+ )
|
|
|
|
+ for accelerate_simtime in accelerate_simTime_list:
|
|
|
|
+ lane_id = self.ego_data["lane_id"].tolist()
|
|
|
|
+ ego_in_accelerate = self.ego_data[
|
|
|
|
+ self.ego_data["simTime"].isin(accelerate_simtime)
|
|
|
|
+ ]
|
|
|
|
+ objstate_in_accelerate = self.obj_data[
|
|
|
|
+ self.obj_data["simTime"].isin(accelerate_simtime)
|
|
|
|
+ ]
|
|
|
|
+ dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
|
|
|
|
+ ego_speedx = ego_in_accelerate["speedX"].tolist()
|
|
|
|
+ ego_speedy = ego_in_accelerate["speedY"].tolist()
|
|
|
|
+
|
|
|
|
+ self.overtake_on_accelerate_lane_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
|
|
|
|
+
|
|
|
|
+ # 减速车道超车
|
|
|
|
+ def overtake_on_decelerate_lane(self):
|
|
|
|
+ decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ decelerate_simTime_list = self.different_road_area_simtime(
|
|
|
|
+ decelerate_simtime_list
|
|
|
|
+ )
|
|
|
|
+ for decelerate_simtime in decelerate_simTime_list:
|
|
|
|
+ lane_id = self.ego_data["id"].tolist()
|
|
|
|
+ ego_in_decelerate = self.ego_data[
|
|
|
|
+ self.ego_data["simTime"].isin(decelerate_simtime)
|
|
|
|
+ ]
|
|
|
|
+ objstate_in_decelerate = self.obj_data[
|
|
|
|
+ self.obj_data["simTime"].isin(decelerate_simtime)
|
|
|
|
+ ]
|
|
|
|
+ dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
|
|
|
|
+ ego_speedx = ego_in_decelerate["speedX"].tolist()
|
|
|
|
+ ego_speedy = ego_in_decelerate["speedY"].tolist()
|
|
|
|
+
|
|
|
|
+ self.overtake_on_decelerate_lane_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
|
|
|
|
+
|
|
|
|
+ # 在交叉路口
|
|
|
|
+ def overtake_in_different_senerios(self):
|
|
|
|
+ crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist() # 判断是路口或者隧道区域
|
|
|
|
+ # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
|
|
|
|
+ crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
|
|
|
|
+ crossroad_objstate = self.obj_data[
|
|
|
|
+ self.obj_data["simTime"].isin(crossroad_simTime)
|
|
|
|
+ ]
|
|
|
|
+ # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
|
|
|
|
+
|
|
|
|
+ # 读取前后的laneId
|
|
|
|
+ lane_id = crossroad_ego["lane_id"].tolist()
|
|
|
|
+
|
|
|
|
+ # 读取车辆前后的位置信息
|
|
|
|
+ dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
|
|
|
|
+ ego_speedx = crossroad_ego["speedX"].tolist()
|
|
|
|
+ ego_speedy = crossroad_ego["speedY"].tolist()
|
|
|
|
+ """
|
|
|
|
+ 如果滑动窗口开始和最后的laneid一致;
|
|
|
|
+ 自车和前车的位置发生的交换;
|
|
|
|
+ 则认为发生超车
|
|
|
|
+ """
|
|
|
|
+ if len(lane_id) > 0:
|
|
|
|
+ self.overtake_in_different_senerios_count += self._is_overtake(
|
|
|
|
+ lane_id, dx, dy, ego_speedx, ego_speedy
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ pass
|
|
|
|
+ # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
|
|
|
|
+
|
|
|
|
+ def statistic(self):
|
|
|
|
+ self.overtake_in_forbid_lane()
|
|
|
|
+ self.overtake_on_decelerate_lane()
|
|
|
|
+ self.overtake_on_accelerate_lane()
|
|
|
|
+ self.overtake_in_ramp_area()
|
|
|
|
+ self.overtake_in_tunnel_area()
|
|
|
|
+ self.overtake_in_different_senerios()
|
|
|
|
+ self.illegal_overtake_with_car()
|
|
|
|
+
|
|
|
|
+ self.calculated_value = {
|
|
|
|
+ "overtake_on_right": self.overtake_on_right_count,
|
|
|
|
+ "overtake_when_turn_around": self.overtake_when_turn_around_count,
|
|
|
|
+ "overtake_when_passing_car": self.overtake_when_passing_car_count,
|
|
|
|
+ "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
|
|
|
|
+ "overtake_in_ramp": self.overtake_in_ramp_count,
|
|
|
|
+ "overtake_in_tunnel": self.overtake_in_tunnel_count,
|
|
|
|
+ "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
|
|
|
|
+ "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
|
|
|
|
+ "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
|
|
|
|
+ }
|
|
|
|
+ # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
|
+ return self.calculated_value
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class SlowdownViolation(object):
|
|
|
|
+ """减速让行违规类"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ print("减速让行违规类-------------------------")
|
|
|
|
+ self.traffic_violations_type = "减速让行违规类"
|
|
|
|
+ self.object_items = []
|
|
|
|
+ self.data = df_data.obj_data[1]
|
|
|
|
+ self.ego_data = (
|
|
|
|
+ self.data[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ ) # Copy to avoid modifying the original DataFrame
|
|
|
|
+ self.pedestrian_data = pd.DataFrame()
|
|
|
|
+
|
|
|
|
+ self.object_items = set(df_data.object_df.type.tolist())
|
|
|
|
+ if 13 in self.object_items: # 行人的type是13
|
|
|
|
+ self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
+ self.pedestrian_data = (
|
|
|
|
+ self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.slow_down_in_crosswalk_count = 0
|
|
|
|
+ self.avoid_pedestrian_in_crosswalk_count = 0
|
|
|
|
+ self.avoid_pedestrian_in_the_road_count = 0
|
|
|
|
+ self.aviod_pedestrian_when_turning_count = 0
|
|
|
|
+
|
|
|
|
+ def pedestrian_in_front_of_car(self):
|
|
|
|
+ if len(self.pedestrian_data) == 0:
|
|
|
|
+ return []
|
|
|
|
+ else:
|
|
|
|
+ self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
|
|
|
|
+ self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
|
|
|
|
+ self.ego_data["dist"] = np.sqrt(
|
|
|
|
+ self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.ego_data["rela_pos"] = (
|
|
|
|
+ self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
|
+ + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
|
+ )
|
|
|
|
+ simtime = self.ego_data[
|
|
|
|
+ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
|
|
|
|
+ ]["simTime"].tolist()
|
|
|
|
+ return simtime
|
|
|
|
+
|
|
|
|
+ def different_road_area_simtime(self, df, threshold=0.6):
|
|
|
|
+ if not df:
|
|
|
|
+ return []
|
|
|
|
+ simtime_group = []
|
|
|
|
+ current_simtime_group = [df[0]]
|
|
|
|
+
|
|
|
|
+ for i in range(1, len(df)):
|
|
|
|
+ if abs(df[i] - df[i - 1]) <= threshold:
|
|
|
|
+ current_simtime_group.append(df[i])
|
|
|
|
+ else:
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ current_simtime_group = [df[i]]
|
|
|
|
+
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ return simtime_group
|
|
|
|
+
|
|
|
|
+ def slow_down_in_crosswalk(self):
|
|
|
|
+ # 筛选出路口或隧道区域的时间点
|
|
|
|
+ crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
|
|
|
|
+
|
|
|
|
+ for crosswalk_simtime in crosswalk_simTime_divide:
|
|
|
|
+ # 筛选出当前时间段内的数据
|
|
|
|
+ # start_time, end_time = crosswalk_simtime
|
|
|
|
+ start_time = crosswalk_simtime[0]
|
|
|
|
+ end_time = crosswalk_simtime[-1]
|
|
|
|
+ print(f"当前时间段:{start_time} - {end_time}")
|
|
|
|
+ crosswalk_objstate = self.ego_data[
|
|
|
|
+ (self.ego_data["simTime"] >= start_time)
|
|
|
|
+ & (self.ego_data["simTime"] <= end_time)
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ # 计算车辆速度
|
|
|
|
+ ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
|
|
|
|
+ ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
|
|
|
|
+ ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2)
|
|
|
|
+
|
|
|
|
+ # 判断是否超速
|
|
|
|
+ if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
|
|
|
|
+ self.slow_down_in_crosswalk_count += 1
|
|
|
|
+
|
|
|
|
+ # 输出总次数
|
|
|
|
+ print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
|
|
|
|
+
|
|
|
|
+ def avoid_pedestrian_in_crosswalk(self):
|
|
|
|
+ crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
|
|
|
|
+ for crosswalk_simtime in crosswalk_simTime_devide:
|
|
|
|
+ if not self.pedestrian_data.empty:
|
|
|
|
+ crosswalk_objstate = self.pedestrian_data[
|
|
|
|
+ self.pedestrian_data["simTime"].isin(crosswalk_simtime)
|
|
|
|
+ ]
|
|
|
|
+ else:
|
|
|
|
+ crosswalk_objstate = pd.DataFrame()
|
|
|
|
+ if len(crosswalk_objstate) > 0:
|
|
|
|
+ pedestrian_simtime = crosswalk_objstate["simTime"]
|
|
|
|
+ pedestrian_objstate = crosswalk_objstate[
|
|
|
|
+ crosswalk_objstate["simTime"].isin(pedestrian_simtime)
|
|
|
|
+ ]
|
|
|
|
+ ego_speed = np.sqrt(
|
|
|
|
+ pedestrian_objstate["speedX"] ** 2
|
|
|
|
+ + pedestrian_objstate["speedY"] ** 2
|
|
|
|
+ )
|
|
|
|
+ if ego_speed.any() > 0:
|
|
|
|
+ self.avoid_pedestrian_in_crosswalk_count += 1
|
|
|
|
+
|
|
|
|
+ def avoid_pedestrian_in_the_road(self):
|
|
|
|
+ simtime = self.pedestrian_in_front_of_car()
|
|
|
|
+ if len(simtime) == 0:
|
|
|
|
+ self.avoid_pedestrian_in_the_road_count += 0
|
|
|
|
+ else:
|
|
|
|
+ pedestrian_on_the_road = self.pedestrian_data[
|
|
|
|
+ self.pedestrian_data["simTime"].isin(simtime)
|
|
|
|
+ ]
|
|
|
|
+ simTime = pedestrian_on_the_road["simTime"].tolist()
|
|
|
|
+ simTime_devide = self.different_road_area_simtime(simTime)
|
|
|
|
+ for simtime1 in simTime_devide:
|
|
|
|
+ sub_pedestrian_on_the_road = pedestrian_on_the_road[
|
|
|
|
+ pedestrian_on_the_road["simTime"].isin(simtime1)
|
|
|
|
+ ]
|
|
|
|
+ ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
|
|
|
|
+ dist = np.sqrt(
|
|
|
|
+ (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
|
|
|
|
+ ** 2
|
|
|
|
+ + (
|
|
|
|
+ ego_car["posY"].values
|
|
|
|
+ - sub_pedestrian_on_the_road["posY"].values
|
|
|
|
+ )
|
|
|
|
+ ** 2
|
|
|
|
+ )
|
|
|
|
+ speed = np.sqrt(
|
|
|
|
+ ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
|
|
|
|
+ )
|
|
|
|
+ data = {"dist": dist, "speed": speed}
|
|
|
|
+ new_ego_car = pd.DataFrame(data)
|
|
|
|
+ new_ego_car = new_ego_car.assign(
|
|
|
|
+ Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
|
|
|
|
+ )
|
|
|
|
+ if new_ego_car["Column3"].any():
|
|
|
|
+ self.avoid_pedestrian_in_the_road_count += 1
|
|
|
|
+
|
|
|
|
+ def aviod_pedestrian_when_turning(self):
|
|
|
|
+ pedestrian_simtime_list = self.pedestrian_in_front_of_car()
|
|
|
|
+ if len(pedestrian_simtime_list) > 0:
|
|
|
|
+ simtime_list = self.ego_data[
|
|
|
|
+ (self.ego_data["simTime"].isin(pedestrian_simtime_list))
|
|
|
|
+ & (self.ego_data["lane_type"] == 20)
|
|
|
|
+ ]["simTime"].tolist()
|
|
|
|
+ simTime_list = self.different_road_area_simtime(simtime_list)
|
|
|
|
+ pedestrian_on_the_road = self.pedestrian_data[
|
|
|
|
+ self.pedestrian_data["simTime"].isin(simtime_list)
|
|
|
|
+ ]
|
|
|
|
+ for simtime in simTime_list:
|
|
|
|
+ sub_pedestrian_on_the_road = pedestrian_on_the_road[
|
|
|
|
+ pedestrian_on_the_road["simTime"].isin(simtime)
|
|
|
|
+ ]
|
|
|
|
+ ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
|
|
|
|
+ ego_car["dist"] = np.sqrt(
|
|
|
|
+ (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
|
|
|
|
+ ** 2
|
|
|
|
+ + (
|
|
|
|
+ ego_car["posY"].values
|
|
|
|
+ - sub_pedestrian_on_the_road["posY"].values
|
|
|
|
+ )
|
|
|
|
+ ** 2
|
|
|
|
+ )
|
|
|
|
+ ego_car["speed"] = np.sqrt(
|
|
|
|
+ ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
|
|
|
|
+ )
|
|
|
|
+ if any(ego_car["speed"].tolist()) != 0:
|
|
|
|
+ self.aviod_pedestrian_when_turning_count += 1
|
|
|
|
+
|
|
|
|
+ def statistic(self):
|
|
|
|
+ self.slow_down_in_crosswalk()
|
|
|
|
+ self.avoid_pedestrian_in_crosswalk()
|
|
|
|
+ self.avoid_pedestrian_in_the_road()
|
|
|
|
+ self.aviod_pedestrian_when_turning()
|
|
|
|
+
|
|
|
|
+ self.calculated_value = {
|
|
|
|
+ "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
|
|
|
|
+ "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
|
|
|
|
+ "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
|
|
|
|
+ "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
|
|
|
|
+ }
|
|
|
|
+ # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
|
+ return self.calculated_value
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class TurnaroundViolation(object):
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ print("掉头违规类初始化中...")
|
|
|
|
+ self.traffic_violations_type = "掉头违规类"
|
|
|
|
+
|
|
|
|
+ self.data = df_data.obj_data[1]
|
|
|
|
+ self.ego_data = (
|
|
|
|
+ self.data[TURNAROUND_INFO].copy().reset_index(drop=True)
|
|
|
|
+ ) # Copy to avoid modifying the original DataFrame
|
|
|
|
+ self.pedestrian_data = pd.DataFrame()
|
|
|
|
+
|
|
|
|
+ self.object_items = set(df_data.object_df.type.tolist())
|
|
|
|
+ if 13 in self.object_items: # 行人的type是13
|
|
|
|
+ self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
|
|
|
|
+ self.pedestrian_data = (
|
|
|
|
+ self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.turning_in_forbiden_turn_back_sign_count = 0
|
|
|
|
+ self.turning_in_forbiden_turn_left_sign_count = 0
|
|
|
|
+ self.avoid_pedestrian_when_turn_back_count = 0
|
|
|
|
+
|
|
|
|
+ def pedestrian_in_front_of_car(self):
|
|
|
|
+ if len(self.pedestrian_data) == 0:
|
|
|
|
+ return []
|
|
|
|
+ else:
|
|
|
|
+ self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
|
|
|
|
+ self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
|
|
|
|
+ self.ego_data["dist"] = np.sqrt(
|
|
|
|
+ self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.ego_data["rela_pos"] = (
|
|
|
|
+ self.ego_data["dx"] * self.ego_data["speedX"]
|
|
|
|
+ + self.ego_data["dy"] * self.ego_data["speedY"]
|
|
|
|
+ )
|
|
|
|
+ simtime = self.ego_data[
|
|
|
|
+ (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
|
|
|
|
+ ]["simTime"].tolist()
|
|
|
|
+ return simtime
|
|
|
|
+
|
|
|
|
+ def different_road_area_simtime(self, df, threshold=0.5):
|
|
|
|
+ if not df:
|
|
|
|
+ return []
|
|
|
|
+ simtime_group = []
|
|
|
|
+ current_simtime_group = [df[0]]
|
|
|
|
+
|
|
|
|
+ for i in range(1, len(df)):
|
|
|
|
+ if abs(df[i] - df[i - 1]) <= threshold:
|
|
|
|
+ current_simtime_group.append(df[i])
|
|
|
|
+ else:
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ current_simtime_group = [df[i]]
|
|
|
|
+
|
|
|
|
+ simtime_group.append(current_simtime_group)
|
|
|
|
+ return simtime_group
|
|
|
|
+
|
|
|
|
+ def turn_back_in_forbiden_sign(self):
|
|
|
|
+ """
|
|
|
|
+ 禁止掉头type = 8
|
|
|
|
+ """
|
|
|
|
+ forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+ forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
|
|
|
|
+ "simTime"
|
|
|
|
+ ].tolist()
|
|
|
|
+
|
|
|
|
+ forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
|
|
|
|
+ forbiden_turn_back_simTime
|
|
|
|
+ )
|
|
|
|
+ forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
|
|
|
|
+ forbiden_turn_left_simTime
|
|
|
|
+ )
|
|
|
|
+ for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
|
|
|
|
+ ego_car1 = self.ego_data.loc[
|
|
|
|
+ (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
|
|
|
|
+ ]
|
|
|
|
+ ego_start_speedx1 = ego_car1["speedX"].iloc[0]
|
|
|
|
+ ego_start_speedy1 = ego_car1["speedY"].iloc[0]
|
|
|
|
+ ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
|
|
|
|
+ ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
|
|
|
|
+
|
|
|
|
+ if (
|
|
|
|
+ ego_end_speedx1 * ego_start_speedx1
|
|
|
|
+ + ego_end_speedy1 * ego_start_speedy1
|
|
|
|
+ < 0
|
|
|
|
+ ):
|
|
|
|
+ self.turning_in_forbiden_turn_back_sign_count += 1
|
|
|
|
+
|
|
|
|
+ for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
|
|
|
|
+ ego_car2 = self.ego_data.loc[
|
|
|
|
+ (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
|
|
|
|
+ ]
|
|
|
|
+ ego_start_speedx2 = ego_car2["speedX"].iloc[0]
|
|
|
|
+ ego_start_speedy2 = ego_car2["speedY"].iloc[0]
|
|
|
|
+ ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
|
|
|
|
+ ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
|
|
|
|
+
|
|
|
|
+ if (
|
|
|
|
+ ego_end_speedx2 * ego_start_speedx2
|
|
|
|
+ + ego_end_speedy2 * ego_start_speedy2
|
|
|
|
+ < 0
|
|
|
|
+ ):
|
|
|
|
+ self.turning_in_forbiden_turn_left_sign_count += 1
|
|
|
|
+
|
|
|
|
+ def avoid_pedestrian_when_turn_back(self):
|
|
|
|
+ sensor_on_intersection = self.pedestrian_in_front_of_car()
|
|
|
|
+ avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
|
|
|
|
+ self.ego_data["lane_type"] == 20
|
|
|
|
+ ]["simTime"].tolist()
|
|
|
|
+ avoid_pedestrian_when_turn_back_simTime_devide = (
|
|
|
|
+ self.different_road_area_simtime(
|
|
|
|
+ avoid_pedestrian_when_turn_back_simTime_list
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+ if len(sensor_on_intersection) > 0:
|
|
|
|
+ for (
|
|
|
|
+ avoid_pedestrian_when_turn_back_simtime
|
|
|
|
+ ) in avoid_pedestrian_when_turn_back_simTime_devide:
|
|
|
|
+ pedestrian_in_intersection_simtime = self.pedestrian_data[
|
|
|
|
+ self.pedestrian_data["simTime"].isin(
|
|
|
|
+ avoid_pedestrian_when_turn_back_simtime
|
|
|
|
+ )
|
|
|
|
+ ].tolist()
|
|
|
|
+ ego_df = self.ego_data[
|
|
|
|
+ self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
|
|
|
|
+ ].reset_index(drop=True)
|
|
|
|
+ pedestrian_df = self.pedestrian_data[
|
|
|
|
+ self.pedestrian_data["simTime"].isin(
|
|
|
|
+ pedestrian_in_intersection_simtime
|
|
|
|
+ )
|
|
|
|
+ ].reset_index(drop=True)
|
|
|
|
+ ego_df["dist"] = np.sqrt(
|
|
|
|
+ (ego_df["posx"] - pedestrian_df["posx"]) ** 2
|
|
|
|
+ + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
|
|
|
|
+ )
|
|
|
|
+ ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
|
|
|
|
+ if any(ego_df["speed"].tolist()) != 0:
|
|
|
|
+ self.avoid_pedestrian_when_turn_back_count += 1
|
|
|
|
+
|
|
|
|
+ def statistic(self):
|
|
|
|
+ self.turn_back_in_forbiden_sign()
|
|
|
|
+ self.avoid_pedestrian_when_turn_back()
|
|
|
|
+
|
|
|
|
+ self.calculated_value = {
|
|
|
|
+ "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
|
|
|
|
+ "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
|
|
|
|
+ "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
|
|
|
|
+ }
|
|
|
|
+ # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
|
|
|
|
+ return self.calculated_value
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class WrongWayViolation:
|
|
|
|
+ """停车违规类"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ print("停车违规类初始化中...")
|
|
|
|
+ self.traffic_violations_type = "停车违规类"
|
|
|
|
+ self.data = df_data.obj_data[1]
|
|
|
|
+ # 初始化违规统计
|
|
|
|
+ self.violation_count = {
|
|
|
|
+ "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
|
|
|
|
+ "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
|
|
|
|
+ "urbanExpresswayEmergencyLaneDriving": 0,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ def process_violations(self):
|
|
|
|
+ """处理停车或者紧急车道行驶违规数据"""
|
|
|
|
+ # 提取有效道路类型
|
|
|
|
+ urban_expressway_or_highway = {1, 2}
|
|
|
|
+ driving_lane = {1, 4, 5, 6}
|
|
|
|
+ emergency_lane = {12}
|
|
|
|
+ self.data["v"] *= 3.6 # 转换速度
|
|
|
|
+
|
|
|
|
+ # 使用向量化和条件判断进行违规判定
|
|
|
|
+ conditions = [
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & self.data["lane_type"].isin(driving_lane)
|
|
|
|
+ & (self.data["v"] == 0)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & self.data["lane_type"].isin(emergency_lane)
|
|
|
|
+ & (self.data["v"] == 0)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & self.data["lane_type"].isin(emergency_lane)
|
|
|
|
+ & (self.data["v"] != 0)
|
|
|
|
+ ),
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ violation_types = [
|
|
|
|
+ "urbanExpresswayOrHighwayDrivingLaneStopped",
|
|
|
|
+ "urbanExpresswayOrHighwayEmergencyLaneStopped",
|
|
|
|
+ "urbanExpresswayEmergencyLaneDriving",
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ # 设置违规类型
|
|
|
|
+ self.data["violation_type"] = None
|
|
|
|
+ for condition, violation_type in zip(conditions, violation_types):
|
|
|
|
+ self.data.loc[condition, "violation_type"] = violation_type
|
|
|
|
+
|
|
|
|
+ # 统计违规情况
|
|
|
|
+ self.violation_count = (
|
|
|
|
+ self.data["violation_type"]
|
|
|
|
+ .value_counts()
|
|
|
|
+ .reindex(violation_types, fill_value=0)
|
|
|
|
+ .to_dict()
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ def statistic(self) -> str:
|
|
|
|
+
|
|
|
|
+ self.process_violations()
|
|
|
|
+ # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
|
|
|
|
+ return self.violation_count
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class SpeedingViolation(object):
|
|
|
|
+ """超速违规类"""
|
|
|
|
+
|
|
|
|
+ """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ print("超速违规类初始化中...")
|
|
|
|
+ self.traffic_violations_type = "超速违规类"
|
|
|
|
+ self.data = df_data.obj_data[
|
|
|
|
+ 1
|
|
|
|
+ ] # Copy to avoid modifying the original DataFrame
|
|
|
|
+ # 初始化违规统计
|
|
|
|
+ self.violation_counts = {
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
|
|
|
|
+ "generalRoadSpeedOverLimit50": 0,
|
|
|
|
+ "generalRoadSpeedOverLimit20to50": 0,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ def process_violations(self):
|
|
|
|
+ """处理数据帧,检查超速和其他违规行为"""
|
|
|
|
+ # 提取有效道路类型
|
|
|
|
+ urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
|
|
|
|
+ general_road = {3} # 直接创建包含一个元素的集合
|
|
|
|
+ self.data["v"] *= 3.6 # 转换速度
|
|
|
|
+
|
|
|
|
+ # 违规判定
|
|
|
|
+ conditions = [
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"])
|
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(urban_expressway_or_highway)
|
|
|
|
+ & (self.data["v"] < self.data["road_speed_min"])
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(general_road)
|
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.5)
|
|
|
|
+ ),
|
|
|
|
+ (
|
|
|
|
+ self.data["road_fc"].isin(general_road)
|
|
|
|
+ & (self.data["v"] > self.data["road_speed_max"] * 1.2)
|
|
|
|
+ & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
|
|
|
|
+ ),
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ violation_types = [
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit50",
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit20to50",
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedOverLimit0to20",
|
|
|
|
+ "urbanExpresswayOrHighwaySpeedUnderLimit",
|
|
|
|
+ "generalRoadSpeedOverLimit50",
|
|
|
|
+ "generalRoadSpeedOverLimit20to50",
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ # 设置违规类型
|
|
|
|
+ self.data["violation_type"] = None
|
|
|
|
+ for condition, violation_type in zip(conditions, violation_types):
|
|
|
|
+ self.data.loc[condition, "violation_type"] = violation_type
|
|
|
|
+
|
|
|
|
+ # 统计各类违规情况
|
|
|
|
+ self.violation_counts = self.data["violation_type"].value_counts().to_dict()
|
|
|
|
+
|
|
|
|
+ def statistic(self) -> str:
|
|
|
|
+ # 处理数据
|
|
|
|
+ self.process_violations()
|
|
|
|
+ # self.logger.info(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
|
|
|
|
+ return self.violation_counts
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class TrafficLightViolation(object):
|
|
|
|
+ """违反交通灯类"""
|
|
|
|
+
|
|
|
|
+ """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ """初始化方法"""
|
|
|
|
+ self.traffic_violations_type = "违反交通灯类"
|
|
|
|
+ print("违反交通灯类 类初始化中...")
|
|
|
|
+ self.config = df_data.vehicle_config
|
|
|
|
+
|
|
|
|
+ self.data_ego = df_data.ego_data # 获取数据
|
|
|
|
+ self.violation_counts = {
|
|
|
|
+ "trafficSignalViolation": 0,
|
|
|
|
+ "illegalDrivingOrParkingAtCrossroads": 0,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 处理数据并判定违规
|
|
|
|
+ self.process_violations()
|
|
|
|
+
|
|
|
|
+ def is_point_cross_line(self, point, stop_line_points):
|
|
|
|
+ """
|
|
|
|
+ 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
|
|
|
|
+ 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
|
|
|
|
+
|
|
|
|
+ :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
|
|
|
|
+ :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
|
|
|
|
+ :return: True 如果车辆跨越了停止线,否则 False
|
|
|
|
+ """
|
|
|
|
+ line_vector = np.array(
|
|
|
|
+ [
|
|
|
|
+ stop_line_points[1][0] - stop_line_points[0][0],
|
|
|
|
+ stop_line_points[1][1] - stop_line_points[0][1],
|
|
|
|
+ ]
|
|
|
|
+ )
|
|
|
|
+ point_vector = np.array(
|
|
|
|
+ [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ cross_product = np.cross(line_vector, point_vector)
|
|
|
|
+ if cross_product != 0:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ mid_point = (
|
|
|
|
+ np.array([stop_line_points[0][0], stop_line_points[0][1]])
|
|
|
|
+ + 0.5 * line_vector
|
|
|
|
+ )
|
|
|
|
+ axletree_to_mid_vector = np.array(
|
|
|
|
+ [point[0] - mid_point[0], point[1] - mid_point[1]]
|
|
|
|
+ )
|
|
|
|
+ direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
|
|
|
|
+
|
|
|
|
+ norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
|
|
|
|
+ norm_direction = np.linalg.norm(direction_vector)
|
|
|
|
+
|
|
|
|
+ if norm_axletree_to_mid == 0 or norm_direction == 0:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
|
|
|
|
+ norm_axletree_to_mid * norm_direction
|
|
|
|
+ )
|
|
|
|
+ angle_theta = math.degrees(math.acos(cos_theta))
|
|
|
|
+
|
|
|
|
+ return angle_theta <= 90
|
|
|
|
+
|
|
|
|
+ def _filter_data(self):
|
|
|
|
+ """过滤数据,筛选出需要分析的记录"""
|
|
|
|
+ return self.data_ego[
|
|
|
|
+ (self.data_ego["stopline_id"] != -1)
|
|
|
|
+ & (self.data_ego["stopline_type"] == 1)
|
|
|
|
+ & (self.data_ego["trafficlight_id"] != -1)
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ def _group_data(self, filtered_data):
|
|
|
|
+ """按时间差对数据进行分组"""
|
|
|
|
+ filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
|
|
|
|
+ threshold = 0.5
|
|
|
|
+ filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
|
|
|
|
+ return filtered_data.groupby("group")
|
|
|
|
+
|
|
|
|
+ def _analyze_group(self, group_data):
|
|
|
|
+ """分析单个分组的数据,判断是否闯红灯"""
|
|
|
|
+ photos = []
|
|
|
|
+ stop_in_intersection = False
|
|
|
|
+
|
|
|
|
+ for _, row in group_data.iterrows():
|
|
|
|
+ vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
|
|
|
|
+ stop_line_points = [
|
|
|
|
+ [row["stopline_x1"], row["stopline_y1"]],
|
|
|
|
+ [row["stopline_x2"], row["stopline_y2"]],
|
|
|
|
+ ]
|
|
|
|
+ traffic_light_status = row["traffic_light_status"]
|
|
|
|
+ heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
|
|
|
|
+ heading_vector = heading_vector / np.linalg.norm(heading_vector)
|
|
|
|
+
|
|
|
|
+ # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
|
|
|
|
+ # config = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
+ front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
|
|
|
|
+ rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
|
|
|
|
+ dist = math.sqrt(
|
|
|
|
+ (row["posX"] - row["traffic_light_x"]) ** 2
|
|
|
|
+ + (row["posY"] - row["traffic_light_y"]) ** 2
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
|
|
|
|
+ has_crossed_line_front = (
|
|
|
|
+ self.is_point_cross_line(front_wheel_pos, stop_line_points)
|
|
|
|
+ and traffic_light_status == 1
|
|
|
|
+ )
|
|
|
|
+ has_crossed_line_rear = (
|
|
|
|
+ self.is_point_cross_line(rear_wheel_pos, stop_line_points)
|
|
|
|
+ and row["v"] > 0
|
|
|
|
+ and traffic_light_status == 1
|
|
|
|
+ )
|
|
|
|
+ has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
|
|
|
|
+ has_passed_intersection = has_crossed_line_front and dist < 1.0
|
|
|
|
+ # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
|
|
|
|
+
|
|
|
|
+ photos.extend(
|
|
|
|
+ [
|
|
|
|
+ has_crossed_line_front,
|
|
|
|
+ has_crossed_line_rear,
|
|
|
|
+ has_passed_intersection,
|
|
|
|
+ has_stop_in_intersection,
|
|
|
|
+ ]
|
|
|
|
+ )
|
|
|
|
+ stop_in_intersection = has_passed_intersection
|
|
|
|
+
|
|
|
|
+ return photos, stop_in_intersection
|
|
|
|
+
|
|
|
|
+ def is_vehicle_run_a_red_light(self):
|
|
|
|
+ """判断车辆是否闯红灯"""
|
|
|
|
+ filtered_data = self._filter_data()
|
|
|
|
+ grouped_data = self._group_data(filtered_data)
|
|
|
|
+ self.photos_group = []
|
|
|
|
+ self.stop_in_intersections = []
|
|
|
|
+
|
|
|
|
+ for _, group_data in grouped_data:
|
|
|
|
+ photos, stop_in_intersection = self._analyze_group(group_data)
|
|
|
|
+ self.photos_group.append(photos)
|
|
|
|
+ self.stop_in_intersections.append(stop_in_intersection)
|
|
|
|
+
|
|
|
|
+ def process_violations(self):
|
|
|
|
+ """处理数据并判定违规"""
|
|
|
|
+ self.is_vehicle_run_a_red_light()
|
|
|
|
+ count_1 = sum(all(photos) for photos in self.photos_group)
|
|
|
|
+ count_2 = sum(
|
|
|
|
+ stop_in_intersection for stop_in_intersection in self.stop_in_intersections
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ self.violation_counts["trafficSignalViolation"] = count_1
|
|
|
|
+ self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
|
|
|
|
+
|
|
|
|
+ def statistic(self):
|
|
|
|
+ """返回统计结果"""
|
|
|
|
+ return self.violation_counts
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class WarningViolation(object):
|
|
|
|
+ """警告性违规类"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ self.traffic_violations_type = "警告性违规类"
|
|
|
|
+ print("警告性违规类 类初始化中...")
|
|
|
|
+ self.config = df_data.vehicle_config
|
|
|
|
+ self.data_ego = df_data.obj_data[1]
|
|
|
|
+ self.data = self.data_ego.copy() # 避免修改原始 DataFrame
|
|
|
|
+ self.violation_counts = {
|
|
|
|
+ "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
|
|
|
|
+ "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ def process_violations(self):
|
|
|
|
+ general_road = {3} # 普通道路
|
|
|
|
+ lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
|
|
|
|
+ # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
|
|
|
|
+ # config = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
+ car_width = self.config["CAR_WIDTH"]
|
|
|
|
+ lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
|
|
|
|
+
|
|
|
|
+ # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
|
|
|
|
+ # 使用布尔索引来筛选满足条件的行
|
|
|
|
+ condition = (self.data["road_fc"].isin(general_road)) & (
|
|
|
|
+ self.data["lane_type"].isin(lane_type)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 创建一个新的列,并根据条件设置值
|
|
|
|
+ self.data["is_violation"] = condition
|
|
|
|
+
|
|
|
|
+ # 统计满足条件的连续时间段
|
|
|
|
+ violation_segments = self.count_continuous_violations(
|
|
|
|
+ self.data["is_violation"], self.data["simTime"]
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 更新骑行车道线违规计数
|
|
|
|
+ self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
|
|
|
|
+
|
|
|
|
+ # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
|
|
|
|
+
|
|
|
|
+ # 计算阈值
|
|
|
|
+ threshold = (lane_width - car_width) / 2
|
|
|
|
+
|
|
|
|
+ # 找到满足条件的行
|
|
|
|
+ self.data["is_violation"] = self.data["laneOffset"] > threshold
|
|
|
|
+
|
|
|
|
+ # 统计满足条件的连续时间段
|
|
|
|
+ violation_segments = self.count_continuous_violations(
|
|
|
|
+ self.data["is_violation"], self.data["simTime"]
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 更新骑行车道线违规计数
|
|
|
|
+ self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
|
|
|
|
+ violation_segments
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ def count_continuous_violations(self, violation_series, time_series):
|
|
|
|
+ """统计连续违规的时间段数量"""
|
|
|
|
+ continuous_segments = []
|
|
|
|
+ current_segment = []
|
|
|
|
+
|
|
|
|
+ for is_violation, time in zip(violation_series, time_series):
|
|
|
|
+ if is_violation:
|
|
|
|
+ if not current_segment: # 新的连续段开始
|
|
|
|
+ current_segment.append(time)
|
|
|
|
+ else:
|
|
|
|
+ if current_segment: # 连续段结束
|
|
|
|
+ continuous_segments.append(current_segment)
|
|
|
|
+ current_segment = []
|
|
|
|
+
|
|
|
|
+ # 检查是否有一个未结束的连续段在最后
|
|
|
|
+ if current_segment:
|
|
|
|
+ continuous_segments.append(current_segment)
|
|
|
|
+
|
|
|
|
+ return continuous_segments
|
|
|
|
+
|
|
|
|
+ def statistic(self):
|
|
|
|
+ # 处理数据
|
|
|
|
+ self.process_violations()
|
|
|
|
+ # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
|
|
|
|
+ return self.violation_counts
|
|
|
|
+
|
|
|
|
+class TrafficSignViolation(object):
|
|
|
|
+ """交通标志违规类"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, df_data):
|
|
|
|
+ self.traffic_violations_type = "交通标志违规类"
|
|
|
|
+ print("交通标志违规类 类初始化中...")
|
|
|
|
+ self.data_ego = df_data.obj_data[1]
|
|
|
|
+ self.ego_data = (
|
|
|
|
+ self.data_ego[TRFFICSIGN_INFO].copy().reset_index(drop=True)
|
|
|
|
+ )
|
|
|
|
+ self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
|
|
|
|
+ self.violation_counts = {
|
|
|
|
+ "NoStraightThrough": 0, # 禁止直行标志地方直行
|
|
|
|
+ "SpeedLimitViolation": 0, # 违反限速规定
|
|
|
|
+ "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # def checkForProhibitionViolation(self):
|
|
|
|
+ # """禁令标志判断违规:7 禁止直行,12:限制速度"""
|
|
|
|
+ # # 筛选出sign_type1为7(禁止直行)
|
|
|
|
+ # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
|
|
|
|
+ # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
|
|
|
|
+
|
|
|
|
+ def checkForProhibitionViolation(self):
|
|
|
|
+ """禁令标志判断违规:7 禁止直行,12:限制速度"""
|
|
|
|
+ # 筛选出 sign_type1 为7(禁止直行)的数据
|
|
|
|
+ violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
|
|
|
|
+
|
|
|
|
+ # 判断车辆是否在禁止直行路段直行
|
|
|
|
+ if not violation_straight_df.empty:
|
|
|
|
+ # 按时间戳排序(假设数据按时间顺序处理)
|
|
|
|
+ violation_straight_df = violation_straight_df.sort_values('simTime')
|
|
|
|
+
|
|
|
|
+ # 计算航向角变化(前后时间点的差值绝对值)
|
|
|
|
+ violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
|
|
|
|
+
|
|
|
|
+ # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
|
|
|
|
+ threshold = 5 # 单位:度(根据场景调整)
|
|
|
|
+ mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
|
|
|
|
+ straight_violations = violation_straight_df[mask]
|
|
|
|
+
|
|
|
|
+ # 统计违规次数或记录违规数据
|
|
|
|
+ self.violation_counts["prohibition_straight"] = len(straight_violations)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # 限制速度判断(原代码)
|
|
|
|
+ violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
|
|
|
|
+ if violation_speed_limit_df.empty:
|
|
|
|
+ mask = self.data_ego["v"] > self.data_ego["sign_speed"]
|
|
|
|
+ self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
|
|
|
|
+
|
|
|
|
+ def checkForInstructionViolation(self):
|
|
|
|
+ """限速标志属于指示性标志:13:最低限速"""
|
|
|
|
+ violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
|
|
|
|
+ if violation_minimum_speed_limit_df.empty:
|
|
|
|
+ mask = self.data_ego["v"] < self.data_ego["sign_speed"]
|
|
|
|
+ self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
|
|
|
|
+ def statistic(self):
|
|
|
|
+ self.checkForProhibitionViolation()
|
|
|
|
+ self.checkForInstructionViolation()
|
|
|
|
+ # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
|
|
|
|
+ return self.violation_counts
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class ViolationManager:
|
|
|
|
+ """违规管理类,用于管理所有违规行为"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, data_processed):
|
|
|
|
+
|
|
|
|
+ self.violations = []
|
|
|
|
+ self.data = data_processed
|
|
|
|
+ self.config = data_processed.traffic_config
|
|
|
|
+
|
|
|
|
+ self.over_take_violation = OvertakingViolation(self.data)
|
|
|
|
+ self.slow_down_violation = SlowdownViolation(self.data)
|
|
|
|
+ self.wrong_way_violation = WrongWayViolation(self.data)
|
|
|
|
+ self.speeding_violation = SpeedingViolation(self.data)
|
|
|
|
+ self.traffic_light_violation = TrafficLightViolation(self.data)
|
|
|
|
+ self.warning_violation = WarningViolation(self.data)
|
|
|
|
+
|
|
|
|
+ # self.report_statistic()
|
|
|
|
+
|
|
|
|
+ def report_statistic(self):
|
|
|
|
+
|
|
|
|
+ traffic_result = self.over_take_violation.statistic()
|
|
|
|
+ traffic_result.update(self.slow_down_violation.statistic())
|
|
|
|
+ traffic_result.update(self.traffic_light_violation.statistic())
|
|
|
|
+ traffic_result.update(self.wrong_way_violation.statistic())
|
|
|
|
+ traffic_result.update(self.speeding_violation.statistic())
|
|
|
|
+ traffic_result.update(self.warning_violation.statistic())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # evaluator = Score(self.config)
|
|
|
|
+ # result = evaluator.evaluate(traffic_result)
|
|
|
|
+
|
|
|
|
+ # print("\n[交规类表现及得分情况]")
|
|
|
|
+ # # self.logger.info(f"Traffic Result:{traffic_result}")
|
|
|
|
+ # return result
|
|
|
|
+ return traffic_result
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# 示例使用
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ pass
|