traffic.py 64 KB


  1. # ... 保留原有导入和常量定义 ...
  2. import math
  3. import operator
  4. import numpy as np
  5. import pandas as pd
  6. from typing import Dict, Any, List, Optional
  7. from modules.lib import log_manager
  8. from modules.lib.score import Score
  9. from modules.lib.log_manager import LogManager
  10. from modules.lib import data_process
  11. OVERTAKE_INFO = [
  12. "simTime",
  13. "simFrame",
  14. "playerId",
  15. "speedX",
  16. "speedY",
  17. "posX",
  18. "posY",
  19. "posH",
  20. "lane_id",
  21. "lane_type",
  22. "road_type",
  23. "interid",
  24. "crossid",
  25. ]
  26. SLOWDOWN_INFO = [
  27. "simTime",
  28. "simFrame",
  29. "playerId",
  30. "speedX",
  31. "speedY",
  32. "posX",
  33. "posY",
  34. "crossid",
  35. "lane_type",
  36. ]
  37. TURNAROUND_INFO = [
  38. "simTime",
  39. "simFrame",
  40. "playerId",
  41. "speedX",
  42. "speedY",
  43. "posX",
  44. "posY",
  45. "sign_type1",
  46. "lane_type",
  47. ]
  48. TRFFICSIGN_INFO = [
  49. "simTime",
  50. "simFrame",
  51. "playerId",
  52. "speedX",
  53. "speedY",
  54. "v",
  55. "posX",
  56. "posY",
  57. "sign_type1",
  58. "sign_ref_link",
  59. "sign_x",
  60. "sign_y",
  61. ]
  62. # 修改指标函数名称为 calculate_xxx 格式
  63. def calculate_overtake_when_passing_car(data_processed):
  64. """计算会车时超车指标"""
  65. overtakingviolation = OvertakingViolation(data_processed)
  66. overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
  67. return {"overtake_when_passing_car": overtake_when_passing_car_count}
  68. def calculate_overtake_on_right(data_processed):
  69. """计算右侧超车指标"""
  70. overtakingviolation = OvertakingViolation(data_processed)
  71. overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
  72. return {"overtake_on_right": overtake_on_right_count}
  73. def calculate_overtake_when_turn_around(data_processed):
  74. """计算掉头时超车指标"""
  75. overtakingviolation = OvertakingViolation(data_processed)
  76. overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
  77. return {"overtake_when_turn_around": overtake_when_turn_around_count}
  78. def calculate_overtake_in_forbid_lane(data_processed):
  79. """计算在禁止车道超车指标"""
  80. overtakingviolation = OvertakingViolation(data_processed)
  81. overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
  82. return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
  83. def calculate_overtake_in_ramp(data_processed):
  84. """计算在匝道超车指标"""
  85. overtakingviolation = OvertakingViolation(data_processed)
  86. overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
  87. return {"overtake_in_ramp": overtake_in_ramp_area_count}
  88. def calculate_overtake_in_tunnel(data_processed):
  89. """计算在隧道超车指标"""
  90. overtakingviolation = OvertakingViolation(data_processed)
  91. overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
  92. return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
  93. def calculate_overtake_on_accelerate_lane(data_processed):
  94. """计算在加速车道超车指标"""
  95. overtakingviolation = OvertakingViolation(data_processed)
  96. overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
  97. return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
  98. def calculate_overtake_on_decelerate_lane(data_processed):
  99. """计算在减速车道超车指标"""
  100. overtakingviolation = OvertakingViolation(data_processed)
  101. overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
  102. return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
  103. def calculate_overtake_in_different_senerios(data_processed):
  104. """计算在不同场景超车指标"""
  105. overtakingviolation = OvertakingViolation(data_processed)
  106. overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
  107. return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
  108. def calculate_slow_down_in_crosswalk(data_processed):
  109. """计算在人行横道减速指标"""
  110. slowdownviolation = SlowdownViolation(data_processed)
  111. slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
  112. return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
  113. def calculate_avoid_pedestrian_in_crosswalk(data_processed):
  114. """计算在人行横道避让行人指标"""
  115. avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
  116. avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
  117. return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
  118. def calculate_avoid_pedestrian_in_the_road(data_processed):
  119. """计算在道路上避让行人指标"""
  120. avoidpedestrianintheroad = SlowdownViolation(data_processed)
  121. avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
  122. return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
  123. def calculate_avoid_pedestrian_when_turning(data_processed):
  124. """计算转弯时避让行人指标"""
  125. avoidpedestrianwhenturning = SlowdownViolation(data_processed)
  126. avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
  127. return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
  128. def calculate_turn_in_forbiden_turn_left_sign(data_processed):
  129. """计算在禁止左转标志处左转指标"""
  130. turnaroundviolation = TurnaroundViolation(data_processed)
  131. turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
  132. return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count}
  133. def calculate_turn_in_forbiden_turn_back_sign(data_processed):
  134. """计算在禁止掉头标志处掉头指标"""
  135. turnaroundviolation = TurnaroundViolation(data_processed)
  136. turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
  137. return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count}
  138. def calculate_avoid_pedestrian_when_turn_back(data_processed):
  139. """计算掉头时避让行人指标"""
  140. turnaroundviolation = TurnaroundViolation(data_processed)
  141. avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
  142. return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count}
  143. def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed):
  144. """计算城市快速路或高速公路行车道停车指标"""
  145. wrongwayviolation = WrongWayViolation(data_processed)
  146. urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  147. return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
  148. def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed):
  149. """计算城市快速路或高速公路应急车道停车指标"""
  150. wrongwayviolation = WrongWayViolation(data_processed)
  151. urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  152. return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
  153. def calculate_urbanexpresswayemergencylanedriving(data_processed):
  154. """计算城市快速路应急车道行驶指标"""
  155. wrongwayviolation = WrongWayViolation(data_processed)
  156. urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
  157. return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
  158. def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed):
  159. """计算城市快速路或高速公路超速50%以上指标"""
  160. speedingviolation = SpeedingViolation(data_processed)
  161. urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
  162. return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
  163. def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed):
  164. """计算城市快速路或高速公路超速20%-50%指标"""
  165. speedingviolation = SpeedingViolation(data_processed)
  166. urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
  167. return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
  168. def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed):
  169. """计算城市快速路或高速公路超速0-20%指标"""
  170. speedingviolation = SpeedingViolation(data_processed)
  171. urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
  172. return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
  173. def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed):
  174. """计算城市快速路或高速公路低于最低限速指标"""
  175. speedingviolation = SpeedingViolation(data_processed)
  176. urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
  177. return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
  178. def calculate_generalroadspeedoverlimit50(data_processed):
  179. """计算一般道路超速50%以上指标"""
  180. speedingviolation = SpeedingViolation(data_processed)
  181. generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
  182. return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
  183. def calculate_generalroadspeedoverlimit20to50(data_processed):
  184. """计算一般道路超速20%-50%指标"""
  185. speedingviolation = SpeedingViolation(data_processed)
  186. generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
  187. return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
  188. def calculate_trafficsignalviolation(data_processed):
  189. """计算交通信号违规指标"""
  190. trafficlightviolation = TrafficLightViolation(data_processed)
  191. trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
  192. return {"trafficSignalViolation": trafficSignalViolation_count}
  193. def calculate_illegaldrivingorparkingatcrossroads(data_processed):
  194. """计算交叉路口违法行驶或停车指标"""
  195. trafficlightviolation = TrafficLightViolation(data_processed)
  196. illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
  197. return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
  198. def calculate_generalroadirregularlaneuse(data_processed):
  199. """计算一般道路不按规定车道行驶指标"""
  200. warningviolation = WarningViolation(data_processed)
  201. generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
  202. return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
  203. def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
  204. """计算城市快速路或高速公路骑车道线行驶指标"""
  205. warningviolation = WarningViolation(data_processed)
  206. urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider()
  207. return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
  208. def calculate_nostraightthrough(data_processed):
  209. """计算禁止直行标志牌处直行指标"""
  210. trafficsignviolation = TrafficSignViolation(data_processed)
  211. noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count()
  212. return {"NoStraightThrough": noStraightThrough_count}
  213. def calculate_speedlimitviolation(data_processed):
  214. """计算违反限速规定指标"""
  215. trafficsignviolation = TrafficSignViolation(data_processed)
  216. SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count()
  217. return {"SpeedLimitViolation": SpeedLimitViolation_count}
  218. def calculate_minimumspeedlimitviolation(data_processed):
  219. """计算违反最低限速规定指标"""
  220. trafficsignviolation = TrafficSignViolation(data_processed)
  221. calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
  222. return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
  223. # ... 保留原有类定义 ...
  224. # 修改 TrafficRegistry 类的 _build_registry 方法
  225. class TrafficRegistry:
  226. """交通违规指标注册器"""
  227. def __init__(self, data_processed):
  228. self.logger = LogManager().get_logger()
  229. self.data = data_processed
  230. self.traffic_config = data_processed.traffic_config["traffic"]
  231. self.metrics = self._extract_metrics(self.traffic_config)
  232. self._registry = self._build_registry()
  233. def _extract_metrics(self, config_node: dict) -> list:
  234. """从配置中提取指标名称"""
  235. metrics = []
  236. def _recurse(node):
  237. if isinstance(node, dict):
  238. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  239. metrics.append(node['name'])
  240. for v in node.values():
  241. _recurse(v)
  242. _recurse(config_node)
  243. self.logger.info(f'评比的交通违规指标列表:{metrics}')
  244. return metrics
  245. def _build_registry(self) -> dict:
  246. """构建指标函数注册表"""
  247. registry = {}
  248. for metric_name in self.metrics:
  249. func_name = f"calculate_{metric_name.lower()}"
  250. try:
  251. registry[metric_name] = globals()[func_name]
  252. except KeyError:
  253. self.logger.error(f"未实现交通违规指标函数: {func_name}")
  254. return registry
  255. def batch_execute(self) -> dict:
  256. """批量执行指标计算"""
  257. results = {}
  258. for name, func in self._registry.items():
  259. try:
  260. result = func(self.data)
  261. results.update(result)
  262. # 新增:将每个指标的结果写入日志
  263. self.logger.info(f'交通违规指标[{name}]计算结果: {result}')
  264. except Exception as e:
  265. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  266. results[name] = None
  267. self.logger.info(f'交通违规指标计算结果:{results}')
  268. return results
  269. class TrafficManager:
  270. """交通违规指标管理类"""
  271. def __init__(self, data_processed):
  272. self.data = data_processed
  273. self.logger = LogManager().get_logger()
  274. self.registry = TrafficRegistry(self.data)
  275. def report_statistic(self):
  276. """计算并报告交通违规指标结果"""
  277. traffic_result = self.registry.batch_execute()
  278. return traffic_result
  279. # ... 保留原有类定义和实现 ...
  280. class OvertakingViolation(object):
  281. """超车违规类"""
  282. def __init__(self, df_data):
  283. print("超车违规类初始化中...")
  284. self.traffic_violations_type = "超车违规类"
  285. # self.logger = log.get_logger() # 使用时再初始化
  286. self.data = df_data.obj_data[1]
  287. self.ego_data = (
  288. self.data[OVERTAKE_INFO].copy().reset_index(drop=True)
  289. ) # Copy to avoid modifying the original DataFrame
  290. header = self.ego_data.columns
  291. if 2 in df_data.obj_id_list:
  292. self.data_obj = df_data.obj_data[2]
  293. self.obj_data = (
  294. self.data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  295. ) # Copy to avoid modifying the original DataFrame
  296. else:
  297. self.obj_data = pd.DataFrame(columns=header)
  298. if 3 in df_data.obj_id_list:
  299. self.other_obj_data1 = df_data.obj_data[3]
  300. self.other_obj_data = (
  301. self.other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  302. )
  303. else:
  304. self.other_obj_data = pd.DataFrame(columns=header)
  305. self.overtake_on_right_count = 0
  306. self.overtake_when_turn_around_count = 0
  307. self.overtake_when_passing_car_count = 0
  308. self.overtake_in_forbid_lane_count = 0
  309. self.overtake_in_ramp_count = 0
  310. self.overtake_in_tunnel_count = 0
  311. self.overtake_on_accelerate_lane_count = 0
  312. self.overtake_on_decelerate_lane_count = 0
  313. self.overtake_in_different_senerios_count = 0
  314. def different_road_area_simtime(self, df, threshold=0.5):
  315. if not df:
  316. return []
  317. simtime_group = []
  318. current_simtime_group = [df[0]]
  319. for i in range(1, len(df)):
  320. if abs(df[i] - df[i - 1]) <= threshold:
  321. current_simtime_group.append(df[i])
  322. else:
  323. simtime_group.append(current_simtime_group)
  324. current_simtime_group = [df[i]]
  325. simtime_group.append(current_simtime_group)
  326. return simtime_group
  327. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  328. lane_start = lane_id[0]
  329. lane_end = lane_id[-1]
  330. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  331. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  332. return lane_start == lane_end and start_condition and end_condition
  333. def _is_dxy_of_car(self, ego_df, obj_df):
  334. """
  335. :param df: objstate.csv and so on
  336. :param id: playerId
  337. :param string_type: posX/Y or speedX/Y and so on
  338. :return: dataframe of dx/y and so on
  339. """
  340. car_dx = obj_df["posX"].values - ego_df["posX"].values
  341. car_dy = obj_df["posY"].values - ego_df["posY"].values
  342. return car_dx, car_dy
  343. # 在前车右侧超车、会车时超车、前车掉头时超车
  344. def illegal_overtake_with_car_detector(self, window_width=250):
  345. # 获取csv文件中最短的帧数
  346. frame_id_length = len(self.ego_data["simFrame"])
  347. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  348. while (start_frame_id + window_width) < frame_id_length:
  349. simframe_window1 = list(
  350. np.arange(start_frame_id, start_frame_id + window_width)
  351. )
  352. simframe_window = list(map(int, simframe_window1))
  353. # 读取滑动窗口的dataframe数据
  354. ego_data_frames = self.ego_data[
  355. self.ego_data["simFrame"].isin(simframe_window)
  356. ]
  357. obj_data_frames = self.obj_data[
  358. self.obj_data["simFrame"].isin(simframe_window)
  359. ]
  360. other_data_frames = self.other_obj_data[
  361. self.other_obj_data["simFrame"].isin(simframe_window)
  362. ]
  363. # 读取前后的laneId
  364. lane_id = ego_data_frames["lane_id"].tolist()
  365. # 读取前后方向盘转角steeringWheel,
  366. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  367. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  368. # 读取车辆前后的位置信息
  369. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  370. ego_speedx = ego_data_frames["speedX"].tolist()
  371. ego_speedy = ego_data_frames["speedY"].tolist()
  372. obj_speedx = obj_data_frames[obj_data_frames["playerId"] == 2][
  373. "speedX"
  374. ].tolist()
  375. obj_speedy = obj_data_frames[obj_data_frames["playerId"] == 2][
  376. "speedY"
  377. ].tolist()
  378. if len(other_data_frames) > 0:
  379. other_start_speedx = other_data_frames["speedX"].iloc[0]
  380. other_start_speedy = other_data_frames["speedY"].iloc[0]
  381. if (
  382. ego_speedx[0] * other_start_speedx
  383. + ego_speedy[0] * other_start_speedy
  384. < 0
  385. ):
  386. self.overtake_when_passing_car_count += self._is_overtake(
  387. lane_id, dx, dy, ego_speedx, ego_speedy
  388. )
  389. start_frame_id += window_width
  390. """
  391. 如果滑动窗口开始和最后的laneid一致;
  392. 方向盘转角前后方向相反(开始方向盘转角向右后来方向盘转角向左);
  393. 自车和前车的位置发生的交换;
  394. 则认为右超车
  395. """
  396. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  397. self.overtake_on_right_count += self._is_overtake(
  398. lane_id, dx, dy, ego_speedx, ego_speedy
  399. )
  400. start_frame_id += window_width
  401. elif ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  402. self.overtake_when_turn_around_count += self._is_overtake(
  403. lane_id, dx, dy, ego_speedx, ego_speedy
  404. )
  405. start_frame_id += window_width
  406. else:
  407. start_frame_id += 1
  408. # 借道超车场景
  409. def overtake_in_forbid_lane_detector(self):
  410. simTime = self.obj_data["simTime"].tolist()
  411. simtime_devide = self.different_road_area_simtime(simTime)
  412. for simtime in simtime_devide:
  413. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  414. try:
  415. lane_type = lane_overtake["lane_type"].tolist()
  416. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  417. 50002 not in lane_type and len(set(lane_type)) > 1
  418. ):
  419. self.overtake_in_forbid_lane_count += 1
  420. except Exception as e:
  421. print("数据缺少lane_type信息")
  422. # print(f"在不该占用车道超车{self.overtake_in_forbid_lane_count}次")
  423. # 在匝道超车
  424. def overtake_in_ramp_area_detector(self):
  425. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  426. "simTime"
  427. ].tolist()
  428. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  429. for ramp_simtime in ramp_simTime_list:
  430. lane_id = self.ego_data["lane_id"].tolist()
  431. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  432. objstate_in_ramp = self.obj_data[
  433. self.obj_data["simTime"].isin(ramp_simtime)
  434. ]
  435. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  436. ego_speedx = ego_in_ramp["speedX"].tolist()
  437. ego_speedy = ego_in_ramp["speedY"].tolist()
  438. if len(lane_id) > 0:
  439. self.overtake_in_ramp_count += self._is_overtake(
  440. lane_id, dx, dy, ego_speedx, ego_speedy
  441. )
  442. else:
  443. continue
  444. # print(f"在匝道超车{self.overtake_in_ramp_count}次")
  445. def overtake_in_tunnel_area_detector(self):
  446. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  447. "simTime"
  448. ].tolist()
  449. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  450. for tunnel_simtime in tunnel_simTime_list:
  451. lane_id = self.ego_data["lane_id"].tolist()
  452. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  453. objstate_in_tunnel = self.obj_data[
  454. self.obj_data["simTime"].isin(tunnel_simtime)
  455. ]
  456. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  457. ego_speedx = ego_in_tunnel["speedX"].tolist()
  458. ego_speedy = ego_in_tunnel["speedY"].tolist()
  459. if len(lane_id) > 0:
  460. self.overtake_in_tunnel_count += self._is_overtake(
  461. lane_id, dx, dy, ego_speedx, ego_speedy
  462. )
  463. else:
  464. continue
  465. # print(f"在隧道超车{self.overtake_in_tunnel_count}次")
  466. # 加速车道超车
  467. def overtake_on_accelerate_lane_detector(self):
  468. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  469. "simTime"
  470. ].tolist()
  471. accelerate_simTime_list = self.different_road_area_simtime(
  472. accelerate_simtime_list
  473. )
  474. for accelerate_simtime in accelerate_simTime_list:
  475. lane_id = self.ego_data["lane_id"].tolist()
  476. ego_in_accelerate = self.ego_data[
  477. self.ego_data["simTime"].isin(accelerate_simtime)
  478. ]
  479. objstate_in_accelerate = self.obj_data[
  480. self.obj_data["simTime"].isin(accelerate_simtime)
  481. ]
  482. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  483. ego_speedx = ego_in_accelerate["speedX"].tolist()
  484. ego_speedy = ego_in_accelerate["speedY"].tolist()
  485. self.overtake_on_accelerate_lane_count += self._is_overtake(
  486. lane_id, dx, dy, ego_speedx, ego_speedy
  487. )
  488. # print(f"在加速车道超车{self.overtake_on_accelerate_lane_count}次")
  489. # 减速车道超车
  490. def overtake_on_decelerate_lane_detector(self):
  491. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  492. "simTime"
  493. ].tolist()
  494. decelerate_simTime_list = self.different_road_area_simtime(
  495. decelerate_simtime_list
  496. )
  497. for decelerate_simtime in decelerate_simTime_list:
  498. lane_id = self.ego_data["id"].tolist()
  499. ego_in_decelerate = self.ego_data[
  500. self.ego_data["simTime"].isin(decelerate_simtime)
  501. ]
  502. objstate_in_decelerate = self.obj_data[
  503. self.obj_data["simTime"].isin(decelerate_simtime)
  504. ]
  505. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  506. ego_speedx = ego_in_decelerate["speedX"].tolist()
  507. ego_speedy = ego_in_decelerate["speedY"].tolist()
  508. self.overtake_on_decelerate_lane_count += self._is_overtake(
  509. lane_id, dx, dy, ego_speedx, ego_speedy
  510. )
  511. # print(f"在减速车道超车{self.overtake_on_decelerate_lane_count}次")
  512. # 在交叉路口
  513. def overtake_in_different_senerios_detector(self):
  514. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  515. "simTime"
  516. ].tolist() # 判断是路口或者隧道区域
  517. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  518. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  519. crossroad_objstate = self.obj_data[
  520. self.obj_data["simTime"].isin(crossroad_simTime)
  521. ]
  522. # 读取前后的laneId
  523. lane_id = crossroad_ego["lane_id"].tolist()
  524. # 读取车辆前后的位置信息
  525. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  526. ego_speedx = crossroad_ego["speedX"].tolist()
  527. ego_speedy = crossroad_ego["speedY"].tolist()
  528. """
  529. 如果滑动窗口开始和最后的laneid一致;
  530. 自车和前车的位置发生的交换;
  531. 则认为发生超车
  532. """
  533. if len(lane_id) > 0:
  534. self.overtake_in_different_senerios_count += self._is_overtake(
  535. lane_id, dx, dy, ego_speedx, ego_speedy
  536. )
  537. else:
  538. pass
  539. def calculate_overtake_when_passing_car_count(self):
  540. self.illegal_overtake_with_car_detector()
  541. return self.overtake_when_passing_car_count
  542. def calculate_overtake_on_right_count(self):
  543. self.illegal_overtake_with_car_detector()
  544. return self.overtake_on_right_count
  545. def calculate_overtake_when_turn_around_count(self):
  546. self.illegal_overtake_with_car_detector()
  547. return self.overtake_when_turn_around_count
  548. def calculate_overtake_in_forbid_lane_count(self):
  549. self.overtake_in_forbid_lane_detector()
  550. return self.overtake_in_forbid_lane_count
  551. def calculate_overtake_in_ramp_area_count(self):
  552. self.overtake_in_ramp_area_detector()
  553. return self.overtake_in_ramp_count
  554. def calculate_overtake_in_tunnel_area_count(self):
  555. self.overtake_in_tunnel_area_detector()
  556. return self.overtake_in_tunnel_count
  557. def calculate_overtake_on_accelerate_lane_count(self):
  558. self.overtake_on_accelerate_lane_detector()
  559. return self.overtake_on_accelerate_lane_count
  560. def calculate_overtake_on_decelerate_lane_count(self):
  561. self.overtake_on_decelerate_lane_detector()
  562. return self.overtake_on_decelerate_lane_count
  563. def calculate_overtake_in_different_senerios_count(self):
  564. self.overtake_in_different_senerios_detector()
  565. return self.overtake_in_different_senerios_count
  566. class SlowdownViolation(object):
  567. """减速让行违规类"""
  568. def __init__(self, df_data):
  569. print("减速让行违规类-------------------------")
  570. self.traffic_violations_type = "减速让行违规类"
  571. self.object_items = []
  572. self.data = df_data.obj_data[1]
  573. self.ego_data = (
  574. self.data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  575. ) # Copy to avoid modifying the original DataFrame
  576. self.pedestrian_data = pd.DataFrame()
  577. self.object_items = set(df_data.object_df.type.tolist())
  578. if 13 in self.object_items: # 行人的type是13
  579. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  580. self.pedestrian_data = (
  581. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  582. )
  583. self.slow_down_in_crosswalk_count = 0
  584. self.avoid_pedestrian_in_crosswalk_count = 0
  585. self.avoid_pedestrian_in_the_road_count = 0
  586. self.aviod_pedestrian_when_turning_count = 0
  587. def pedestrian_in_front_of_car(self):
  588. if len(self.pedestrian_data) == 0:
  589. return []
  590. else:
  591. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  592. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  593. self.ego_data["dist"] = np.sqrt(
  594. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  595. )
  596. self.ego_data["rela_pos"] = (
  597. self.ego_data["dx"] * self.ego_data["speedX"]
  598. + self.ego_data["dy"] * self.ego_data["speedY"]
  599. )
  600. simtime = self.ego_data[
  601. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  602. ]["simTime"].tolist()
  603. return simtime
  604. def different_road_area_simtime(self, df, threshold=0.6):
  605. if not df:
  606. return []
  607. simtime_group = []
  608. current_simtime_group = [df[0]]
  609. for i in range(1, len(df)):
  610. if abs(df[i] - df[i - 1]) <= threshold:
  611. current_simtime_group.append(df[i])
  612. else:
  613. simtime_group.append(current_simtime_group)
  614. current_simtime_group = [df[i]]
  615. simtime_group.append(current_simtime_group)
  616. return simtime_group
  617. def slow_down_in_crosswalk_detector(self):
  618. # 筛选出路口或隧道区域的时间点
  619. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  620. "simTime"
  621. ].tolist()
  622. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  623. for crosswalk_simtime in crosswalk_simTime_divide:
  624. # 筛选出当前时间段内的数据
  625. # start_time, end_time = crosswalk_simtime
  626. start_time = crosswalk_simtime[0]
  627. end_time = crosswalk_simtime[-1]
  628. print(f"当前时间段:{start_time} - {end_time}")
  629. crosswalk_objstate = self.ego_data[
  630. (self.ego_data["simTime"] >= start_time)
  631. & (self.ego_data["simTime"] <= end_time)
  632. ]
  633. # 计算车辆速度
  634. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  635. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  636. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  637. # 判断是否超速
  638. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  639. self.slow_down_in_crosswalk_count += 1
  640. # 输出总次数
  641. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  642. def avoid_pedestrian_in_crosswalk_detector(self):
  643. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  644. "simTime"
  645. ].tolist()
  646. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  647. for crosswalk_simtime in crosswalk_simTime_devide:
  648. if not self.pedestrian_data.empty:
  649. crosswalk_objstate = self.pedestrian_data[
  650. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  651. ]
  652. else:
  653. crosswalk_objstate = pd.DataFrame()
  654. if len(crosswalk_objstate) > 0:
  655. pedestrian_simtime = crosswalk_objstate["simTime"]
  656. pedestrian_objstate = crosswalk_objstate[
  657. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  658. ]
  659. ego_speed = np.sqrt(
  660. pedestrian_objstate["speedX"] ** 2
  661. + pedestrian_objstate["speedY"] ** 2
  662. )
  663. if ego_speed.any() > 0:
  664. self.avoid_pedestrian_in_crosswalk_count += 1
  665. def avoid_pedestrian_in_the_road_detector(self):
  666. simtime = self.pedestrian_in_front_of_car()
  667. if len(simtime) == 0:
  668. self.avoid_pedestrian_in_the_road_count += 0
  669. else:
  670. pedestrian_on_the_road = self.pedestrian_data[
  671. self.pedestrian_data["simTime"].isin(simtime)
  672. ]
  673. simTime = pedestrian_on_the_road["simTime"].tolist()
  674. simTime_devide = self.different_road_area_simtime(simTime)
  675. for simtime1 in simTime_devide:
  676. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  677. pedestrian_on_the_road["simTime"].isin(simtime1)
  678. ]
  679. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  680. dist = np.sqrt(
  681. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  682. ** 2
  683. + (
  684. ego_car["posY"].values
  685. - sub_pedestrian_on_the_road["posY"].values
  686. )
  687. ** 2
  688. )
  689. speed = np.sqrt(
  690. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  691. )
  692. data = {"dist": dist, "speed": speed}
  693. new_ego_car = pd.DataFrame(data)
  694. new_ego_car = new_ego_car.assign(
  695. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  696. )
  697. if new_ego_car["Column3"].any():
  698. self.avoid_pedestrian_in_the_road_count += 1
  699. def aviod_pedestrian_when_turning_detector(self):
  700. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  701. if len(pedestrian_simtime_list) > 0:
  702. simtime_list = self.ego_data[
  703. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  704. & (self.ego_data["lane_type"] == 20)
  705. ]["simTime"].tolist()
  706. simTime_list = self.different_road_area_simtime(simtime_list)
  707. pedestrian_on_the_road = self.pedestrian_data[
  708. self.pedestrian_data["simTime"].isin(simtime_list)
  709. ]
  710. for simtime in simTime_list:
  711. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  712. pedestrian_on_the_road["simTime"].isin(simtime)
  713. ]
  714. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  715. ego_car["dist"] = np.sqrt(
  716. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  717. ** 2
  718. + (
  719. ego_car["posY"].values
  720. - sub_pedestrian_on_the_road["posY"].values
  721. )
  722. ** 2
  723. )
  724. ego_car["speed"] = np.sqrt(
  725. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  726. )
  727. if any(ego_car["speed"].tolist()) != 0:
  728. self.aviod_pedestrian_when_turning_count += 1
  729. def calculate_slow_down_in_crosswalk_count(self):
  730. self.slow_down_in_crosswalk_detector()
  731. return self.slow_down_in_crosswalk_count
  732. def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
  733. self.avoid_pedestrian_in_crosswalk_detector()
  734. return self.avoid_pedestrian_in_crosswalk_count
  735. def calculate_avoid_pedestrian_in_the_road_count(self):
  736. self.avoid_pedestrian_in_the_road_detector()
  737. return self.avoid_pedestrian_in_the_road_count
  738. def calculate_avoid_pedestrian_when_turning_count(self):
  739. self.aviod_pedestrian_when_turning_detector()
  740. return self.aviod_pedestrian_when_turning_count
  741. class TurnaroundViolation(object):
  742. def __init__(self, df_data):
  743. print("掉头违规类初始化中...")
  744. self.traffic_violations_type = "掉头违规类"
  745. self.data = df_data.obj_data[1]
  746. self.ego_data = (
  747. self.data[TURNAROUND_INFO].copy().reset_index(drop=True)
  748. ) # Copy to avoid modifying the original DataFrame
  749. self.pedestrian_data = pd.DataFrame()
  750. self.object_items = set(df_data.object_df.type.tolist())
  751. if 13 in self.object_items: # 行人的type是13
  752. self.pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  753. self.pedestrian_data = (
  754. self.pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  755. )
  756. self.turning_in_forbiden_turn_back_sign_count = 0
  757. self.turning_in_forbiden_turn_left_sign_count = 0
  758. self.avoid_pedestrian_when_turn_back_count = 0
  759. def pedestrian_in_front_of_car(self):
  760. if len(self.pedestrian_data) == 0:
  761. return []
  762. else:
  763. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  764. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  765. self.ego_data["dist"] = np.sqrt(
  766. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  767. )
  768. self.ego_data["rela_pos"] = (
  769. self.ego_data["dx"] * self.ego_data["speedX"]
  770. + self.ego_data["dy"] * self.ego_data["speedY"]
  771. )
  772. simtime = self.ego_data[
  773. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  774. ]["simTime"].tolist()
  775. return simtime
  776. def different_road_area_simtime(self, df, threshold=0.5):
  777. if not df:
  778. return []
  779. simtime_group = []
  780. current_simtime_group = [df[0]]
  781. for i in range(1, len(df)):
  782. if abs(df[i] - df[i - 1]) <= threshold:
  783. current_simtime_group.append(df[i])
  784. else:
  785. simtime_group.append(current_simtime_group)
  786. current_simtime_group = [df[i]]
  787. simtime_group.append(current_simtime_group)
  788. return simtime_group
  789. def turn_back_in_forbiden_sign_detector(self):
  790. """
  791. 禁止掉头type = 8
  792. """
  793. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  794. "simTime"
  795. ].tolist()
  796. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  797. "simTime"
  798. ].tolist()
  799. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  800. forbiden_turn_back_simTime
  801. )
  802. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  803. forbiden_turn_left_simTime
  804. )
  805. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  806. ego_car1 = self.ego_data.loc[
  807. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  808. ]
  809. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  810. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  811. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  812. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  813. if (
  814. ego_end_speedx1 * ego_start_speedx1
  815. + ego_end_speedy1 * ego_start_speedy1
  816. < 0
  817. ):
  818. self.turning_in_forbiden_turn_back_sign_count += 1
  819. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  820. ego_car2 = self.ego_data.loc[
  821. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  822. ]
  823. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  824. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  825. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  826. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  827. if (
  828. ego_end_speedx2 * ego_start_speedx2
  829. + ego_end_speedy2 * ego_start_speedy2
  830. < 0
  831. ):
  832. self.turning_in_forbiden_turn_left_sign_count += 1
  833. def avoid_pedestrian_when_turn_back_detector(self):
  834. sensor_on_intersection = self.pedestrian_in_front_of_car()
  835. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  836. self.ego_data["lane_type"] == 20
  837. ]["simTime"].tolist()
  838. avoid_pedestrian_when_turn_back_simTime_devide = (
  839. self.different_road_area_simtime(
  840. avoid_pedestrian_when_turn_back_simTime_list
  841. )
  842. )
  843. if len(sensor_on_intersection) > 0:
  844. for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
  845. pedestrian_in_intersection_simtime = self.pedestrian_data[
  846. self.pedestrian_data["simTime"].isin(
  847. avoid_pedestrian_when_turn_back_simtime
  848. )
  849. ].tolist()
  850. ego_df = self.ego_data[
  851. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  852. ].reset_index(drop=True)
  853. pedestrian_df = self.pedestrian_data[
  854. self.pedestrian_data["simTime"].isin(
  855. pedestrian_in_intersection_simtime
  856. )
  857. ].reset_index(drop=True)
  858. ego_df["dist"] = np.sqrt(
  859. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  860. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  861. )
  862. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  863. if any(ego_df["speed"].tolist()) != 0:
  864. self.avoid_pedestrian_when_turn_back_count += 1
  865. def calculate_turn_in_forbiden_turn_left_sign_count(self):
  866. self.turn_back_in_forbiden_sign_detector()
  867. return self.turning_in_forbiden_turn_left_sign_count
  868. def calculate_turn_in_forbiden_turn_back_sign_count(self):
  869. self.turn_back_in_forbiden_sign_detector()
  870. return self.turning_in_forbiden_turn_back_sign_count
  871. def calaulate_avoid_pedestrian_when_turn_back_count(self):
  872. self.avoid_pedestrian_when_turn_back_detector()
  873. return self.avoid_pedestrian_when_turn_back_count
  874. class WrongWayViolation:
  875. """停车违规类"""
  876. def __init__(self, df_data):
  877. print("停车违规类初始化中...")
  878. self.traffic_violations_type = "停车违规类"
  879. self.data = df_data.obj_data[1]
  880. # 初始化违规统计
  881. self.violation_count = {
  882. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  883. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  884. "urbanExpresswayEmergencyLaneDriving": 0,
  885. }
  886. def process_violations(self):
  887. """处理停车或者紧急车道行驶违规数据"""
  888. # 提取有效道路类型
  889. urban_expressway_or_highway = {1, 2}
  890. driving_lane = {1, 4, 5, 6}
  891. emergency_lane = {12}
  892. self.data["v"] *= 3.6 # 转换速度
  893. # 使用向量化和条件判断进行违规判定
  894. conditions = [
  895. (
  896. self.data["road_fc"].isin(urban_expressway_or_highway)
  897. & self.data["lane_type"].isin(driving_lane)
  898. & (self.data["v"] == 0)
  899. ),
  900. (
  901. self.data["road_fc"].isin(urban_expressway_or_highway)
  902. & self.data["lane_type"].isin(emergency_lane)
  903. & (self.data["v"] == 0)
  904. ),
  905. (
  906. self.data["road_fc"].isin(urban_expressway_or_highway)
  907. & self.data["lane_type"].isin(emergency_lane)
  908. & (self.data["v"] != 0)
  909. ),
  910. ]
  911. violation_types = [
  912. "urbanExpresswayOrHighwayDrivingLaneStopped",
  913. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  914. "urbanExpresswayEmergencyLaneDriving",
  915. ]
  916. # 设置违规类型
  917. self.data["violation_type"] = None
  918. for condition, violation_type in zip(conditions, violation_types):
  919. self.data.loc[condition, "violation_type"] = violation_type
  920. # 统计违规情况
  921. self.violation_count = (
  922. self.data["violation_type"]
  923. .value_counts()
  924. .reindex(violation_types, fill_value=0)
  925. .to_dict()
  926. )
  927. def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
  928. self.process_violations()
  929. return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
  930. def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
  931. self.process_violations()
  932. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  933. def calculate_urbanExpresswayEmergencyLaneDriving(self):
  934. self.process_violations()
  935. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  936. class SpeedingViolation(object):
  937. """超速违规类"""
  938. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  939. def __init__(self, df_data):
  940. print("超速违规类初始化中...")
  941. self.traffic_violations_type = "超速违规类"
  942. self.data = df_data.obj_data[
  943. 1
  944. ] # Copy to avoid modifying the original DataFrame
  945. # 初始化违规统计
  946. self.violation_counts = {
  947. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  948. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  949. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  950. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  951. "generalRoadSpeedOverLimit50": 0,
  952. "generalRoadSpeedOverLimit20to50": 0,
  953. }
  954. def process_violations(self):
  955. """处理数据帧,检查超速和其他违规行为"""
  956. # 提取有效道路类型
  957. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  958. general_road = {3} # 直接创建包含一个元素的集合
  959. self.data["v"] *= 3.6 # 转换速度
  960. # 违规判定
  961. conditions = [
  962. (
  963. self.data["road_fc"].isin(urban_expressway_or_highway)
  964. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  965. ),
  966. (
  967. self.data["road_fc"].isin(urban_expressway_or_highway)
  968. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  969. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  970. ),
  971. (
  972. self.data["road_fc"].isin(urban_expressway_or_highway)
  973. & (self.data["v"] > self.data["road_speed_max"])
  974. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  975. ),
  976. (
  977. self.data["road_fc"].isin(urban_expressway_or_highway)
  978. & (self.data["v"] < self.data["road_speed_min"])
  979. ),
  980. (
  981. self.data["road_fc"].isin(general_road)
  982. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  983. ),
  984. (
  985. self.data["road_fc"].isin(general_road)
  986. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  987. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  988. ),
  989. ]
  990. violation_types = [
  991. "urbanExpresswayOrHighwaySpeedOverLimit50",
  992. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  993. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  994. "urbanExpresswayOrHighwaySpeedUnderLimit",
  995. "generalRoadSpeedOverLimit50",
  996. "generalRoadSpeedOverLimit20to50",
  997. ]
  998. # 设置违规类型
  999. self.data["violation_type"] = None
  1000. for condition, violation_type in zip(conditions, violation_types):
  1001. self.data.loc[condition, "violation_type"] = violation_type
  1002. # 统计各类违规情况
  1003. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  1004. def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
  1005. self.process_violations()
  1006. return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
  1007. "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
  1008. def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
  1009. self.process_violations()
  1010. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
  1011. "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
  1012. def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
  1013. self.process_violations()
  1014. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
  1015. "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
  1016. def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
  1017. self.process_violations()
  1018. return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
  1019. "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
  1020. def calculate_generalRoadSpeedOverLimit50(self):
  1021. self.process_violations()
  1022. return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
  1023. "generalRoadSpeedOverLimit50") else 0
  1024. def calculate_generalRoadSpeedOverLimit20to50_count(self):
  1025. self.process_violations()
  1026. return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
  1027. "generalRoadSpeedOverLimit20to50") else 0
  1028. class TrafficLightViolation(object):
  1029. """违反交通灯类"""
  1030. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  1031. def __init__(self, df_data):
  1032. """初始化方法"""
  1033. self.traffic_violations_type = "违反交通灯类"
  1034. print("违反交通灯类 类初始化中...")
  1035. self.config = df_data.vehicle_config
  1036. self.data_ego = df_data.ego_data # 获取数据
  1037. self.violation_counts = {
  1038. "trafficSignalViolation": 0,
  1039. "illegalDrivingOrParkingAtCrossroads": 0,
  1040. }
  1041. # 处理数据并判定违规
  1042. self.process_violations()
  1043. def is_point_cross_line(self, point, stop_line_points):
  1044. """
  1045. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  1046. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  1047. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  1048. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  1049. :return: True 如果车辆跨越了停止线,否则 False
  1050. """
  1051. line_vector = np.array(
  1052. [
  1053. stop_line_points[1][0] - stop_line_points[0][0],
  1054. stop_line_points[1][1] - stop_line_points[0][1],
  1055. ]
  1056. )
  1057. point_vector = np.array(
  1058. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  1059. )
  1060. cross_product = np.cross(line_vector, point_vector)
  1061. if cross_product != 0:
  1062. return False
  1063. mid_point = (
  1064. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  1065. + 0.5 * line_vector
  1066. )
  1067. axletree_to_mid_vector = np.array(
  1068. [point[0] - mid_point[0], point[1] - mid_point[1]]
  1069. )
  1070. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  1071. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  1072. norm_direction = np.linalg.norm(direction_vector)
  1073. if norm_axletree_to_mid == 0 or norm_direction == 0:
  1074. return False
  1075. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  1076. norm_axletree_to_mid * norm_direction
  1077. )
  1078. angle_theta = math.degrees(math.acos(cos_theta))
  1079. return angle_theta <= 90
  1080. def _filter_data(self):
  1081. """过滤数据,筛选出需要分析的记录"""
  1082. return self.data_ego[
  1083. (self.data_ego["stopline_id"] != -1)
  1084. & (self.data_ego["stopline_type"] == 1)
  1085. & (self.data_ego["trafficlight_id"] != -1)
  1086. ]
  1087. def _group_data(self, filtered_data):
  1088. """按时间差对数据进行分组"""
  1089. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  1090. threshold = 0.5
  1091. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  1092. return filtered_data.groupby("group")
  1093. def _analyze_group(self, group_data):
  1094. """分析单个分组的数据,判断是否闯红灯"""
  1095. photos = []
  1096. stop_in_intersection = False
  1097. for _, row in group_data.iterrows():
  1098. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  1099. stop_line_points = [
  1100. [row["stopline_x1"], row["stopline_y1"]],
  1101. [row["stopline_x2"], row["stopline_y2"]],
  1102. ]
  1103. traffic_light_status = row["traffic_light_status"]
  1104. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  1105. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  1106. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1107. # config = yaml.load(f, Loader=yaml.FullLoader)
  1108. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  1109. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  1110. dist = math.sqrt(
  1111. (row["posX"] - row["traffic_light_x"]) ** 2
  1112. + (row["posY"] - row["traffic_light_y"]) ** 2
  1113. )
  1114. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  1115. has_crossed_line_front = (
  1116. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  1117. and traffic_light_status == 1
  1118. )
  1119. has_crossed_line_rear = (
  1120. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  1121. and row["v"] > 0
  1122. and traffic_light_status == 1
  1123. )
  1124. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  1125. has_passed_intersection = has_crossed_line_front and dist < 1.0
  1126. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  1127. photos.extend(
  1128. [
  1129. has_crossed_line_front,
  1130. has_crossed_line_rear,
  1131. has_passed_intersection,
  1132. has_stop_in_intersection,
  1133. ]
  1134. )
  1135. stop_in_intersection = has_passed_intersection
  1136. return photos, stop_in_intersection
  1137. def is_vehicle_run_a_red_light(self):
  1138. """判断车辆是否闯红灯"""
  1139. filtered_data = self._filter_data()
  1140. grouped_data = self._group_data(filtered_data)
  1141. self.photos_group = []
  1142. self.stop_in_intersections = []
  1143. for _, group_data in grouped_data:
  1144. photos, stop_in_intersection = self._analyze_group(group_data)
  1145. self.photos_group.append(photos)
  1146. self.stop_in_intersections.append(stop_in_intersection)
  1147. def process_violations(self):
  1148. """处理数据并判定违规"""
  1149. self.is_vehicle_run_a_red_light()
  1150. count_1 = sum(all(photos) for photos in self.photos_group)
  1151. count_2 = sum(
  1152. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  1153. )
  1154. self.violation_counts["trafficSignalViolation"] = count_1
  1155. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  1156. def calculate_trafficSignalViolation_count(self):
  1157. self.process_violations()
  1158. return self.violation_counts["trafficSignalViolation"]
  1159. def calculate_illegalDrivingOrParkingAtCrossroads(self):
  1160. self.process_violations()
  1161. return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
  1162. class WarningViolation(object):
  1163. """警告性违规类"""
  1164. def __init__(self, df_data):
  1165. self.traffic_violations_type = "警告性违规类"
  1166. print("警告性违规类 类初始化中...")
  1167. self.config = df_data.vehicle_config
  1168. self.data_ego = df_data.obj_data[1]
  1169. self.data = self.data_ego.copy() # 避免修改原始 DataFrame
  1170. self.violation_counts = {
  1171. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1172. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1173. }
  1174. def process_violations(self):
  1175. general_road = {3} # 普通道路
  1176. lane_type = {11} # 车道类型 # 10: 机动车道,11: 非机动车道
  1177. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1178. # config = yaml.load(f, Loader=yaml.FullLoader)
  1179. car_width = self.config["CAR_WIDTH"]
  1180. lane_width = self.data["lane_width"] # 假定 'lane_width' 在数据中存在
  1181. # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1182. # 使用布尔索引来筛选满足条件的行
  1183. condition = (self.data["road_fc"].isin(general_road)) & (
  1184. self.data["lane_type"].isin(lane_type)
  1185. )
  1186. # 创建一个新的列,并根据条件设置值
  1187. self.data["is_violation"] = condition
  1188. # 统计满足条件的连续时间段
  1189. violation_segments = self.count_continuous_violations(
  1190. self.data["is_violation"], self.data["simTime"]
  1191. )
  1192. # 更新骑行车道线违规计数
  1193. self.violation_counts["generalRoadIrregularLaneUse"] += len(violation_segments)
  1194. # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1195. # 计算阈值
  1196. threshold = (lane_width - car_width) / 2
  1197. # 找到满足条件的行
  1198. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1199. # 统计满足条件的连续时间段
  1200. violation_segments = self.count_continuous_violations(
  1201. self.data["is_violation"], self.data["simTime"]
  1202. )
  1203. # 更新骑行车道线违规计数
  1204. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] += len(
  1205. violation_segments
  1206. )
  1207. def count_continuous_violations(self, violation_series, time_series):
  1208. """统计连续违规的时间段数量"""
  1209. continuous_segments = []
  1210. current_segment = []
  1211. for is_violation, time in zip(violation_series, time_series):
  1212. if is_violation:
  1213. if not current_segment: # 新的连续段开始
  1214. current_segment.append(time)
  1215. else:
  1216. if current_segment: # 连续段结束
  1217. continuous_segments.append(current_segment)
  1218. current_segment = []
  1219. # 检查是否有一个未结束的连续段在最后
  1220. if current_segment:
  1221. continuous_segments.append(current_segment)
  1222. return continuous_segments
  1223. def calculate_generalRoadIrregularLaneUse_count(self):
  1224. self.process_violations()
  1225. return self.violation_counts["generalRoadIrregularLaneUse"]
  1226. def calculate_urbanExpresswayOrHighwayRideLaneDivider(self):
  1227. self.process_violations()
  1228. return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
  1229. class TrafficSignViolation:
  1230. """交通标志违规类"""
  1231. PROHIBITED_STRAIGHT_THRESHOLD = 5
  1232. SIGN_TYPE_STRAIGHT_PROHIBITED = 7
  1233. SIGN_TYPE_SPEED_LIMIT = 12
  1234. SIGN_TYPE_MIN_SPEED_LIMIT = 13
  1235. def __init__(self, df_data):
  1236. self.traffic_violations_type = "交通标志违规类"
  1237. print("交通标志违规类 初始化中...")
  1238. # 数据预处理
  1239. self._raw_data = df_data.obj_data[1].copy()
  1240. self.data_ego = self._raw_data.sort_values('simTime').reset_index(drop=True)
  1241. # 延迟计算标志
  1242. self._calculated = False
  1243. self._violation_counts = {
  1244. "NoStraightThrough": 0,
  1245. "SpeedLimitViolation": 0,
  1246. "MinimumSpeedLimitViolation": 0
  1247. }
  1248. def _ensure_calculated(self):
  1249. """保证计算只执行一次"""
  1250. if not self._calculated:
  1251. self._check_prohibition_violations()
  1252. self._check_instruction_violations()
  1253. self._calculated = True
  1254. def calculate_NoStraightThrough_count(self):
  1255. """计算禁止直行违规次数"""
  1256. self._ensure_calculated()
  1257. return self._violation_counts["NoStraightThrough"]
  1258. def calculate_SpeedLimitViolation_count(self):
  1259. """计算超速违规次数"""
  1260. self._ensure_calculated()
  1261. return self._violation_counts["SpeedLimitViolation"]
  1262. def calculate_MinimumSpeedLimitViolation_count(self):
  1263. """计算最低限速违规次数"""
  1264. self._ensure_calculated()
  1265. return self._violation_counts["MinimumSpeedLimitViolation"]
  1266. def _check_prohibition_violations(self):
  1267. """处理禁令标志违规(禁止直行和限速)"""
  1268. self._check_straight_violation()
  1269. self._check_speed_violation(
  1270. self.SIGN_TYPE_SPEED_LIMIT,
  1271. operator.gt,
  1272. "SpeedLimitViolation"
  1273. )
  1274. def _check_instruction_violations(self):
  1275. """处理指示标志违规(最低限速)"""
  1276. self._check_speed_violation(
  1277. self.SIGN_TYPE_MIN_SPEED_LIMIT,
  1278. operator.lt,
  1279. "MinimumSpeedLimitViolation"
  1280. )
  1281. def _check_straight_violation(self):
  1282. """检查禁止直行违规"""
  1283. straight_df = self.data_ego[self.data_ego["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
  1284. if not straight_df.empty:
  1285. # 计算航向角变化并填充缺失值
  1286. straight_df = straight_df.copy()
  1287. straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0)
  1288. # 创建筛选条件
  1289. mask = (
  1290. (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) &
  1291. (straight_df['v'] > 0)
  1292. )
  1293. self.violation_counts["NoStraightThrough"] = mask.sum()
  1294. def _check_speed_violation(self, sign_type, compare_op, count_key):
  1295. """通用速度违规检查方法"""
  1296. violation_df = self.data_ego[self.data_ego["sign_type1"] == sign_type]
  1297. if not violation_df.empty:
  1298. mask = compare_op(violation_df['v'], violation_df['sign_speed'])
  1299. self.violation_counts[count_key] = mask.sum()