traffic.py 62 KB


  1. import math
  2. import numpy as np
  3. import pandas as pd
  4. from modules.lib import log_manager
  5. from modules.lib.score import Score
  6. from modules.lib.log_manager import LogManager
  7. from modules.lib import data_process
  8. OVERTAKE_INFO = [
  9. "simTime",
  10. "simFrame",
  11. "playerId",
  12. "speedX",
  13. "speedY",
  14. "posX",
  15. "posY",
  16. "posH",
  17. "lane_id",
  18. "lane_type",
  19. "road_type",
  20. "interid",
  21. "crossid",
  22. ]
  23. SLOWDOWN_INFO = [
  24. "simTime",
  25. "simFrame",
  26. "playerId",
  27. "speedX",
  28. "speedY",
  29. "posX",
  30. "posY",
  31. "crossid",
  32. "lane_type",
  33. ]
  34. TURNAROUND_INFO = [
  35. "simTime",
  36. "simFrame",
  37. "playerId",
  38. "speedX",
  39. "speedY",
  40. "posX",
  41. "posY",
  42. "sign_type1",
  43. "lane_type",
  44. ]
  45. TRFFICSIGN_INFO = [
  46. "simTime",
  47. "simFrame",
  48. "playerId",
  49. "speedX",
  50. "speedY",
  51. "v",
  52. "posX",
  53. "posY",
  54. "sign_type1",
  55. "sign_ref_link",
  56. "sign_x",
  57. "sign_y",
  58. ]
  59. def overtake_when_passing_car(data_processed):
  60. overtakingviolation = OvertakingViolation(data_processed)
  61. overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
  62. return {"overtake_when_passing_car": overtake_when_passing_car_count}
  63. def overtake_on_right(data_processed):
  64. overtakingviolation = OvertakingViolation(data_processed)
  65. overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
  66. return {"overtake_on_right": overtake_on_right_count}
  67. def overtake_when_turn_around(data_processed):
  68. overtakingviolation = OvertakingViolation(data_processed)
  69. overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
  70. return {"overtake_when_turn_around": overtake_when_turn_around_count}
  71. def overtake_in_forbid_lane(data_processed):
  72. overtakingviolation = OvertakingViolation(data_processed)
  73. overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
  74. return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
  75. def overtake_in_ramp(data_processed):
  76. overtakingviolation = OvertakingViolation(data_processed)
  77. overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
  78. return {"overtake_in_ramp": overtake_in_ramp_area_count}
  79. def overtake_in_tunnel(data_processed):
  80. overtakingviolation = OvertakingViolation(data_processed)
  81. overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
  82. return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
  83. def overtake_on_accelerate_lane(data_processed):
  84. overtakingviolation = OvertakingViolation(data_processed)
  85. overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
  86. return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
  87. def overtake_on_decelerate_lane(data_processed):
  88. overtakingviolation = OvertakingViolation(data_processed)
  89. overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
  90. return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
  91. def overtake_in_different_senerios(data_processed):
  92. overtakingviolation = OvertakingViolation(data_processed)
  93. overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
  94. return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
  95. class OvertakingViolation(object):
  96. """超车违规类"""
  97. def __init__(self, df_data):
  98. print("超车违规类初始化中...")
  99. self.traffic_violations_type = "超车违规类"
  100. # self.logger = log.get_logger() # 使用时再初始化
  101. self.data = df_data.obj_data[1]
  102. self.ego_data = (
  103. self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
  104. ) # Copy to avoid modifying the original DataFrame
  105. header = self.ego_data.columns
  106. if 2 in df_data.obj_id_list:
  107. self.data_obj = df_data.obj_data[2]
  108. self.obj_data = (
  109. self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  110. ) # Copy to avoid modifying the original DataFrame
  111. else:
  112. self.obj_data = pd.DataFrame(columns=header)
  113. if 3 in df_data.obj_id_list:
  114. self.other_obj_data1 = df_data.obj_data[3]
  115. self.other_obj_data = (
  116. self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  117. )
  118. else:
  119. self.other_obj_data = pd.DataFrame(columns=header)
  120. self.overtake_on_right_count = 0
  121. self.overtake_when_turn_around_count = 0
  122. self.overtake_when_passing_car_count = 0
  123. self.overtake_in_forbid_lane_count = 0
  124. self.overtake_in_ramp_count = 0
  125. self.overtake_in_tunnel_count = 0
  126. self.overtake_on_accelerate_lane_count = 0
  127. self.overtake_on_decelerate_lane_count = 0
  128. self.overtake_in_different_senerios_count = 0
  129. def different_road_area_simtime(self, df, threshold=0.5):
  130. if not df:
  131. return []
  132. simtime_group = []
  133. current_simtime_group = [df[0]]
  134. for i in range(1, len(df)):
  135. if abs(df[i] - df[i - 1]) <= threshold:
  136. current_simtime_group.append(df[i])
  137. else:
  138. simtime_group.append(current_simtime_group)
  139. current_simtime_group = [df[i]]
  140. simtime_group.append(current_simtime_group)
  141. return simtime_group
  142. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  143. lane_start = lane_id[0]
  144. lane_end = lane_id[-1]
  145. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  146. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  147. return lane_start == lane_end and start_condition and end_condition
  148. def _is_dxy_of_car(self, ego_df, obj_df):
  149. """
  150. :param df: objstate.csv and so on
  151. :param id: playerId
  152. :param string_type: posX/Y or speedX/Y and so on
  153. :return: dataframe of dx/y and so on
  154. """
  155. car_dx = obj_df["posX"].values - ego_df["posX"].values
  156. car_dy = obj_df["posY"].values - ego_df["posY"].values
  157. return car_dx, car_dy
  158. # 在前车右侧超车、会车时超车、前车掉头时超车
  159. def illegal_overtake_with_car_detector(self, window_width=250):
  160. # 获取csv文件中最短的帧数
  161. frame_id_length = len(self.ego_data["simFrame"])
  162. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  163. while (start_frame_id + window_width) < frame_id_length:
  164. simframe_window1 = list(
  165. np.arange(start_frame_id, start_frame_id + window_width)
  166. )
  167. simframe_window = list(map(int, simframe_window1))
  168. # 读取滑动窗口的dataframe数据
  169. ego_data_frames = self.ego_data[
  170. self.ego_data["simFrame"].isin(simframe_window)
  171. ]
  172. obj_data_frames = self.obj_data[
  173. self.obj_data["simFrame"].isin(simframe_window)
  174. ]
  175. other_data_frames = self.other_obj_data[
  176. self.other_obj_data["simFrame"].isin(simframe_window)
  177. ]
  178. # 读取前后的laneId
  179. lane_id = ego_data_frames["lane_id"].tolist()
  180. # 读取前后方向盘转角steeringWheel,
  181. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  182. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  183. # 读取车辆前后的位置信息
  184. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  185. ego_speedx = ego_data_frames["speedX"].tolist()
  186. ego_speedy = ego_data_frames["speedY"].tolist()
  187. obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
  188. "speedX"
  189. ].tolist()
  190. obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
  191. "speedY"
  192. ].tolist()
  193. if len(other_data_frames) > 0:
  194. other_start_speedx = other_data_frames["speedX"].iloc[0]
  195. other_start_speedy = other_data_frames["speedY"].iloc[0]
  196. if (
  197. ego_speedx[0] * other_start_speedx
  198. + ego_speedy[0] * other_start_speedy
  199. < 0
  200. ):
  201. self.overtake_when_passing_car_count += self._is_overtake(
  202. lane_id, dx, dy, ego_speedx, ego_speedy
  203. )
  204. start_frame_id += window_width
  205. """
  206. 如果滑动窗口开始和最后的laneid一致;
  207. 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
  208. 自车和前车的位置发生的交换;
  209. 则认为右超车
  210. """
  211. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  212. self.overtake_on_right_count += self._is_overtake(
  213. lane_id, dx, dy, ego_speedx, ego_speedy
  214. )
  215. start_frame_id += window_width
  216. elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  217. self.overtake_when_turn_around_count += self._is_overtake(
  218. lane_id, dx, dy, ego_speedx, ego_speedy
  219. )
  220. start_frame_id += window_width
  221. else:
  222. start_frame_id += 1
  223. # 借道超车场景
  224. def overtake_in_forbid_lane_detector(self):
  225. simTime = self.obj_data["simTime"].tolist()
  226. simtime_devide = self.different_road_area_simtime(simTime)
  227. for simtime in simtime_devide:
  228. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  229. try:
  230. lane_type = lane_overtake["lane_type"].tolist()
  231. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  232. 50002 not in lane_type and len(set(lane_type)) > 1
  233. ):
  234. self.overtake_in_forbid_lane_count += 1
  235. except Exception as e:
  236. print("数据缺少lane_type信息")
  237. # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
  238. # 在匝道超车
  239. def overtake_in_ramp_area_detector(self):
  240. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  241. "simTime"
  242. ].tolist()
  243. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  244. for ramp_simtime in ramp_simTime_list:
  245. lane_id = self.ego_data["lane_id"].tolist()
  246. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  247. objstate_in_ramp = self.obj_data[
  248. self.obj_data["simTime"].isin(ramp_simtime)
  249. ]
  250. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  251. ego_speedx = ego_in_ramp["speedX"].tolist()
  252. ego_speedy = ego_in_ramp["speedY"].tolist()
  253. if len(lane_id) > 0:
  254. self.overtake_in_ramp_count += self._is_overtake(
  255. lane_id, dx, dy, ego_speedx, ego_speedy
  256. )
  257. else:
  258. continue
  259. # print(f"在匝道超车{self.overtake_in_ramp_count}次")
  260. def overtake_in_tunnel_area_detector(self):
  261. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  262. "simTime"
  263. ].tolist()
  264. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  265. for tunnel_simtime in tunnel_simTime_list:
  266. lane_id = self.ego_data["lane_id"].tolist()
  267. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  268. objstate_in_tunnel = self.obj_data[
  269. self.obj_data["simTime"].isin(tunnel_simtime)
  270. ]
  271. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  272. ego_speedx = ego_in_tunnel["speedX"].tolist()
  273. ego_speedy = ego_in_tunnel["speedY"].tolist()
  274. if len(lane_id) > 0:
  275. self.overtake_in_tunnel_count += self._is_overtake(
  276. lane_id, dx, dy, ego_speedx, ego_speedy
  277. )
  278. else:
  279. continue
  280. # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
  281. # 加速车道超车
  282. def overtake_on_accelerate_lane_detector(self):
  283. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  284. "simTime"
  285. ].tolist()
  286. accelerate_simTime_list = self.different_road_area_simtime(
  287. accelerate_simtime_list
  288. )
  289. for accelerate_simtime in accelerate_simTime_list:
  290. lane_id = self.ego_data["lane_id"].tolist()
  291. ego_in_accelerate = self.ego_data[
  292. self.ego_data["simTime"].isin(accelerate_simtime)
  293. ]
  294. objstate_in_accelerate = self.obj_data[
  295. self.obj_data["simTime"].isin(accelerate_simtime)
  296. ]
  297. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  298. ego_speedx = ego_in_accelerate["speedX"].tolist()
  299. ego_speedy = ego_in_accelerate["speedY"].tolist()
  300. self.overtake_on_accelerate_lane_count += self._is_overtake(
  301. lane_id, dx, dy, ego_speedx, ego_speedy
  302. )
  303. # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
  304. # 减速车道超车
  305. def overtake_on_decelerate_lane_detector(self):
  306. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  307. "simTime"
  308. ].tolist()
  309. decelerate_simTime_list = self.different_road_area_simtime(
  310. decelerate_simtime_list
  311. )
  312. for decelerate_simtime in decelerate_simTime_list:
  313. lane_id = self.ego_data["id"].tolist()
  314. ego_in_decelerate = self.ego_data[
  315. self.ego_data["simTime"].isin(decelerate_simtime)
  316. ]
  317. objstate_in_decelerate = self.obj_data[
  318. self.obj_data["simTime"].isin(decelerate_simtime)
  319. ]
  320. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  321. ego_speedx = ego_in_decelerate["speedX"].tolist()
  322. ego_speedy = ego_in_decelerate["speedY"].tolist()
  323. self.overtake_on_decelerate_lane_count += self._is_overtake(
  324. lane_id, dx, dy, ego_speedx, ego_speedy
  325. )
  326. # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
  327. # 在交叉路口
  328. def overtake_in_different_senerios_detector(self):
  329. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  330. "simTime"
  331. ].tolist() # 判断是路口或者隧道区域
  332. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  333. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  334. crossroad_objstate = self.obj_data[
  335. self.obj_data["simTime"].isin(crossroad_simTime)
  336. ]
  337. # 读取前后的laneId
  338. lane_id = crossroad_ego["lane_id"].tolist()
  339. # 读取车辆前后的位置信息
  340. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  341. ego_speedx = crossroad_ego["speedX"].tolist()
  342. ego_speedy = crossroad_ego["speedY"].tolist()
  343. """
  344. 如果滑动窗口开始和最后的laneid一致;
  345. 自车和前车的位置发生的交换;
  346. 则认为发生超车
  347. """
  348. if len(lane_id) > 0:
  349. self.overtake_in_different_senerios_count += self._is_overtake(
  350. lane_id, dx, dy, ego_speedx, ego_speedy
  351. )
  352. else:
  353. pass
  354. def calculate_overtake_when_passing_car_count(self):
  355. self.illegal_overtake_with_car_detector()
  356. return self.overtake_when_passing_car_count
  357. def calculate_overtake_on_right_count(self):
  358. self.illegal_overtake_with_car_detector()
  359. return self.overtake_on_right_count
  360. def calculate_overtake_when_turn_around_count(self):
  361. self.illegal_overtake_with_car_detector()
  362. return self.overtake_when_turn_around_count
  363. def calculate_overtake_in_forbid_lane_count(self):
  364. self.overtake_in_forbid_lane_detector()
  365. return self.overtake_in_forbid_lane_count
  366. def calculate_overtake_in_ramp_area_count(self):
  367. self.overtake_in_ramp_area_detector()
  368. return self.overtake_in_ramp_count
  369. def calculate_overtake_in_tunnel_area_count(self):
  370. self.overtake_in_tunnel_area_detector()
  371. return self.overtake_in_tunnel_count
  372. def calculate_overtake_on_accelerate_lane_count(self):
  373. self.overtake_on_accelerate_lane_detector()
  374. return self.overtake_on_accelerate_lane_count
  375. def calculate_overtake_on_decelerate_lane_count(self):
  376. self.overtake_on_decelerate_lane_detector()
  377. return self.overtake_on_decelerate_lane_count
  378. def calculate_overtake_in_different_senerios_count(self):
  379. self.overtake_in_different_senerios_detector()
  380. return self.overtake_in_different_senerios_count
  381. def slow_down_in_crosswalk(data_processed):
  382. slowdownviolation = SlowdownViolation(data_processed)
  383. slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
  384. return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
  385. def avoid_pedestrian_in_crosswalk(data_processed):
  386. avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
  387. avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
  388. return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
  389. def avoid_pedestrian_in_the_road(data_processed):
  390. avoidpedestrianintheroad = SlowdownViolation(data_processed)
  391. avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
  392. return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
  393. def aviod_pedestrian_when_turning(data_processed):
  394. avoidpedestrianwhenturning = SlowdownViolation(data_processed)
  395. avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
  396. return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
  397. class SlowdownViolation(object):
  398. """减速让行违规类"""
  399. def __init__(self, df_data):
  400. print("减速让行违规类-------------------------")
  401. self.traffic_violations_type = "减速让行违规类"
  402. self.object_items = []
  403. self.data = df_data.obj_data[1]
  404. self.ego_data = (
  405. self.data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  406. ) # Copy to avoid modifying the original DataFrame
  407. self.pedestrian_data = pd.DataFrame()
  408. self.object_items = set(df_data.object_df.type.tolist())
  409. if 13 in self.object_items: # 行人的type是13
  410. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  411. self.pedestrian_data = (
  412. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  413. )
  414. self.slow_down_in_crosswalk_count = 0
  415. self.avoid_pedestrian_in_crosswalk_count = 0
  416. self.avoid_pedestrian_in_the_road_count = 0
  417. self.aviod_pedestrian_when_turning_count = 0
  418. def pedestrian_in_front_of_car(self):
  419. if len(self.pedestrian_data) == 0:
  420. return []
  421. else:
  422. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  423. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  424. self.ego_data["dist"] = np.sqrt(
  425. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  426. )
  427. self.ego_data["rela_pos"] = (
  428. self.ego_data["dx"] * self.ego_data["speedX"]
  429. + self.ego_data["dy"] * self.ego_data["speedY"]
  430. )
  431. simtime = self.ego_data[
  432. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  433. ]["simTime"].tolist()
  434. return simtime
  435. def different_road_area_simtime(self, df, threshold=0.6):
  436. if not df:
  437. return []
  438. simtime_group = []
  439. current_simtime_group = [df[0]]
  440. for i in range(1, len(df)):
  441. if abs(df[i] - df[i - 1]) <= threshold:
  442. current_simtime_group.append(df[i])
  443. else:
  444. simtime_group.append(current_simtime_group)
  445. current_simtime_group = [df[i]]
  446. simtime_group.append(current_simtime_group)
  447. return simtime_group
  448. def slow_down_in_crosswalk_detector(self):
  449. # 筛选出路口或隧道区域的时间点
  450. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  451. "simTime"
  452. ].tolist()
  453. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  454. for crosswalk_simtime in crosswalk_simTime_divide:
  455. # 筛选出当前时间段内的数据
  456. # start_time, end_time = crosswalk_simtime
  457. start_time = crosswalk_simtime[0]
  458. end_time = crosswalk_simtime[-1]
  459. print(f"当前时间段:{start_time} - {end_time}")
  460. crosswalk_objstate = self.ego_data[
  461. (self.ego_data["simTime"] >= start_time)
  462. & (self.ego_data["simTime"] <= end_time)
  463. ]
  464. # 计算车辆速度
  465. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  466. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  467. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  468. # 判断是否超速
  469. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  470. self.slow_down_in_crosswalk_count += 1
  471. # 输出总次数
  472. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  473. def avoid_pedestrian_in_crosswalk_detector(self):
  474. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  475. "simTime"
  476. ].tolist()
  477. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  478. for crosswalk_simtime in crosswalk_simTime_devide:
  479. if not self.pedestrian_data.empty:
  480. crosswalk_objstate = self.pedestrian_data[
  481. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  482. ]
  483. else:
  484. crosswalk_objstate = pd.DataFrame()
  485. if len(crosswalk_objstate) > 0:
  486. pedestrian_simtime = crosswalk_objstate["simTime"]
  487. pedestrian_objstate = crosswalk_objstate[
  488. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  489. ]
  490. ego_speed = np.sqrt(
  491. pedestrian_objstate["speedX"] ** 2
  492. + pedestrian_objstate["speedY"] ** 2
  493. )
  494. if ego_speed.any() > 0:
  495. self.avoid_pedestrian_in_crosswalk_count += 1
  496. def avoid_pedestrian_in_the_road_detector(self):
  497. simtime = self.pedestrian_in_front_of_car()
  498. if len(simtime) == 0:
  499. self.avoid_pedestrian_in_the_road_count += 0
  500. else:
  501. pedestrian_on_the_road = self.pedestrian_data[
  502. self.pedestrian_data["simTime"].isin(simtime)
  503. ]
  504. simTime = pedestrian_on_the_road["simTime"].tolist()
  505. simTime_devide = self.different_road_area_simtime(simTime)
  506. for simtime1 in simTime_devide:
  507. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  508. pedestrian_on_the_road["simTime"].isin(simtime1)
  509. ]
  510. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  511. dist = np.sqrt(
  512. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  513. ** 2
  514. + (
  515. ego_car["posY"].values
  516. - sub_pedestrian_on_the_road["posY"].values
  517. )
  518. ** 2
  519. )
  520. speed = np.sqrt(
  521. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  522. )
  523. data = {"dist": dist, "speed": speed}
  524. new_ego_car = pd.DataFrame(data)
  525. new_ego_car = new_ego_car.assign(
  526. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  527. )
  528. if new_ego_car["Column3"].any():
  529. self.avoid_pedestrian_in_the_road_count += 1
  530. def aviod_pedestrian_when_turning_detector(self):
  531. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  532. if len(pedestrian_simtime_list) > 0:
  533. simtime_list = self.ego_data[
  534. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  535. & (self.ego_data["lane_type"] == 20)
  536. ]["simTime"].tolist()
  537. simTime_list = self.different_road_area_simtime(simtime_list)
  538. pedestrian_on_the_road = self.pedestrian_data[
  539. self.pedestrian_data["simTime"].isin(simtime_list)
  540. ]
  541. for simtime in simTime_list:
  542. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  543. pedestrian_on_the_road["simTime"].isin(simtime)
  544. ]
  545. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  546. ego_car["dist"] = np.sqrt(
  547. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  548. ** 2
  549. + (
  550. ego_car["posY"].values
  551. - sub_pedestrian_on_the_road["posY"].values
  552. )
  553. ** 2
  554. )
  555. ego_car["speed"] = np.sqrt(
  556. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  557. )
  558. if any(ego_car["speed"].tolist()) != 0:
  559. self.aviod_pedestrian_when_turning_count += 1
  560. def calculate_slow_down_in_crosswalk_count(self):
  561. self.slow_down_in_crosswalk_detector()
  562. return self.slow_down_in_crosswalk_count
  563. def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
  564. self.avoid_pedestrian_in_crosswalk_detector()
  565. return self.avoid_pedestrian_in_crosswalk_count
  566. def calculate_avoid_pedestrian_in_the_road_count(self):
  567. self.avoid_pedestrian_in_the_road_detector()
  568. return self.avoid_pedestrian_in_the_road_count
  569. def calculate_avoid_pedestrian_when_turning_count(self):
  570. self.aviod_pedestrian_when_turning_detector()
  571. return self.aviod_pedestrian_when_turning_count
  572. def turn_in_forbiden_turn_left_sign(data_processed):
  573. turnaroundviolation = TurnaroundViolation(data_processed)
  574. turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
  575. return turn_in_forbiden_turn_left_sign_count
  576. def turn_in_forbiden_turn_back_sign(data_processed):
  577. turnaroundviolation = TurnaroundViolation(data_processed)
  578. turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
  579. return turn_in_forbiden_turn_back_sign_count
  580. def avoid_pedestrian_when_turn_back(data_processed):
  581. turnaroundviolation = TurnaroundViolation(data_processed)
  582. avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
  583. return avoid_pedestrian_when_turn_back_count
  584. class TurnaroundViolation(object):
  585. def __init__(self, df_data):
  586. print("掉头违规类初始化中...")
  587. self.traffic_violations_type = "掉头违规类"
  588. self.data = df_data.obj_data[1]
  589. self.ego_data = (
  590. self.data[TURNAROUND_INFO].copy().reset_index(drop=True)
  591. ) # Copy to avoid modifying the original DataFrame
  592. self.pedestrian_data = pd.DataFrame()
  593. self.object_items = set(df_data.object_df.type.tolist())
  594. if 13 in self.object_items: # 行人的type是13
  595. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  596. self.pedestrian_data = (
  597. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  598. )
  599. self.turning_in_forbiden_turn_back_sign_count = 0
  600. self.turning_in_forbiden_turn_left_sign_count = 0
  601. self.avoid_pedestrian_when_turn_back_count = 0
  602. def pedestrian_in_front_of_car(self):
  603. if len(self.pedestrian_data) == 0:
  604. return []
  605. else:
  606. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  607. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  608. self.ego_data["dist"] = np.sqrt(
  609. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  610. )
  611. self.ego_data["rela_pos"] = (
  612. self.ego_data["dx"] * self.ego_data["speedX"]
  613. + self.ego_data["dy"] * self.ego_data["speedY"]
  614. )
  615. simtime = self.ego_data[
  616. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  617. ]["simTime"].tolist()
  618. return simtime
  619. def different_road_area_simtime(self, df, threshold=0.5):
  620. if not df:
  621. return []
  622. simtime_group = []
  623. current_simtime_group = [df[0]]
  624. for i in range(1, len(df)):
  625. if abs(df[i] - df[i - 1]) <= threshold:
  626. current_simtime_group.append(df[i])
  627. else:
  628. simtime_group.append(current_simtime_group)
  629. current_simtime_group = [df[i]]
  630. simtime_group.append(current_simtime_group)
  631. return simtime_group
  632. def turn_back_in_forbiden_sign_detector(self):
  633. """
  634. 禁止掉头type = 8
  635. """
  636. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  637. "simTime"
  638. ].tolist()
  639. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  640. "simTime"
  641. ].tolist()
  642. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  643. forbiden_turn_back_simTime
  644. )
  645. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  646. forbiden_turn_left_simTime
  647. )
  648. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  649. ego_car1 = self.ego_data.loc[
  650. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  651. ]
  652. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  653. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  654. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  655. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  656. if (
  657. ego_end_speedx1 * ego_start_speedx1
  658. + ego_end_speedy1 * ego_start_speedy1
  659. < 0
  660. ):
  661. self.turning_in_forbiden_turn_back_sign_count += 1
  662. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  663. ego_car2 = self.ego_data.loc[
  664. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  665. ]
  666. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  667. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  668. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  669. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  670. if (
  671. ego_end_speedx2 * ego_start_speedx2
  672. + ego_end_speedy2 * ego_start_speedy2
  673. < 0
  674. ):
  675. self.turning_in_forbiden_turn_left_sign_count += 1
  676. def avoid_pedestrian_when_turn_back_detector(self):
  677. sensor_on_intersection = self.pedestrian_in_front_of_car()
  678. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  679. self.ego_data["lane_type"] == 20
  680. ]["simTime"].tolist()
  681. avoid_pedestrian_when_turn_back_simTime_devide = (
  682. self.different_road_area_simtime(
  683. avoid_pedestrian_when_turn_back_simTime_list
  684. )
  685. )
  686. if len(sensor_on_intersection) > 0:
  687. for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
  688. pedestrian_in_intersection_simtime = self.pedestrian_data[
  689. self.pedestrian_data["simTime"].isin(
  690. avoid_pedestrian_when_turn_back_simtime
  691. )
  692. ].tolist()
  693. ego_df = self.ego_data[
  694. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  695. ].reset_index(drop=True)
  696. pedestrian_df = self.pedestrian_data[
  697. self.pedestrian_data["simTime"].isin(
  698. pedestrian_in_intersection_simtime
  699. )
  700. ].reset_index(drop=True)
  701. ego_df["dist"] = np.sqrt(
  702. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  703. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  704. )
  705. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  706. if any(ego_df["speed"].tolist()) != 0:
  707. self.avoid_pedestrian_when_turn_back_count += 1
  708. def calculate_turn_in_forbiden_turn_left_sign_count(self):
  709. self.turn_back_in_forbiden_sign_detector()
  710. return self.turning_in_forbiden_turn_left_sign_count
  711. def calculate_turn_in_forbiden_turn_back_sign_count(self):
  712. self.turn_back_in_forbiden_sign_detector()
  713. return self.turning_in_forbiden_turn_back_sign_count
  714. def calaulate_avoid_pedestrian_when_turn_back_count(self):
  715. self.avoid_pedestrian_when_turn_back_detector()
  716. return self.avoid_pedestrian_when_turn_back_count
  717. def urbanExpresswayOrHighwayDrivingLaneStopped(data_processed):
  718. wrongwayviolation = WrongWayViolation(data_processed)
  719. urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  720. return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
  721. def urbanExpresswayOrHighwayEmergencyLaneStopped(data_processed):
  722. wrongwayviolation = WrongWayViolation(data_processed)
  723. urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  724. return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
  725. def urbanExpresswayEmergencyLaneDriving(data_processed):
  726. wrongwayviolation = WrongWayViolation(data_processed)
  727. urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
  728. return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
  729. class WrongWayViolation:
  730. """停车违规类"""
  731. def __init__(self, df_data):
  732. print("停车违规类初始化中...")
  733. self.traffic_violations_type = "停车违规类"
  734. self.data = df_data.obj_data[1]
  735. # 初始化违规统计
  736. self.violation_count = {
  737. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  738. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  739. "urbanExpresswayEmergencyLaneDriving": 0,
  740. }
  741. def process_violations(self):
  742. """处理停车或者紧急车道行驶违规数据"""
  743. # 提取有效道路类型
  744. urban_expressway_or_highway = {1, 2}
  745. driving_lane = {1, 4, 5, 6}
  746. emergency_lane = {12}
  747. self.data["v"] *= 3.6 # 转换速度
  748. # 使用向量化和条件判断进行违规判定
  749. conditions = [
  750. (
  751. self.data["road_fc"].isin(urban_expressway_or_highway)
  752. & self.data["lane_type"].isin(driving_lane)
  753. & (self.data["v"] == 0)
  754. ),
  755. (
  756. self.data["road_fc"].isin(urban_expressway_or_highway)
  757. & self.data["lane_type"].isin(emergency_lane)
  758. & (self.data["v"] == 0)
  759. ),
  760. (
  761. self.data["road_fc"].isin(urban_expressway_or_highway)
  762. & self.data["lane_type"].isin(emergency_lane)
  763. & (self.data["v"] != 0)
  764. ),
  765. ]
  766. violation_types = [
  767. "urbanExpresswayOrHighwayDrivingLaneStopped",
  768. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  769. "urbanExpresswayEmergencyLaneDriving",
  770. ]
  771. # 设置违规类型
  772. self.data["violation_type"] = None
  773. for condition, violation_type in zip(conditions, violation_types):
  774. self.data.loc[condition, "violation_type"] = violation_type
  775. # 统计违规情况
  776. self.violation_count = (
  777. self.data["violation_type"]
  778. .value_counts()
  779. .reindex(violation_types, fill_value=0)
  780. .to_dict()
  781. )
  782. def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
  783. self.process_violations()
  784. return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
  785. def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
  786. self.process_violations()
  787. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  788. def calculate_urbanExpresswayEmergencyLaneDriving(self):
  789. self.process_violations()
  790. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  791. def urbanExpresswayOrHighwaySpeedOverLimit50(data_processed):
  792. speedingviolation = SpeedingViolation(data_processed)
  793. urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
  794. return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
  795. def urbanExpresswayOrHighwaySpeedOverLimit20to50(data_processed):
  796. speedingviolation = SpeedingViolation(data_processed)
  797. urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
  798. return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
  799. def urbanExpresswayOrHighwaySpeedOverLimit0to20(data_processed):
  800. speedingviolation = SpeedingViolation(data_processed)
  801. urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
  802. return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
  803. def urbanExpresswayOrHighwaySpeedUnderLimit(data_processed):
  804. speedingviolation = SpeedingViolation(data_processed)
  805. urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
  806. return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
  807. def generalRoadSpeedOverLimit50(data_processed):
  808. speedingviolation = SpeedingViolation(data_processed)
  809. generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
  810. return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
  811. def generalRoadSpeedOverLimit20to50(data_processed):
  812. speedingviolation = SpeedingViolation(data_processed)
  813. generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
  814. return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
  815. class SpeedingViolation(object):
  816. """超速违规类"""
  817. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  818. def __init__(self, df_data):
  819. print("超速违规类初始化中...")
  820. self.traffic_violations_type = "超速违规类"
  821. self.data = df_data.obj_data[
  822. 1
  823. ] # Copy to avoid modifying the original DataFrame
  824. # 初始化违规统计
  825. self.violation_counts = {
  826. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  827. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  828. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  829. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  830. "generalRoadSpeedOverLimit50": 0,
  831. "generalRoadSpeedOverLimit20to50": 0,
  832. }
  833. def process_violations(self):
  834. """处理数据帧,检查超速和其他违规行为"""
  835. # 提取有效道路类型
  836. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  837. general_road = {3} # 直接创建包含一个元素的集合
  838. self.data["v"] *= 3.6 # 转换速度
  839. # 违规判定
  840. conditions = [
  841. (
  842. self.data["road_fc"].isin(urban_expressway_or_highway)
  843. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  844. ),
  845. (
  846. self.data["road_fc"].isin(urban_expressway_or_highway)
  847. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  848. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  849. ),
  850. (
  851. self.data["road_fc"].isin(urban_expressway_or_highway)
  852. & (self.data["v"] > self.data["road_speed_max"])
  853. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  854. ),
  855. (
  856. self.data["road_fc"].isin(urban_expressway_or_highway)
  857. & (self.data["v"] < self.data["road_speed_min"])
  858. ),
  859. (
  860. self.data["road_fc"].isin(general_road)
  861. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  862. ),
  863. (
  864. self.data["road_fc"].isin(general_road)
  865. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  866. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  867. ),
  868. ]
  869. violation_types = [
  870. "urbanExpresswayOrHighwaySpeedOverLimit50",
  871. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  872. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  873. "urbanExpresswayOrHighwaySpeedUnderLimit",
  874. "generalRoadSpeedOverLimit50",
  875. "generalRoadSpeedOverLimit20to50",
  876. ]
  877. # 设置违规类型
  878. self.data["violation_type"] = None
  879. for condition, violation_type in zip(conditions, violation_types):
  880. self.data.loc[condition, "violation_type"] = violation_type
  881. # 统计各类违规情况
  882. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  883. def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
  884. self.process_violations()
  885. return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
  886. "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
  887. def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
  888. self.process_violations()
  889. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
  890. "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
  891. def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
  892. self.process_violations()
  893. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
  894. "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
  895. def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
  896. self.process_violations()
  897. return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
  898. "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
  899. def calculate_generalRoadSpeedOverLimit50(self):
  900. self.process_violations()
  901. return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
  902. "generalRoadSpeedOverLimit50") else 0
  903. def calculate_generalRoadSpeedOverLimit20to50_count(self):
  904. self.process_violations()
  905. return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
  906. "generalRoadSpeedOverLimit20to50") else 0
  907. def trafficSignalViolation(data_processed):
  908. trafficlightviolation = TrafficLightViolation(data_processed)
  909. trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
  910. return {"trafficSignalViolation": trafficSignalViolation_count}
  911. def illegalDrivingOrParkingAtCrossroads(data_processed):
  912. trafficlightviolation = TrafficLightViolation(data_processed)
  913. illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
  914. return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
  915. class TrafficLightViolation(object):
  916. """违反交通灯类"""
  917. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  918. def __init__(self, df_data):
  919. """初始化方法"""
  920. self.traffic_violations_type = "违反交通灯类"
  921. print("违反交通灯类 类初始化中...")
  922. self.config = df_data.vehicle_config
  923. self.data_ego = df_data.ego_data # 获取数据
  924. self.violation_counts = {
  925. "trafficSignalViolation": 0,
  926. "illegalDrivingOrParkingAtCrossroads": 0,
  927. }
  928. # 处理数据并判定违规
  929. self.process_violations()
  930. def is_point_cross_line(self, point, stop_line_points):
  931. """
  932. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  933. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  934. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  935. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  936. :return: True 如果车辆跨越了停止线,否则 False
  937. """
  938. line_vector = np.array(
  939. [
  940. stop_line_points[1][0] - stop_line_points[0][0],
  941. stop_line_points[1][1] - stop_line_points[0][1],
  942. ]
  943. )
  944. point_vector = np.array(
  945. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  946. )
  947. cross_product = np.cross(line_vector, point_vector)
  948. if cross_product != 0:
  949. return False
  950. mid_point = (
  951. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  952. + 0.5 * line_vector
  953. )
  954. axletree_to_mid_vector = np.array(
  955. [point[0] - mid_point[0], point[1] - mid_point[1]]
  956. )
  957. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  958. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  959. norm_direction = np.linalg.norm(direction_vector)
  960. if norm_axletree_to_mid == 0 or norm_direction == 0:
  961. return False
  962. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  963. norm_axletree_to_mid * norm_direction
  964. )
  965. angle_theta = math.degrees(math.acos(cos_theta))
  966. return angle_theta <= 90
  967. def _filter_data(self):
  968. """过滤数据,筛选出需要分析的记录"""
  969. return self.data_ego[
  970. (self.data_ego["stopline_id"] != -1)
  971. & (self.data_ego["stopline_type"] == 1)
  972. & (self.data_ego["trafficlight_id"] != -1)
  973. ]
  974. def _group_data(self, filtered_data):
  975. """按时间差对数据进行分组"""
  976. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  977. threshold = 0.5
  978. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  979. return filtered_data.groupby("group")
  980. def _analyze_group(self, group_data):
  981. """分析单个分组的数据,判断是否闯红灯"""
  982. photos = []
  983. stop_in_intersection = False
  984. for _, row in group_data.iterrows():
  985. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  986. stop_line_points = [
  987. [row["stopline_x1"], row["stopline_y1"]],
  988. [row["stopline_x2"], row["stopline_y2"]],
  989. ]
  990. traffic_light_status = row["traffic_light_status"]
  991. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  992. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  993. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  994. # config = yaml.load(f, Loader=yaml.FullLoader)
  995. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  996. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  997. dist = math.sqrt(
  998. (row["posX"] - row["traffic_light_x"]) ** 2
  999. + (row["posY"] - row["traffic_light_y"]) ** 2
  1000. )
  1001. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  1002. has_crossed_line_front = (
  1003. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  1004. and traffic_light_status == 1
  1005. )
  1006. has_crossed_line_rear = (
  1007. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  1008. and row["v"] > 0
  1009. and traffic_light_status == 1
  1010. )
  1011. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  1012. has_passed_intersection = has_crossed_line_front and dist < 1.0
  1013. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  1014. photos.extend(
  1015. [
  1016. has_crossed_line_front,
  1017. has_crossed_line_rear,
  1018. has_passed_intersection,
  1019. has_stop_in_intersection,
  1020. ]
  1021. )
  1022. stop_in_intersection = has_passed_intersection
  1023. return photos, stop_in_intersection
  1024. def is_vehicle_run_a_red_light(self):
  1025. """判断车辆是否闯红灯"""
  1026. filtered_data = self._filter_data()
  1027. grouped_data = self._group_data(filtered_data)
  1028. self.photos_group = []
  1029. self.stop_in_intersections = []
  1030. for _, group_data in grouped_data:
  1031. photos, stop_in_intersection = self._analyze_group(group_data)
  1032. self.photos_group.append(photos)
  1033. self.stop_in_intersections.append(stop_in_intersection)
  1034. def process_violations(self):
  1035. """处理数据并判定违规"""
  1036. self.is_vehicle_run_a_red_light()
  1037. count_1 = sum(all(photos) for photos in self.photos_group)
  1038. count_2 = sum(
  1039. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  1040. )
  1041. self.violation_counts["trafficSignalViolation"] = count_1
  1042. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  1043. def calculate_trafficSignalViolation_count(self):
  1044. self.process_violations()
  1045. return self.violation_counts["trafficSignalViolation"]
  1046. def calculate_illegalDrivingOrParkingAtCrossroads(self):
  1047. self.process_violations()
  1048. return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
  1049. def generalRoadIrregularLaneUse(data_processed):
  1050. warningviolation = WarningViolation(data_processed)
  1051. generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
  1052. return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
  1053. def urbanExpresswayOrHighwayRideLaneDivider(data_processed):
  1054. warningviolation = WarningViolation(data_processed)
  1055. urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider()
  1056. return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
  1057. class WarningViolation(object):
  1058. """警告性违规类"""
  1059. def __init__(self, df_data):
  1060. self.traffic_violations_type = "警告性违规类"
  1061. print("警告性违规类 类初始化中...")
  1062. self.config = df_data.vehicle_config
  1063. self.data_ego = df_data.obj_data[1]
  1064. self.data = self.data_ego.copy() # 避免修改原始 DataFrame
  1065. self.violation_counts = {
  1066. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1067. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1068. }
  1069. def process_violations(self):
  1070. general_road = {3} # 普通道路
  1071. lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
  1072. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1073. # config = yaml.load(f, Loader=yaml.FullLoader)
  1074. car_width = self.config["CAR_WIDTH"]
  1075. lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
  1076. # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1077. # 使用布尔索引来筛选满足条件的行
  1078. condition = (self.data["road_fc"].isin(general_road)) & (
  1079. self.data["lane_type"].isin(lane_type)
  1080. )
  1081. # 创建一个新的列,并根据条件设置值
  1082. self.data["is_violation"] = condition
  1083. # 统计满足条件的连续时间段
  1084. violation_segments = self.count_continuous_violations(
  1085. self.data["is_violation"], self.data["simTime"]
  1086. )
  1087. # 更新骑行车道线违规计数
  1088. self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
  1089. # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1090. # 计算阈值
  1091. threshold = (lane_width - car_width) / 2
  1092. # 找到满足条件的行
  1093. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1094. # 统计满足条件的连续时间段
  1095. violation_segments = self.count_continuous_violations(
  1096. self.data["is_violation"], self.data["simTime"]
  1097. )
  1098. # 更新骑行车道线违规计数
  1099. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
  1100. violation_segments
  1101. )
  1102. def count_continuous_violations(self, violation_series, time_series):
  1103. """统计连续违规的时间段数量"""
  1104. continuous_segments = []
  1105. current_segment = []
  1106. for is_violation, time in zip(violation_series, time_series):
  1107. if is_violation:
  1108. if not current_segment: # 新的连续段开始
  1109. current_segment.append(time)
  1110. else:
  1111. if current_segment: # 连续段结束
  1112. continuous_segments.append(current_segment)
  1113. current_segment = []
  1114. # 检查是否有一个未结束的连续段在最后
  1115. if current_segment:
  1116. continuous_segments.append(current_segment)
  1117. return continuous_segments
  1118. def calculate_generalRoadIrregularLaneUse_count(self):
  1119. self.process_violations()
  1120. return self.violation_counts["generalRoadIrregularLaneUse"]
  1121. def calculate_urbanExpresswayOrHighwayRideLaneDivider(self):
  1122. self.process_violations()
  1123. return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
  1124. class TrafficSignViolation(object):
  1125. """交通标志违规类"""
  1126. def __init__(self, df_data):
  1127. self.traffic_violations_type = "交通标志违规类"
  1128. print("交通标志违规类 类初始化中...")
  1129. self.data_ego = df_data.obj_data[1]
  1130. self.ego_data = (
  1131. self.data_ego[TRFFICSIGN_INFO].copy().reset_index(drop=True)
  1132. )
  1133. self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
  1134. self.violation_counts = {
  1135. "NoStraightThrough": 0, # 禁止直行标志地方直行
  1136. "SpeedLimitViolation": 0, # 违反限速规定
  1137. "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
  1138. }
  1139. # def checkForProhibitionViolation(self):
  1140. # """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1141. # # 筛选出sign_type1为7(禁止直行)
  1142. # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
  1143. # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1144. def checkForProhibitionViolation(self):
  1145. """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1146. # 筛选出 sign_type1 为7(禁止直行)的数据
  1147. violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
  1148. # 判断车辆是否在禁止直行路段直行
  1149. if not violation_straight_df.empty:
  1150. # 按时间戳排序(假设数据按时间顺序处理)
  1151. violation_straight_df = violation_straight_df.sort_values('simTime')
  1152. # 计算航向角变化(前后时间点的差值绝对值)
  1153. violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
  1154. # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
  1155. threshold = 5 # 单位:度(根据场景调整)
  1156. mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
  1157. straight_violations = violation_straight_df[mask]
  1158. # 统计违规次数或记录违规数据
  1159. self.violation_counts["prohibition_straight"] = len(straight_violations)
  1160. # 限制速度判断(原代码)
  1161. violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1162. if violation_speed_limit_df.empty:
  1163. mask = self.data_ego["v"] > self.data_ego["sign_speed"]
  1164. self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
  1165. def checkForInstructionViolation(self):
  1166. """限速标志属于指示性标志:13:最低限速"""
  1167. violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
  1168. if violation_minimum_speed_limit_df.empty:
  1169. mask = self.data_ego["v"] < self.data_ego["sign_speed"]
  1170. self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
  1171. def statistic(self):
  1172. self.checkForProhibitionViolation()
  1173. self.checkForInstructionViolation()
  1174. # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
  1175. return self.violation_counts
  1176. class TrafficRegistry:
  1177. """舒适性指标注册器"""
  1178. def __init__(self, data_processed):
  1179. self.logger = LogManager().get_logger() # 获取全局日志实例
  1180. self.data = data_processed
  1181. self.traffic_config = data_processed.traffic_config["traffic"]
  1182. self.metrics = self._extract_metrics(self.traffic_config)
  1183. self._registry = self._build_registry()
  1184. def _extract_metrics(self, config_node: dict) -> list:
  1185. """DFS遍历提取指标"""
  1186. metrics = []
  1187. def _recurse(node):
  1188. if isinstance(node, dict):
  1189. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  1190. metrics.append(node['name'])
  1191. for v in node.values():
  1192. _recurse(v)
  1193. _recurse(config_node)
  1194. self.logger.info(f'评比的合规性指标列表:{metrics}')
  1195. return metrics
  1196. def _build_registry(self) -> dict:
  1197. """自动注册指标函数"""
  1198. registry = {}
  1199. for metric_name in self.metrics:
  1200. try:
  1201. registry[metric_name] = globals()[metric_name]
  1202. except KeyError:
  1203. self.logger.error(f"未实现指标函数: {metric_name}")
  1204. return registry
  1205. def batch_execute(self) -> dict:
  1206. """批量执行指标计算"""
  1207. results = {}
  1208. for name, func in self._registry.items():
  1209. try:
  1210. result = func(self.data)
  1211. results.update(result)
  1212. except Exception as e:
  1213. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  1214. results[name] = None
  1215. self.logger.info(f'合规性指标计算结果:{results}')
  1216. return results
  1217. class TrafficManager:
  1218. """合规性指标计算主类"""
  1219. def __init__(self, data_processed):
  1220. self.data = data_processed
  1221. self.logger = LogManager().get_logger()
  1222. self.registry = TrafficRegistry(self.data)
  1223. def report_statistic(self):
  1224. """生成合规性评分报告"""
  1225. traffic_result = self.registry.batch_execute()
  1226. return traffic_result
  1227. '''
  1228. class ViolationManager:
  1229. """违规管理类,用于管理所有违规行为"""
  1230. def __init__(self, data_processed):
  1231. self.violations = []
  1232. self.data = data_processed
  1233. self.config = data_processed.traffic_config
  1234. self.over_take_violation = OvertakingViolation(self.data)
  1235. self.slow_down_violation = SlowdownViolation(self.data)
  1236. self.wrong_way_violation = WrongWayViolation(self.data)
  1237. self.speeding_violation = SpeedingViolation(self.data)
  1238. self.traffic_light_violation = TrafficLightViolation(self.data)
  1239. self.warning_violation = WarningViolation(self.data)
  1240. # self.report_statistic()
  1241. def report_statistic(self):
  1242. traffic_result = self.over_take_violation.statistic()
  1243. traffic_result.update(self.slow_down_violation.statistic())
  1244. traffic_result.update(self.traffic_light_violation.statistic())
  1245. traffic_result.update(self.wrong_way_violation.statistic())
  1246. traffic_result.update(self.speeding_violation.statistic())
  1247. traffic_result.update(self.warning_violation.statistic())
  1248. # evaluator = Score(self.config)
  1249. # result = evaluator.evaluate(traffic_result)
  1250. # print("\n[交规类表现及得分情况]")
  1251. # # self.logger.info(f"Traffic Result:{traffic_result}")
  1252. # return result
  1253. return traffic_result
  1254. '''
  1255. # 示例使用
  1256. if __name__ == "__main__":
  1257. case_name = 'D:\Cicv\招远\V2V_CSAE53-2020_ForwardCollision_LST_02-03'
  1258. mode_label = 'D:\Cicv\招远\zhaoyuan0410\config\metrics_config.yaml'
  1259. data = data_process.DataPreprocessing(case_name, mode_label)
  1260. traffic_instance = TrafficManager(data)
  1261. try:
  1262. traffic_result = traffic_instance.report_statistic()
  1263. result = {'traffic': traffic_result}
  1264. print(result)
  1265. except Exception as e:
  1266. print(f"An error occurred in Traffict.report_statistic: {e}")