traffic.py 73 KB


  1. # ... 保留原有导入和常量定义 ...
  2. import math
  3. import operator
  4. import copy
  5. import numpy as np
  6. import pandas as pd
  7. from typing import Dict, Any, List, Optional
  8. from modules.lib import log_manager
  9. from modules.lib.score import Score
  10. from modules.lib.log_manager import LogManager
  11. from modules.lib import data_process
  12. OVERTAKE_INFO = [
  13. "simTime",
  14. "simFrame",
  15. "playerId",
  16. "speedX",
  17. "speedY",
  18. "posX",
  19. "posY",
  20. "posH",
  21. "lane_id",
  22. "lane_type",
  23. "road_type",
  24. "interid",
  25. "crossid",
  26. ]
  27. SLOWDOWN_INFO = [
  28. "simTime",
  29. "simFrame",
  30. "playerId",
  31. "speedX",
  32. "speedY",
  33. "posX",
  34. "posY",
  35. "crossid",
  36. "lane_type",
  37. ]
  38. TURNAROUND_INFO = [
  39. "simTime",
  40. "simFrame",
  41. "playerId",
  42. "speedX",
  43. "speedY",
  44. "posX",
  45. "posY",
  46. "sign_type1",
  47. "lane_type",
  48. ]
  49. TRFFICSIGN_INFO = [
  50. "simTime",
  51. "simFrame",
  52. "playerId",
  53. "speedX",
  54. "speedY",
  55. "v",
  56. "posX",
  57. "posY",
  58. "sign_type1",
  59. "sign_ref_link",
  60. "sign_x",
  61. "sign_y",
  62. ]
  63. # 修改指标函数名称为 calculate_xxx 格式
  64. def calculate_overtake_when_passing_car(data_processed):
  65. """计算会车时超车指标"""
  66. overtakingviolation = OvertakingViolation(data_processed)
  67. overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
  68. return {"overtake_when_passing_car": overtake_when_passing_car_count}
  69. def calculate_overtake_on_right(data_processed):
  70. """计算右侧超车指标"""
  71. overtakingviolation = OvertakingViolation(data_processed)
  72. overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
  73. return {"overtake_on_right": overtake_on_right_count}
  74. def calculate_overtake_when_turn_around(data_processed):
  75. """计算掉头时超车指标"""
  76. overtakingviolation = OvertakingViolation(data_processed)
  77. overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
  78. return {"overtake_when_turn_around": overtake_when_turn_around_count}
  79. def calculate_overtake_in_forbid_lane(data_processed):
  80. """计算在禁止车道超车指标"""
  81. overtakingviolation = OvertakingViolation(data_processed)
  82. overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
  83. return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
  84. def calculate_overtake_in_ramp(data_processed):
  85. """计算在匝道超车指标"""
  86. overtakingviolation = OvertakingViolation(data_processed)
  87. overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
  88. return {"overtake_in_ramp": overtake_in_ramp_area_count}
  89. def calculate_overtake_in_tunnel(data_processed):
  90. """计算在隧道超车指标"""
  91. overtakingviolation = OvertakingViolation(data_processed)
  92. overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
  93. return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
  94. def calculate_overtake_on_accelerate_lane(data_processed):
  95. """计算在加速车道超车指标"""
  96. overtakingviolation = OvertakingViolation(data_processed)
  97. overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
  98. return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
  99. def calculate_overtake_on_decelerate_lane(data_processed):
  100. """计算在减速车道超车指标"""
  101. overtakingviolation = OvertakingViolation(data_processed)
  102. overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
  103. return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
  104. def calculate_overtake_in_different_senerios(data_processed):
  105. """计算在不同场景超车指标"""
  106. overtakingviolation = OvertakingViolation(data_processed)
  107. overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
  108. return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
  109. def calculate_slow_down_in_crosswalk(data_processed):
  110. """计算在人行横道减速指标"""
  111. slowdownviolation = SlowdownViolation(data_processed)
  112. slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
  113. return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
  114. def calculate_avoid_pedestrian_in_crosswalk(data_processed):
  115. """计算在人行横道避让行人指标"""
  116. avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
  117. avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
  118. return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
  119. def calculate_avoid_pedestrian_in_the_road(data_processed):
  120. """计算在道路上避让行人指标"""
  121. avoidpedestrianintheroad = SlowdownViolation(data_processed)
  122. avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
  123. return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
  124. def calculate_avoid_pedestrian_when_turning(data_processed):
  125. """计算转弯时避让行人指标"""
  126. avoidpedestrianwhenturning = SlowdownViolation(data_processed)
  127. avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
  128. return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
  129. def calculate_turn_in_forbiden_turn_left_sign(data_processed):
  130. """计算在禁止左转标志处左转指标"""
  131. turnaroundviolation = TurnaroundViolation(data_processed)
  132. turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
  133. return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count}
  134. def calculate_turn_in_forbiden_turn_back_sign(data_processed):
  135. """计算在禁止掉头标志处掉头指标"""
  136. turnaroundviolation = TurnaroundViolation(data_processed)
  137. turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
  138. return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count}
  139. def calculate_avoid_pedestrian_when_turn_back(data_processed):
  140. """计算掉头时避让行人指标"""
  141. turnaroundviolation = TurnaroundViolation(data_processed)
  142. avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
  143. return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count}
  144. def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed):
  145. """计算城市快速路或高速公路行车道停车指标"""
  146. wrongwayviolation = WrongWayViolation(data_processed)
  147. urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  148. return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
  149. def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed):
  150. """计算城市快速路或高速公路应急车道停车指标"""
  151. wrongwayviolation = WrongWayViolation(data_processed)
  152. urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  153. return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
  154. def calculate_urbanexpresswayemergencylanedriving(data_processed):
  155. """计算城市快速路应急车道行驶指标"""
  156. wrongwayviolation = WrongWayViolation(data_processed)
  157. urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
  158. return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
  159. def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed):
  160. """计算城市快速路或高速公路超速50%以上指标"""
  161. speedingviolation = SpeedingViolation(data_processed)
  162. urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
  163. return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
  164. def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed):
  165. """计算城市快速路或高速公路超速20%-50%指标"""
  166. speedingviolation = SpeedingViolation(data_processed)
  167. urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
  168. return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
  169. def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed):
  170. """计算城市快速路或高速公路超速0-20%指标"""
  171. speedingviolation = SpeedingViolation(data_processed)
  172. urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
  173. return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
  174. def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed):
  175. """计算城市快速路或高速公路低于最低限速指标"""
  176. speedingviolation = SpeedingViolation(data_processed)
  177. urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
  178. return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
  179. def calculate_generalroadspeedoverlimit50(data_processed):
  180. """计算一般道路超速50%以上指标"""
  181. speedingviolation = SpeedingViolation(data_processed)
  182. generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
  183. return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
  184. def calculate_generalroadspeedoverlimit20to50(data_processed):
  185. """计算一般道路超速20%-50%指标"""
  186. speedingviolation = SpeedingViolation(data_processed)
  187. generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
  188. return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
  189. def calculate_trafficsignalviolation(data_processed):
  190. """计算交通信号违规指标"""
  191. trafficlightviolation = TrafficLightViolation(data_processed)
  192. trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
  193. return {"trafficSignalViolation": trafficSignalViolation_count}
  194. def calculate_illegaldrivingorparkingatcrossroads(data_processed):
  195. """计算交叉路口违法行驶或停车指标"""
  196. trafficlightviolation = TrafficLightViolation(data_processed)
  197. illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
  198. return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
  199. def calculate_generalroadirregularlaneuse(data_processed):
  200. """计算一般道路不按规定车道行驶指标"""
  201. warningviolation = WarningViolation(data_processed)
  202. generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
  203. return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
  204. def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
  205. """计算城市快速路或高速公路骑车道线行驶指标"""
  206. warningviolation = WarningViolation(data_processed)
  207. urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider_count()
  208. return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
  209. def calculate_nostraightthrough(data_processed):
  210. """计算禁止直行标志牌处直行指标"""
  211. trafficsignviolation = TrafficSignViolation(data_processed)
  212. noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count()
  213. return {"NoStraightThrough": noStraightThrough_count}
  214. def calculate_speedlimitviolation(data_processed):
  215. """计算违反限速规定指标"""
  216. trafficsignviolation = TrafficSignViolation(data_processed)
  217. SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count()
  218. return {"SpeedLimitViolation": SpeedLimitViolation_count}
  219. def calculate_minimumspeedlimitviolation(data_processed):
  220. """计算违反最低限速规定指标"""
  221. trafficsignviolation = TrafficSignViolation(data_processed)
  222. calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
  223. return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
  224. # 修改 TrafficRegistry 类的 _build_registry 方法
  225. class TrafficRegistry:
  226. """交通违规指标注册器"""
  227. def __init__(self, data_processed):
  228. self.logger = LogManager().get_logger()
  229. self.data = data_processed
  230. self.traffic_config = data_processed.traffic_config["traffic"]
  231. self.metrics = self._extract_metrics(self.traffic_config)
  232. self._registry = self._build_registry()
  233. def _extract_metrics(self, config_node: dict) -> list:
  234. """从配置中提取指标名称"""
  235. metrics = []
  236. def _recurse(node):
  237. if isinstance(node, dict):
  238. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  239. metrics.append(node['name'])
  240. for v in node.values():
  241. _recurse(v)
  242. _recurse(config_node)
  243. self.logger.info(f'评比的交通违规指标列表:{metrics}')
  244. return metrics
  245. def _build_registry(self) -> dict:
  246. """构建指标函数注册表"""
  247. registry = {}
  248. for metric_name in self.metrics:
  249. func_name = f"calculate_{metric_name.lower()}"
  250. try:
  251. registry[metric_name] = globals()[func_name]
  252. except KeyError:
  253. self.logger.error(f"未实现交通违规指标函数: {func_name}")
  254. return registry
  255. def batch_execute(self) -> dict:
  256. """批量执行指标计算"""
  257. results = {}
  258. for name, func in self._registry.items():
  259. try:
  260. result = func(self.data)
  261. results.update(result)
  262. # 新增:将每个指标的结果写入日志
  263. self.logger.info(f'交通违规指标[{name}]计算结果: {result}')
  264. except Exception as e:
  265. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  266. results[name] = None
  267. self.logger.info(f'交通违规指标计算结果:{results}')
  268. return results
  269. class TrafficManager:
  270. """交通违规指标管理类"""
  271. def __init__(self, data_processed):
  272. self.data = data_processed
  273. self.logger = LogManager().get_logger()
  274. self.registry = TrafficRegistry(self.data)
  275. def report_statistic(self):
  276. """计算并报告交通违规指标结果"""
  277. traffic_result = self.registry.batch_execute()
  278. return traffic_result
  279. class OvertakingViolation(object):
  280. """超车违规类"""
  281. def __init__(self, df_data):
  282. print("超车违规类初始化中...")
  283. self.traffic_violations_type = "超车违规类"
  284. # 存储原始数据引用,不进行拷贝
  285. self._raw_data = df_data.obj_data[1] # 自车数据
  286. # 安全获取其他车辆数据
  287. self._data_obj = None
  288. self._other_obj_data1 = None
  289. # 检查是否存在ID为2的对象数据
  290. if 2 in df_data.obj_id_list:
  291. self._data_obj = df_data.obj_data[2]
  292. # 检查是否存在ID为3的对象数据
  293. if 3 in df_data.obj_id_list:
  294. self._other_obj_data1 = df_data.obj_data[3]
  295. # 初始化属性,但不立即创建数据副本
  296. self._ego_data = None
  297. self._obj_data = None
  298. self._other_obj_data = None
  299. # 使用字典统一管理违规计数器
  300. self.violation_counts = {
  301. "overtake_on_right": 0,
  302. "overtake_when_turn_around": 0,
  303. "overtake_when_passing_car": 0,
  304. "overtake_in_forbid_lane": 0,
  305. "overtake_in_ramp": 0,
  306. "overtake_in_tunnel": 0,
  307. "overtake_on_accelerate_lane": 0,
  308. "overtake_on_decelerate_lane": 0,
  309. "overtake_in_different_senerios": 0
  310. }
  311. # 标记计算状态
  312. self._calculated = {
  313. "illegal_overtake": False,
  314. "forbid_lane": False,
  315. "ramp_area": False,
  316. "tunnel_area": False,
  317. "accelerate_lane": False,
  318. "decelerate_lane": False,
  319. "different_senerios": False
  320. }
  321. @property
  322. def ego_data(self):
  323. """懒加载方式获取ego数据,只在首次访问时创建副本"""
  324. if self._ego_data is None:
  325. self._ego_data = self._raw_data[OVERTAKE_INFO].copy().reset_index(drop=True)
  326. return self._ego_data
  327. @property
  328. def obj_data(self):
  329. """懒加载方式获取obj数据"""
  330. if self._obj_data is None:
  331. if self._data_obj is not None:
  332. self._obj_data = self._data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  333. else:
  334. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  335. self._obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  336. return self._obj_data
  337. @property
  338. def other_obj_data(self):
  339. """懒加载方式获取other_obj数据"""
  340. if self._other_obj_data is None:
  341. if self._other_obj_data1 is not None:
  342. self._other_obj_data = self._other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  343. else:
  344. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  345. self._other_obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  346. return self._other_obj_data
  347. def different_road_area_simtime(self, df, threshold=0.5):
  348. if not df:
  349. return []
  350. simtime_group = []
  351. current_simtime_group = [df[0]]
  352. for i in range(1, len(df)):
  353. if abs(df[i] - df[i - 1]) <= threshold:
  354. current_simtime_group.append(df[i])
  355. else:
  356. simtime_group.append(current_simtime_group)
  357. current_simtime_group = [df[i]]
  358. simtime_group.append(current_simtime_group)
  359. return simtime_group
  360. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  361. lane_start = lane_id[0]
  362. lane_end = lane_id[-1]
  363. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  364. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  365. return lane_start == lane_end and start_condition and end_condition
  366. def _is_dxy_of_car(self, ego_df, obj_df):
  367. """
  368. :param df: objstate.csv and so on
  369. :param id: playerId
  370. :param string_type: posX/Y or speedX/Y and so on
  371. :return: dataframe of dx/y and so on
  372. """
  373. car_dx = obj_df["posX"].values - ego_df["posX"].values
  374. car_dy = obj_df["posY"].values - ego_df["posY"].values
  375. return car_dx, car_dy
  376. def illegal_overtake_with_car_detector(self, window_width=250):
  377. """检测超车违规"""
  378. # 如果已经计算过,直接返回
  379. if self._calculated["illegal_overtake"]:
  380. return
  381. # 如果没有其他车辆数据,直接返回,保持默认值0
  382. if self.obj_data.empty:
  383. print("没有其他车辆数据,无法检测超车违规,默认为0")
  384. self._calculated["illegal_overtake"] = True
  385. return
  386. # 获取csv文件中最短的帧数
  387. frame_id_length = len(self.ego_data["simFrame"])
  388. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  389. while (start_frame_id + window_width) < frame_id_length:
  390. simframe_window1 = list(
  391. np.arange(start_frame_id, start_frame_id + window_width)
  392. )
  393. simframe_window = list(map(int, simframe_window1))
  394. # 读取滑动窗口的dataframe数据
  395. ego_data_frames = self.ego_data[
  396. self.ego_data["simFrame"].isin(simframe_window)
  397. ]
  398. # 确保有足够的数据进行处理
  399. if len(ego_data_frames) == 0:
  400. start_frame_id += 1
  401. continue
  402. obj_data_frames = self.obj_data[
  403. self.obj_data["simFrame"].isin(simframe_window)
  404. ]
  405. # 如果没有其他车辆数据,跳过当前窗口
  406. if len(obj_data_frames) == 0:
  407. start_frame_id += 1
  408. continue
  409. other_data_frames = self.other_obj_data[
  410. self.other_obj_data["simFrame"].isin(simframe_window)
  411. ]
  412. # 读取前后的laneId
  413. lane_id = ego_data_frames["lane_id"].tolist()
  414. # 读取前后方向盘转角steeringWheel
  415. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  416. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  417. # 读取车辆前后的位置信息
  418. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  419. ego_speedx = ego_data_frames["speedX"].tolist()
  420. ego_speedy = ego_data_frames["speedY"].tolist()
  421. # 安全获取obj_speedx和obj_speedy
  422. obj_with_id_2 = obj_data_frames[obj_data_frames["playerId"] == 2]
  423. if not obj_with_id_2.empty:
  424. obj_speedx = obj_with_id_2["speedX"].tolist()
  425. obj_speedy = obj_with_id_2["speedY"].tolist()
  426. else:
  427. obj_speedx = []
  428. obj_speedy = []
  429. # 检查会车时超车
  430. if len(other_data_frames) > 0:
  431. other_start_speedx = other_data_frames["speedX"].iloc[0]
  432. other_start_speedy = other_data_frames["speedY"].iloc[0]
  433. if (
  434. ego_speedx[0] * other_start_speedx
  435. + ego_speedy[0] * other_start_speedy
  436. < 0
  437. ):
  438. self.violation_counts["overtake_when_passing_car"] += self._is_overtake(
  439. lane_id, dx, dy, ego_speedx, ego_speedy
  440. )
  441. start_frame_id += window_width
  442. continue
  443. # 检查右侧超车
  444. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  445. self.violation_counts["overtake_on_right"] += self._is_overtake(
  446. lane_id, dx, dy, ego_speedx, ego_speedy
  447. )
  448. start_frame_id += window_width
  449. continue
  450. # 检查掉头时超车
  451. if obj_speedx and obj_speedy: # 确保列表不为空
  452. if ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  453. self.violation_counts["overtake_when_turn_around"] += self._is_overtake(
  454. lane_id, dx, dy, ego_speedx, ego_speedy
  455. )
  456. start_frame_id += window_width
  457. continue
  458. # 如果没有检测到任何违规,移动窗口
  459. start_frame_id += 1
  460. self._calculated["illegal_overtake"] = True
  461. # 借道超车场景
  462. def overtake_in_forbid_lane_detector(self):
  463. """检测借道超车违规"""
  464. # 如果已经计算过,直接返回
  465. if self._calculated["forbid_lane"]:
  466. return
  467. # 如果没有其他车辆数据,直接返回,保持默认值0
  468. if self.obj_data.empty:
  469. print("没有其他车辆数据,无法检测借道超车违规,默认为0")
  470. self._calculated["forbid_lane"] = True
  471. return
  472. simTime = self.obj_data["simTime"].tolist()
  473. simtime_devide = self.different_road_area_simtime(simTime)
  474. for simtime in simtime_devide:
  475. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  476. try:
  477. lane_type = lane_overtake["lane_type"].tolist()
  478. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  479. 50002 not in lane_type and len(set(lane_type)) > 1
  480. ):
  481. self.violation_counts["overtake_in_forbid_lane"] += 1
  482. except Exception as e:
  483. print("数据缺少lane_type信息")
  484. self._calculated["forbid_lane"] = True
  485. # 在匝道超车
  486. def overtake_in_ramp_area_detector(self):
  487. """检测匝道超车违规"""
  488. # 如果已经计算过,直接返回
  489. if self._calculated["ramp_area"]:
  490. return
  491. # 如果没有其他车辆数据,直接返回,保持默认值0
  492. if self.obj_data.empty:
  493. print("没有其他车辆数据,无法检测匝道超车违规,默认为0")
  494. self._calculated["ramp_area"] = True
  495. return
  496. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  497. "simTime"
  498. ].tolist()
  499. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  500. for ramp_simtime in ramp_simTime_list:
  501. lane_id = self.ego_data["lane_id"].tolist()
  502. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  503. objstate_in_ramp = self.obj_data[
  504. self.obj_data["simTime"].isin(ramp_simtime)
  505. ]
  506. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  507. ego_speedx = ego_in_ramp["speedX"].tolist()
  508. ego_speedy = ego_in_ramp["speedY"].tolist()
  509. if len(lane_id) > 0:
  510. self.violation_counts["overtake_in_ramp"] += self._is_overtake(
  511. lane_id, dx, dy, ego_speedx, ego_speedy
  512. )
  513. else:
  514. continue
  515. self._calculated["ramp_area"] = True
  516. def overtake_in_tunnel_area_detector(self):
  517. """检测隧道超车违规"""
  518. # 如果已经计算过,直接返回
  519. if self._calculated["tunnel_area"]:
  520. return
  521. # 如果没有其他车辆数据,直接返回,保持默认值0
  522. if self.obj_data.empty:
  523. print("没有其他车辆数据,无法检测隧道超车违规,默认为0")
  524. self._calculated["tunnel_area"] = True
  525. return
  526. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  527. "simTime"
  528. ].tolist()
  529. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  530. for tunnel_simtime in tunnel_simTime_list:
  531. lane_id = self.ego_data["lane_id"].tolist()
  532. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  533. objstate_in_tunnel = self.obj_data[
  534. self.obj_data["simTime"].isin(tunnel_simtime)
  535. ]
  536. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  537. ego_speedx = ego_in_tunnel["speedX"].tolist()
  538. ego_speedy = ego_in_tunnel["speedY"].tolist()
  539. if len(lane_id) > 0:
  540. self.violation_counts["overtake_in_tunnel"] += self._is_overtake(
  541. lane_id, dx, dy, ego_speedx, ego_speedy
  542. )
  543. else:
  544. continue
  545. self._calculated["tunnel_area"] = True
  546. # 加速车道超车
  547. def overtake_on_accelerate_lane_detector(self):
  548. """检测加速车道超车违规"""
  549. # 如果已经计算过,直接返回
  550. if self._calculated["accelerate_lane"]:
  551. return
  552. # 如果没有其他车辆数据,直接返回,保持默认值0
  553. if self.obj_data.empty:
  554. print("没有其他车辆数据,无法检测加速车道超车违规,默认为0")
  555. self._calculated["accelerate_lane"] = True
  556. return
  557. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  558. "simTime"
  559. ].tolist()
  560. accelerate_simTime_list = self.different_road_area_simtime(
  561. accelerate_simtime_list
  562. )
  563. for accelerate_simtime in accelerate_simTime_list:
  564. lane_id = self.ego_data["lane_id"].tolist()
  565. ego_in_accelerate = self.ego_data[
  566. self.ego_data["simTime"].isin(accelerate_simtime)
  567. ]
  568. objstate_in_accelerate = self.obj_data[
  569. self.obj_data["simTime"].isin(accelerate_simtime)
  570. ]
  571. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  572. ego_speedx = ego_in_accelerate["speedX"].tolist()
  573. ego_speedy = ego_in_accelerate["speedY"].tolist()
  574. self.violation_counts["overtake_on_accelerate_lane"] += self._is_overtake(
  575. lane_id, dx, dy, ego_speedx, ego_speedy
  576. )
  577. self._calculated["accelerate_lane"] = True
  578. # 减速车道超车
  579. def overtake_on_decelerate_lane_detector(self):
  580. """检测减速车道超车违规"""
  581. # 如果已经计算过,直接返回
  582. if self._calculated["decelerate_lane"]:
  583. return
  584. # 如果没有其他车辆数据,直接返回,保持默认值0
  585. if self.obj_data.empty:
  586. print("没有其他车辆数据,无法检测减速车道超车违规,默认为0")
  587. self._calculated["decelerate_lane"] = True
  588. return
  589. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  590. "simTime"
  591. ].tolist()
  592. decelerate_simTime_list = self.different_road_area_simtime(
  593. decelerate_simtime_list
  594. )
  595. for decelerate_simtime in decelerate_simTime_list:
  596. lane_id = self.ego_data["id"].tolist()
  597. ego_in_decelerate = self.ego_data[
  598. self.ego_data["simTime"].isin(decelerate_simtime)
  599. ]
  600. objstate_in_decelerate = self.obj_data[
  601. self.obj_data["simTime"].isin(decelerate_simtime)
  602. ]
  603. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  604. ego_speedx = ego_in_decelerate["speedX"].tolist()
  605. ego_speedy = ego_in_decelerate["speedY"].tolist()
  606. self.violation_counts["overtake_on_decelerate_lane"] += self._is_overtake(
  607. lane_id, dx, dy, ego_speedx, ego_speedy
  608. )
  609. self._calculated["decelerate_lane"] = True
  610. # 在交叉路口
  611. def overtake_in_different_senerios_detector(self):
  612. """检测不同场景超车违规"""
  613. # 如果已经计算过,直接返回
  614. if self._calculated["different_senerios"]:
  615. return
  616. # 如果没有其他车辆数据,直接返回,保持默认值0
  617. if self.obj_data.empty:
  618. print("没有其他车辆数据,无法检测不同场景超车违规,默认为0")
  619. self._calculated["different_senerios"] = True
  620. return
  621. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  622. "simTime"
  623. ].tolist() # 判断是路口或者隧道区域
  624. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  625. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  626. crossroad_objstate = self.obj_data[
  627. self.obj_data["simTime"].isin(crossroad_simTime)
  628. ]
  629. # 读取前后的laneId
  630. lane_id = crossroad_ego["lane_id"].tolist()
  631. # 读取车辆前后的位置信息
  632. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  633. ego_speedx = crossroad_ego["speedX"].tolist()
  634. ego_speedy = crossroad_ego["speedY"].tolist()
  635. """
  636. 如果滑动窗口开始和最后的laneid一致;
  637. 自车和前车的位置发生的交换;
  638. 则认为发生超车
  639. """
  640. if len(lane_id) > 0:
  641. self.violation_counts["overtake_in_different_senerios"] += self._is_overtake(
  642. lane_id, dx, dy, ego_speedx, ego_speedy
  643. )
  644. self._calculated["different_senerios"] = True
  645. def calculate_overtake_when_passing_car_count(self):
  646. """计算会车时超车违规次数"""
  647. self.illegal_overtake_with_car_detector()
  648. return self.violation_counts["overtake_when_passing_car"]
  649. def calculate_overtake_on_right_count(self):
  650. """计算右侧超车违规次数"""
  651. self.illegal_overtake_with_car_detector()
  652. return self.violation_counts["overtake_on_right"]
  653. def calculate_overtake_when_turn_around_count(self):
  654. """计算掉头时超车违规次数"""
  655. self.illegal_overtake_with_car_detector()
  656. return self.violation_counts["overtake_when_turn_around"]
  657. def calculate_overtake_in_forbid_lane_count(self):
  658. """计算借道超车违规次数"""
  659. self.overtake_in_forbid_lane_detector()
  660. return self.violation_counts["overtake_in_forbid_lane"]
  661. def calculate_overtake_in_ramp_area_count(self):
  662. """计算匝道超车违规次数"""
  663. self.overtake_in_ramp_area_detector()
  664. return self.violation_counts["overtake_in_ramp"]
  665. def calculate_overtake_in_tunnel_area_count(self):
  666. """计算隧道超车违规次数"""
  667. self.overtake_in_tunnel_area_detector()
  668. return self.violation_counts["overtake_in_tunnel"]
  669. def calculate_overtake_on_accelerate_lane_count(self):
  670. """计算加速车道超车违规次数"""
  671. self.overtake_on_accelerate_lane_detector()
  672. return self.violation_counts["overtake_on_accelerate_lane"]
  673. def calculate_overtake_on_decelerate_lane_count(self):
  674. """计算减速车道超车违规次数"""
  675. self.overtake_on_decelerate_lane_detector()
  676. return self.violation_counts["overtake_on_decelerate_lane"]
  677. def calculate_overtake_in_different_senerios_count(self):
  678. """计算不同场景超车违规次数"""
  679. self.overtake_in_different_senerios_detector()
  680. return self.violation_counts["overtake_in_different_senerios"]
  681. class SlowdownViolation(object):
  682. """减速让行违规类"""
  683. def __init__(self, df_data):
  684. print("减速让行违规类-------------------------")
  685. self.traffic_violations_type = "减速让行违规类"
  686. # 存储原始数据引用
  687. self._raw_data = df_data.obj_data[1]
  688. self.object_items = set(df_data.object_df.type.tolist())
  689. # 存储行人数据引用
  690. self._pedestrian_df = None
  691. if 13 in self.object_items: # 行人的type是13
  692. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  693. # 初始化属性,但不立即创建数据副本
  694. self._ego_data = None
  695. self._pedestrian_data = None
  696. # 初始化计数器
  697. self.slow_down_in_crosswalk_count = 0
  698. self.avoid_pedestrian_in_crosswalk_count = 0
  699. self.avoid_pedestrian_in_the_road_count = 0
  700. self.aviod_pedestrian_when_turning_count = 0
  701. @property
  702. def ego_data(self):
  703. """懒加载方式获取ego数据"""
  704. if self._ego_data is None:
  705. self._ego_data = self._raw_data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  706. return self._ego_data
  707. @property
  708. def pedestrian_data(self):
  709. """懒加载方式获取行人数据"""
  710. if self._pedestrian_data is None:
  711. if self._pedestrian_df is not None:
  712. # 使用浅拷贝代替深拷贝
  713. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  714. else:
  715. self._pedestrian_data = pd.DataFrame()
  716. return self._pedestrian_data
  717. def pedestrian_in_front_of_car(self):
  718. if len(self.pedestrian_data) == 0:
  719. return []
  720. else:
  721. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  722. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  723. self.ego_data["dist"] = np.sqrt(
  724. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  725. )
  726. self.ego_data["rela_pos"] = (
  727. self.ego_data["dx"] * self.ego_data["speedX"]
  728. + self.ego_data["dy"] * self.ego_data["speedY"]
  729. )
  730. simtime = self.ego_data[
  731. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  732. ]["simTime"].tolist()
  733. return simtime
  734. def different_road_area_simtime(self, df, threshold=0.6):
  735. if not df:
  736. return []
  737. simtime_group = []
  738. current_simtime_group = [df[0]]
  739. for i in range(1, len(df)):
  740. if abs(df[i] - df[i - 1]) <= threshold:
  741. current_simtime_group.append(df[i])
  742. else:
  743. simtime_group.append(current_simtime_group)
  744. current_simtime_group = [df[i]]
  745. simtime_group.append(current_simtime_group)
  746. return simtime_group
  747. def slow_down_in_crosswalk_detector(self):
  748. # 筛选出路口或隧道区域的时间点
  749. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  750. "simTime"
  751. ].tolist()
  752. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  753. for crosswalk_simtime in crosswalk_simTime_divide:
  754. # 筛选出当前时间段内的数据
  755. # start_time, end_time = crosswalk_simtime
  756. start_time = crosswalk_simtime[0]
  757. end_time = crosswalk_simtime[-1]
  758. print(f"当前时间段:{start_time} - {end_time}")
  759. crosswalk_objstate = self.ego_data[
  760. (self.ego_data["simTime"] >= start_time)
  761. & (self.ego_data["simTime"] <= end_time)
  762. ]
  763. # 计算车辆速度
  764. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  765. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  766. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  767. # 判断是否超速
  768. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  769. self.slow_down_in_crosswalk_count += 1
  770. # 输出总次数
  771. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  772. def avoid_pedestrian_in_crosswalk_detector(self):
  773. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  774. "simTime"
  775. ].tolist()
  776. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  777. for crosswalk_simtime in crosswalk_simTime_devide:
  778. if not self.pedestrian_data.empty:
  779. crosswalk_objstate = self.pedestrian_data[
  780. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  781. ]
  782. else:
  783. crosswalk_objstate = pd.DataFrame()
  784. if len(crosswalk_objstate) > 0:
  785. pedestrian_simtime = crosswalk_objstate["simTime"]
  786. pedestrian_objstate = crosswalk_objstate[
  787. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  788. ]
  789. ego_speed = np.sqrt(
  790. pedestrian_objstate["speedX"] ** 2
  791. + pedestrian_objstate["speedY"] ** 2
  792. )
  793. if ego_speed.any() > 0:
  794. self.avoid_pedestrian_in_crosswalk_count += 1
  795. def avoid_pedestrian_in_the_road_detector(self):
  796. simtime = self.pedestrian_in_front_of_car()
  797. if len(simtime) == 0:
  798. self.avoid_pedestrian_in_the_road_count += 0
  799. else:
  800. pedestrian_on_the_road = self.pedestrian_data[
  801. self.pedestrian_data["simTime"].isin(simtime)
  802. ]
  803. simTime = pedestrian_on_the_road["simTime"].tolist()
  804. simTime_devide = self.different_road_area_simtime(simTime)
  805. for simtime1 in simTime_devide:
  806. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  807. pedestrian_on_the_road["simTime"].isin(simtime1)
  808. ]
  809. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  810. dist = np.sqrt(
  811. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  812. ** 2
  813. + (
  814. ego_car["posY"].values
  815. - sub_pedestrian_on_the_road["posY"].values
  816. )
  817. ** 2
  818. )
  819. speed = np.sqrt(
  820. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  821. )
  822. data = {"dist": dist, "speed": speed}
  823. new_ego_car = pd.DataFrame(data)
  824. new_ego_car = new_ego_car.assign(
  825. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  826. )
  827. if new_ego_car["Column3"].any():
  828. self.avoid_pedestrian_in_the_road_count += 1
  829. def aviod_pedestrian_when_turning_detector(self):
  830. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  831. if len(pedestrian_simtime_list) > 0:
  832. simtime_list = self.ego_data[
  833. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  834. & (self.ego_data["lane_type"] == 20)
  835. ]["simTime"].tolist()
  836. simTime_list = self.different_road_area_simtime(simtime_list)
  837. pedestrian_on_the_road = self.pedestrian_data[
  838. self.pedestrian_data["simTime"].isin(simtime_list)
  839. ]
  840. for simtime in simTime_list:
  841. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  842. pedestrian_on_the_road["simTime"].isin(simtime)
  843. ]
  844. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  845. ego_car["dist"] = np.sqrt(
  846. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  847. ** 2
  848. + (
  849. ego_car["posY"].values
  850. - sub_pedestrian_on_the_road["posY"].values
  851. )
  852. ** 2
  853. )
  854. ego_car["speed"] = np.sqrt(
  855. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  856. )
  857. if any(ego_car["speed"].tolist()) != 0:
  858. self.aviod_pedestrian_when_turning_count += 1
  859. def calculate_slow_down_in_crosswalk_count(self):
  860. self.slow_down_in_crosswalk_detector()
  861. return self.slow_down_in_crosswalk_count
  862. def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
  863. self.avoid_pedestrian_in_crosswalk_detector()
  864. return self.avoid_pedestrian_in_crosswalk_count
  865. def calculate_avoid_pedestrian_in_the_road_count(self):
  866. self.avoid_pedestrian_in_the_road_detector()
  867. return self.avoid_pedestrian_in_the_road_count
  868. def calculate_avoid_pedestrian_when_turning_count(self):
  869. self.aviod_pedestrian_when_turning_detector()
  870. return self.aviod_pedestrian_when_turning_count
  871. class TurnaroundViolation(object):
  872. def __init__(self, df_data):
  873. print("掉头违规类初始化中...")
  874. self.traffic_violations_type = "掉头违规类"
  875. # 存储原始数据引用
  876. self._raw_data = df_data.obj_data[1]
  877. self.object_items = set(df_data.object_df.type.tolist())
  878. # 存储行人数据引用
  879. self._pedestrian_df = None
  880. if 13 in self.object_items: # 行人的type是13
  881. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  882. # 初始化属性,但不立即创建数据副本
  883. self._ego_data = None
  884. self._pedestrian_data = None
  885. # 初始化计数器
  886. self.turning_in_forbiden_turn_back_sign_count = 0
  887. self.turning_in_forbiden_turn_left_sign_count = 0
  888. self.avoid_pedestrian_when_turn_back_count = 0
  889. @property
  890. def ego_data(self):
  891. """懒加载方式获取ego数据"""
  892. if self._ego_data is None:
  893. self._ego_data = self._raw_data[TURNAROUND_INFO].copy().reset_index(drop=True)
  894. return self._ego_data
  895. @property
  896. def pedestrian_data(self):
  897. """懒加载方式获取行人数据"""
  898. if self._pedestrian_data is None:
  899. if self._pedestrian_df is not None:
  900. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  901. else:
  902. self._pedestrian_data = pd.DataFrame()
  903. return self._pedestrian_data
  904. def pedestrian_in_front_of_car(self):
  905. if len(self.pedestrian_data) == 0:
  906. return []
  907. else:
  908. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  909. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  910. self.ego_data["dist"] = np.sqrt(
  911. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  912. )
  913. self.ego_data["rela_pos"] = (
  914. self.ego_data["dx"] * self.ego_data["speedX"]
  915. + self.ego_data["dy"] * self.ego_data["speedY"]
  916. )
  917. simtime = self.ego_data[
  918. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  919. ]["simTime"].tolist()
  920. return simtime
  921. def different_road_area_simtime(self, df, threshold=0.5):
  922. if not df:
  923. return []
  924. simtime_group = []
  925. current_simtime_group = [df[0]]
  926. for i in range(1, len(df)):
  927. if abs(df[i] - df[i - 1]) <= threshold:
  928. current_simtime_group.append(df[i])
  929. else:
  930. simtime_group.append(current_simtime_group)
  931. current_simtime_group = [df[i]]
  932. simtime_group.append(current_simtime_group)
  933. return simtime_group
  934. def turn_back_in_forbiden_sign_detector(self):
  935. """
  936. 禁止掉头type = 8
  937. """
  938. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  939. "simTime"
  940. ].tolist()
  941. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  942. "simTime"
  943. ].tolist()
  944. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  945. forbiden_turn_back_simTime
  946. )
  947. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  948. forbiden_turn_left_simTime
  949. )
  950. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  951. ego_car1 = self.ego_data.loc[
  952. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  953. ]
  954. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  955. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  956. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  957. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  958. if (
  959. ego_end_speedx1 * ego_start_speedx1
  960. + ego_end_speedy1 * ego_start_speedy1
  961. < 0
  962. ):
  963. self.turning_in_forbiden_turn_back_sign_count += 1
  964. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  965. ego_car2 = self.ego_data.loc[
  966. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  967. ]
  968. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  969. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  970. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  971. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  972. if (
  973. ego_end_speedx2 * ego_start_speedx2
  974. + ego_end_speedy2 * ego_start_speedy2
  975. < 0
  976. ):
  977. self.turning_in_forbiden_turn_left_sign_count += 1
  978. def avoid_pedestrian_when_turn_back_detector(self):
  979. sensor_on_intersection = self.pedestrian_in_front_of_car()
  980. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  981. self.ego_data["lane_type"] == 20
  982. ]["simTime"].tolist()
  983. avoid_pedestrian_when_turn_back_simTime_devide = (
  984. self.different_road_area_simtime(
  985. avoid_pedestrian_when_turn_back_simTime_list
  986. )
  987. )
  988. if len(sensor_on_intersection) > 0:
  989. for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
  990. pedestrian_in_intersection_simtime = self.pedestrian_data[
  991. self.pedestrian_data["simTime"].isin(
  992. avoid_pedestrian_when_turn_back_simtime
  993. )
  994. ].tolist()
  995. ego_df = self.ego_data[
  996. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  997. ].reset_index(drop=True)
  998. pedestrian_df = self.pedestrian_data[
  999. self.pedestrian_data["simTime"].isin(
  1000. pedestrian_in_intersection_simtime
  1001. )
  1002. ].reset_index(drop=True)
  1003. ego_df["dist"] = np.sqrt(
  1004. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  1005. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  1006. )
  1007. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  1008. if any(ego_df["speed"].tolist()) != 0:
  1009. self.avoid_pedestrian_when_turn_back_count += 1
  1010. def calculate_turn_in_forbiden_turn_left_sign_count(self):
  1011. self.turn_back_in_forbiden_sign_detector()
  1012. return self.turning_in_forbiden_turn_left_sign_count
  1013. def calculate_turn_in_forbiden_turn_back_sign_count(self):
  1014. self.turn_back_in_forbiden_sign_detector()
  1015. return self.turning_in_forbiden_turn_back_sign_count
  1016. def calaulate_avoid_pedestrian_when_turn_back_count(self):
  1017. self.avoid_pedestrian_when_turn_back_detector()
  1018. return self.avoid_pedestrian_when_turn_back_count
  1019. class WrongWayViolation(object):
  1020. """停车违规类"""
  1021. def __init__(self, df_data):
  1022. print("停车违规类初始化中...")
  1023. self.traffic_violations_type = "停车违规类"
  1024. # 存储原始数据引用
  1025. self._raw_data = df_data.obj_data[1]
  1026. # 初始化属性,但不立即创建数据副本
  1027. self._data = None
  1028. # 初始化违规统计
  1029. self.violation_count = {
  1030. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  1031. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  1032. "urbanExpresswayEmergencyLaneDriving": 0,
  1033. }
  1034. @property
  1035. def data(self):
  1036. """懒加载方式获取数据"""
  1037. if self._data is None:
  1038. # 使用浅拷贝代替深拷贝
  1039. self._data = self._raw_data.copy()
  1040. return self._data
  1041. def process_violations(self):
  1042. """处理停车或者紧急车道行驶违规数据"""
  1043. # 提取有效道路类型
  1044. urban_expressway_or_highway = {1, 2}
  1045. driving_lane = {1, 4, 5, 6}
  1046. emergency_lane = {12}
  1047. self.data["v"] *= 3.6 # 转换速度
  1048. # 使用向量化和条件判断进行违规判定
  1049. conditions = [
  1050. (
  1051. self.data["road_fc"].isin(urban_expressway_or_highway)
  1052. & self.data["lane_type"].isin(driving_lane)
  1053. & (self.data["v"] == 0)
  1054. ),
  1055. (
  1056. self.data["road_fc"].isin(urban_expressway_or_highway)
  1057. & self.data["lane_type"].isin(emergency_lane)
  1058. & (self.data["v"] == 0)
  1059. ),
  1060. (
  1061. self.data["road_fc"].isin(urban_expressway_or_highway)
  1062. & self.data["lane_type"].isin(emergency_lane)
  1063. & (self.data["v"] != 0)
  1064. ),
  1065. ]
  1066. violation_types = [
  1067. "urbanExpresswayOrHighwayDrivingLaneStopped",
  1068. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  1069. "urbanExpresswayEmergencyLaneDriving",
  1070. ]
  1071. # 设置违规类型
  1072. self.data["violation_type"] = None
  1073. for condition, violation_type in zip(conditions, violation_types):
  1074. self.data.loc[condition, "violation_type"] = violation_type
  1075. # 统计违规情况
  1076. self.violation_count = (
  1077. self.data["violation_type"]
  1078. .value_counts()
  1079. .reindex(violation_types, fill_value=0)
  1080. .to_dict()
  1081. )
  1082. def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
  1083. self.process_violations()
  1084. return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
  1085. def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
  1086. self.process_violations()
  1087. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1088. def calculate_urbanExpresswayEmergencyLaneDriving(self):
  1089. self.process_violations()
  1090. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1091. class SpeedingViolation(object):
  1092. """超速违规类"""
  1093. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  1094. def __init__(self, df_data):
  1095. print("超速违规类初始化中...")
  1096. self.traffic_violations_type = "超速违规类"
  1097. # 存储原始数据引用
  1098. self._raw_data = df_data.obj_data[1]
  1099. # 初始化属性,但不立即创建数据副本
  1100. self._data = None
  1101. # 初始化违规统计
  1102. self.violation_counts = {
  1103. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  1104. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  1105. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  1106. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  1107. "generalRoadSpeedOverLimit50": 0,
  1108. "generalRoadSpeedOverLimit20to50": 0,
  1109. }
  1110. @property
  1111. def data(self):
  1112. """懒加载方式获取数据"""
  1113. if self._data is None:
  1114. # 使用浅拷贝代替深拷贝
  1115. self._data = self._raw_data.copy()
  1116. # 预处理数据 - 转换速度单位
  1117. self._data["v"] *= 3.6 # 转换为 km/h
  1118. return self._data
  1119. def process_violations(self):
  1120. """处理数据帧,检查超速和其他违规行为"""
  1121. # 提取有效道路类型
  1122. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  1123. general_road = {3} # 直接创建包含一个元素的集合
  1124. self.data["v"] *= 3.6 # 转换速度
  1125. # 违规判定
  1126. conditions = [
  1127. (
  1128. self.data["road_fc"].isin(urban_expressway_or_highway)
  1129. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1130. ),
  1131. (
  1132. self.data["road_fc"].isin(urban_expressway_or_highway)
  1133. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1134. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1135. ),
  1136. (
  1137. self.data["road_fc"].isin(urban_expressway_or_highway)
  1138. & (self.data["v"] > self.data["road_speed_max"])
  1139. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  1140. ),
  1141. (
  1142. self.data["road_fc"].isin(urban_expressway_or_highway)
  1143. & (self.data["v"] < self.data["road_speed_min"])
  1144. ),
  1145. (
  1146. self.data["road_fc"].isin(general_road)
  1147. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1148. ),
  1149. (
  1150. self.data["road_fc"].isin(general_road)
  1151. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1152. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1153. ),
  1154. ]
  1155. violation_types = [
  1156. "urbanExpresswayOrHighwaySpeedOverLimit50",
  1157. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  1158. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  1159. "urbanExpresswayOrHighwaySpeedUnderLimit",
  1160. "generalRoadSpeedOverLimit50",
  1161. "generalRoadSpeedOverLimit20to50",
  1162. ]
  1163. # 设置违规类型
  1164. self.data["violation_type"] = None
  1165. for condition, violation_type in zip(conditions, violation_types):
  1166. self.data.loc[condition, "violation_type"] = violation_type
  1167. # 统计各类违规情况
  1168. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  1169. def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
  1170. self.process_violations()
  1171. return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
  1172. "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
  1173. def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
  1174. self.process_violations()
  1175. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
  1176. "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
  1177. def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
  1178. self.process_violations()
  1179. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
  1180. "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
  1181. def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
  1182. self.process_violations()
  1183. return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
  1184. "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
  1185. def calculate_generalRoadSpeedOverLimit50(self):
  1186. self.process_violations()
  1187. return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
  1188. "generalRoadSpeedOverLimit50") else 0
  1189. def calculate_generalRoadSpeedOverLimit20to50_count(self):
  1190. self.process_violations()
  1191. return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
  1192. "generalRoadSpeedOverLimit20to50") else 0
  1193. class TrafficLightViolation(object):
  1194. """违反交通灯类"""
  1195. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  1196. def __init__(self, df_data):
  1197. """初始化方法"""
  1198. self.traffic_violations_type = "违反交通灯类"
  1199. print("违反交通灯类 类初始化中...")
  1200. self.config = df_data.vehicle_config
  1201. self.data_ego = df_data.ego_data # 获取数据
  1202. self.violation_counts = {
  1203. "trafficSignalViolation": 0,
  1204. "illegalDrivingOrParkingAtCrossroads": 0,
  1205. }
  1206. # 处理数据并判定违规
  1207. self.process_violations()
  1208. def is_point_cross_line(self, point, stop_line_points):
  1209. """
  1210. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  1211. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  1212. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  1213. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  1214. :return: True 如果车辆跨越了停止线,否则 False
  1215. """
  1216. line_vector = np.array(
  1217. [
  1218. stop_line_points[1][0] - stop_line_points[0][0],
  1219. stop_line_points[1][1] - stop_line_points[0][1],
  1220. ]
  1221. )
  1222. point_vector = np.array(
  1223. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  1224. )
  1225. cross_product = np.cross(line_vector, point_vector)
  1226. if cross_product != 0:
  1227. return False
  1228. mid_point = (
  1229. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  1230. + 0.5 * line_vector
  1231. )
  1232. axletree_to_mid_vector = np.array(
  1233. [point[0] - mid_point[0], point[1] - mid_point[1]]
  1234. )
  1235. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  1236. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  1237. norm_direction = np.linalg.norm(direction_vector)
  1238. if norm_axletree_to_mid == 0 or norm_direction == 0:
  1239. return False
  1240. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  1241. norm_axletree_to_mid * norm_direction
  1242. )
  1243. angle_theta = math.degrees(math.acos(cos_theta))
  1244. return angle_theta <= 90
  1245. def _filter_data(self):
  1246. """过滤数据,筛选出需要分析的记录"""
  1247. return self.data_ego[
  1248. (self.data_ego["stopline_id"] != -1)
  1249. & (self.data_ego["stopline_type"] == 1)
  1250. & (self.data_ego["trafficlight_id"] != -1)
  1251. ]
  1252. def _group_data(self, filtered_data):
  1253. """按时间差对数据进行分组"""
  1254. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  1255. threshold = 0.5
  1256. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  1257. return filtered_data.groupby("group")
  1258. def _analyze_group(self, group_data):
  1259. """分析单个分组的数据,判断是否闯红灯"""
  1260. photos = []
  1261. stop_in_intersection = False
  1262. for _, row in group_data.iterrows():
  1263. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  1264. stop_line_points = [
  1265. [row["stopline_x1"], row["stopline_y1"]],
  1266. [row["stopline_x2"], row["stopline_y2"]],
  1267. ]
  1268. traffic_light_status = row["traffic_light_status"]
  1269. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  1270. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  1271. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1272. # config = yaml.load(f, Loader=yaml.FullLoader)
  1273. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  1274. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  1275. dist = math.sqrt(
  1276. (row["posX"] - row["traffic_light_x"]) ** 2
  1277. + (row["posY"] - row["traffic_light_y"]) ** 2
  1278. )
  1279. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  1280. has_crossed_line_front = (
  1281. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  1282. and traffic_light_status == 1
  1283. )
  1284. has_crossed_line_rear = (
  1285. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  1286. and row["v"] > 0
  1287. and traffic_light_status == 1
  1288. )
  1289. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  1290. has_passed_intersection = has_crossed_line_front and dist < 1.0
  1291. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  1292. photos.extend(
  1293. [
  1294. has_crossed_line_front,
  1295. has_crossed_line_rear,
  1296. has_passed_intersection,
  1297. has_stop_in_intersection,
  1298. ]
  1299. )
  1300. stop_in_intersection = has_passed_intersection
  1301. return photos, stop_in_intersection
  1302. def is_vehicle_run_a_red_light(self):
  1303. """判断车辆是否闯红灯"""
  1304. filtered_data = self._filter_data()
  1305. grouped_data = self._group_data(filtered_data)
  1306. self.photos_group = []
  1307. self.stop_in_intersections = []
  1308. for _, group_data in grouped_data:
  1309. photos, stop_in_intersection = self._analyze_group(group_data)
  1310. self.photos_group.append(photos)
  1311. self.stop_in_intersections.append(stop_in_intersection)
  1312. def process_violations(self):
  1313. """处理数据并判定违规"""
  1314. self.is_vehicle_run_a_red_light()
  1315. count_1 = sum(all(photos) for photos in self.photos_group)
  1316. count_2 = sum(
  1317. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  1318. )
  1319. self.violation_counts["trafficSignalViolation"] = count_1
  1320. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  1321. def calculate_trafficSignalViolation_count(self):
  1322. self.process_violations()
  1323. return self.violation_counts["trafficSignalViolation"]
  1324. def calculate_illegalDrivingOrParkingAtCrossroads(self):
  1325. self.process_violations()
  1326. return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
  1327. class WarningViolation(object):
  1328. """警告性违规类"""
  1329. def __init__(self, df_data):
  1330. print("警告性违规类初始化中...")
  1331. self.traffic_violations_type = "警告性违规类"
  1332. # 存储原始数据引用
  1333. self.config = df_data.vehicle_config
  1334. self._raw_data = df_data.obj_data[1]
  1335. # 初始化属性,但不立即创建数据副本
  1336. self._data = None
  1337. # 初始化违规计数器
  1338. self.violation_counts = {
  1339. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1340. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1341. }
  1342. @property
  1343. def data(self):
  1344. """懒加载方式获取数据"""
  1345. if self._data is None:
  1346. # 使用浅拷贝代替深拷贝
  1347. self._data = self._raw_data.copy()
  1348. return self._data
  1349. def process_violations(self):
  1350. """处理所有违规类型"""
  1351. # 处理普通道路不按规定车道行驶违规
  1352. self._process_irregular_lane_use()
  1353. # 处理骑、轧车行道分界线违规
  1354. self._process_lane_divider_violation()
  1355. def _process_irregular_lane_use(self):
  1356. """处理普通道路不按规定车道行驶违规"""
  1357. # 定义道路和车道类型
  1358. general_road = {3} # 普通道路
  1359. lane_type = {11} # 非机动车道
  1360. # 使用布尔索引来筛选满足条件的行
  1361. condition = (self.data["road_fc"].isin(general_road)) & (
  1362. self.data["lane_type"].isin(lane_type)
  1363. )
  1364. # 创建一个新的列,并根据条件设置值
  1365. self.data["is_violation"] = condition
  1366. # 统计满足条件的连续时间段
  1367. violation_segments = self.count_continuous_violations(
  1368. self.data["is_violation"], self.data["simTime"]
  1369. )
  1370. # 更新违规计数
  1371. self.violation_counts["generalRoadIrregularLaneUse"] = len(violation_segments)
  1372. def _process_lane_divider_violation(self):
  1373. """处理骑、轧车行道分界线违规"""
  1374. # 获取车辆和车道宽度
  1375. car_width = self.config["CAR_WIDTH"]
  1376. lane_width = self.data["lane_width"]
  1377. # 计算阈值
  1378. threshold = (lane_width - car_width) / 2
  1379. # 找到满足条件的行
  1380. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1381. # 统计满足条件的连续时间段
  1382. violation_segments = self.count_continuous_violations(
  1383. self.data["is_violation"], self.data["simTime"]
  1384. )
  1385. # 更新违规计数
  1386. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] = len(
  1387. violation_segments
  1388. )
  1389. def count_continuous_violations(self, violation_series, time_series):
  1390. """统计连续违规的时间段数量
  1391. Args:
  1392. violation_series: 表示是否违规的布尔序列
  1393. time_series: 对应的时间序列
  1394. Returns:
  1395. list: 连续违规时间段列表
  1396. """
  1397. continuous_segments = []
  1398. current_segment = []
  1399. for is_violation, time in zip(violation_series, time_series):
  1400. if is_violation:
  1401. if not current_segment: # 新的连续段开始
  1402. current_segment.append(time)
  1403. else:
  1404. if current_segment: # 连续段结束
  1405. current_segment.append(time) # 添加结束时间
  1406. continuous_segments.append(current_segment)
  1407. current_segment = []
  1408. # 检查是否有一个未结束的连续段在最后
  1409. if current_segment:
  1410. current_segment.append(time_series.iloc[-1]) # 使用最后的时间作为结束时间
  1411. continuous_segments.append(current_segment)
  1412. return continuous_segments
  1413. def calculate_generalRoadIrregularLaneUse_count(self):
  1414. """计算普通道路不按规定车道行驶违规次数"""
  1415. # 只处理普通道路不按规定车道行驶违规
  1416. self._process_irregular_lane_use()
  1417. return self.violation_counts["generalRoadIrregularLaneUse"]
  1418. def calculate_urbanExpresswayOrHighwayRideLaneDivider_count(self):
  1419. """计算骑、轧车行道分界线违规次数"""
  1420. # 只处理骑、轧车行道分界线违规
  1421. self._process_lane_divider_violation()
  1422. return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
  1423. class TrafficSignViolation:
  1424. """交通标志违规类"""
  1425. PROHIBITED_STRAIGHT_THRESHOLD = 5
  1426. SIGN_TYPE_STRAIGHT_PROHIBITED = 7
  1427. SIGN_TYPE_SPEED_LIMIT = 12
  1428. SIGN_TYPE_MIN_SPEED_LIMIT = 13
  1429. def __init__(self, df_data):
  1430. print("交通标志违规类初始化中...")
  1431. self.traffic_violations_type = "交通标志违规类"
  1432. # 存储原始数据引用
  1433. self._raw_data = df_data.obj_data[1]
  1434. # 初始化属性,但不立即创建数据副本
  1435. self._data = None
  1436. # 延迟计算标志
  1437. self._calculated = False
  1438. self._violation_counts = {
  1439. "NoStraightThrough": 0,
  1440. "SpeedLimitViolation": 0,
  1441. "MinimumSpeedLimitViolation": 0
  1442. }
  1443. @property
  1444. def data(self):
  1445. """懒加载方式获取数据"""
  1446. if self._data is None:
  1447. # 使用浅拷贝代替深拷贝
  1448. self._data = self._raw_data.copy()
  1449. # 预处理数据 - 按时间排序
  1450. self._data = self._data.sort_values('simTime').reset_index(drop=True)
  1451. return self._data
  1452. def _ensure_calculated(self):
  1453. """保证计算只执行一次"""
  1454. if not self._calculated:
  1455. self._check_prohibition_violations()
  1456. self._check_instruction_violations()
  1457. self._calculated = True
  1458. def calculate_NoStraightThrough_count(self):
  1459. """计算禁止直行违规次数"""
  1460. self._ensure_calculated()
  1461. return self._violation_counts["NoStraightThrough"]
  1462. def calculate_SpeedLimitViolation_count(self):
  1463. """计算超速违规次数"""
  1464. self._ensure_calculated()
  1465. return self._violation_counts["SpeedLimitViolation"]
  1466. def calculate_MinimumSpeedLimitViolation_count(self):
  1467. """计算最低限速违规次数"""
  1468. self._ensure_calculated()
  1469. return self._violation_counts["MinimumSpeedLimitViolation"]
  1470. def _check_prohibition_violations(self):
  1471. """处理禁令标志违规(禁止直行和限速)"""
  1472. self._check_straight_violation()
  1473. self._check_speed_violation(
  1474. self.SIGN_TYPE_SPEED_LIMIT,
  1475. operator.gt,
  1476. "SpeedLimitViolation"
  1477. )
  1478. def _check_instruction_violations(self):
  1479. """处理指示标志违规(最低限速)"""
  1480. self._check_speed_violation(
  1481. self.SIGN_TYPE_MIN_SPEED_LIMIT,
  1482. operator.lt,
  1483. "MinimumSpeedLimitViolation"
  1484. )
  1485. def _check_straight_violation(self):
  1486. """检查禁止直行违规"""
  1487. straight_df = self.data[self.data["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
  1488. if not straight_df.empty:
  1489. # 计算航向角变化并填充缺失值
  1490. straight_df = straight_df.copy()
  1491. straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0)
  1492. # 创建筛选条件
  1493. mask = (
  1494. (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) &
  1495. (straight_df['v'] > 0)
  1496. )
  1497. self._violation_counts["NoStraightThrough"] = mask.sum()
  1498. def _check_speed_violation(self, sign_type, compare_op, count_key):
  1499. """通用速度违规检查方法
  1500. Args:
  1501. sign_type: 标志类型
  1502. compare_op: 比较操作符
  1503. count_key: 违规计数键名
  1504. """
  1505. violation_df = self.data[self.data["sign_type1"] == sign_type]
  1506. if not violation_df.empty:
  1507. mask = compare_op(violation_df['v'], violation_df['sign_speed'])
  1508. self._violation_counts[count_key] = mask.sum()