traffic.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. import math
  2. import numpy as np
  3. import pandas as pd
  4. from modules.lib import log_manager
  5. from modules.lib.score import Score
  6. from modules.config import config
  7. class OvertakingViolation(object):
  8. """超车违规类"""
  9. def __init__(self, df_data):
  10. print("超车违规类初始化中...")
  11. self.traffic_violations_type = "超车违规类"
  12. # self.logger = log.get_logger() # 使用时再初始化
  13. self.data = df_data.ego_data
  14. self.ego_data = (
  15. self.data[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  16. ) # Copy to avoid modifying the original DataFrame
  17. header = self.ego_data.columns
  18. if 2 in df_data.obj_id_list:
  19. self.data_obj = df_data.obj_data[2]
  20. self.obj_data = (
  21. self.data_obj[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  22. ) # Copy to avoid modifying the original DataFrame
  23. else:
  24. self.obj_data = pd.DataFrame(columns=header)
  25. if 3 in df_data.obj_id_list:
  26. self.other_obj_data1 = df_data.obj_data[3]
  27. self.other_obj_data = (
  28. self.other_obj_data1[config.OVERTAKE_INFO].copy().reset_index(drop=True)
  29. )
  30. else:
  31. self.other_obj_data = pd.DataFrame(columns=header)
  32. self.overtake_on_right_count = 0
  33. self.overtake_when_turn_around_count = 0
  34. self.overtake_when_passing_car_count = 0
  35. self.overtake_in_forbid_lane_count = 0
  36. self.overtake_in_ramp_count = 0
  37. self.overtake_in_tunnel_count = 0
  38. self.overtake_on_accelerate_lane_count = 0
  39. self.overtake_on_decelerate_lane_count = 0
  40. self.overtake_in_different_senerios_count = 0
  41. def different_road_area_simtime(self, df, threshold=0.5):
  42. if not df:
  43. return []
  44. simtime_group = []
  45. current_simtime_group = [df[0]]
  46. for i in range(1, len(df)):
  47. if abs(df[i] - df[i - 1]) <= threshold:
  48. current_simtime_group.append(df[i])
  49. else:
  50. simtime_group.append(current_simtime_group)
  51. current_simtime_group = [df[i]]
  52. simtime_group.append(current_simtime_group)
  53. return simtime_group
  54. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  55. lane_start = lane_id[0]
  56. lane_end = lane_id[-1]
  57. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  58. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  59. return lane_start == lane_end and start_condition and end_condition
  60. def _is_dxy_of_car(self, ego_df, obj_df):
  61. """
  62. :param df: objstate.csv and so on
  63. :param string_type: posX/Y or speedX/Y and so on
  64. :return: dataframe of dx/y and so on
  65. """
  66. car_dx = obj_df["posX"].values - ego_df["posX"].values
  67. car_dy = obj_df["posY"].values - ego_df["posY"].values
  68. return car_dx, car_dy
  69. # 在前车右侧超车、会车时超车、前车掉头时超车
  70. def illegal_overtake_with_car(self, window_width=250):
  71. # 获取csv文件中最短的帧数
  72. frame_id_length = len(self.ego_data["simFrame"])
  73. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  74. while (start_frame_id + window_width) < frame_id_length:
  75. # if start_frame_id == 828:
  76. # print("end")
  77. simframe_window1 = list(
  78. np.arange(start_frame_id, start_frame_id + window_width)
  79. )
  80. simframe_window = list(map(int, simframe_window1))
  81. # 读取滑动窗口的dataframe数据
  82. ego_data_frames = self.ego_data[
  83. self.ego_data["simFrame"].isin(simframe_window)
  84. ]
  85. obj_data_frames = self.obj_data[
  86. self.obj_data["simFrame"].isin(simframe_window)
  87. ]
  88. other_data_frames = self.other_obj_data[
  89. self.other_obj_data["simFrame"].isin(simframe_window)
  90. ]
  91. # 读取前后的laneId
  92. lane_id = ego_data_frames["lane_id"].tolist()
  93. # 读取前后方向盘转角steeringWheel,
  94. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  95. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  96. # 读取车辆前后的位置信息
  97. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  98. ego_speedx = ego_data_frames["speedX"].tolist()
  99. ego_speedy = ego_data_frames["speedY"].tolist()
  100. obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
  101. "speedX"
  102. ].tolist()
  103. obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
  104. "speedY"
  105. ].tolist()
  106. if len(other_data_frames) > 0:
  107. other_start_speedx = other_data_frames["speedX"].iloc[0]
  108. other_start_speedy = other_data_frames["speedY"].iloc[0]
  109. if (
  110. ego_speedx[0] * other_start_speedx
  111. + ego_speedy[0] * other_start_speedy
  112. < 0
  113. ):
  114. self.overtake_when_passing_car_count += self._is_overtake(
  115. lane_id, dx, dy, ego_speedx, ego_speedy
  116. )
  117. start_frame_id += window_width
  118. """
  119. 如果滑动窗口开始和最后的laneid一致;
  120. 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
  121. 自车和前车的位置发生的交换;
  122. 则认为右超车
  123. """
  124. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  125. self.overtake_on_right_count += self._is_overtake(
  126. lane_id, dx, dy, ego_speedx, ego_speedy
  127. )
  128. start_frame_id += window_width
  129. elif (len(ego_speedx)*len(obj_speedx) > 0) and (ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0):
  130. self.overtake_when_turn_around_count += self._is_overtake(
  131. lane_id, dx, dy, ego_speedx, ego_speedy
  132. )
  133. start_frame_id += window_width
  134. else:
  135. start_frame_id += 1
  136. # print(
  137. # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
  138. # 借道超车场景
  139. def overtake_in_forbid_lane(self):
  140. simTime = self.obj_data["simTime"].tolist()
  141. simtime_devide = self.different_road_area_simtime(simTime)
  142. if len(simtime_devide) == 0:
  143. self.overtake_in_forbid_lane_count += 0
  144. return
  145. else:
  146. for simtime in simtime_devide:
  147. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  148. try:
  149. lane_type = lane_overtake["lane_type"].tolist()
  150. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  151. 50002 not in lane_type and len(set(lane_type)) > 1
  152. ):
  153. self.overtake_in_forbid_lane_count += 1
  154. except Exception as e:
  155. print("数据缺少lane_type信息")
  156. # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
  157. # 在匝道超车
  158. def overtake_in_ramp_area(self):
  159. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  160. "simTime"
  161. ].tolist()
  162. if len(ramp_simtime_list) == 0:
  163. self.overtake_in_ramp_count += 0
  164. return
  165. else:
  166. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  167. for ramp_simtime in ramp_simTime_list:
  168. lane_id = self.ego_data["lane_id"].tolist()
  169. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  170. objstate_in_ramp = self.obj_data[
  171. self.obj_data["simTime"].isin(ramp_simtime)
  172. ]
  173. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  174. ego_speedx = ego_in_ramp["speedX"].tolist()
  175. ego_speedy = ego_in_ramp["speedY"].tolist()
  176. if len(lane_id) > 0:
  177. self.overtake_in_ramp_count += self._is_overtake(
  178. lane_id, dx, dy, ego_speedx, ego_speedy
  179. )
  180. else:
  181. continue
  182. # print(f"在匝道超车{self.overtake_in_ramp_count}次")
  183. def overtake_in_tunnel_area(self):
  184. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  185. "simTime"
  186. ].tolist()
  187. if len(tunnel_simtime_list) == 0:
  188. self.overtake_in_tunnel_count += 0
  189. return
  190. else:
  191. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  192. for tunnel_simtime in tunnel_simTime_list:
  193. lane_id = self.ego_data["lane_id"].tolist()
  194. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  195. objstate_in_tunnel = self.obj_data[
  196. self.obj_data["simTime"].isin(tunnel_simtime)
  197. ]
  198. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  199. ego_speedx = ego_in_tunnel["speedX"].tolist()
  200. ego_speedy = ego_in_tunnel["speedY"].tolist()
  201. if len(lane_id) > 0:
  202. self.overtake_in_tunnel_count += self._is_overtake(
  203. lane_id, dx, dy, ego_speedx, ego_speedy
  204. )
  205. else:
  206. continue
  207. # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
  208. # 加速车道超车
  209. def overtake_on_accelerate_lane(self):
  210. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  211. "simTime"
  212. ].tolist()
  213. if len(accelerate_simtime_list) == 0:
  214. self.overtake_on_accelerate_lane_count += 0
  215. return
  216. else:
  217. accelerate_simTime_list = self.different_road_area_simtime(
  218. accelerate_simtime_list
  219. )
  220. for accelerate_simtime in accelerate_simTime_list:
  221. lane_id = self.ego_data["lane_id"].tolist()
  222. ego_in_accelerate = self.ego_data[
  223. self.ego_data["simTime"].isin(accelerate_simtime)
  224. ]
  225. objstate_in_accelerate = self.obj_data[
  226. self.obj_data["simTime"].isin(accelerate_simtime)
  227. ]
  228. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  229. ego_speedx = ego_in_accelerate["speedX"].tolist()
  230. ego_speedy = ego_in_accelerate["speedY"].tolist()
  231. self.overtake_on_accelerate_lane_count += self._is_overtake(
  232. lane_id, dx, dy, ego_speedx, ego_speedy
  233. )
  234. # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
  235. # 减速车道超车
  236. def overtake_on_decelerate_lane(self):
  237. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  238. "simTime"
  239. ].tolist()
  240. if len(decelerate_simtime_list) == 0:
  241. self.overtake_on_decelerate_lane_count += 0
  242. return
  243. else:
  244. decelerate_simTime_list = self.different_road_area_simtime(
  245. decelerate_simtime_list
  246. )
  247. for decelerate_simtime in decelerate_simTime_list:
  248. lane_id = self.ego_data["id"].tolist()
  249. ego_in_decelerate = self.ego_data[
  250. self.ego_data["simTime"].isin(decelerate_simtime)
  251. ]
  252. objstate_in_decelerate = self.obj_data[
  253. self.obj_data["simTime"].isin(decelerate_simtime)
  254. ]
  255. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  256. ego_speedx = ego_in_decelerate["speedX"].tolist()
  257. ego_speedy = ego_in_decelerate["speedY"].tolist()
  258. self.overtake_on_decelerate_lane_count += self._is_overtake(
  259. lane_id, dx, dy, ego_speedx, ego_speedy
  260. )
  261. # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
  262. # 在交叉路口
  263. def overtake_in_different_senerios(self):
  264. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  265. "simTime"
  266. ].tolist() # 判断是路口或者隧道区域
  267. if len(crossroad_simTime) == 0:
  268. self.overtake_in_different_senerios_count += 0
  269. return
  270. else:
  271. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  272. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  273. crossroad_objstate = self.obj_data[
  274. self.obj_data["simTime"].isin(crossroad_simTime)
  275. ]
  276. # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
  277. # 读取前后的laneId
  278. lane_id = crossroad_ego["lane_id"].tolist()
  279. # 读取车辆前后的位置信息
  280. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  281. ego_speedx = crossroad_ego["speedX"].tolist()
  282. ego_speedy = crossroad_ego["speedY"].tolist()
  283. """
  284. 如果滑动窗口开始和最后的laneid一致;
  285. 自车和前车的位置发生的交换;
  286. 则认为发生超车
  287. """
  288. if len(lane_id) > 0:
  289. self.overtake_in_different_senerios_count += self._is_overtake(
  290. lane_id, dx, dy, ego_speedx, ego_speedy
  291. )
  292. else:
  293. pass
  294. # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
  295. def statistic(self):
  296. if len(self.obj_data) == 0:
  297. pass
  298. else:
  299. self.overtake_in_forbid_lane()
  300. self.overtake_on_decelerate_lane()
  301. self.overtake_on_accelerate_lane()
  302. self.overtake_in_ramp_area()
  303. self.overtake_in_tunnel_area()
  304. self.overtake_in_different_senerios()
  305. self.illegal_overtake_with_car()
  306. self.calculated_value = {
  307. "overtake_on_right": self.overtake_on_right_count,
  308. "overtake_when_turn_around": self.overtake_when_turn_around_count,
  309. "overtake_when_passing_car": self.overtake_when_passing_car_count,
  310. "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
  311. "overtake_in_ramp": self.overtake_in_ramp_count,
  312. "overtake_in_tunnel": self.overtake_in_tunnel_count,
  313. "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
  314. "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
  315. "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
  316. }
  317. # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
  318. return self.calculated_value
  319. class SlowdownViolation(object):
  320. """减速让行违规类"""
  321. def __init__(self, df_data):
  322. print("减速让行违规类-------------------------")
  323. self.traffic_violations_type = "减速让行违规类"
  324. self.object_items = []
  325. self.data = df_data.ego_data
  326. self.ego_data = (
  327. self.data[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  328. ) # Copy to avoid modifying the original DataFrame
  329. self.pedestrian_data = pd.DataFrame()
  330. self.object_items = set(df_data.object_df.type.tolist())
  331. if 13 in self.object_items: # 行人的type是13
  332. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  333. self.pedestrian_data = (
  334. self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  335. )
  336. self.slow_down_in_crosswalk_count = 0
  337. self.avoid_pedestrian_in_crosswalk_count = 0
  338. self.avoid_pedestrian_in_the_road_count = 0
  339. self.aviod_pedestrian_when_turning_count = 0
  340. def pedestrian_in_front_of_car(self):
  341. if len(self.pedestrian_data) == 0:
  342. return []
  343. else:
  344. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  345. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  346. self.ego_data["dist"] = np.sqrt(
  347. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  348. )
  349. self.ego_data["rela_pos"] = (
  350. self.ego_data["dx"] * self.ego_data["speedX"]
  351. + self.ego_data["dy"] * self.ego_data["speedY"]
  352. )
  353. simtime = self.ego_data[
  354. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  355. ]["simTime"].tolist()
  356. return simtime
  357. def different_road_area_simtime(self, df, threshold=0.6):
  358. if not df:
  359. return []
  360. simtime_group = []
  361. current_simtime_group = [df[0]]
  362. for i in range(1, len(df)):
  363. if abs(df[i] - df[i - 1]) <= threshold:
  364. current_simtime_group.append(df[i])
  365. else:
  366. simtime_group.append(current_simtime_group)
  367. current_simtime_group = [df[i]]
  368. simtime_group.append(current_simtime_group)
  369. return simtime_group
  370. def slow_down_in_crosswalk(self):
  371. # 筛选出路口或隧道区域的时间点
  372. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  373. "simTime"
  374. ].tolist()
  375. if len(crosswalk_simTime) == 0:
  376. self.slow_down_in_crosswalk_count += 0
  377. return
  378. else:
  379. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  380. for crosswalk_simtime in crosswalk_simTime_divide:
  381. # 筛选出当前时间段内的数据
  382. # start_time, end_time = crosswalk_simtime
  383. start_time = crosswalk_simtime[0]
  384. end_time = crosswalk_simtime[-1]
  385. print(f"当前时间段:{start_time} - {end_time}")
  386. crosswalk_objstate = self.ego_data[
  387. (self.ego_data["simTime"] >= start_time)
  388. & (self.ego_data["simTime"] <= end_time)
  389. ]
  390. # 计算车辆速度
  391. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  392. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  393. ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2)
  394. # 判断是否超速
  395. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  396. self.slow_down_in_crosswalk_count += 1
  397. # 输出总次数
  398. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  399. def avoid_pedestrian_in_crosswalk(self):
  400. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  401. "simTime"
  402. ].tolist()
  403. if len(crosswalk_simTime) == 0:
  404. self.avoid_pedestrian_in_crosswalk_count += 0
  405. return
  406. else:
  407. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  408. for crosswalk_simtime in crosswalk_simTime_devide:
  409. if not self.pedestrian_data.empty:
  410. crosswalk_objstate = self.pedestrian_data[
  411. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  412. ]
  413. else:
  414. crosswalk_objstate = pd.DataFrame()
  415. if len(crosswalk_objstate) > 0:
  416. pedestrian_simtime = crosswalk_objstate["simTime"]
  417. pedestrian_objstate = crosswalk_objstate[
  418. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  419. ]
  420. ego_speed = np.sqrt(
  421. pedestrian_objstate["speedX"] ** 2
  422. + pedestrian_objstate["speedY"] ** 2
  423. )
  424. if ego_speed.any() > 0:
  425. self.avoid_pedestrian_in_crosswalk_count += 1
  426. def avoid_pedestrian_in_the_road(self):
  427. simtime = self.pedestrian_in_front_of_car()
  428. if len(simtime) == 0:
  429. self.avoid_pedestrian_in_the_road_count += 0
  430. return
  431. else:
  432. pedestrian_on_the_road = self.pedestrian_data[
  433. self.pedestrian_data["simTime"].isin(simtime)
  434. ]
  435. simTime = pedestrian_on_the_road["simTime"].tolist()
  436. simTime_devide = self.different_road_area_simtime(simTime)
  437. if len(simTime_devide) == 0:
  438. pass
  439. else:
  440. for simtime1 in simTime_devide:
  441. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  442. pedestrian_on_the_road["simTime"].isin(simtime1)
  443. ]
  444. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  445. dist = np.sqrt(
  446. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  447. ** 2
  448. + (
  449. ego_car["posY"].values
  450. - sub_pedestrian_on_the_road["posY"].values
  451. )
  452. ** 2
  453. )
  454. speed = np.sqrt(
  455. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  456. )
  457. data = {"dist": dist, "speed": speed}
  458. new_ego_car = pd.DataFrame(data)
  459. new_ego_car = new_ego_car.assign(
  460. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  461. )
  462. if new_ego_car["Column3"].any():
  463. self.avoid_pedestrian_in_the_road_count += 1
  464. def aviod_pedestrian_when_turning(self):
  465. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  466. if len(pedestrian_simtime_list) > 0:
  467. simtime_list = self.ego_data[
  468. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  469. & (self.ego_data["lane_type"] == 20)
  470. ]["simTime"].tolist()
  471. simTime_list = self.different_road_area_simtime(simtime_list)
  472. pedestrian_on_the_road = self.pedestrian_data[
  473. self.pedestrian_data["simTime"].isin(simtime_list)
  474. ]
  475. if len(simTime_list) == 0:
  476. pass
  477. else:
  478. for simtime in simTime_list:
  479. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  480. pedestrian_on_the_road["simTime"].isin(simtime)
  481. ]
  482. if len(sub_pedestrian_on_the_road) > 0:
  483. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  484. ego_car["dist"] = np.sqrt(
  485. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  486. ** 2
  487. + (
  488. ego_car["posY"].values
  489. - sub_pedestrian_on_the_road["posY"].values
  490. )
  491. ** 2
  492. )
  493. ego_car["speed"] = np.sqrt(
  494. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  495. )
  496. if any(ego_car["speed"].tolist()) != 0:
  497. self.aviod_pedestrian_when_turning_count += 1
  498. def statistic(self):
  499. self.slow_down_in_crosswalk()
  500. self.avoid_pedestrian_in_crosswalk()
  501. self.avoid_pedestrian_in_the_road()
  502. self.aviod_pedestrian_when_turning()
  503. self.calculated_value = {
  504. "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
  505. "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
  506. "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
  507. "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
  508. }
  509. # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
  510. return self.calculated_value
  511. class TurnaroundViolation(object):
  512. def __init__(self, df_data):
  513. print("掉头违规类初始化中...")
  514. self.traffic_violations_type = "掉头违规类"
  515. self.data = df_data.obj_data[1]
  516. self.ego_data = (
  517. self.data[config.TURNAROUND_INFO].copy().reset_index(drop=True)
  518. ) # Copy to avoid modifying the original DataFrame
  519. self.pedestrian_data = pd.DataFrame()
  520. self.object_items = set(df_data.object_df.type.tolist())
  521. if 13 in self.object_items: # 行人的type是13
  522. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  523. self.pedestrian_data = (
  524. self.pedestrian_df[config.SLOWDOWN_INFO].copy().reset_index(drop=True)
  525. )
  526. self.turning_in_forbiden_turn_back_sign_count = 0
  527. self.turning_in_forbiden_turn_left_sign_count = 0
  528. self.avoid_pedestrian_when_turn_back_count = 0
  529. def pedestrian_in_front_of_car(self):
  530. if len(self.pedestrian_data) == 0:
  531. return []
  532. else:
  533. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  534. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  535. self.ego_data["dist"] = np.sqrt(
  536. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  537. )
  538. self.ego_data["rela_pos"] = (
  539. self.ego_data["dx"] * self.ego_data["speedX"]
  540. + self.ego_data["dy"] * self.ego_data["speedY"]
  541. )
  542. simtime = self.ego_data[
  543. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  544. ]["simTime"].tolist()
  545. return simtime
  546. def different_road_area_simtime(self, df, threshold=0.5):
  547. if not df:
  548. return []
  549. simtime_group = []
  550. current_simtime_group = [df[0]]
  551. for i in range(1, len(df)):
  552. if abs(df[i] - df[i - 1]) <= threshold:
  553. current_simtime_group.append(df[i])
  554. else:
  555. simtime_group.append(current_simtime_group)
  556. current_simtime_group = [df[i]]
  557. simtime_group.append(current_simtime_group)
  558. return simtime_group
  559. def turn_back_in_forbiden_sign(self):
  560. """
  561. 禁止掉头type = 8
  562. """
  563. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  564. "simTime"
  565. ].tolist()
  566. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  567. "simTime"
  568. ].tolist()
  569. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  570. forbiden_turn_back_simTime
  571. )
  572. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  573. forbiden_turn_left_simTime
  574. )
  575. if len(forbiden_turn_back_simtime_devide) == 0:
  576. pass
  577. else:
  578. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  579. ego_car1 = self.ego_data.loc[
  580. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  581. ]
  582. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  583. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  584. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  585. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  586. if (
  587. ego_end_speedx1 * ego_start_speedx1
  588. + ego_end_speedy1 * ego_start_speedy1
  589. < 0
  590. ):
  591. self.turning_in_forbiden_turn_back_sign_count += 1
  592. if len(forbiden_turn_left_simtime_devide) == 0:
  593. pass
  594. else:
  595. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  596. ego_car2 = self.ego_data.loc[
  597. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  598. ]
  599. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  600. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  601. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  602. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  603. if (
  604. ego_end_speedx2 * ego_start_speedx2
  605. + ego_end_speedy2 * ego_start_speedy2
  606. < 0
  607. ):
  608. self.turning_in_forbiden_turn_left_sign_count += 1
  609. def avoid_pedestrian_when_turn_back(self):
  610. sensor_on_intersection = self.pedestrian_in_front_of_car()
  611. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  612. self.ego_data["lane_type"] == 20
  613. ]["simTime"].tolist()
  614. avoid_pedestrian_when_turn_back_simTime_devide = (
  615. self.different_road_area_simtime(
  616. avoid_pedestrian_when_turn_back_simTime_list
  617. )
  618. )
  619. if (len(sensor_on_intersection) > 0) and (len(avoid_pedestrian_when_turn_back_simTime_devide) > 0):
  620. for (
  621. avoid_pedestrian_when_turn_back_simtime
  622. ) in avoid_pedestrian_when_turn_back_simTime_devide:
  623. pedestrian_in_intersection_simtime = self.pedestrian_data[
  624. self.pedestrian_data["simTime"].isin(
  625. avoid_pedestrian_when_turn_back_simtime
  626. )
  627. ].tolist()
  628. ego_df = self.ego_data[
  629. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  630. ].reset_index(drop=True)
  631. pedestrian_df = self.pedestrian_data[
  632. self.pedestrian_data["simTime"].isin(
  633. pedestrian_in_intersection_simtime
  634. )
  635. ].reset_index(drop=True)
  636. ego_df["dist"] = np.sqrt(
  637. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  638. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  639. )
  640. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  641. if (any(ego_df["speed"].tolist()) != 0) or (any(ego_df["dist"].tolist()) == 0):
  642. self.avoid_pedestrian_when_turn_back_count += 1
  643. def statistic(self):
  644. self.turn_back_in_forbiden_sign()
  645. self.avoid_pedestrian_when_turn_back()
  646. self.calculated_value = {
  647. "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
  648. "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
  649. "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
  650. }
  651. # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
  652. return self.calculated_value
  653. class WrongWayViolation:
  654. """停车违规类"""
  655. def __init__(self, df_data):
  656. print("停车违规类初始化中...")
  657. self.traffic_violations_type = "停车违规类"
  658. self.data = df_data.obj_data[1]
  659. # 初始化违规统计
  660. self.violation_count = {
  661. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  662. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  663. "urbanExpresswayEmergencyLaneDriving": 0,
  664. }
  665. def process_violations(self):
  666. """处理停车或者紧急车道行驶违规数据"""
  667. # 提取有效道路类型
  668. urban_expressway_or_highway = {1, 2}
  669. driving_lane = {1, 4, 5, 6}
  670. emergency_lane = {12}
  671. self.data["v"] *= 3.6 # 转换速度
  672. # 使用向量化和条件判断进行违规判定
  673. conditions = [
  674. (
  675. self.data["road_fc"].isin(urban_expressway_or_highway)
  676. & self.data["lane_type"].isin(driving_lane)
  677. & (self.data["v"] == 0)
  678. ),
  679. (
  680. self.data["road_fc"].isin(urban_expressway_or_highway)
  681. & self.data["lane_type"].isin(emergency_lane)
  682. & (self.data["v"] == 0)
  683. ),
  684. (
  685. self.data["road_fc"].isin(urban_expressway_or_highway)
  686. & self.data["lane_type"].isin(emergency_lane)
  687. & (self.data["v"] != 0)
  688. ),
  689. ]
  690. violation_types = [
  691. "urbanExpresswayOrHighwayDrivingLaneStopped",
  692. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  693. "urbanExpresswayEmergencyLaneDriving",
  694. ]
  695. # 设置违规类型
  696. self.data["violation_type"] = None
  697. for condition, violation_type in zip(conditions, violation_types):
  698. self.data.loc[condition, "violation_type"] = violation_type
  699. # 统计违规情况
  700. self.violation_count = (
  701. self.data["violation_type"]
  702. .value_counts()
  703. .reindex(violation_types, fill_value=0)
  704. .to_dict()
  705. )
  706. def statistic(self) -> str:
  707. self.process_violations()
  708. # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
  709. return self.violation_count
  710. class SpeedingViolation(object):
  711. """超速违规类"""
  712. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  713. def __init__(self, df_data):
  714. print("超速违规类初始化中...")
  715. self.traffic_violations_type = "超速违规类"
  716. self.data = df_data.obj_data[
  717. 1
  718. ] # Copy to avoid modifying the original DataFrame
  719. # 初始化违规统计
  720. self.violation_counts = {
  721. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  722. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  723. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  724. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  725. "generalRoadSpeedOverLimit50": 0,
  726. "generalRoadSpeedOverLimit20to50": 0,
  727. }
  728. def process_violations(self):
  729. """处理数据帧,检查超速和其他违规行为"""
  730. # 提取有效道路类型
  731. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  732. general_road = {3} # 直接创建包含一个元素的集合
  733. self.data["v"] *= 3.6 # 转换速度
  734. # 违规判定
  735. conditions = [
  736. (
  737. self.data["road_fc"].isin(urban_expressway_or_highway)
  738. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  739. ),
  740. (
  741. self.data["road_fc"].isin(urban_expressway_or_highway)
  742. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  743. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  744. ),
  745. (
  746. self.data["road_fc"].isin(urban_expressway_or_highway)
  747. & (self.data["v"] > self.data["road_speed_max"])
  748. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  749. ),
  750. (
  751. self.data["road_fc"].isin(urban_expressway_or_highway)
  752. & (self.data["v"] < self.data["road_speed_min"])
  753. ),
  754. (
  755. self.data["road_fc"].isin(general_road)
  756. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  757. ),
  758. (
  759. self.data["road_fc"].isin(general_road)
  760. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  761. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  762. ),
  763. ]
  764. violation_types = [
  765. "urbanExpresswayOrHighwaySpeedOverLimit50",
  766. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  767. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  768. "urbanExpresswayOrHighwaySpeedUnderLimit",
  769. "generalRoadSpeedOverLimit50",
  770. "generalRoadSpeedOverLimit20to50",
  771. ]
  772. # 设置违规类型
  773. self.data["violation_type"] = None
  774. for condition, violation_type in zip(conditions, violation_types):
  775. self.data.loc[condition, "violation_type"] = violation_type
  776. # 统计各类违规情况
  777. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  778. def statistic(self) -> str:
  779. # 处理数据
  780. self.process_violations()
  781. # self.logger.info(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
  782. return self.violation_counts
  783. class TrafficLightViolation(object):
  784. """违反交通灯类"""
  785. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  786. def __init__(self, df_data):
  787. """初始化方法"""
  788. self.traffic_violations_type = "违反交通灯类"
  789. print("违反交通灯类 类初始化中...")
  790. self.config = df_data.vehicle_config
  791. self.data_ego = df_data.ego_data # 获取数据
  792. self.violation_counts = {
  793. "trafficSignalViolation": 0,
  794. "illegalDrivingOrParkingAtCrossroads": 0,
  795. }
  796. # 处理数据并判定违规
  797. self.process_violations()
  798. def is_point_cross_line(self, point, stop_line_points):
  799. """
  800. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  801. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  802. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  803. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  804. :return: True 如果车辆跨越了停止线,否则 False
  805. """
  806. line_vector = np.array(
  807. [
  808. stop_line_points[1][0] - stop_line_points[0][0],
  809. stop_line_points[1][1] - stop_line_points[0][1],
  810. ]
  811. )
  812. point_vector = np.array(
  813. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  814. )
  815. cross_product = np.cross(line_vector, point_vector)
  816. if cross_product != 0:
  817. return False
  818. mid_point = (
  819. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  820. + 0.5 * line_vector
  821. )
  822. axletree_to_mid_vector = np.array(
  823. [point[0] - mid_point[0], point[1] - mid_point[1]]
  824. )
  825. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  826. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  827. norm_direction = np.linalg.norm(direction_vector)
  828. if norm_axletree_to_mid == 0 or norm_direction == 0:
  829. return False
  830. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  831. norm_axletree_to_mid * norm_direction
  832. )
  833. angle_theta = math.degrees(math.acos(cos_theta))
  834. return angle_theta <= 90
  835. def _filter_data(self):
  836. """过滤数据,筛选出需要分析的记录"""
  837. return self.data_ego[
  838. (self.data_ego["stopline_id"] != -1)
  839. & (self.data_ego["stopline_type"] == 1)
  840. & (self.data_ego["trafficlight_id"] != -1)
  841. ]
  842. def _group_data(self, filtered_data):
  843. """按时间差对数据进行分组"""
  844. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  845. threshold = 0.5
  846. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  847. return filtered_data.groupby("group")
  848. def _analyze_group(self, group_data):
  849. """分析单个分组的数据,判断是否闯红灯"""
  850. photos = []
  851. stop_in_intersection = False
  852. for _, row in group_data.iterrows():
  853. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  854. stop_line_points = [
  855. [row["stopline_x1"], row["stopline_y1"]],
  856. [row["stopline_x2"], row["stopline_y2"]],
  857. ]
  858. stateMask = row["stateMask"]
  859. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  860. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  861. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  862. # config = yaml.load(f, Loader=yaml.FullLoader)
  863. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  864. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  865. dist = math.sqrt(
  866. (row["posX"] - row["traffic_light_x"]) ** 2
  867. + (row["posY"] - row["traffic_light_y"]) ** 2
  868. )
  869. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  870. has_crossed_line_front = (
  871. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  872. and stateMask == 1
  873. )
  874. has_crossed_line_rear = (
  875. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  876. and row["v"] > 0
  877. and stateMask == 1
  878. )
  879. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  880. has_passed_intersection = has_crossed_line_front and dist < 1.0
  881. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  882. photos.extend(
  883. [
  884. has_crossed_line_front,
  885. has_crossed_line_rear,
  886. has_passed_intersection,
  887. has_stop_in_intersection,
  888. ]
  889. )
  890. stop_in_intersection = has_passed_intersection
  891. return photos, stop_in_intersection
  892. def is_vehicle_run_a_red_light(self):
  893. """判断车辆是否闯红灯"""
  894. filtered_data = self._filter_data()
  895. grouped_data = self._group_data(filtered_data)
  896. self.photos_group = []
  897. self.stop_in_intersections = []
  898. for _, group_data in grouped_data:
  899. photos, stop_in_intersection = self._analyze_group(group_data)
  900. self.photos_group.append(photos)
  901. self.stop_in_intersections.append(stop_in_intersection)
  902. def process_violations(self):
  903. """处理数据并判定违规"""
  904. self.is_vehicle_run_a_red_light()
  905. count_1 = sum(all(photos) for photos in self.photos_group)
  906. count_2 = sum(
  907. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  908. )
  909. self.violation_counts["trafficSignalViolation"] = count_1
  910. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  911. def statistic(self):
  912. """返回统计结果"""
  913. return self.violation_counts
  914. class WarningViolation(object):
  915. """警告性违规类"""
  916. def __init__(self, df_data):
  917. self.traffic_violations_type = "警告性违规类"
  918. print("警告性违规类 类初始化中...")
  919. self.config = df_data.vehicle_config
  920. self.data_ego = df_data.obj_data[1]
  921. self.data = self.data_ego.copy() # 避免修改原始 DataFrame
  922. self.violation_counts = {
  923. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  924. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  925. }
  926. def process_violations(self):
  927. general_road = {3} # 普通道路
  928. lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
  929. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  930. # config = yaml.load(f, Loader=yaml.FullLoader)
  931. car_width = self.config["CAR_WIDTH"]
  932. lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
  933. # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  934. # 使用布尔索引来筛选满足条件的行
  935. condition = (self.data["road_fc"].isin(general_road)) & (
  936. self.data["lane_type"].isin(lane_type)
  937. )
  938. # 创建一个新的列,并根据条件设置值
  939. self.data["is_violation"] = condition
  940. # 统计满足条件的连续时间段
  941. violation_segments = self.count_continuous_violations(
  942. self.data["is_violation"], self.data["simTime"]
  943. )
  944. # 更新骑行车道线违规计数
  945. self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
  946. # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  947. # 计算阈值
  948. threshold = (lane_width - car_width) / 2
  949. # 找到满足条件的行
  950. self.data["is_violation"] = self.data["laneOffset"] > threshold
  951. # 统计满足条件的连续时间段
  952. violation_segments = self.count_continuous_violations(
  953. self.data["is_violation"], self.data["simTime"]
  954. )
  955. # 更新骑行车道线违规计数
  956. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
  957. violation_segments
  958. )
  959. def count_continuous_violations(self, violation_series, time_series):
  960. """统计连续违规的时间段数量"""
  961. continuous_segments = []
  962. current_segment = []
  963. for is_violation, time in zip(violation_series, time_series):
  964. if is_violation:
  965. if not current_segment: # 新的连续段开始
  966. current_segment.append(time)
  967. else:
  968. if current_segment: # 连续段结束
  969. continuous_segments.append(current_segment)
  970. current_segment = []
  971. # 检查是否有一个未结束的连续段在最后
  972. if current_segment:
  973. continuous_segments.append(current_segment)
  974. return continuous_segments
  975. def statistic(self):
  976. # 处理数据
  977. self.process_violations()
  978. # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
  979. return self.violation_counts
  980. class TrafficSignViolation(object):
  981. """交通标志违规类"""
  982. def __init__(self, df_data):
  983. self.traffic_violations_type = "交通标志违规类"
  984. print("交通标志违规类 类初始化中...")
  985. self.data_ego = df_data.obj_data[1]
  986. self.ego_data = (
  987. self.data_ego[config.TRFFICSIGN_INFO].copy().reset_index(drop=True)
  988. )
  989. self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
  990. self.violation_counts = {
  991. "NoStraightThrough": 0, # 禁止直行标志地方直行
  992. "SpeedLimitViolation": 0, # 违反限速规定
  993. "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
  994. }
  995. # def checkForProhibitionViolation(self):
  996. # """禁令标志判断违规:7 禁止直行,12:限制速度"""
  997. # # 筛选出sign_type1为7(禁止直行)
  998. # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
  999. # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1000. def checkForProhibitionViolation(self):
  1001. """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1002. # 筛选出 sign_type1 为7(禁止直行)的数据
  1003. violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
  1004. # 判断车辆是否在禁止直行路段直行
  1005. if not violation_straight_df.empty:
  1006. # 按时间戳排序(假设数据按时间顺序处理)
  1007. violation_straight_df = violation_straight_df.sort_values('simTime')
  1008. # 计算航向角变化(前后时间点的差值绝对值)
  1009. violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
  1010. # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
  1011. threshold = 5 # 单位:度(根据场景调整)
  1012. mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
  1013. straight_violations = violation_straight_df[mask]
  1014. # 统计违规次数或记录违规数据
  1015. self.violation_counts["prohibition_straight"] = len(straight_violations)
  1016. # 限制速度判断(原代码)
  1017. violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1018. if violation_speed_limit_df.empty:
  1019. mask = self.data_ego["v"] > self.data_ego["sign_speed"]
  1020. self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
  1021. def checkForInstructionViolation(self):
  1022. """限速标志属于指示性标志:13:最低限速"""
  1023. violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
  1024. if violation_minimum_speed_limit_df.empty:
  1025. mask = self.data_ego["v"] < self.data_ego["sign_speed"]
  1026. self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
  1027. def statistic(self):
  1028. self.checkForProhibitionViolation()
  1029. self.checkForInstructionViolation()
  1030. # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
  1031. return self.violation_counts
  1032. class ViolationManager:
  1033. """违规管理类,用于管理所有违规行为"""
  1034. def __init__(self, data_processed):
  1035. self.violations = []
  1036. self.data = data_processed
  1037. self.config = data_processed.traffic_config
  1038. self.over_take_violation = OvertakingViolation(self.data)
  1039. self.slow_down_violation = SlowdownViolation(self.data)
  1040. self.wrong_way_violation = WrongWayViolation(self.data)
  1041. self.speeding_violation = SpeedingViolation(self.data)
  1042. self.traffic_light_violation = TrafficLightViolation(self.data)
  1043. self.warning_violation = WarningViolation(self.data)
  1044. # self.report_statistic()
  1045. def report_statistic(self):
  1046. traffic_result = self.over_take_violation.statistic()
  1047. traffic_result.update(self.slow_down_violation.statistic())
  1048. traffic_result.update(self.traffic_light_violation.statistic())
  1049. traffic_result.update(self.wrong_way_violation.statistic())
  1050. traffic_result.update(self.speeding_violation.statistic())
  1051. traffic_result.update(self.warning_violation.statistic())
  1052. evaluator = Score(self.config)
  1053. result = evaluator.evaluate(traffic_result)
  1054. print("\n[交规类表现及得分情况]")
  1055. # self.logger.info(f"Traffic Result:{traffic_result}")
  1056. return result
  1057. # 示例使用
  1058. if __name__ == "__main__":
  1059. pass