traffic.py 56 KB


  1. import math
  2. import numpy as np
  3. import pandas as pd
  4. from modules.lib import log_manager
  5. from modules.lib.score import Score
  6. from modules.config import config
  7. class OvertakingViolation(object):
  8. """超车违规类"""
  9. def __init__(self, df_data):
  10. print("超车违规类初始化中...")
  11. self.traffic_violations_type = "超车违规类"
  12. # self.logger = log.get_logger() # 使用时再初始化
  13. self.data = df_data
  14. self.data_ego = df_data.ego_data
  15. self.ego_data = (
  16. self.data_ego[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  17. ) # Copy to avoid modifying the original DataFrame
  18. header = self.ego_data.columns
  19. self.overtake_on_right_count = 0
  20. self.overtake_when_turn_around_count = 0
  21. self.overtake_when_passing_car_count = 0
  22. self.overtake_in_forbid_lane_count = 0
  23. self.overtake_in_ramp_count = 0
  24. self.overtake_in_tunnel_count = 0
  25. self.overtake_on_accelerate_lane_count = 0
  26. self.overtake_on_decelerate_lane_count = 0
  27. self.overtake_in_different_senerios_count = 0
  28. def different_road_area_simtime(self, df, threshold=0.5):
  29. if not df:
  30. return []
  31. simtime_group = []
  32. current_simtime_group = [df[0]]
  33. for i in range(1, len(df)):
  34. if abs(df[i] - df[i - 1]) <= threshold:
  35. current_simtime_group.append(df[i])
  36. else:
  37. simtime_group.append(current_simtime_group)
  38. current_simtime_group = [df[i]]
  39. simtime_group.append(current_simtime_group)
  40. return simtime_group
  41. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  42. lane_start = lane_id[0]
  43. lane_end = lane_id[-1]
  44. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  45. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  46. return lane_start == lane_end and start_condition and end_condition
  47. def _is_dxy_of_car(self, ego_df, obj_df):
  48. """
  49. :param df: objstate.csv and so on
  50. :param string_type: posX/Y or speedX/Y and so on
  51. :return: dataframe of dx/y and so on
  52. """
  53. car_dx = obj_df["posX"].values - ego_df["posX"].values
  54. car_dy = obj_df["posY"].values - ego_df["posY"].values
  55. return car_dx, car_dy
  56. # 在前车右侧超车、会车时超车、前车掉头时超车
  57. def _is_objectcar_of_overtake(self, obj_df):
  58. if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and (
  59. (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX' ].values *obj_df['speedX'].values).any() >= 0):
  60. return obj_df
  61. else:
  62. return None
  63. def _is_passingcar(self, obj_df):
  64. if ((self.ego_data['posX'].values - obj_df['posX'].values).any() <= self.ego_data['lane_width'].values.any()) and (
  65. (self.ego_data['posY'].values - obj_df['posY'].values).any() <= 50) and ((self.ego_data['speedX'].values * obj_df['speedX'].values).any() < 0):
  66. return obj_df
  67. else:
  68. return None
  69. def illegal_overtake_with_car(self, window_width=250):
  70. # 获取csv文件中最短的帧数
  71. frame_id_length = len(self.ego_data["simFrame"])
  72. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  73. for i in self.data.obj_id_list:
  74. if i != 1:
  75. data_obj = self.data.obj_data[i]
  76. else:
  77. continue
  78. obj_datas = self._is_objectcar_of_overtake(data_obj)
  79. if obj_datas is not None:
  80. obj_data = (
  81. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  82. )
  83. while (start_frame_id + window_width) < frame_id_length:
  84. simframe_window1 = list(
  85. np.arange(start_frame_id, start_frame_id + window_width)
  86. )
  87. simframe_window = list(map(int, simframe_window1))
  88. # 读取滑动窗口的dataframe数据
  89. ego_data_frames = self.ego_data[
  90. self.ego_data["simFrame"].isin(simframe_window)
  91. ]
  92. obj_data_frames = obj_data[
  93. obj_data["simFrame"].isin(simframe_window)
  94. ]
  95. # 读取前后的laneId
  96. lane_id = ego_data_frames["lane_id"].tolist()
  97. # 读取前后方向盘转角steeringWheel,
  98. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  99. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  100. # 读取车辆前后的位置信息
  101. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  102. ego_speedx = ego_data_frames["speedX"].tolist()
  103. ego_speedy = ego_data_frames["speedY"].tolist()
  104. obj_speedx = obj_data_frames[obj_data_frames["playerId"] == i][
  105. "speedX"
  106. ].tolist()
  107. obj_speedy = obj_data_frames[obj_data_frames["playerId"] == i][
  108. "speedY"
  109. ].tolist()
  110. for j in self.data.obj_id_list:
  111. data_pass_obj = self.data.obj_data[j]
  112. obj_pass_datas = self._is_passingcar(data_pass_obj)
  113. if obj_pass_datas:
  114. other_data_frames = obj_pass_datas[
  115. obj_pass_datas["simFrame"].isin(simframe_window)
  116. ]
  117. if len(other_data_frames) > 0:
  118. other_start_speedx = other_data_frames["speedX"].iloc[0]
  119. other_start_speedy = other_data_frames["speedY"].iloc[0]
  120. if (
  121. ego_speedx[0] * other_start_speedx
  122. + ego_speedy[0] * other_start_speedy
  123. < 0
  124. ):
  125. self.overtake_when_passing_car_count += self._is_overtake(
  126. lane_id, dx, dy, ego_speedx, ego_speedy
  127. )
  128. start_frame_id += window_width
  129. """
  130. 如果滑动窗口开始和最后的laneid一致;
  131. 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
  132. 自车和前车的位置发生的交换;
  133. 则认为右超车
  134. """
  135. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  136. self.overtake_on_right_count += self._is_overtake(
  137. lane_id, dx, dy, ego_speedx, ego_speedy
  138. )
  139. start_frame_id += window_width
  140. elif (len(ego_speedx ) *len(obj_speedx) > 0) and \
  141. (ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0):
  142. self.overtake_when_turn_around_count += self._is_overtake(
  143. lane_id, dx, dy, ego_speedx, ego_speedy
  144. )
  145. start_frame_id += window_width
  146. else:
  147. start_frame_id += 1
  148. # print(
  149. # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
  150. # 借道超车场景
  151. def overtake_in_forbid_lane(self):
  152. for i in self.data.obj_id_list:
  153. if i != 1:
  154. data_obj = self.data.obj_data[i]
  155. else:
  156. continue
  157. obj_datas = self._is_objectcar_of_overtake(data_obj)
  158. if obj_datas is not None:
  159. obj_data = (
  160. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  161. )
  162. simTime = obj_data["simTime"].tolist()
  163. simtime_devide = self.different_road_area_simtime(simTime)
  164. if len(simtime_devide) == 0:
  165. self.overtake_in_forbid_lane_count += 0
  166. return
  167. else:
  168. for simtime in simtime_devide:
  169. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  170. try:
  171. lane_type = lane_overtake["lane_type"].tolist()
  172. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  173. 50002 not in lane_type and len(set(lane_type)) > 1
  174. ):
  175. self.overtake_in_forbid_lane_count += 1
  176. except Exception as e:
  177. print("数据缺少lane_type信息")
  178. # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
  179. # 在匝道超车
  180. def overtake_in_ramp_area(self):
  181. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  182. "simTime"
  183. ].tolist()
  184. if len(ramp_simtime_list) == 0:
  185. self.overtake_in_ramp_count += 0
  186. return
  187. else:
  188. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  189. for ramp_simtime in ramp_simTime_list:
  190. for i in self.data.obj_id_list:
  191. if i != 1:
  192. data_obj = self.data.obj_data[i]
  193. else:
  194. continue
  195. obj_datas = self._is_objectcar_of_overtake(data_obj)
  196. if obj_datas is not None:
  197. obj_data = (
  198. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  199. )
  200. lane_id = self.ego_data["lane_id"].tolist()
  201. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  202. objstate_in_ramp = obj_data[
  203. obj_data["simTime"].isin(ramp_simtime)
  204. ]
  205. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  206. ego_speedx = ego_in_ramp["speedX"].tolist()
  207. ego_speedy = ego_in_ramp["speedY"].tolist()
  208. if len(lane_id) > 0:
  209. self.overtake_in_ramp_count += self._is_overtake(
  210. lane_id, dx, dy, ego_speedx, ego_speedy
  211. )
  212. else:
  213. continue
  214. # print(f"在匝道超车{self.overtake_in_ramp_count}次")
  215. def overtake_in_tunnel_area(self):
  216. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  217. "simTime"
  218. ].tolist()
  219. if len(tunnel_simtime_list) == 0:
  220. self.overtake_in_tunnel_count += 0
  221. return
  222. else:
  223. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  224. for tunnel_simtime in tunnel_simTime_list:
  225. lane_id = self.ego_data["lane_id"].tolist()
  226. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  227. for i in self.data.obj_id_list:
  228. if i != 1:
  229. data_obj = self.data.obj_data[i]
  230. else:
  231. continue
  232. obj_datas = self._is_objectcar_of_overtake(data_obj)
  233. if obj_datas is not None:
  234. obj_data = (
  235. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  236. )
  237. objstate_in_tunnel = obj_data[
  238. obj_data["simTime"].isin(tunnel_simtime)
  239. ]
  240. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  241. ego_speedx = ego_in_tunnel["speedX"].tolist()
  242. ego_speedy = ego_in_tunnel["speedY"].tolist()
  243. if len(lane_id) > 0:
  244. self.overtake_in_tunnel_count += self._is_overtake(
  245. lane_id, dx, dy, ego_speedx, ego_speedy
  246. )
  247. else:
  248. continue
  249. # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
  250. # 加速车道超车
  251. def overtake_on_accelerate_lane(self):
  252. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  253. "simTime"
  254. ].tolist()
  255. if len(accelerate_simtime_list) == 0:
  256. self.overtake_on_accelerate_lane_count += 0
  257. return
  258. else:
  259. accelerate_simTime_list = self.different_road_area_simtime(
  260. accelerate_simtime_list
  261. )
  262. for accelerate_simtime in accelerate_simTime_list:
  263. lane_id = self.ego_data["lane_id"].tolist()
  264. ego_in_accelerate = self.ego_data[
  265. self.ego_data["simTime"].isin(accelerate_simtime)
  266. ]
  267. for i in self.data.obj_id_list:
  268. if i != 1:
  269. data_obj = self.data.obj_data[i]
  270. else:
  271. continue
  272. obj_datas = self._is_objectcar_of_overtake(data_obj)
  273. if obj_datas is not None:
  274. obj_data = (
  275. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  276. )
  277. objstate_in_accelerate = obj_data[
  278. obj_data["simTime"].isin(accelerate_simtime)
  279. ]
  280. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  281. ego_speedx = ego_in_accelerate["speedX"].tolist()
  282. ego_speedy = ego_in_accelerate["speedY"].tolist()
  283. self.overtake_on_accelerate_lane_count += self._is_overtake(
  284. lane_id, dx, dy, ego_speedx, ego_speedy
  285. )
  286. # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
  287. # 减速车道超车
  288. def overtake_on_decelerate_lane(self):
  289. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  290. "simTime"
  291. ].tolist()
  292. if len(decelerate_simtime_list) == 0:
  293. self.overtake_on_decelerate_lane_count += 0
  294. return
  295. else:
  296. decelerate_simTime_list = self.different_road_area_simtime(
  297. decelerate_simtime_list
  298. )
  299. for decelerate_simtime in decelerate_simTime_list:
  300. lane_id = self.ego_data["id"].tolist()
  301. ego_in_decelerate = self.ego_data[
  302. self.ego_data["simTime"].isin(decelerate_simtime)
  303. ]
  304. for i in self.data.obj_id_list:
  305. if i != 1:
  306. data_obj = self.data.obj_data[i]
  307. else:
  308. continue
  309. obj_datas = self._is_objectcar_of_overtake(data_obj)
  310. if obj_datas is not None:
  311. obj_data = (
  312. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  313. )
  314. objstate_in_decelerate = obj_data[
  315. obj_data["simTime"].isin(decelerate_simtime)
  316. ]
  317. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  318. ego_speedx = ego_in_decelerate["speedX"].tolist()
  319. ego_speedy = ego_in_decelerate["speedY"].tolist()
  320. self.overtake_on_decelerate_lane_count += self._is_overtake(
  321. lane_id, dx, dy, ego_speedx, ego_speedy
  322. )
  323. # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
  324. # 在交叉路口
  325. def overtake_in_different_senerios(self):
  326. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  327. "simTime"
  328. ].tolist() # 判断是路口或者隧道区域
  329. if len(crossroad_simTime) == 0:
  330. self.overtake_in_different_senerios_count += 0
  331. return
  332. else:
  333. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  334. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  335. for i in self.data.obj_id_list:
  336. if i != 1:
  337. data_obj = self.data.obj_data[i]
  338. else:
  339. continue
  340. obj_datas = self._is_objectcar_of_overtake(data_obj)
  341. if obj_datas is not None:
  342. obj_data = (
  343. obj_datas[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  344. )
  345. crossroad_objstate = obj_data[
  346. obj_data["simTime"].isin(crossroad_simTime)
  347. ]
  348. # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
  349. # 读取前后的laneId
  350. lane_id = crossroad_ego["lane_id"].tolist()
  351. # 读取车辆前后的位置信息
  352. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  353. ego_speedx = crossroad_ego["speedX"].tolist()
  354. ego_speedy = crossroad_ego["speedY"].tolist()
  355. """
  356. 如果滑动窗口开始和最后的laneid一致;
  357. 自车和前车的位置发生的交换;
  358. 则认为发生超车
  359. """
  360. if len(lane_id) > 0:
  361. self.overtake_in_different_senerios_count += self._is_overtake(
  362. lane_id, dx, dy, ego_speedx, ego_speedy
  363. )
  364. else:
  365. pass
  366. # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
  367. def statistic(self):
  368. for i in self.data.obj_id_list:
  369. if i != 1:
  370. data_obj = self.data.obj_data[i]
  371. else:
  372. continue
  373. obj_datas = self._is_objectcar_of_overtake(data_obj)
  374. if obj_datas is not None:
  375. self.overtake_in_forbid_lane()
  376. self.overtake_on_decelerate_lane()
  377. self.overtake_on_accelerate_lane()
  378. self.overtake_in_ramp_area()
  379. self.overtake_in_tunnel_area()
  380. self.overtake_in_different_senerios()
  381. self.illegal_overtake_with_car()
  382. else:
  383. pass
  384. self.calculated_value = {
  385. "overtake_on_right": self.overtake_on_right_count,
  386. "overtake_when_turn_around": self.overtake_when_turn_around_count,
  387. "overtake_when_passing_car": self.overtake_when_passing_car_count,
  388. "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
  389. "overtake_in_ramp": self.overtake_in_ramp_count,
  390. "overtake_in_tunnel": self.overtake_in_tunnel_count,
  391. "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
  392. "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
  393. "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
  394. }
  395. # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
  396. return self.calculated_value
  397. class SlowdownViolation(object):
  398. """减速让行违规类"""
  399. def __init__(self, df_data):
  400. print("减速让行违规类-------------------------")
  401. self.traffic_violations_type = "减速让行违规类"
  402. self.object_items = []
  403. self.data = df_data.ego_data
  404. self.ego_data = (
  405. self.data[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  406. ) # Copy to avoid modifying the original DataFrame
  407. self.pedestrian_data = pd.DataFrame()
  408. self.object_items = set(df_data.object_df.type.tolist())
  409. if 13 in self.object_items: # 行人的type是13
  410. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  411. self.pedestrian_data = (
  412. self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  413. )
  414. self.slow_down_in_crosswalk_count = 0
  415. self.avoid_pedestrian_in_crosswalk_count = 0
  416. self.avoid_pedestrian_in_the_road_count = 0
  417. self.aviod_pedestrian_when_turning_count = 0
  418. def pedestrian_in_front_of_car(self):
  419. if len(self.pedestrian_data) == 0:
  420. return []
  421. else:
  422. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  423. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  424. self.ego_data["dist"] = np.sqrt(
  425. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  426. )
  427. self.ego_data["rela_pos"] = (
  428. self.ego_data["dx"] * self.ego_data["speedX"]
  429. + self.ego_data["dy"] * self.ego_data["speedY"]
  430. )
  431. simtime = self.ego_data[
  432. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  433. ]["simTime"].tolist()
  434. return simtime
  435. def different_road_area_simtime(self, df, threshold=0.6):
  436. if not df:
  437. return []
  438. simtime_group = []
  439. current_simtime_group = [df[0]]
  440. for i in range(1, len(df)):
  441. if abs(df[i] - df[i - 1]) <= threshold:
  442. current_simtime_group.append(df[i])
  443. else:
  444. simtime_group.append(current_simtime_group)
  445. current_simtime_group = [df[i]]
  446. simtime_group.append(current_simtime_group)
  447. return simtime_group
  448. def slow_down_in_crosswalk(self):
  449. # 筛选出路口或隧道区域的时间点
  450. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  451. "simTime"
  452. ].tolist()
  453. if len(crosswalk_simTime) == 0:
  454. self.slow_down_in_crosswalk_count += 0
  455. return
  456. else:
  457. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  458. for crosswalk_simtime in crosswalk_simTime_divide:
  459. # 筛选出当前时间段内的数据
  460. # start_time, end_time = crosswalk_simtime
  461. start_time = crosswalk_simtime[0]
  462. end_time = crosswalk_simtime[-1]
  463. print(f"当前时间段:{start_time} - {end_time}")
  464. crosswalk_objstate = self.ego_data[
  465. (self.ego_data["simTime"] >= start_time)
  466. & (self.ego_data["simTime"] <= end_time)
  467. ]
  468. # 计算车辆速度
  469. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  470. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  471. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  472. # 判断是否超速
  473. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  474. self.slow_down_in_crosswalk_count += 1
  475. # 输出总次数
  476. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  477. def avoid_pedestrian_in_crosswalk(self):
  478. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  479. "simTime"
  480. ].tolist()
  481. if len(crosswalk_simTime) == 0:
  482. self.avoid_pedestrian_in_crosswalk_count += 0
  483. return
  484. else:
  485. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  486. for crosswalk_simtime in crosswalk_simTime_devide:
  487. if not self.pedestrian_data.empty:
  488. crosswalk_objstate = self.pedestrian_data[
  489. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  490. ]
  491. else:
  492. crosswalk_objstate = pd.DataFrame()
  493. if len(crosswalk_objstate) > 0:
  494. pedestrian_simtime = crosswalk_objstate["simTime"]
  495. pedestrian_objstate = crosswalk_objstate[
  496. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  497. ]
  498. ego_speed = np.sqrt(
  499. pedestrian_objstate["speedX"] ** 2
  500. + pedestrian_objstate["speedY"] ** 2
  501. )
  502. if ego_speed.any() > 0:
  503. self.avoid_pedestrian_in_crosswalk_count += 1
  504. def avoid_pedestrian_in_the_road(self):
  505. simtime = self.pedestrian_in_front_of_car()
  506. if len(simtime) == 0:
  507. self.avoid_pedestrian_in_the_road_count += 0
  508. return
  509. else:
  510. pedestrian_on_the_road = self.pedestrian_data[
  511. self.pedestrian_data["simTime"].isin(simtime)
  512. ]
  513. simTime = pedestrian_on_the_road["simTime"].tolist()
  514. simTime_devide = self.different_road_area_simtime(simTime)
  515. if len(simTime_devide) == 0:
  516. pass
  517. else:
  518. for simtime1 in simTime_devide:
  519. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  520. pedestrian_on_the_road["simTime"].isin(simtime1)
  521. ]
  522. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  523. dist = np.sqrt(
  524. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  525. ** 2
  526. + (
  527. ego_car["posY"].values
  528. - sub_pedestrian_on_the_road["posY"].values
  529. )
  530. ** 2
  531. )
  532. speed = np.sqrt(
  533. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  534. )
  535. data = {"dist": dist, "speed": speed}
  536. new_ego_car = pd.DataFrame(data)
  537. new_ego_car = new_ego_car.assign(
  538. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  539. )
  540. if new_ego_car["Column3"].any():
  541. self.avoid_pedestrian_in_the_road_count += 1
  542. def aviod_pedestrian_when_turning(self):
  543. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  544. if len(pedestrian_simtime_list) > 0:
  545. simtime_list = self.ego_data[
  546. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  547. & (self.ego_data["lane_type"] == 20)
  548. ]["simTime"].tolist()
  549. simTime_list = self.different_road_area_simtime(simtime_list)
  550. pedestrian_on_the_road = self.pedestrian_data[
  551. self.pedestrian_data["simTime"].isin(simtime_list)
  552. ]
  553. if len(simTime_list) == 0:
  554. pass
  555. else:
  556. for simtime in simTime_list:
  557. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  558. pedestrian_on_the_road["simTime"].isin(simtime)
  559. ]
  560. if len(sub_pedestrian_on_the_road) > 0:
  561. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  562. ego_car["dist"] = np.sqrt(
  563. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  564. ** 2
  565. + (
  566. ego_car["posY"].values
  567. - sub_pedestrian_on_the_road["posY"].values
  568. )
  569. ** 2
  570. )
  571. ego_car["speed"] = np.sqrt(
  572. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  573. )
  574. if any(ego_car["speed"].tolist()) != 0:
  575. self.aviod_pedestrian_when_turning_count += 1
  576. def statistic(self):
  577. self.slow_down_in_crosswalk()
  578. self.avoid_pedestrian_in_crosswalk()
  579. self.avoid_pedestrian_in_the_road()
  580. self.aviod_pedestrian_when_turning()
  581. self.calculated_value = {
  582. "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
  583. "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
  584. "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
  585. "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
  586. }
  587. # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
  588. return self.calculated_value
  589. class TurnaroundViolation(object):
  590. def __init__(self, df_data):
  591. print("掉头违规类初始化中...")
  592. self.traffic_violations_type = "掉头违规类"
  593. self.data = df_data.obj_data[1]
  594. self.ego_data = (
  595. self.data[config.TURNAROUND_INFO].copy().reset_index(drop=True)
  596. ) # Copy to avoid modifying the original DataFrame
  597. self.pedestrian_data = pd.DataFrame()
  598. self.object_items = set(df_data.object_df.type.tolist())
  599. if 13 in self.object_items: # 行人的type是13
  600. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  601. self.pedestrian_data = (
  602. self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  603. )
  604. self.turning_in_forbiden_turn_back_sign_count = 0
  605. self.turning_in_forbiden_turn_left_sign_count = 0
  606. self.avoid_pedestrian_when_turn_back_count = 0
  607. def pedestrian_in_front_of_car(self):
  608. if len(self.pedestrian_data) == 0:
  609. return []
  610. else:
  611. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  612. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  613. self.ego_data["dist"] = np.sqrt(
  614. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  615. )
  616. self.ego_data["rela_pos"] = (
  617. self.ego_data["dx"] * self.ego_data["speedX"]
  618. + self.ego_data["dy"] * self.ego_data["speedY"]
  619. )
  620. simtime = self.ego_data[
  621. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  622. ]["simTime"].tolist()
  623. return simtime
  624. def different_road_area_simtime(self, df, threshold=0.5):
  625. if not df:
  626. return []
  627. simtime_group = []
  628. current_simtime_group = [df[0]]
  629. for i in range(1, len(df)):
  630. if abs(df[i] - df[i - 1]) <= threshold:
  631. current_simtime_group.append(df[i])
  632. else:
  633. simtime_group.append(current_simtime_group)
  634. current_simtime_group = [df[i]]
  635. simtime_group.append(current_simtime_group)
  636. return simtime_group
  637. def turn_back_in_forbiden_sign(self):
  638. """
  639. 禁止掉头type = 8
  640. """
  641. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  642. "simTime"
  643. ].tolist()
  644. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  645. "simTime"
  646. ].tolist()
  647. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  648. forbiden_turn_back_simTime
  649. )
  650. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  651. forbiden_turn_left_simTime
  652. )
  653. if len(forbiden_turn_back_simtime_devide) == 0:
  654. pass
  655. else:
  656. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  657. ego_car1 = self.ego_data.loc[
  658. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  659. ]
  660. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  661. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  662. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  663. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  664. if (
  665. ego_end_speedx1 * ego_start_speedx1
  666. + ego_end_speedy1 * ego_start_speedy1
  667. < 0
  668. ):
  669. self.turning_in_forbiden_turn_back_sign_count += 1
  670. if len(forbiden_turn_left_simtime_devide) == 0:
  671. pass
  672. else:
  673. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  674. ego_car2 = self.ego_data.loc[
  675. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  676. ]
  677. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  678. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  679. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  680. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  681. if (
  682. ego_end_speedx2 * ego_start_speedx2
  683. + ego_end_speedy2 * ego_start_speedy2
  684. < 0
  685. ):
  686. self.turning_in_forbiden_turn_left_sign_count += 1
  687. def avoid_pedestrian_when_turn_back(self):
  688. sensor_on_intersection = self.pedestrian_in_front_of_car()
  689. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  690. self.ego_data["lane_type"] == 20
  691. ]["simTime"].tolist()
  692. avoid_pedestrian_when_turn_back_simTime_devide = (
  693. self.different_road_area_simtime(
  694. avoid_pedestrian_when_turn_back_simTime_list
  695. )
  696. )
  697. if (len(sensor_on_intersection) > 0) and (len(avoid_pedestrian_when_turn_back_simTime_devide) > 0):
  698. for (
  699. avoid_pedestrian_when_turn_back_simtime
  700. ) in avoid_pedestrian_when_turn_back_simTime_devide:
  701. pedestrian_in_intersection_simtime = self.pedestrian_data[
  702. self.pedestrian_data["simTime"].isin(
  703. avoid_pedestrian_when_turn_back_simtime
  704. )
  705. ].tolist()
  706. ego_df = self.ego_data[
  707. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  708. ].reset_index(drop=True)
  709. pedestrian_df = self.pedestrian_data[
  710. self.pedestrian_data["simTime"].isin(
  711. pedestrian_in_intersection_simtime
  712. )
  713. ].reset_index(drop=True)
  714. ego_df["dist"] = np.sqrt(
  715. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  716. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  717. )
  718. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  719. if (any(ego_df["speed"].tolist()) != 0) or (any(ego_df["dist"].tolist()) == 0):
  720. self.avoid_pedestrian_when_turn_back_count += 1
  721. def statistic(self):
  722. self.turn_back_in_forbiden_sign()
  723. self.avoid_pedestrian_when_turn_back()
  724. self.calculated_value = {
  725. "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
  726. "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
  727. "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
  728. }
  729. # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
  730. return self.calculated_value
  731. class WrongWayViolation:
  732. """停车违规类"""
  733. def __init__(self, df_data):
  734. print("停车违规类初始化中...")
  735. self.traffic_violations_type = "停车违规类"
  736. self.data = df_data.obj_data[1].copy()
  737. # 初始化违规统计
  738. self.violation_count = {
  739. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  740. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  741. "urbanExpresswayEmergencyLaneDriving": 0,
  742. }
  743. def process_violations(self):
  744. """处理停车或者紧急车道行驶违规数据"""
  745. # 提取有效道路类型
  746. urban_expressway_or_highway = {1, 2}
  747. driving_lane = {1, 4, 5, 6}
  748. emergency_lane = {12}
  749. self.data["v"] *= 3.6 # 转换速度
  750. # 使用向量化和条件判断进行违规判定
  751. conditions = [
  752. (
  753. self.data["road_fc"].isin(urban_expressway_or_highway)
  754. & self.data["lane_type"].isin(driving_lane)
  755. & (self.data["v"] == 0)
  756. ),
  757. (
  758. self.data["road_fc"].isin(urban_expressway_or_highway)
  759. & self.data["lane_type"].isin(emergency_lane)
  760. & (self.data["v"] == 0)
  761. ),
  762. (
  763. self.data["road_fc"].isin(urban_expressway_or_highway)
  764. & self.data["lane_type"].isin(emergency_lane)
  765. & (self.data["v"] != 0)
  766. ),
  767. ]
  768. violation_types = [
  769. "urbanExpresswayOrHighwayDrivingLaneStopped",
  770. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  771. "urbanExpresswayEmergencyLaneDriving",
  772. ]
  773. # 设置违规类型
  774. self.data["violation_type"] = None
  775. for condition, violation_type in zip(conditions, violation_types):
  776. self.data.loc[condition, "violation_type"] = violation_type
  777. # 统计违规情况
  778. self.violation_count = (
  779. self.data["violation_type"]
  780. .value_counts()
  781. .reindex(violation_types, fill_value=0)
  782. .to_dict()
  783. )
  784. def statistic(self) -> str:
  785. self.process_violations()
  786. # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
  787. return self.violation_count
  788. class SpeedingViolation(object):
  789. """超速违规类"""
  790. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  791. def __init__(self, df_data):
  792. print("超速违规类初始化中...")
  793. self.traffic_violations_type = "超速违规类"
  794. self.data = df_data.obj_data[1].copy()
  795. # 初始化违规统计
  796. self.violation_counts = {
  797. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  798. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  799. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  800. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  801. "generalRoadSpeedOverLimit50": 0,
  802. "generalRoadSpeedOverLimit20to50": 0,
  803. }
  804. def process_violations(self):
  805. """处理数据帧,检查超速和其他违规行为"""
  806. # 提取有效道路类型
  807. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  808. general_road = {3} # 直接创建包含一个元素的集合
  809. # 转换速度
  810. self.data["v"] *= 3.6 # 转换速度
  811. conditions = [
  812. (
  813. self.data["road_fc"].isin(urban_expressway_or_highway)
  814. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  815. ),
  816. (
  817. self.data["road_fc"].isin(urban_expressway_or_highway)
  818. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  819. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  820. ),
  821. (
  822. self.data["road_fc"].isin(urban_expressway_or_highway)
  823. & (self.data["v"] > self.data["road_speed_max"])
  824. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  825. ),
  826. (
  827. self.data["road_fc"].isin(urban_expressway_or_highway)
  828. & (self.data["v"] < self.data["road_speed_min"])
  829. ),
  830. (
  831. self.data["road_fc"].isin(general_road)
  832. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  833. ),
  834. (
  835. self.data["road_fc"].isin(general_road)
  836. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  837. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  838. ),
  839. ]
  840. violation_types = [
  841. "urbanExpresswayOrHighwaySpeedOverLimit50",
  842. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  843. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  844. "urbanExpresswayOrHighwaySpeedUnderLimit",
  845. "generalRoadSpeedOverLimit50",
  846. "generalRoadSpeedOverLimit20to50",
  847. ]
  848. # 设置违规类型
  849. self.data["violation_type"] = None
  850. for condition, violation_type in zip(conditions, violation_types):
  851. self.data.loc[condition, "violation_type"] = violation_type
  852. # 统计各类违规情况
  853. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  854. # 添加statistic方法
  855. def statistic(self):
  856. """返回统计结果"""
  857. # 处理数据
  858. self.process_violations()
  859. print(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
  860. return self.violation_counts
  861. class TrafficLightViolation(object):
  862. """违反交通灯类"""
  863. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  864. def __init__(self, df_data):
  865. """初始化方法"""
  866. self.traffic_violations_type = "违反交通灯类"
  867. print("违反交通灯类 类初始化中...")
  868. self.config = df_data.vehicle_config
  869. self.data_ego = df_data.ego_data # 获取数据
  870. self.violation_counts = {
  871. "trafficSignalViolation": 0,
  872. "illegalDrivingOrParkingAtCrossroads": 0,
  873. }
  874. # 处理数据并判定违规
  875. self.process_violations()
  876. def is_point_cross_line(self, point, stop_line_points):
  877. """
  878. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  879. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  880. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  881. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  882. :return: True 如果车辆跨越了停止线,否则 False
  883. """
  884. line_vector = np.array(
  885. [
  886. stop_line_points[1][0] - stop_line_points[0][0],
  887. stop_line_points[1][1] - stop_line_points[0][1],
  888. ]
  889. )
  890. point_vector = np.array(
  891. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  892. )
  893. cross_product = np.cross(line_vector, point_vector)
  894. if cross_product != 0:
  895. return False
  896. mid_point = (
  897. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  898. + 0.5 * line_vector
  899. )
  900. axletree_to_mid_vector = np.array(
  901. [point[0] - mid_point[0], point[1] - mid_point[1]]
  902. )
  903. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  904. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  905. norm_direction = np.linalg.norm(direction_vector)
  906. if norm_axletree_to_mid == 0 or norm_direction == 0:
  907. return False
  908. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  909. norm_axletree_to_mid * norm_direction
  910. )
  911. angle_theta = math.degrees(math.acos(cos_theta))
  912. return angle_theta <= 90
  913. def _filter_data(self):
  914. """过滤数据,筛选出需要分析的记录"""
  915. return self.data_ego[
  916. (self.data_ego["stopline_id"] != -1)
  917. & (self.data_ego["stopline_type"] == 1)
  918. & (self.data_ego["trafficlight_id"] != -1)
  919. ]
  920. def _group_data(self, filtered_data):
  921. """按时间差对数据进行分组"""
  922. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  923. threshold = 0.5
  924. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  925. return filtered_data.groupby("group")
  926. def _analyze_group(self, group_data):
  927. """分析单个分组的数据,判断是否闯红灯"""
  928. photos = []
  929. stop_in_intersection = False
  930. for _, row in group_data.iterrows():
  931. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  932. stop_line_points = [
  933. [row["stopline_x1"], row["stopline_y1"]],
  934. [row["stopline_x2"], row["stopline_y2"]],
  935. ]
  936. stateMask = row["stateMask"]
  937. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  938. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  939. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  940. # config = yaml.load(f, Loader=yaml.FullLoader)
  941. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  942. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  943. dist = math.sqrt(
  944. (row["posX"] - row["traffic_light_x"]) ** 2
  945. + (row["posY"] - row["traffic_light_y"]) ** 2
  946. )
  947. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  948. has_crossed_line_front = (
  949. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  950. and stateMask == 1
  951. )
  952. has_crossed_line_rear = (
  953. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  954. and row["v"] > 0
  955. and stateMask == 1
  956. )
  957. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  958. has_passed_intersection = has_crossed_line_front and dist < 1.0
  959. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  960. photos.extend(
  961. [
  962. has_crossed_line_front,
  963. has_crossed_line_rear,
  964. has_passed_intersection,
  965. has_stop_in_intersection,
  966. ]
  967. )
  968. stop_in_intersection = has_passed_intersection
  969. return photos, stop_in_intersection
  970. def is_vehicle_run_a_red_light(self):
  971. """判断车辆是否闯红灯"""
  972. filtered_data = self._filter_data()
  973. grouped_data = self._group_data(filtered_data)
  974. self.photos_group = []
  975. self.stop_in_intersections = []
  976. for _, group_data in grouped_data:
  977. photos, stop_in_intersection = self._analyze_group(group_data)
  978. self.photos_group.append(photos)
  979. self.stop_in_intersections.append(stop_in_intersection)
  980. def process_violations(self):
  981. """处理数据并判定违规"""
  982. self.is_vehicle_run_a_red_light()
  983. count_1 = sum(all(photos) for photos in self.photos_group)
  984. count_2 = sum(
  985. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  986. )
  987. self.violation_counts["trafficSignalViolation"] = count_1
  988. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  989. def statistic(self):
  990. """返回统计结果"""
  991. return self.violation_counts
  992. class WarningViolation(object):
  993. """警告性违规类"""
  994. def __init__(self, df_data):
  995. self.traffic_violations_type = "警告性违规类"
  996. print("警告性违规类 类初始化中...")
  997. self.config = df_data.vehicle_config
  998. self.data_ego = df_data.obj_data[1]
  999. self.data = self.data_ego.copy() # 避免修改原始 DataFrame
  1000. self.violation_counts = {
  1001. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1002. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1003. }
  1004. def process_violations(self):
  1005. general_road = {3} # 普通道路
  1006. lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
  1007. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1008. # config = yaml.load(f, Loader=yaml.FullLoader)
  1009. car_width = self.config["CAR_WIDTH"]
  1010. lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
  1011. # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1012. # 使用布尔索引来筛选满足条件的行
  1013. condition = (self.data["road_fc"].isin(general_road)) & (
  1014. self.data["lane_type"].isin(lane_type)
  1015. )
  1016. # 创建一个新的列,并根据条件设置值
  1017. self.data["is_violation"] = condition
  1018. # 统计满足条件的连续时间段
  1019. violation_segments = self.count_continuous_violations(
  1020. self.data["is_violation"], self.data["simTime"]
  1021. )
  1022. # 更新骑行车道线违规计数
  1023. self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
  1024. # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1025. # 计算阈值
  1026. threshold = (lane_width - car_width) / 2
  1027. # 找到满足条件的行
  1028. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1029. # 统计满足条件的连续时间段
  1030. violation_segments = self.count_continuous_violations(
  1031. self.data["is_violation"], self.data["simTime"]
  1032. )
  1033. # 更新骑行车道线违规计数
  1034. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
  1035. violation_segments
  1036. )
  1037. def count_continuous_violations(self, violation_series, time_series):
  1038. """统计连续违规的时间段数量"""
  1039. continuous_segments = []
  1040. current_segment = []
  1041. for is_violation, time in zip(violation_series, time_series):
  1042. if is_violation:
  1043. if not current_segment: # 新的连续段开始
  1044. current_segment.append(time)
  1045. else:
  1046. if current_segment: # 连续段结束
  1047. continuous_segments.append(current_segment)
  1048. current_segment = []
  1049. # 检查是否有一个未结束的连续段在最后
  1050. if current_segment:
  1051. continuous_segments.append(current_segment)
  1052. return continuous_segments
  1053. def statistic(self):
  1054. # 处理数据
  1055. self.process_violations()
  1056. # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
  1057. return self.violation_counts
  1058. class TrafficSignViolation(object):
  1059. """交通标志违规类"""
  1060. def __init__(self, df_data):
  1061. self.traffic_violations_type = "交通标志违规类"
  1062. print("交通标志违规类 类初始化中...")
  1063. self.data_ego = df_data.obj_data[1]
  1064. self.ego_data = (
  1065. self.data_ego[config.TRFFICSIGN_INFO].copy().reset_index(drop=True)
  1066. )
  1067. self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
  1068. self.violation_counts = {
  1069. "NoStraightThrough": 0, # 禁止直行标志地方直行
  1070. "SpeedLimitViolation": 0, # 违反限速规定
  1071. "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
  1072. }
  1073. # def checkForProhibitionViolation(self):
  1074. # """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1075. # # 筛选出sign_type1为7(禁止直行)
  1076. # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
  1077. # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1078. def checkForProhibitionViolation(self):
  1079. """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1080. # 筛选出 sign_type1 为7(禁止直行)的数据
  1081. violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
  1082. # 判断车辆是否在禁止直行路段直行
  1083. if not violation_straight_df.empty:
  1084. # 按时间戳排序(假设数据按时间顺序处理)
  1085. violation_straight_df = violation_straight_df.sort_values('simTime')
  1086. # 计算航向角变化(前后时间点的差值绝对值)
  1087. violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
  1088. # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
  1089. threshold = 5 # 单位:度(根据场景调整)
  1090. mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
  1091. straight_violations = violation_straight_df[mask]
  1092. # 统计违规次数或记录违规数据
  1093. self.violation_counts["prohibition_straight"] = len(straight_violations)
  1094. # 限制速度判断(原代码)
  1095. violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1096. if violation_speed_limit_df.empty:
  1097. mask = self.data_ego["v"] > self.data_ego["sign_speed"]
  1098. self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
  1099. def checkForInstructionViolation(self):
  1100. """限速标志属于指示性标志:13:最低限速"""
  1101. violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
  1102. if violation_minimum_speed_limit_df.empty:
  1103. mask = self.data_ego["v"] < self.data_ego["sign_speed"]
  1104. self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
  1105. def statistic(self):
  1106. self.checkForProhibitionViolation()
  1107. self.checkForInstructionViolation()
  1108. # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
  1109. return self.violation_counts
  1110. class ViolationManager:
  1111. """违规管理类,用于管理所有违规行为"""
  1112. def __init__(self, data_processed):
  1113. self.violations = []
  1114. self.data = data_processed
  1115. self.config = data_processed.traffic_config
  1116. self.over_take_violation = OvertakingViolation(self.data)
  1117. self.slow_down_violation = SlowdownViolation(self.data)
  1118. self.wrong_way_violation = WrongWayViolation(self.data)
  1119. self.speeding_violation = SpeedingViolation(self.data)
  1120. self.traffic_light_violation = TrafficLightViolation(self.data)
  1121. self.warning_violation = WarningViolation(self.data)
  1122. # self.report_statistic()
  1123. def report_statistic(self):
  1124. traffic_result = self.over_take_violation.statistic()
  1125. traffic_result.update(self.slow_down_violation.statistic())
  1126. traffic_result.update(self.traffic_light_violation.statistic())
  1127. traffic_result.update(self.wrong_way_violation.statistic())
  1128. traffic_result.update(self.speeding_violation.statistic())
  1129. traffic_result.update(self.warning_violation.statistic())
  1130. evaluator = Score(self.config)
  1131. result = evaluator.evaluate(traffic_result)
  1132. print("\n[交规类表现及得分情况]")
  1133. # self.logger.info(f"Traffic Result:{traffic_result}")
  1134. return result
  1135. # 示例使用
  1136. if __name__ == "__main__":
  1137. pass