traffic.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220
  1. import math
  2. import numpy as np
  3. import pandas as pd
  4. from modules.lib import log_manager
  5. from modules.lib.score import Score
  6. OVERTAKE_INFO = [
  7. "simTime",
  8. "simFrame",
  9. "playerId",
  10. "speedX",
  11. "speedY",
  12. "posX",
  13. "posY",
  14. "posH",
  15. "lane_id",
  16. "lane_type",
  17. "road_type",
  18. "interid",
  19. "crossid",
  20. ]
  21. SLOWDOWN_INFO = [
  22. "simTime",
  23. "simFrame",
  24. "playerId",
  25. "speedX",
  26. "speedY",
  27. "posX",
  28. "posY",
  29. "crossid",
  30. "lane_type",
  31. ]
  32. TURNAROUND_INFO = [
  33. "simTime",
  34. "simFrame",
  35. "playerId",
  36. "speedX",
  37. "speedY",
  38. "posX",
  39. "posY",
  40. "sign_type1",
  41. "lane_type",
  42. ]
  43. TRFFICSIGN_INFO = [
  44. "simTime",
  45. "simFrame",
  46. "playerId",
  47. "speedX",
  48. "speedY",
  49. "v",
  50. "posX",
  51. "posY",
  52. "sign_type1",
  53. "sign_ref_link",
  54. "sign_x",
  55. "sign_y",
  56. ]
  57. class OvertakingViolation(object):
  58. """超车违规类"""
  59. def __init__(self, df_data):
  60. print("超车违规类初始化中...")
  61. self.traffic_violations_type = "超车违规类"
  62. # self.logger = log.get_logger() # 使用时再初始化
  63. self.data = df_data.obj_data[1]
  64. self.ego_data = (
  65. self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
  66. ) # Copy to avoid modifying the original DataFrame
  67. self.data_obj = df_data.obj_data[2]
  68. self.obj_data = (
  69. self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  70. ) # Copy to avoid modifying the original DataFrame
  71. self.object_items = []
  72. for i, item in df_data.obj_data.items():
  73. self.object_items.append(i)
  74. if 3 in self.object_items:
  75. self.other_obj_data1 = df_data.obj_data[3]
  76. self.other_obj_data = (
  77. self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  78. )
  79. self.overtake_on_right_count = 0
  80. self.overtake_when_turn_around_count = 0
  81. self.overtake_when_passing_car_count = 0
  82. self.overtake_in_forbid_lane_count = 0
  83. self.overtake_in_ramp_count = 0
  84. self.overtake_in_tunnel_count = 0
  85. self.overtake_on_accelerate_lane_count = 0
  86. self.overtake_on_decelerate_lane_count = 0
  87. self.overtake_in_different_senerios_count = 0
  88. def different_road_area_simtime(self, df, threshold=0.5):
  89. if not df:
  90. return []
  91. simtime_group = []
  92. current_simtime_group = [df[0]]
  93. for i in range(1, len(df)):
  94. if abs(df[i] - df[i - 1]) <= threshold:
  95. current_simtime_group.append(df[i])
  96. else:
  97. simtime_group.append(current_simtime_group)
  98. current_simtime_group = [df[i]]
  99. simtime_group.append(current_simtime_group)
  100. return simtime_group
  101. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  102. lane_start = lane_id[0]
  103. lane_end = lane_id[-1]
  104. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  105. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  106. return lane_start == lane_end and start_condition and end_condition
  107. def _is_dxy_of_car(self, ego_df, obj_df):
  108. """
  109. :param df: objstate.csv and so on
  110. :param id: playerId
  111. :param string_type: posX/Y or speedX/Y and so on
  112. :return: dataframe of dx/y and so on
  113. """
  114. car_dx = obj_df["posX"].values - ego_df["posX"].values
  115. car_dy = obj_df["posY"].values - ego_df["posY"].values
  116. return car_dx, car_dy
  117. # 在前车右侧超车、会车时超车、前车掉头时超车
  118. def illegal_overtake_with_car(self, window_width=250):
  119. # 获取csv文件中最短的帧数
  120. frame_id_length = len(self.ego_data["simFrame"])
  121. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  122. while (start_frame_id + window_width) < frame_id_length:
  123. # if start_frame_id == 828:
  124. # print("end")
  125. simframe_window1 = list(
  126. np.arange(start_frame_id, start_frame_id + window_width)
  127. )
  128. simframe_window = list(map(int, simframe_window1))
  129. # 读取滑动窗口的dataframe数据
  130. ego_data_frames = self.ego_data[
  131. self.ego_data["simFrame"].isin(simframe_window)
  132. ]
  133. obj_data_frames = self.obj_data[
  134. self.obj_data["simFrame"].isin(simframe_window)
  135. ]
  136. other_data_frames = self.other_obj_data[
  137. self.other_obj_data["simFrame"].isin(simframe_window)
  138. ]
  139. # 读取前后的laneId
  140. lane_id = ego_data_frames["lane_id"].tolist()
  141. # 读取前后方向盘转角steeringWheel,
  142. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  143. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  144. # 读取车辆前后的位置信息
  145. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  146. ego_speedx = ego_data_frames["speedX"].tolist()
  147. ego_speedy = ego_data_frames["speedY"].tolist()
  148. obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
  149. "speedX"
  150. ].tolist()
  151. obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
  152. "speedY"
  153. ].tolist()
  154. if len(other_data_frames) > 0:
  155. other_start_speedx = other_data_frames["speedX"].iloc[0]
  156. other_start_speedy = other_data_frames["speedY"].iloc[0]
  157. if (
  158. ego_speedx[0] * other_start_speedx
  159. + ego_speedy[0] * other_start_speedy
  160. < 0
  161. ):
  162. self.overtake_when_passing_car_count += self._is_overtake(
  163. lane_id, dx, dy, ego_speedx, ego_speedy
  164. )
  165. start_frame_id += window_width
  166. """
  167. 如果滑动窗口开始和最后的laneid一致;
  168. 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
  169. 自车和前车的位置发生的交换;
  170. 则认为右超车
  171. """
  172. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  173. self.overtake_on_right_count += self._is_overtake(
  174. lane_id, dx, dy, ego_speedx, ego_speedy
  175. )
  176. start_frame_id += window_width
  177. elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  178. self.overtake_when_turn_around_count += self._is_overtake(
  179. lane_id, dx, dy, ego_speedx, ego_speedy
  180. )
  181. start_frame_id += window_width
  182. else:
  183. start_frame_id += 1
  184. # print(
  185. # f"在会车时超车{self.overtake_when_passing_car_count}次, 右侧超车{self.overtake_on_right_count}次, 在前车掉头时超车{self.overtake_when_turn_around_count}次")
  186. # 借道超车场景
  187. def overtake_in_forbid_lane(self):
  188. simTime = self.obj_data["simTime"].tolist()
  189. simtime_devide = self.different_road_area_simtime(simTime)
  190. for simtime in simtime_devide:
  191. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  192. try:
  193. lane_type = lane_overtake["lane_type"].tolist()
  194. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  195. 50002 not in lane_type and len(set(lane_type)) > 1
  196. ):
  197. self.overtake_in_forbid_lane_count += 1
  198. except Exception as e:
  199. print("数据缺少lane_type信息")
  200. # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
  201. # 在匝道超车
  202. def overtake_in_ramp_area(self):
  203. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  204. "simTime"
  205. ].tolist()
  206. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  207. for ramp_simtime in ramp_simTime_list:
  208. lane_id = self.ego_data["lane_id"].tolist()
  209. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  210. objstate_in_ramp = self.obj_data[
  211. self.obj_data["simTime"].isin(ramp_simtime)
  212. ]
  213. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  214. ego_speedx = ego_in_ramp["speedX"].tolist()
  215. ego_speedy = ego_in_ramp["speedY"].tolist()
  216. if len(lane_id) > 0:
  217. self.overtake_in_ramp_count += self._is_overtake(
  218. lane_id, dx, dy, ego_speedx, ego_speedy
  219. )
  220. else:
  221. continue
  222. # print(f"在匝道超车{self.overtake_in_ramp_count}次")
  223. def overtake_in_tunnel_area(self):
  224. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  225. "simTime"
  226. ].tolist()
  227. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  228. for tunnel_simtime in tunnel_simTime_list:
  229. lane_id = self.ego_data["lane_id"].tolist()
  230. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  231. objstate_in_tunnel = self.obj_data[
  232. self.obj_data["simTime"].isin(tunnel_simtime)
  233. ]
  234. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  235. ego_speedx = ego_in_tunnel["speedX"].tolist()
  236. ego_speedy = ego_in_tunnel["speedY"].tolist()
  237. if len(lane_id) > 0:
  238. self.overtake_in_tunnel_count += self._is_overtake(
  239. lane_id, dx, dy, ego_speedx, ego_speedy
  240. )
  241. else:
  242. continue
  243. # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
  244. # 加速车道超车
  245. def overtake_on_accelerate_lane(self):
  246. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  247. "simTime"
  248. ].tolist()
  249. accelerate_simTime_list = self.different_road_area_simtime(
  250. accelerate_simtime_list
  251. )
  252. for accelerate_simtime in accelerate_simTime_list:
  253. lane_id = self.ego_data["lane_id"].tolist()
  254. ego_in_accelerate = self.ego_data[
  255. self.ego_data["simTime"].isin(accelerate_simtime)
  256. ]
  257. objstate_in_accelerate = self.obj_data[
  258. self.obj_data["simTime"].isin(accelerate_simtime)
  259. ]
  260. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  261. ego_speedx = ego_in_accelerate["speedX"].tolist()
  262. ego_speedy = ego_in_accelerate["speedY"].tolist()
  263. self.overtake_on_accelerate_lane_count += self._is_overtake(
  264. lane_id, dx, dy, ego_speedx, ego_speedy
  265. )
  266. # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
  267. # 减速车道超车
  268. def overtake_on_decelerate_lane(self):
  269. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  270. "simTime"
  271. ].tolist()
  272. decelerate_simTime_list = self.different_road_area_simtime(
  273. decelerate_simtime_list
  274. )
  275. for decelerate_simtime in decelerate_simTime_list:
  276. lane_id = self.ego_data["id"].tolist()
  277. ego_in_decelerate = self.ego_data[
  278. self.ego_data["simTime"].isin(decelerate_simtime)
  279. ]
  280. objstate_in_decelerate = self.obj_data[
  281. self.obj_data["simTime"].isin(decelerate_simtime)
  282. ]
  283. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  284. ego_speedx = ego_in_decelerate["speedX"].tolist()
  285. ego_speedy = ego_in_decelerate["speedY"].tolist()
  286. self.overtake_on_decelerate_lane_count += self._is_overtake(
  287. lane_id, dx, dy, ego_speedx, ego_speedy
  288. )
  289. # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
  290. # 在交叉路口
  291. def overtake_in_different_senerios(self):
  292. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  293. "simTime"
  294. ].tolist() # 判断是路口或者隧道区域
  295. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  296. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  297. crossroad_objstate = self.obj_data[
  298. self.obj_data["simTime"].isin(crossroad_simTime)
  299. ]
  300. # crossroad_laneinfo = self.laneinfo_new_data[self.laneinfo_new_data['simTime'].isin(crossroad_simTime)]
  301. # 读取前后的laneId
  302. lane_id = crossroad_ego["lane_id"].tolist()
  303. # 读取车辆前后的位置信息
  304. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  305. ego_speedx = crossroad_ego["speedX"].tolist()
  306. ego_speedy = crossroad_ego["speedY"].tolist()
  307. """
  308. 如果滑动窗口开始和最后的laneid一致;
  309. 自车和前车的位置发生的交换;
  310. 则认为发生超车
  311. """
  312. if len(lane_id) > 0:
  313. self.overtake_in_different_senerios_count += self._is_overtake(
  314. lane_id, dx, dy, ego_speedx, ego_speedy
  315. )
  316. else:
  317. pass
  318. # print(f"在路口超车{self.overtake_in_different_senerios_count}次")
  319. def statistic(self):
  320. self.overtake_in_forbid_lane()
  321. self.overtake_on_decelerate_lane()
  322. self.overtake_on_accelerate_lane()
  323. self.overtake_in_ramp_area()
  324. self.overtake_in_tunnel_area()
  325. self.overtake_in_different_senerios()
  326. self.illegal_overtake_with_car()
  327. self.calculated_value = {
  328. "overtake_on_right": self.overtake_on_right_count,
  329. "overtake_when_turn_around": self.overtake_when_turn_around_count,
  330. "overtake_when_passing_car": self.overtake_when_passing_car_count,
  331. "overtake_in_forbid_lane": self.overtake_in_forbid_lane_count,
  332. "overtake_in_ramp": self.overtake_in_ramp_count,
  333. "overtake_in_tunnel": self.overtake_in_tunnel_count,
  334. "overtake_on_accelerate_lane": self.overtake_on_accelerate_lane_count,
  335. "overtake_on_decelerate_lane": self.overtake_on_decelerate_lane_count,
  336. "overtake_in_different_senerios": self.overtake_in_different_senerios_count,
  337. }
  338. # self.logger.info(f"超车类指标统计完成,统计结果:{self.calculated_value}")
  339. return self.calculated_value
  340. class SlowdownViolation(object):
  341. """减速让行违规类"""
  342. def __init__(self, df_data):
  343. print("减速让行违规类-------------------------")
  344. self.traffic_violations_type = "减速让行违规类"
  345. self.object_items = []
  346. self.data = df_data.obj_data[1]
  347. self.ego_data = (
  348. self.data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  349. ) # Copy to avoid modifying the original DataFrame
  350. self.pedestrian_data = pd.DataFrame()
  351. self.object_items = set(df_data.object_df.type.tolist())
  352. if 13 in self.object_items: # 行人的type是13
  353. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  354. self.pedestrian_data = (
  355. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  356. )
  357. self.slow_down_in_crosswalk_count = 0
  358. self.avoid_pedestrian_in_crosswalk_count = 0
  359. self.avoid_pedestrian_in_the_road_count = 0
  360. self.aviod_pedestrian_when_turning_count = 0
  361. def pedestrian_in_front_of_car(self):
  362. if len(self.pedestrian_data) == 0:
  363. return []
  364. else:
  365. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  366. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  367. self.ego_data["dist"] = np.sqrt(
  368. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  369. )
  370. self.ego_data["rela_pos"] = (
  371. self.ego_data["dx"] * self.ego_data["speedX"]
  372. + self.ego_data["dy"] * self.ego_data["speedY"]
  373. )
  374. simtime = self.ego_data[
  375. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  376. ]["simTime"].tolist()
  377. return simtime
  378. def different_road_area_simtime(self, df, threshold=0.6):
  379. if not df:
  380. return []
  381. simtime_group = []
  382. current_simtime_group = [df[0]]
  383. for i in range(1, len(df)):
  384. if abs(df[i] - df[i - 1]) <= threshold:
  385. current_simtime_group.append(df[i])
  386. else:
  387. simtime_group.append(current_simtime_group)
  388. current_simtime_group = [df[i]]
  389. simtime_group.append(current_simtime_group)
  390. return simtime_group
  391. def slow_down_in_crosswalk(self):
  392. # 筛选出路口或隧道区域的时间点
  393. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  394. "simTime"
  395. ].tolist()
  396. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  397. for crosswalk_simtime in crosswalk_simTime_divide:
  398. # 筛选出当前时间段内的数据
  399. # start_time, end_time = crosswalk_simtime
  400. start_time = crosswalk_simtime[0]
  401. end_time = crosswalk_simtime[-1]
  402. print(f"当前时间段:{start_time} - {end_time}")
  403. crosswalk_objstate = self.ego_data[
  404. (self.ego_data["simTime"] >= start_time)
  405. & (self.ego_data["simTime"] <= end_time)
  406. ]
  407. # 计算车辆速度
  408. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  409. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  410. ego_speed = np.sqrt(ego_speedx**2 + ego_speedy**2)
  411. # 判断是否超速
  412. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  413. self.slow_down_in_crosswalk_count += 1
  414. # 输出总次数
  415. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  416. def avoid_pedestrian_in_crosswalk(self):
  417. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  418. "simTime"
  419. ].tolist()
  420. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  421. for crosswalk_simtime in crosswalk_simTime_devide:
  422. if not self.pedestrian_data.empty:
  423. crosswalk_objstate = self.pedestrian_data[
  424. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  425. ]
  426. else:
  427. crosswalk_objstate = pd.DataFrame()
  428. if len(crosswalk_objstate) > 0:
  429. pedestrian_simtime = crosswalk_objstate["simTime"]
  430. pedestrian_objstate = crosswalk_objstate[
  431. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  432. ]
  433. ego_speed = np.sqrt(
  434. pedestrian_objstate["speedX"] ** 2
  435. + pedestrian_objstate["speedY"] ** 2
  436. )
  437. if ego_speed.any() > 0:
  438. self.avoid_pedestrian_in_crosswalk_count += 1
  439. def avoid_pedestrian_in_the_road(self):
  440. simtime = self.pedestrian_in_front_of_car()
  441. if len(simtime) == 0:
  442. self.avoid_pedestrian_in_the_road_count += 0
  443. else:
  444. pedestrian_on_the_road = self.pedestrian_data[
  445. self.pedestrian_data["simTime"].isin(simtime)
  446. ]
  447. simTime = pedestrian_on_the_road["simTime"].tolist()
  448. simTime_devide = self.different_road_area_simtime(simTime)
  449. for simtime1 in simTime_devide:
  450. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  451. pedestrian_on_the_road["simTime"].isin(simtime1)
  452. ]
  453. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  454. dist = np.sqrt(
  455. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  456. ** 2
  457. + (
  458. ego_car["posY"].values
  459. - sub_pedestrian_on_the_road["posY"].values
  460. )
  461. ** 2
  462. )
  463. speed = np.sqrt(
  464. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  465. )
  466. data = {"dist": dist, "speed": speed}
  467. new_ego_car = pd.DataFrame(data)
  468. new_ego_car = new_ego_car.assign(
  469. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  470. )
  471. if new_ego_car["Column3"].any():
  472. self.avoid_pedestrian_in_the_road_count += 1
  473. def aviod_pedestrian_when_turning(self):
  474. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  475. if len(pedestrian_simtime_list) > 0:
  476. simtime_list = self.ego_data[
  477. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  478. & (self.ego_data["lane_type"] == 20)
  479. ]["simTime"].tolist()
  480. simTime_list = self.different_road_area_simtime(simtime_list)
  481. pedestrian_on_the_road = self.pedestrian_data[
  482. self.pedestrian_data["simTime"].isin(simtime_list)
  483. ]
  484. for simtime in simTime_list:
  485. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  486. pedestrian_on_the_road["simTime"].isin(simtime)
  487. ]
  488. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  489. ego_car["dist"] = np.sqrt(
  490. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  491. ** 2
  492. + (
  493. ego_car["posY"].values
  494. - sub_pedestrian_on_the_road["posY"].values
  495. )
  496. ** 2
  497. )
  498. ego_car["speed"] = np.sqrt(
  499. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  500. )
  501. if any(ego_car["speed"].tolist()) != 0:
  502. self.aviod_pedestrian_when_turning_count += 1
  503. def statistic(self):
  504. self.slow_down_in_crosswalk()
  505. self.avoid_pedestrian_in_crosswalk()
  506. self.avoid_pedestrian_in_the_road()
  507. self.aviod_pedestrian_when_turning()
  508. self.calculated_value = {
  509. "slow_down_in_crosswalk": self.slow_down_in_crosswalk_count,
  510. "avoid_pedestrian_in_crosswalk": self.avoid_pedestrian_in_crosswalk_count,
  511. "avoid_pedestrian_in_the_road": self.avoid_pedestrian_in_the_road_count,
  512. "aviod_pedestrian_when_turning": self.aviod_pedestrian_when_turning_count,
  513. }
  514. # self.logger.info(f"减速让行类指标统计完成,统计结果:{self.calculated_value}")
  515. return self.calculated_value
  516. class TurnaroundViolation(object):
  517. def __init__(self, df_data):
  518. print("掉头违规类初始化中...")
  519. self.traffic_violations_type = "掉头违规类"
  520. self.data = df_data.obj_data[1]
  521. self.ego_data = (
  522. self.data[TURNAROUND_INFO].copy().reset_index(drop=True)
  523. ) # Copy to avoid modifying the original DataFrame
  524. self.pedestrian_data = pd.DataFrame()
  525. self.object_items = set(df_data.object_df.type.tolist())
  526. if 13 in self.object_items: # 行人的type是13
  527. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  528. self.pedestrian_data = (
  529. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  530. )
  531. self.turning_in_forbiden_turn_back_sign_count = 0
  532. self.turning_in_forbiden_turn_left_sign_count = 0
  533. self.avoid_pedestrian_when_turn_back_count = 0
  534. def pedestrian_in_front_of_car(self):
  535. if len(self.pedestrian_data) == 0:
  536. return []
  537. else:
  538. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  539. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  540. self.ego_data["dist"] = np.sqrt(
  541. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  542. )
  543. self.ego_data["rela_pos"] = (
  544. self.ego_data["dx"] * self.ego_data["speedX"]
  545. + self.ego_data["dy"] * self.ego_data["speedY"]
  546. )
  547. simtime = self.ego_data[
  548. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  549. ]["simTime"].tolist()
  550. return simtime
  551. def different_road_area_simtime(self, df, threshold=0.5):
  552. if not df:
  553. return []
  554. simtime_group = []
  555. current_simtime_group = [df[0]]
  556. for i in range(1, len(df)):
  557. if abs(df[i] - df[i - 1]) <= threshold:
  558. current_simtime_group.append(df[i])
  559. else:
  560. simtime_group.append(current_simtime_group)
  561. current_simtime_group = [df[i]]
  562. simtime_group.append(current_simtime_group)
  563. return simtime_group
  564. def turn_back_in_forbiden_sign(self):
  565. """
  566. 禁止掉头type = 8
  567. """
  568. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  569. "simTime"
  570. ].tolist()
  571. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  572. "simTime"
  573. ].tolist()
  574. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  575. forbiden_turn_back_simTime
  576. )
  577. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  578. forbiden_turn_left_simTime
  579. )
  580. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  581. ego_car1 = self.ego_data.loc[
  582. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  583. ]
  584. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  585. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  586. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  587. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  588. if (
  589. ego_end_speedx1 * ego_start_speedx1
  590. + ego_end_speedy1 * ego_start_speedy1
  591. < 0
  592. ):
  593. self.turning_in_forbiden_turn_back_sign_count += 1
  594. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  595. ego_car2 = self.ego_data.loc[
  596. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  597. ]
  598. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  599. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  600. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  601. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  602. if (
  603. ego_end_speedx2 * ego_start_speedx2
  604. + ego_end_speedy2 * ego_start_speedy2
  605. < 0
  606. ):
  607. self.turning_in_forbiden_turn_left_sign_count += 1
  608. def avoid_pedestrian_when_turn_back(self):
  609. sensor_on_intersection = self.pedestrian_in_front_of_car()
  610. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  611. self.ego_data["lane_type"] == 20
  612. ]["simTime"].tolist()
  613. avoid_pedestrian_when_turn_back_simTime_devide = (
  614. self.different_road_area_simtime(
  615. avoid_pedestrian_when_turn_back_simTime_list
  616. )
  617. )
  618. if len(sensor_on_intersection) > 0:
  619. for (
  620. avoid_pedestrian_when_turn_back_simtime
  621. ) in avoid_pedestrian_when_turn_back_simTime_devide:
  622. pedestrian_in_intersection_simtime = self.pedestrian_data[
  623. self.pedestrian_data["simTime"].isin(
  624. avoid_pedestrian_when_turn_back_simtime
  625. )
  626. ].tolist()
  627. ego_df = self.ego_data[
  628. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  629. ].reset_index(drop=True)
  630. pedestrian_df = self.pedestrian_data[
  631. self.pedestrian_data["simTime"].isin(
  632. pedestrian_in_intersection_simtime
  633. )
  634. ].reset_index(drop=True)
  635. ego_df["dist"] = np.sqrt(
  636. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  637. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  638. )
  639. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  640. if any(ego_df["speed"].tolist()) != 0:
  641. self.avoid_pedestrian_when_turn_back_count += 1
  642. def statistic(self):
  643. self.turn_back_in_forbiden_sign()
  644. self.avoid_pedestrian_when_turn_back()
  645. self.calculated_value = {
  646. "turn_back_in_forbiden_turn_back_sign": self.turning_in_forbiden_turn_back_sign_count,
  647. "turn_back_in_forbiden_turn_left_sign": self.turning_in_forbiden_turn_left_sign_count,
  648. "avoid_pedestrian_when_turn_back": self.avoid_pedestrian_when_turn_back_count,
  649. }
  650. # self.logger.info(f"掉头违规类指标统计完成,统计结果:{self.calculated_value}")
  651. return self.calculated_value
  652. class WrongWayViolation:
  653. """停车违规类"""
  654. def __init__(self, df_data):
  655. print("停车违规类初始化中...")
  656. self.traffic_violations_type = "停车违规类"
  657. self.data = df_data.obj_data[1]
  658. # 初始化违规统计
  659. self.violation_count = {
  660. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  661. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  662. "urbanExpresswayEmergencyLaneDriving": 0,
  663. }
  664. def process_violations(self):
  665. """处理停车或者紧急车道行驶违规数据"""
  666. # 提取有效道路类型
  667. urban_expressway_or_highway = {1, 2}
  668. driving_lane = {1, 4, 5, 6}
  669. emergency_lane = {12}
  670. self.data["v"] *= 3.6 # 转换速度
  671. # 使用向量化和条件判断进行违规判定
  672. conditions = [
  673. (
  674. self.data["road_fc"].isin(urban_expressway_or_highway)
  675. & self.data["lane_type"].isin(driving_lane)
  676. & (self.data["v"] == 0)
  677. ),
  678. (
  679. self.data["road_fc"].isin(urban_expressway_or_highway)
  680. & self.data["lane_type"].isin(emergency_lane)
  681. & (self.data["v"] == 0)
  682. ),
  683. (
  684. self.data["road_fc"].isin(urban_expressway_or_highway)
  685. & self.data["lane_type"].isin(emergency_lane)
  686. & (self.data["v"] != 0)
  687. ),
  688. ]
  689. violation_types = [
  690. "urbanExpresswayOrHighwayDrivingLaneStopped",
  691. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  692. "urbanExpresswayEmergencyLaneDriving",
  693. ]
  694. # 设置违规类型
  695. self.data["violation_type"] = None
  696. for condition, violation_type in zip(conditions, violation_types):
  697. self.data.loc[condition, "violation_type"] = violation_type
  698. # 统计违规情况
  699. self.violation_count = (
  700. self.data["violation_type"]
  701. .value_counts()
  702. .reindex(violation_types, fill_value=0)
  703. .to_dict()
  704. )
  705. def statistic(self) -> str:
  706. self.process_violations()
  707. # self.logger.info(f"停车违规类指标统计完成,统计结果:{self.violation_count}")
  708. return self.violation_count
  709. class SpeedingViolation(object):
  710. """超速违规类"""
  711. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  712. def __init__(self, df_data):
  713. print("超速违规类初始化中...")
  714. self.traffic_violations_type = "超速违规类"
  715. self.data = df_data.obj_data[
  716. 1
  717. ] # Copy to avoid modifying the original DataFrame
  718. # 初始化违规统计
  719. self.violation_counts = {
  720. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  721. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  722. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  723. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  724. "generalRoadSpeedOverLimit50": 0,
  725. "generalRoadSpeedOverLimit20to50": 0,
  726. }
  727. def process_violations(self):
  728. """处理数据帧,检查超速和其他违规行为"""
  729. # 提取有效道路类型
  730. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  731. general_road = {3} # 直接创建包含一个元素的集合
  732. self.data["v"] *= 3.6 # 转换速度
  733. # 违规判定
  734. conditions = [
  735. (
  736. self.data["road_fc"].isin(urban_expressway_or_highway)
  737. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  738. ),
  739. (
  740. self.data["road_fc"].isin(urban_expressway_or_highway)
  741. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  742. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  743. ),
  744. (
  745. self.data["road_fc"].isin(urban_expressway_or_highway)
  746. & (self.data["v"] > self.data["road_speed_max"])
  747. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  748. ),
  749. (
  750. self.data["road_fc"].isin(urban_expressway_or_highway)
  751. & (self.data["v"] < self.data["road_speed_min"])
  752. ),
  753. (
  754. self.data["road_fc"].isin(general_road)
  755. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  756. ),
  757. (
  758. self.data["road_fc"].isin(general_road)
  759. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  760. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  761. ),
  762. ]
  763. violation_types = [
  764. "urbanExpresswayOrHighwaySpeedOverLimit50",
  765. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  766. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  767. "urbanExpresswayOrHighwaySpeedUnderLimit",
  768. "generalRoadSpeedOverLimit50",
  769. "generalRoadSpeedOverLimit20to50",
  770. ]
  771. # 设置违规类型
  772. self.data["violation_type"] = None
  773. for condition, violation_type in zip(conditions, violation_types):
  774. self.data.loc[condition, "violation_type"] = violation_type
  775. # 统计各类违规情况
  776. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  777. def statistic(self) -> str:
  778. # 处理数据
  779. self.process_violations()
  780. # self.logger.info(f"超速违规类指标统计完成,统计结果:{self.violation_counts}")
  781. return self.violation_counts
  782. class TrafficLightViolation(object):
  783. """违反交通灯类"""
  784. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  785. def __init__(self, df_data):
  786. """初始化方法"""
  787. self.traffic_violations_type = "违反交通灯类"
  788. print("违反交通灯类 类初始化中...")
  789. self.config = df_data.vehicle_config
  790. self.data_ego = df_data.ego_data # 获取数据
  791. self.violation_counts = {
  792. "trafficSignalViolation": 0,
  793. "illegalDrivingOrParkingAtCrossroads": 0,
  794. }
  795. # 处理数据并判定违规
  796. self.process_violations()
  797. def is_point_cross_line(self, point, stop_line_points):
  798. """
  799. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  800. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  801. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  802. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  803. :return: True 如果车辆跨越了停止线,否则 False
  804. """
  805. line_vector = np.array(
  806. [
  807. stop_line_points[1][0] - stop_line_points[0][0],
  808. stop_line_points[1][1] - stop_line_points[0][1],
  809. ]
  810. )
  811. point_vector = np.array(
  812. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  813. )
  814. cross_product = np.cross(line_vector, point_vector)
  815. if cross_product != 0:
  816. return False
  817. mid_point = (
  818. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  819. + 0.5 * line_vector
  820. )
  821. axletree_to_mid_vector = np.array(
  822. [point[0] - mid_point[0], point[1] - mid_point[1]]
  823. )
  824. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  825. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  826. norm_direction = np.linalg.norm(direction_vector)
  827. if norm_axletree_to_mid == 0 or norm_direction == 0:
  828. return False
  829. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  830. norm_axletree_to_mid * norm_direction
  831. )
  832. angle_theta = math.degrees(math.acos(cos_theta))
  833. return angle_theta <= 90
  834. def _filter_data(self):
  835. """过滤数据,筛选出需要分析的记录"""
  836. return self.data_ego[
  837. (self.data_ego["stopline_id"] != -1)
  838. & (self.data_ego["stopline_type"] == 1)
  839. & (self.data_ego["trafficlight_id"] != -1)
  840. ]
  841. def _group_data(self, filtered_data):
  842. """按时间差对数据进行分组"""
  843. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  844. threshold = 0.5
  845. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  846. return filtered_data.groupby("group")
  847. def _analyze_group(self, group_data):
  848. """分析单个分组的数据,判断是否闯红灯"""
  849. photos = []
  850. stop_in_intersection = False
  851. for _, row in group_data.iterrows():
  852. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  853. stop_line_points = [
  854. [row["stopline_x1"], row["stopline_y1"]],
  855. [row["stopline_x2"], row["stopline_y2"]],
  856. ]
  857. traffic_light_status = row["traffic_light_status"]
  858. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  859. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  860. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  861. # config = yaml.load(f, Loader=yaml.FullLoader)
  862. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  863. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  864. dist = math.sqrt(
  865. (row["posX"] - row["traffic_light_x"]) ** 2
  866. + (row["posY"] - row["traffic_light_y"]) ** 2
  867. )
  868. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  869. has_crossed_line_front = (
  870. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  871. and traffic_light_status == 1
  872. )
  873. has_crossed_line_rear = (
  874. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  875. and row["v"] > 0
  876. and traffic_light_status == 1
  877. )
  878. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  879. has_passed_intersection = has_crossed_line_front and dist < 1.0
  880. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  881. photos.extend(
  882. [
  883. has_crossed_line_front,
  884. has_crossed_line_rear,
  885. has_passed_intersection,
  886. has_stop_in_intersection,
  887. ]
  888. )
  889. stop_in_intersection = has_passed_intersection
  890. return photos, stop_in_intersection
  891. def is_vehicle_run_a_red_light(self):
  892. """判断车辆是否闯红灯"""
  893. filtered_data = self._filter_data()
  894. grouped_data = self._group_data(filtered_data)
  895. self.photos_group = []
  896. self.stop_in_intersections = []
  897. for _, group_data in grouped_data:
  898. photos, stop_in_intersection = self._analyze_group(group_data)
  899. self.photos_group.append(photos)
  900. self.stop_in_intersections.append(stop_in_intersection)
  901. def process_violations(self):
  902. """处理数据并判定违规"""
  903. self.is_vehicle_run_a_red_light()
  904. count_1 = sum(all(photos) for photos in self.photos_group)
  905. count_2 = sum(
  906. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  907. )
  908. self.violation_counts["trafficSignalViolation"] = count_1
  909. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  910. def statistic(self):
  911. """返回统计结果"""
  912. return self.violation_counts
  913. class WarningViolation(object):
  914. """警告性违规类"""
  915. def __init__(self, df_data):
  916. self.traffic_violations_type = "警告性违规类"
  917. print("警告性违规类 类初始化中...")
  918. self.config = df_data.vehicle_config
  919. self.data_ego = df_data.obj_data[1]
  920. self.data = self.data_ego.copy() # 避免修改原始 DataFrame
  921. self.violation_counts = {
  922. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  923. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  924. }
  925. def process_violations(self):
  926. general_road = {3} # 普通道路
  927. lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
  928. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  929. # config = yaml.load(f, Loader=yaml.FullLoader)
  930. car_width = self.config["CAR_WIDTH"]
  931. lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
  932. # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  933. # 使用布尔索引来筛选满足条件的行
  934. condition = (self.data["road_fc"].isin(general_road)) & (
  935. self.data["lane_type"].isin(lane_type)
  936. )
  937. # 创建一个新的列,并根据条件设置值
  938. self.data["is_violation"] = condition
  939. # 统计满足条件的连续时间段
  940. violation_segments = self.count_continuous_violations(
  941. self.data["is_violation"], self.data["simTime"]
  942. )
  943. # 更新骑行车道线违规计数
  944. self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
  945. # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  946. # 计算阈值
  947. threshold = (lane_width - car_width) / 2
  948. # 找到满足条件的行
  949. self.data["is_violation"] = self.data["laneOffset"] > threshold
  950. # 统计满足条件的连续时间段
  951. violation_segments = self.count_continuous_violations(
  952. self.data["is_violation"], self.data["simTime"]
  953. )
  954. # 更新骑行车道线违规计数
  955. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
  956. violation_segments
  957. )
  958. def count_continuous_violations(self, violation_series, time_series):
  959. """统计连续违规的时间段数量"""
  960. continuous_segments = []
  961. current_segment = []
  962. for is_violation, time in zip(violation_series, time_series):
  963. if is_violation:
  964. if not current_segment: # 新的连续段开始
  965. current_segment.append(time)
  966. else:
  967. if current_segment: # 连续段结束
  968. continuous_segments.append(current_segment)
  969. current_segment = []
  970. # 检查是否有一个未结束的连续段在最后
  971. if current_segment:
  972. continuous_segments.append(current_segment)
  973. return continuous_segments
  974. def statistic(self):
  975. # 处理数据
  976. self.process_violations()
  977. # self.logger.info(f"警告性违规类指标统计完成,统计结果:{self.violation_counts}")
  978. return self.violation_counts
  979. class TrafficSignViolation(object):
  980. """交通标志违规类"""
  981. def __init__(self, df_data):
  982. self.traffic_violations_type = "交通标志违规类"
  983. print("交通标志违规类 类初始化中...")
  984. self.data_ego = df_data.obj_data[1]
  985. self.ego_data = (
  986. self.data_ego[TRFFICSIGN_INFO].copy().reset_index(drop=True)
  987. )
  988. self.data_ego = self.data_ego.copy() # 避免修改原始 DataFrame
  989. self.violation_counts = {
  990. "NoStraightThrough": 0, # 禁止直行标志地方直行
  991. "SpeedLimitViolation": 0, # 违反限速规定
  992. "MinimumSpeedLimitViolation": 0, # 违反最低限速规定
  993. }
  994. # def checkForProhibitionViolation(self):
  995. # """禁令标志判断违规:7 禁止直行,12:限制速度"""
  996. # # 筛选出sign_type1为7(禁止直行)
  997. # violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7]
  998. # violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  999. def checkForProhibitionViolation(self):
  1000. """禁令标志判断违规:7 禁止直行,12:限制速度"""
  1001. # 筛选出 sign_type1 为7(禁止直行)的数据
  1002. violation_straight_df = self.data_ego[self.data_ego["sign_type1"] == 7].copy()
  1003. # 判断车辆是否在禁止直行路段直行
  1004. if not violation_straight_df.empty:
  1005. # 按时间戳排序(假设数据按时间顺序处理)
  1006. violation_straight_df = violation_straight_df.sort_values('simTime')
  1007. # 计算航向角变化(前后时间点的差值绝对值)
  1008. violation_straight_df['posH_diff'] = violation_straight_df['posH'].diff().abs()
  1009. # 筛选条件:航向角变化小于阈值(例如5度)且速度不为0
  1010. threshold = 5 # 单位:度(根据场景调整)
  1011. mask = (violation_straight_df['posH_diff'] <= threshold) & (violation_straight_df['v'] > 0)
  1012. straight_violations = violation_straight_df[mask]
  1013. # 统计违规次数或记录违规数据
  1014. self.violation_counts["prohibition_straight"] = len(straight_violations)
  1015. # 限制速度判断(原代码)
  1016. violation_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 12]
  1017. if violation_speed_limit_df.empty:
  1018. mask = self.data_ego["v"] > self.data_ego["sign_speed"]
  1019. self.violation_counts["SpeedLimitViolation"] = len(self.data_ego[mask])
  1020. def checkForInstructionViolation(self):
  1021. """限速标志属于指示性标志:13:最低限速"""
  1022. violation_minimum_speed_limit_df = self.data_ego[self.data_ego["sign_type1"] == 13]
  1023. if violation_minimum_speed_limit_df.empty:
  1024. mask = self.data_ego["v"] < self.data_ego["sign_speed"]
  1025. self.violation_counts["MinimumSpeedLimitViolation"] = len(self.data_ego[mask])
  1026. def statistic(self):
  1027. self.checkForProhibitionViolation()
  1028. self.checkForInstructionViolation()
  1029. # self.logger.info(f"交通标志违规类指标统计完成,统计结果:{self.violation_counts}")
  1030. return self.violation_counts
  1031. class ViolationManager:
  1032. """违规管理类,用于管理所有违规行为"""
  1033. def __init__(self, data_processed):
  1034. self.violations = []
  1035. self.data = data_processed
  1036. self.config = data_processed.traffic_config
  1037. self.over_take_violation = OvertakingViolation(self.data)
  1038. self.slow_down_violation = SlowdownViolation(self.data)
  1039. self.wrong_way_violation = WrongWayViolation(self.data)
  1040. self.speeding_violation = SpeedingViolation(self.data)
  1041. self.traffic_light_violation = TrafficLightViolation(self.data)
  1042. self.warning_violation = WarningViolation(self.data)
  1043. # self.report_statistic()
  1044. def report_statistic(self):
  1045. traffic_result = self.over_take_violation.statistic()
  1046. traffic_result.update(self.slow_down_violation.statistic())
  1047. traffic_result.update(self.traffic_light_violation.statistic())
  1048. traffic_result.update(self.wrong_way_violation.statistic())
  1049. traffic_result.update(self.speeding_violation.statistic())
  1050. traffic_result.update(self.warning_violation.statistic())
  1051. # evaluator = Score(self.config)
  1052. # result = evaluator.evaluate(traffic_result)
  1053. # print("\n[交规类表现及得分情况]")
  1054. # # self.logger.info(f"Traffic Result:{traffic_result}")
  1055. # return result
  1056. return traffic_result
  1057. # 示例使用
  1058. if __name__ == "__main__":
  1059. pass