traffic.py 74 KB


  1. # ... 保留原有导入和常量定义 ...
  2. import math
  3. import operator
  4. import copy
  5. import numpy as np
  6. import pandas as pd
  7. from typing import Dict, Any, List, Optional
  8. from modules.lib import log_manager
  9. from modules.lib.score import Score
  10. from modules.lib.log_manager import LogManager
  11. from modules.lib import data_process
  12. OVERTAKE_INFO = [
  13. "simTime",
  14. "simFrame",
  15. "playerId",
  16. "speedX",
  17. "speedY",
  18. "posX",
  19. "posY",
  20. "posH",
  21. "lane_id",
  22. "lane_type",
  23. "road_type",
  24. "interid",
  25. "crossid",
  26. ]
  27. SLOWDOWN_INFO = [
  28. "simTime",
  29. "simFrame",
  30. "playerId",
  31. "speedX",
  32. "speedY",
  33. "posX",
  34. "posY",
  35. "crossid",
  36. "lane_type",
  37. ]
  38. TURNAROUND_INFO = [
  39. "simTime",
  40. "simFrame",
  41. "playerId",
  42. "speedX",
  43. "speedY",
  44. "posX",
  45. "posY",
  46. "sign_type1",
  47. "lane_type",
  48. ]
  49. TRFFICSIGN_INFO = [
  50. "simTime",
  51. "simFrame",
  52. "playerId",
  53. "speedX",
  54. "speedY",
  55. "v",
  56. "posX",
  57. "posY",
  58. "sign_type1",
  59. "sign_ref_link",
  60. "sign_x",
  61. "sign_y",
  62. ]
  63. # 修改指标函数名称为 calculate_xxx 格式
  64. def calculate_overtake_when_passing_car(data_processed):
  65. """计算会车时超车指标"""
  66. overtakingviolation = OvertakingViolation(data_processed)
  67. overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
  68. return {"overtake_when_passing_car": overtake_when_passing_car_count}
  69. def calculate_overtake_on_right(data_processed):
  70. """计算右侧超车指标"""
  71. overtakingviolation = OvertakingViolation(data_processed)
  72. overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
  73. return {"overtake_on_right": overtake_on_right_count}
  74. def calculate_overtake_when_turn_around(data_processed):
  75. """计算掉头时超车指标"""
  76. overtakingviolation = OvertakingViolation(data_processed)
  77. overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
  78. return {"overtake_when_turn_around": overtake_when_turn_around_count}
  79. def calculate_overtake_in_forbid_lane(data_processed):
  80. """计算在禁止车道超车指标"""
  81. overtakingviolation = OvertakingViolation(data_processed)
  82. overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
  83. return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
  84. def calculate_overtake_in_ramp(data_processed):
  85. """计算在匝道超车指标"""
  86. overtakingviolation = OvertakingViolation(data_processed)
  87. overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
  88. return {"overtake_in_ramp": overtake_in_ramp_area_count}
  89. def calculate_overtake_in_tunnel(data_processed):
  90. """计算在隧道超车指标"""
  91. overtakingviolation = OvertakingViolation(data_processed)
  92. overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
  93. return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
  94. def calculate_overtake_on_accelerate_lane(data_processed):
  95. """计算在加速车道超车指标"""
  96. overtakingviolation = OvertakingViolation(data_processed)
  97. overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
  98. return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
  99. def calculate_overtake_on_decelerate_lane(data_processed):
  100. """计算在减速车道超车指标"""
  101. overtakingviolation = OvertakingViolation(data_processed)
  102. overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
  103. return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
  104. def calculate_overtake_in_different_senerios(data_processed):
  105. """计算在不同场景超车指标"""
  106. overtakingviolation = OvertakingViolation(data_processed)
  107. overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
  108. return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
  109. def calculate_slow_down_in_crosswalk(data_processed):
  110. """计算在人行横道减速指标"""
  111. slowdownviolation = SlowdownViolation(data_processed)
  112. slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
  113. return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
  114. def calculate_avoid_pedestrian_in_crosswalk(data_processed):
  115. """计算在人行横道避让行人指标"""
  116. avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
  117. avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
  118. return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
  119. def calculate_avoid_pedestrian_in_the_road(data_processed):
  120. """计算在道路上避让行人指标"""
  121. avoidpedestrianintheroad = SlowdownViolation(data_processed)
  122. avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
  123. return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
  124. def calculate_avoid_pedestrian_when_turning(data_processed):
  125. """计算转弯时避让行人指标"""
  126. avoidpedestrianwhenturning = SlowdownViolation(data_processed)
  127. avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
  128. return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
  129. def calculate_turn_in_forbiden_turn_left_sign(data_processed):
  130. """计算在禁止左转标志处左转指标"""
  131. turnaroundviolation = TurnaroundViolation(data_processed)
  132. turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
  133. return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count}
  134. def calculate_turn_in_forbiden_turn_back_sign(data_processed):
  135. """计算在禁止掉头标志处掉头指标"""
  136. turnaroundviolation = TurnaroundViolation(data_processed)
  137. turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
  138. return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count}
  139. def calculate_avoid_pedestrian_when_turn_back(data_processed):
  140. """计算掉头时避让行人指标"""
  141. turnaroundviolation = TurnaroundViolation(data_processed)
  142. avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
  143. return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count}
  144. def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed):
  145. """计算城市快速路或高速公路行车道停车指标"""
  146. wrongwayviolation = WrongWayViolation(data_processed)
  147. urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  148. return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
  149. def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed):
  150. """计算城市快速路或高速公路应急车道停车指标"""
  151. wrongwayviolation = WrongWayViolation(data_processed)
  152. urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  153. return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
  154. def calculate_urbanexpresswayemergencylanedriving(data_processed):
  155. """计算城市快速路应急车道行驶指标"""
  156. wrongwayviolation = WrongWayViolation(data_processed)
  157. urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
  158. return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
  159. def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed):
  160. """计算城市快速路或高速公路超速50%以上指标"""
  161. speedingviolation = SpeedingViolation(data_processed)
  162. urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
  163. return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
  164. def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed):
  165. """计算城市快速路或高速公路超速20%-50%指标"""
  166. speedingviolation = SpeedingViolation(data_processed)
  167. urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
  168. return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
  169. def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed):
  170. """计算城市快速路或高速公路超速0-20%指标"""
  171. speedingviolation = SpeedingViolation(data_processed)
  172. urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
  173. return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
  174. def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed):
  175. """计算城市快速路或高速公路低于最低限速指标"""
  176. speedingviolation = SpeedingViolation(data_processed)
  177. urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
  178. return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
  179. def calculate_generalroadspeedoverlimit50(data_processed):
  180. """计算一般道路超速50%以上指标"""
  181. speedingviolation = SpeedingViolation(data_processed)
  182. generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
  183. return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
  184. def calculate_generalroadspeedoverlimit20to50(data_processed):
  185. """计算一般道路超速20%-50%指标"""
  186. speedingviolation = SpeedingViolation(data_processed)
  187. generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
  188. return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
  189. def calculate_trafficsignalviolation(data_processed):
  190. """计算交通信号违规指标"""
  191. trafficlightviolation = TrafficLightViolation(data_processed)
  192. trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
  193. return {"trafficSignalViolation": trafficSignalViolation_count}
  194. def calculate_illegaldrivingorparkingatcrossroads(data_processed):
  195. """计算交叉路口违法行驶或停车指标"""
  196. trafficlightviolation = TrafficLightViolation(data_processed)
  197. illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
  198. return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
  199. def calculate_generalroadirregularlaneuse(data_processed):
  200. """计算一般道路不按规定车道行驶指标"""
  201. warningviolation = WarningViolation(data_processed)
  202. generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
  203. return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
  204. def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
  205. """计算城市快速路或高速公路骑车道线行驶指标"""
  206. warningviolation = WarningViolation(data_processed)
  207. urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider_count()
  208. return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
  209. def calculate_nostraightthrough(data_processed):
  210. """计算禁止直行标志牌处直行指标"""
  211. trafficsignviolation = TrafficSignViolation(data_processed)
  212. noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count()
  213. return {"NoStraightThrough": noStraightThrough_count}
  214. def calculate_speedlimitviolation(data_processed):
  215. """计算违反限速规定指标"""
  216. trafficsignviolation = TrafficSignViolation(data_processed)
  217. SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count()
  218. return {"SpeedLimitViolation": SpeedLimitViolation_count}
  219. def calculate_minimumspeedlimitviolation(data_processed):
  220. """计算违反最低限速规定指标"""
  221. trafficsignviolation = TrafficSignViolation(data_processed)
  222. calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
  223. return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
  224. # 修改 TrafficRegistry 类的 _build_registry 方法
  225. class TrafficRegistry:
  226. """交通违规指标注册器"""
  227. def __init__(self, data_processed):
  228. self.logger = LogManager().get_logger()
  229. self.data = data_processed
  230. # 检查traffic_config是否为空
  231. if not hasattr(data_processed, 'traffic_config') or not data_processed.traffic_config:
  232. self.logger.warning("交通违规配置为空,跳过交通违规指标计算")
  233. self.traffic_config = {}
  234. self.metrics = []
  235. self._registry = {}
  236. return
  237. self.traffic_config = data_processed.traffic_config.get("traffic", {})
  238. self.metrics = self._extract_metrics(self.traffic_config)
  239. self._registry = self._build_registry()
  240. def _extract_metrics(self, config_node: dict) -> list:
  241. """从配置中提取指标名称"""
  242. metrics = []
  243. def _recurse(node):
  244. if isinstance(node, dict):
  245. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  246. metrics.append(node['name'])
  247. for v in node.values():
  248. _recurse(v)
  249. _recurse(config_node)
  250. self.logger.info(f'评比的交通违规指标列表:{metrics}')
  251. return metrics
  252. def _build_registry(self) -> dict:
  253. """构建指标函数注册表"""
  254. registry = {}
  255. for metric_name in self.metrics:
  256. func_name = f"calculate_{metric_name.lower()}"
  257. try:
  258. registry[metric_name] = globals()[func_name]
  259. except KeyError:
  260. self.logger.error(f"未实现交通违规指标函数: {func_name}")
  261. return registry
  262. def batch_execute(self) -> dict:
  263. """批量执行指标计算"""
  264. results = {}
  265. # 如果配置为空或没有注册的指标,直接返回空结果
  266. if not hasattr(self, 'traffic_config') or not self.traffic_config or not self._registry:
  267. self.logger.info("交通违规配置为空或无注册指标,返回空结果")
  268. return results
  269. for name, func in self._registry.items():
  270. try:
  271. result = func(self.data)
  272. results.update(result)
  273. # 新增:将每个指标的结果写入日志
  274. self.logger.info(f'交通违规指标[{name}]计算结果: {result}')
  275. except Exception as e:
  276. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  277. results[name] = None
  278. self.logger.info(f'交通违规指标计算结果:{results}')
  279. return results
  280. class TrafficManager:
  281. """交通违规指标管理类"""
  282. def __init__(self, data_processed, plot_path):
  283. self.data = data_processed
  284. self.logger = LogManager().get_logger()
  285. self.plot_path = plot_path
  286. # 检查traffic_config是否为空
  287. if not hasattr(data_processed, 'traffic_config') or not data_processed.traffic_config:
  288. self.logger.warning("交通违规配置为空,跳过交通违规指标计算初始化")
  289. self.registry = None
  290. else:
  291. self.registry = TrafficRegistry(self.data)
  292. def report_statistic(self):
  293. """计算并报告交通违规指标结果"""
  294. # 如果registry为None,直接返回空字典
  295. if self.registry is None:
  296. self.logger.info("交通违规指标管理器未初始化,返回空结果")
  297. return {}
  298. traffic_result = self.registry.batch_execute()
  299. return traffic_result
  300. class OvertakingViolation(object):
  301. """超车违规类"""
  302. def __init__(self, df_data):
  303. print("超车违规类初始化中...")
  304. self.traffic_violations_type = "超车违规类"
  305. # 存储原始数据引用,不进行拷贝
  306. self._raw_data = df_data.obj_data[1] # 自车数据
  307. # 安全获取其他车辆数据
  308. self._data_obj = None
  309. self._other_obj_data1 = None
  310. # 检查是否存在ID为2的对象数据
  311. if 2 in df_data.obj_id_list:
  312. self._data_obj = df_data.obj_data[2]
  313. # 检查是否存在ID为3的对象数据
  314. if 3 in df_data.obj_id_list:
  315. self._other_obj_data1 = df_data.obj_data[3]
  316. # 初始化属性,但不立即创建数据副本
  317. self._ego_data = None
  318. self._obj_data = None
  319. self._other_obj_data = None
  320. # 使用字典统一管理违规计数器
  321. self.violation_counts = {
  322. "overtake_on_right": 0,
  323. "overtake_when_turn_around": 0,
  324. "overtake_when_passing_car": 0,
  325. "overtake_in_forbid_lane": 0,
  326. "overtake_in_ramp": 0,
  327. "overtake_in_tunnel": 0,
  328. "overtake_on_accelerate_lane": 0,
  329. "overtake_on_decelerate_lane": 0,
  330. "overtake_in_different_senerios": 0
  331. }
  332. # 标记计算状态
  333. self._calculated = {
  334. "illegal_overtake": False,
  335. "forbid_lane": False,
  336. "ramp_area": False,
  337. "tunnel_area": False,
  338. "accelerate_lane": False,
  339. "decelerate_lane": False,
  340. "different_senerios": False
  341. }
  342. @property
  343. def ego_data(self):
  344. """懒加载方式获取ego数据,只在首次访问时创建副本"""
  345. if self._ego_data is None:
  346. self._ego_data = self._raw_data[OVERTAKE_INFO].copy().reset_index(drop=True)
  347. return self._ego_data
  348. @property
  349. def obj_data(self):
  350. """懒加载方式获取obj数据"""
  351. if self._obj_data is None:
  352. if self._data_obj is not None:
  353. self._obj_data = self._data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  354. else:
  355. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  356. self._obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  357. return self._obj_data
  358. @property
  359. def other_obj_data(self):
  360. """懒加载方式获取other_obj数据"""
  361. if self._other_obj_data is None:
  362. if self._other_obj_data1 is not None:
  363. self._other_obj_data = self._other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  364. else:
  365. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  366. self._other_obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  367. return self._other_obj_data
  368. def different_road_area_simtime(self, df, threshold=0.5):
  369. if not df:
  370. return []
  371. simtime_group = []
  372. current_simtime_group = [df[0]]
  373. for i in range(1, len(df)):
  374. if abs(df[i] - df[i - 1]) <= threshold:
  375. current_simtime_group.append(df[i])
  376. else:
  377. simtime_group.append(current_simtime_group)
  378. current_simtime_group = [df[i]]
  379. simtime_group.append(current_simtime_group)
  380. return simtime_group
  381. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  382. lane_start = lane_id[0]
  383. lane_end = lane_id[-1]
  384. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  385. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  386. return lane_start == lane_end and start_condition and end_condition
  387. def _is_dxy_of_car(self, ego_df, obj_df):
  388. """
  389. :param df: objstate.csv and so on
  390. :param id: playerId
  391. :param string_type: posX/Y or speedX/Y and so on
  392. :return: dataframe of dx/y and so on
  393. """
  394. car_dx = obj_df["posX"].values - ego_df["posX"].values
  395. car_dy = obj_df["posY"].values - ego_df["posY"].values
  396. return car_dx, car_dy
  397. def illegal_overtake_with_car_detector(self, window_width=250):
  398. """检测超车违规"""
  399. # 如果已经计算过,直接返回
  400. if self._calculated["illegal_overtake"]:
  401. return
  402. # 如果没有其他车辆数据,直接返回,保持默认值0
  403. if self.obj_data.empty:
  404. print("没有其他车辆数据,无法检测超车违规,默认为0")
  405. self._calculated["illegal_overtake"] = True
  406. return
  407. # 获取csv文件中最短的帧数
  408. frame_id_length = len(self.ego_data["simFrame"])
  409. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  410. while (start_frame_id + window_width) < frame_id_length:
  411. simframe_window1 = list(
  412. np.arange(start_frame_id, start_frame_id + window_width)
  413. )
  414. simframe_window = list(map(int, simframe_window1))
  415. # 读取滑动窗口的dataframe数据
  416. ego_data_frames = self.ego_data[
  417. self.ego_data["simFrame"].isin(simframe_window)
  418. ]
  419. # 确保有足够的数据进行处理
  420. if len(ego_data_frames) == 0:
  421. start_frame_id += 1
  422. continue
  423. obj_data_frames = self.obj_data[
  424. self.obj_data["simFrame"].isin(simframe_window)
  425. ]
  426. # 如果没有其他车辆数据,跳过当前窗口
  427. if len(obj_data_frames) == 0:
  428. start_frame_id += 1
  429. continue
  430. other_data_frames = self.other_obj_data[
  431. self.other_obj_data["simFrame"].isin(simframe_window)
  432. ]
  433. # 读取前后的laneId
  434. lane_id = ego_data_frames["lane_id"].tolist()
  435. # 读取前后方向盘转角steeringWheel
  436. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  437. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  438. # 读取车辆前后的位置信息
  439. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  440. ego_speedx = ego_data_frames["speedX"].tolist()
  441. ego_speedy = ego_data_frames["speedY"].tolist()
  442. # 安全获取obj_speedx和obj_speedy
  443. obj_with_id_2 = obj_data_frames[obj_data_frames["playerId"] == 2]
  444. if not obj_with_id_2.empty:
  445. obj_speedx = obj_with_id_2["speedX"].tolist()
  446. obj_speedy = obj_with_id_2["speedY"].tolist()
  447. else:
  448. obj_speedx = []
  449. obj_speedy = []
  450. # 检查会车时超车
  451. if len(other_data_frames) > 0:
  452. other_start_speedx = other_data_frames["speedX"].iloc[0]
  453. other_start_speedy = other_data_frames["speedY"].iloc[0]
  454. if (
  455. ego_speedx[0] * other_start_speedx
  456. + ego_speedy[0] * other_start_speedy
  457. < 0
  458. ):
  459. self.violation_counts["overtake_when_passing_car"] += self._is_overtake(
  460. lane_id, dx, dy, ego_speedx, ego_speedy
  461. )
  462. start_frame_id += window_width
  463. continue
  464. # 检查右侧超车
  465. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  466. self.violation_counts["overtake_on_right"] += self._is_overtake(
  467. lane_id, dx, dy, ego_speedx, ego_speedy
  468. )
  469. start_frame_id += window_width
  470. continue
  471. # 检查掉头时超车
  472. if obj_speedx and obj_speedy: # 确保列表不为空
  473. if ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  474. self.violation_counts["overtake_when_turn_around"] += self._is_overtake(
  475. lane_id, dx, dy, ego_speedx, ego_speedy
  476. )
  477. start_frame_id += window_width
  478. continue
  479. # 如果没有检测到任何违规,移动窗口
  480. start_frame_id += 1
  481. self._calculated["illegal_overtake"] = True
  482. # 借道超车场景
  483. def overtake_in_forbid_lane_detector(self):
  484. """检测借道超车违规"""
  485. # 如果已经计算过,直接返回
  486. if self._calculated["forbid_lane"]:
  487. return
  488. # 如果没有其他车辆数据,直接返回,保持默认值0
  489. if self.obj_data.empty:
  490. print("没有其他车辆数据,无法检测借道超车违规,默认为0")
  491. self._calculated["forbid_lane"] = True
  492. return
  493. simTime = self.obj_data["simTime"].tolist()
  494. simtime_devide = self.different_road_area_simtime(simTime)
  495. for simtime in simtime_devide:
  496. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  497. try:
  498. lane_type = lane_overtake["lane_type"].tolist()
  499. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  500. 50002 not in lane_type and len(set(lane_type)) > 1
  501. ):
  502. self.violation_counts["overtake_in_forbid_lane"] += 1
  503. except Exception as e:
  504. print("数据缺少lane_type信息")
  505. self._calculated["forbid_lane"] = True
  506. # 在匝道超车
  507. def overtake_in_ramp_area_detector(self):
  508. """检测匝道超车违规"""
  509. # 如果已经计算过,直接返回
  510. if self._calculated["ramp_area"]:
  511. return
  512. # 如果没有其他车辆数据,直接返回,保持默认值0
  513. if self.obj_data.empty:
  514. print("没有其他车辆数据,无法检测匝道超车违规,默认为0")
  515. self._calculated["ramp_area"] = True
  516. return
  517. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  518. "simTime"
  519. ].tolist()
  520. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  521. for ramp_simtime in ramp_simTime_list:
  522. lane_id = self.ego_data["lane_id"].tolist()
  523. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  524. objstate_in_ramp = self.obj_data[
  525. self.obj_data["simTime"].isin(ramp_simtime)
  526. ]
  527. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  528. ego_speedx = ego_in_ramp["speedX"].tolist()
  529. ego_speedy = ego_in_ramp["speedY"].tolist()
  530. if len(lane_id) > 0:
  531. self.violation_counts["overtake_in_ramp"] += self._is_overtake(
  532. lane_id, dx, dy, ego_speedx, ego_speedy
  533. )
  534. else:
  535. continue
  536. self._calculated["ramp_area"] = True
  537. def overtake_in_tunnel_area_detector(self):
  538. """检测隧道超车违规"""
  539. # 如果已经计算过,直接返回
  540. if self._calculated["tunnel_area"]:
  541. return
  542. # 如果没有其他车辆数据,直接返回,保持默认值0
  543. if self.obj_data.empty:
  544. print("没有其他车辆数据,无法检测隧道超车违规,默认为0")
  545. self._calculated["tunnel_area"] = True
  546. return
  547. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  548. "simTime"
  549. ].tolist()
  550. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  551. for tunnel_simtime in tunnel_simTime_list:
  552. lane_id = self.ego_data["lane_id"].tolist()
  553. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  554. objstate_in_tunnel = self.obj_data[
  555. self.obj_data["simTime"].isin(tunnel_simtime)
  556. ]
  557. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  558. ego_speedx = ego_in_tunnel["speedX"].tolist()
  559. ego_speedy = ego_in_tunnel["speedY"].tolist()
  560. if len(lane_id) > 0:
  561. self.violation_counts["overtake_in_tunnel"] += self._is_overtake(
  562. lane_id, dx, dy, ego_speedx, ego_speedy
  563. )
  564. else:
  565. continue
  566. self._calculated["tunnel_area"] = True
  567. # 加速车道超车
  568. def overtake_on_accelerate_lane_detector(self):
  569. """检测加速车道超车违规"""
  570. # 如果已经计算过,直接返回
  571. if self._calculated["accelerate_lane"]:
  572. return
  573. # 如果没有其他车辆数据,直接返回,保持默认值0
  574. if self.obj_data.empty:
  575. print("没有其他车辆数据,无法检测加速车道超车违规,默认为0")
  576. self._calculated["accelerate_lane"] = True
  577. return
  578. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  579. "simTime"
  580. ].tolist()
  581. accelerate_simTime_list = self.different_road_area_simtime(
  582. accelerate_simtime_list
  583. )
  584. for accelerate_simtime in accelerate_simTime_list:
  585. lane_id = self.ego_data["lane_id"].tolist()
  586. ego_in_accelerate = self.ego_data[
  587. self.ego_data["simTime"].isin(accelerate_simtime)
  588. ]
  589. objstate_in_accelerate = self.obj_data[
  590. self.obj_data["simTime"].isin(accelerate_simtime)
  591. ]
  592. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  593. ego_speedx = ego_in_accelerate["speedX"].tolist()
  594. ego_speedy = ego_in_accelerate["speedY"].tolist()
  595. self.violation_counts["overtake_on_accelerate_lane"] += self._is_overtake(
  596. lane_id, dx, dy, ego_speedx, ego_speedy
  597. )
  598. self._calculated["accelerate_lane"] = True
  599. # 减速车道超车
  600. def overtake_on_decelerate_lane_detector(self):
  601. """检测减速车道超车违规"""
  602. # 如果已经计算过,直接返回
  603. if self._calculated["decelerate_lane"]:
  604. return
  605. # 如果没有其他车辆数据,直接返回,保持默认值0
  606. if self.obj_data.empty:
  607. print("没有其他车辆数据,无法检测减速车道超车违规,默认为0")
  608. self._calculated["decelerate_lane"] = True
  609. return
  610. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  611. "simTime"
  612. ].tolist()
  613. decelerate_simTime_list = self.different_road_area_simtime(
  614. decelerate_simtime_list
  615. )
  616. for decelerate_simtime in decelerate_simTime_list:
  617. lane_id = self.ego_data["id"].tolist()
  618. ego_in_decelerate = self.ego_data[
  619. self.ego_data["simTime"].isin(decelerate_simtime)
  620. ]
  621. objstate_in_decelerate = self.obj_data[
  622. self.obj_data["simTime"].isin(decelerate_simtime)
  623. ]
  624. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  625. ego_speedx = ego_in_decelerate["speedX"].tolist()
  626. ego_speedy = ego_in_decelerate["speedY"].tolist()
  627. self.violation_counts["overtake_on_decelerate_lane"] += self._is_overtake(
  628. lane_id, dx, dy, ego_speedx, ego_speedy
  629. )
  630. self._calculated["decelerate_lane"] = True
  631. # 在交叉路口
  632. def overtake_in_different_senerios_detector(self):
  633. """检测不同场景超车违规"""
  634. # 如果已经计算过,直接返回
  635. if self._calculated["different_senerios"]:
  636. return
  637. # 如果没有其他车辆数据,直接返回,保持默认值0
  638. if self.obj_data.empty:
  639. print("没有其他车辆数据,无法检测不同场景超车违规,默认为0")
  640. self._calculated["different_senerios"] = True
  641. return
  642. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  643. "simTime"
  644. ].tolist() # 判断是路口或者隧道区域
  645. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  646. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  647. crossroad_objstate = self.obj_data[
  648. self.obj_data["simTime"].isin(crossroad_simTime)
  649. ]
  650. # 读取前后的laneId
  651. lane_id = crossroad_ego["lane_id"].tolist()
  652. # 读取车辆前后的位置信息
  653. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  654. ego_speedx = crossroad_ego["speedX"].tolist()
  655. ego_speedy = crossroad_ego["speedY"].tolist()
  656. """
  657. 如果滑动窗口开始和最后的laneid一致;
  658. 自车和前车的位置发生的交换;
  659. 则认为发生超车
  660. """
  661. if len(lane_id) > 0:
  662. self.violation_counts["overtake_in_different_senerios"] += self._is_overtake(
  663. lane_id, dx, dy, ego_speedx, ego_speedy
  664. )
  665. self._calculated["different_senerios"] = True
  666. def calculate_overtake_when_passing_car_count(self):
  667. """计算会车时超车违规次数"""
  668. self.illegal_overtake_with_car_detector()
  669. return self.violation_counts["overtake_when_passing_car"]
  670. def calculate_overtake_on_right_count(self):
  671. """计算右侧超车违规次数"""
  672. self.illegal_overtake_with_car_detector()
  673. return self.violation_counts["overtake_on_right"]
  674. def calculate_overtake_when_turn_around_count(self):
  675. """计算掉头时超车违规次数"""
  676. self.illegal_overtake_with_car_detector()
  677. return self.violation_counts["overtake_when_turn_around"]
  678. def calculate_overtake_in_forbid_lane_count(self):
  679. """计算借道超车违规次数"""
  680. self.overtake_in_forbid_lane_detector()
  681. return self.violation_counts["overtake_in_forbid_lane"]
  682. def calculate_overtake_in_ramp_area_count(self):
  683. """计算匝道超车违规次数"""
  684. self.overtake_in_ramp_area_detector()
  685. return self.violation_counts["overtake_in_ramp"]
  686. def calculate_overtake_in_tunnel_area_count(self):
  687. """计算隧道超车违规次数"""
  688. self.overtake_in_tunnel_area_detector()
  689. return self.violation_counts["overtake_in_tunnel"]
  690. def calculate_overtake_on_accelerate_lane_count(self):
  691. """计算加速车道超车违规次数"""
  692. self.overtake_on_accelerate_lane_detector()
  693. return self.violation_counts["overtake_on_accelerate_lane"]
  694. def calculate_overtake_on_decelerate_lane_count(self):
  695. """计算减速车道超车违规次数"""
  696. self.overtake_on_decelerate_lane_detector()
  697. return self.violation_counts["overtake_on_decelerate_lane"]
  698. def calculate_overtake_in_different_senerios_count(self):
  699. """计算不同场景超车违规次数"""
  700. self.overtake_in_different_senerios_detector()
  701. return self.violation_counts["overtake_in_different_senerios"]
  702. class SlowdownViolation(object):
  703. """减速让行违规类"""
  704. def __init__(self, df_data):
  705. print("减速让行违规类-------------------------")
  706. self.traffic_violations_type = "减速让行违规类"
  707. # 存储原始数据引用
  708. self._raw_data = df_data.obj_data[1]
  709. self.object_items = set(df_data.object_df.type.tolist())
  710. # 存储行人数据引用
  711. self._pedestrian_df = None
  712. if 13 in self.object_items: # 行人的type是13
  713. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  714. # 初始化属性,但不立即创建数据副本
  715. self._ego_data = None
  716. self._pedestrian_data = None
  717. # 初始化计数器
  718. self.slow_down_in_crosswalk_count = 0
  719. self.avoid_pedestrian_in_crosswalk_count = 0
  720. self.avoid_pedestrian_in_the_road_count = 0
  721. self.aviod_pedestrian_when_turning_count = 0
  722. @property
  723. def ego_data(self):
  724. """懒加载方式获取ego数据"""
  725. if self._ego_data is None:
  726. self._ego_data = self._raw_data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  727. return self._ego_data
  728. @property
  729. def pedestrian_data(self):
  730. """懒加载方式获取行人数据"""
  731. if self._pedestrian_data is None:
  732. if self._pedestrian_df is not None:
  733. # 使用浅拷贝代替深拷贝
  734. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  735. else:
  736. self._pedestrian_data = pd.DataFrame()
  737. return self._pedestrian_data
  738. def pedestrian_in_front_of_car(self):
  739. if len(self.pedestrian_data) == 0:
  740. return []
  741. else:
  742. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  743. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  744. self.ego_data["dist"] = np.sqrt(
  745. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  746. )
  747. self.ego_data["rela_pos"] = (
  748. self.ego_data["dx"] * self.ego_data["speedX"]
  749. + self.ego_data["dy"] * self.ego_data["speedY"]
  750. )
  751. simtime = self.ego_data[
  752. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  753. ]["simTime"].tolist()
  754. return simtime
  755. def different_road_area_simtime(self, df, threshold=0.6):
  756. if not df:
  757. return []
  758. simtime_group = []
  759. current_simtime_group = [df[0]]
  760. for i in range(1, len(df)):
  761. if abs(df[i] - df[i - 1]) <= threshold:
  762. current_simtime_group.append(df[i])
  763. else:
  764. simtime_group.append(current_simtime_group)
  765. current_simtime_group = [df[i]]
  766. simtime_group.append(current_simtime_group)
  767. return simtime_group
  768. def slow_down_in_crosswalk_detector(self):
  769. # 筛选出路口或隧道区域的时间点
  770. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  771. "simTime"
  772. ].tolist()
  773. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  774. for crosswalk_simtime in crosswalk_simTime_divide:
  775. # 筛选出当前时间段内的数据
  776. # start_time, end_time = crosswalk_simtime
  777. start_time = crosswalk_simtime[0]
  778. end_time = crosswalk_simtime[-1]
  779. print(f"当前时间段:{start_time} - {end_time}")
  780. crosswalk_objstate = self.ego_data[
  781. (self.ego_data["simTime"] >= start_time)
  782. & (self.ego_data["simTime"] <= end_time)
  783. ]
  784. # 计算车辆速度
  785. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  786. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  787. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  788. # 判断是否超速
  789. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  790. self.slow_down_in_crosswalk_count += 1
  791. # 输出总次数
  792. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  793. def avoid_pedestrian_in_crosswalk_detector(self):
  794. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  795. "simTime"
  796. ].tolist()
  797. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  798. for crosswalk_simtime in crosswalk_simTime_devide:
  799. if not self.pedestrian_data.empty:
  800. crosswalk_objstate = self.pedestrian_data[
  801. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  802. ]
  803. else:
  804. crosswalk_objstate = pd.DataFrame()
  805. if len(crosswalk_objstate) > 0:
  806. pedestrian_simtime = crosswalk_objstate["simTime"]
  807. pedestrian_objstate = crosswalk_objstate[
  808. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  809. ]
  810. ego_speed = np.sqrt(
  811. pedestrian_objstate["speedX"] ** 2
  812. + pedestrian_objstate["speedY"] ** 2
  813. )
  814. if ego_speed.any() > 0:
  815. self.avoid_pedestrian_in_crosswalk_count += 1
  816. def avoid_pedestrian_in_the_road_detector(self):
  817. simtime = self.pedestrian_in_front_of_car()
  818. if len(simtime) == 0:
  819. self.avoid_pedestrian_in_the_road_count += 0
  820. else:
  821. pedestrian_on_the_road = self.pedestrian_data[
  822. self.pedestrian_data["simTime"].isin(simtime)
  823. ]
  824. simTime = pedestrian_on_the_road["simTime"].tolist()
  825. simTime_devide = self.different_road_area_simtime(simTime)
  826. for simtime1 in simTime_devide:
  827. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  828. pedestrian_on_the_road["simTime"].isin(simtime1)
  829. ]
  830. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  831. dist = np.sqrt(
  832. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  833. ** 2
  834. + (
  835. ego_car["posY"].values
  836. - sub_pedestrian_on_the_road["posY"].values
  837. )
  838. ** 2
  839. )
  840. speed = np.sqrt(
  841. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  842. )
  843. data = {"dist": dist, "speed": speed}
  844. new_ego_car = pd.DataFrame(data)
  845. new_ego_car = new_ego_car.assign(
  846. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  847. )
  848. if new_ego_car["Column3"].any():
  849. self.avoid_pedestrian_in_the_road_count += 1
  850. def aviod_pedestrian_when_turning_detector(self):
  851. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  852. if len(pedestrian_simtime_list) > 0:
  853. simtime_list = self.ego_data[
  854. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  855. & (self.ego_data["lane_type"] == 20)
  856. ]["simTime"].tolist()
  857. simTime_list = self.different_road_area_simtime(simtime_list)
  858. pedestrian_on_the_road = self.pedestrian_data[
  859. self.pedestrian_data["simTime"].isin(simtime_list)
  860. ]
  861. for simtime in simTime_list:
  862. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  863. pedestrian_on_the_road["simTime"].isin(simtime)
  864. ]
  865. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  866. ego_car["dist"] = np.sqrt(
  867. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  868. ** 2
  869. + (
  870. ego_car["posY"].values
  871. - sub_pedestrian_on_the_road["posY"].values
  872. )
  873. ** 2
  874. )
  875. ego_car["speed"] = np.sqrt(
  876. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  877. )
  878. if any(ego_car["speed"].tolist()) != 0:
  879. self.aviod_pedestrian_when_turning_count += 1
  880. def calculate_slow_down_in_crosswalk_count(self):
  881. self.slow_down_in_crosswalk_detector()
  882. return self.slow_down_in_crosswalk_count
  883. def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
  884. self.avoid_pedestrian_in_crosswalk_detector()
  885. return self.avoid_pedestrian_in_crosswalk_count
  886. def calculate_avoid_pedestrian_in_the_road_count(self):
  887. self.avoid_pedestrian_in_the_road_detector()
  888. return self.avoid_pedestrian_in_the_road_count
  889. def calculate_avoid_pedestrian_when_turning_count(self):
  890. self.aviod_pedestrian_when_turning_detector()
  891. return self.aviod_pedestrian_when_turning_count
  892. class TurnaroundViolation(object):
  893. def __init__(self, df_data):
  894. print("掉头违规类初始化中...")
  895. self.traffic_violations_type = "掉头违规类"
  896. # 存储原始数据引用
  897. self._raw_data = df_data.obj_data[1]
  898. self.object_items = set(df_data.object_df.type.tolist())
  899. # 存储行人数据引用
  900. self._pedestrian_df = None
  901. if 13 in self.object_items: # 行人的type是13
  902. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  903. # 初始化属性,但不立即创建数据副本
  904. self._ego_data = None
  905. self._pedestrian_data = None
  906. # 初始化计数器
  907. self.turning_in_forbiden_turn_back_sign_count = 0
  908. self.turning_in_forbiden_turn_left_sign_count = 0
  909. self.avoid_pedestrian_when_turn_back_count = 0
  910. @property
  911. def ego_data(self):
  912. """懒加载方式获取ego数据"""
  913. if self._ego_data is None:
  914. self._ego_data = self._raw_data[TURNAROUND_INFO].copy().reset_index(drop=True)
  915. return self._ego_data
  916. @property
  917. def pedestrian_data(self):
  918. """懒加载方式获取行人数据"""
  919. if self._pedestrian_data is None:
  920. if self._pedestrian_df is not None:
  921. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  922. else:
  923. self._pedestrian_data = pd.DataFrame()
  924. return self._pedestrian_data
  925. def pedestrian_in_front_of_car(self):
  926. if len(self.pedestrian_data) == 0:
  927. return []
  928. else:
  929. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  930. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  931. self.ego_data["dist"] = np.sqrt(
  932. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  933. )
  934. self.ego_data["rela_pos"] = (
  935. self.ego_data["dx"] * self.ego_data["speedX"]
  936. + self.ego_data["dy"] * self.ego_data["speedY"]
  937. )
  938. simtime = self.ego_data[
  939. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  940. ]["simTime"].tolist()
  941. return simtime
  942. def different_road_area_simtime(self, df, threshold=0.5):
  943. if not df:
  944. return []
  945. simtime_group = []
  946. current_simtime_group = [df[0]]
  947. for i in range(1, len(df)):
  948. if abs(df[i] - df[i - 1]) <= threshold:
  949. current_simtime_group.append(df[i])
  950. else:
  951. simtime_group.append(current_simtime_group)
  952. current_simtime_group = [df[i]]
  953. simtime_group.append(current_simtime_group)
  954. return simtime_group
  955. def turn_back_in_forbiden_sign_detector(self):
  956. """
  957. 禁止掉头type = 8
  958. """
  959. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  960. "simTime"
  961. ].tolist()
  962. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  963. "simTime"
  964. ].tolist()
  965. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  966. forbiden_turn_back_simTime
  967. )
  968. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  969. forbiden_turn_left_simTime
  970. )
  971. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  972. ego_car1 = self.ego_data.loc[
  973. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  974. ]
  975. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  976. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  977. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  978. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  979. if (
  980. ego_end_speedx1 * ego_start_speedx1
  981. + ego_end_speedy1 * ego_start_speedy1
  982. < 0
  983. ):
  984. self.turning_in_forbiden_turn_back_sign_count += 1
  985. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  986. ego_car2 = self.ego_data.loc[
  987. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  988. ]
  989. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  990. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  991. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  992. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  993. if (
  994. ego_end_speedx2 * ego_start_speedx2
  995. + ego_end_speedy2 * ego_start_speedy2
  996. < 0
  997. ):
  998. self.turning_in_forbiden_turn_left_sign_count += 1
  999. def avoid_pedestrian_when_turn_back_detector(self):
  1000. sensor_on_intersection = self.pedestrian_in_front_of_car()
  1001. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  1002. self.ego_data["lane_type"] == 20
  1003. ]["simTime"].tolist()
  1004. avoid_pedestrian_when_turn_back_simTime_devide = (
  1005. self.different_road_area_simtime(
  1006. avoid_pedestrian_when_turn_back_simTime_list
  1007. )
  1008. )
  1009. if len(sensor_on_intersection) > 0:
  1010. for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
  1011. pedestrian_in_intersection_simtime = self.pedestrian_data[
  1012. self.pedestrian_data["simTime"].isin(
  1013. avoid_pedestrian_when_turn_back_simtime
  1014. )
  1015. ].tolist()
  1016. ego_df = self.ego_data[
  1017. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  1018. ].reset_index(drop=True)
  1019. pedestrian_df = self.pedestrian_data[
  1020. self.pedestrian_data["simTime"].isin(
  1021. pedestrian_in_intersection_simtime
  1022. )
  1023. ].reset_index(drop=True)
  1024. ego_df["dist"] = np.sqrt(
  1025. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  1026. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  1027. )
  1028. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  1029. if any(ego_df["speed"].tolist()) != 0:
  1030. self.avoid_pedestrian_when_turn_back_count += 1
  1031. def calculate_turn_in_forbiden_turn_left_sign_count(self):
  1032. self.turn_back_in_forbiden_sign_detector()
  1033. return self.turning_in_forbiden_turn_left_sign_count
  1034. def calculate_turn_in_forbiden_turn_back_sign_count(self):
  1035. self.turn_back_in_forbiden_sign_detector()
  1036. return self.turning_in_forbiden_turn_back_sign_count
  1037. def calaulate_avoid_pedestrian_when_turn_back_count(self):
  1038. self.avoid_pedestrian_when_turn_back_detector()
  1039. return self.avoid_pedestrian_when_turn_back_count
  1040. class WrongWayViolation(object):
  1041. """停车违规类"""
  1042. def __init__(self, df_data):
  1043. print("停车违规类初始化中...")
  1044. self.traffic_violations_type = "停车违规类"
  1045. # 存储原始数据引用
  1046. self._raw_data = df_data.obj_data[1]
  1047. # 初始化属性,但不立即创建数据副本
  1048. self._data = None
  1049. # 初始化违规统计
  1050. self.violation_count = {
  1051. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  1052. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  1053. "urbanExpresswayEmergencyLaneDriving": 0,
  1054. }
  1055. @property
  1056. def data(self):
  1057. """懒加载方式获取数据"""
  1058. if self._data is None:
  1059. # 使用浅拷贝代替深拷贝
  1060. self._data = self._raw_data.copy()
  1061. return self._data
  1062. def process_violations(self):
  1063. """处理停车或者紧急车道行驶违规数据"""
  1064. # 提取有效道路类型
  1065. urban_expressway_or_highway = {1, 2}
  1066. driving_lane = {1, 4, 5, 6}
  1067. emergency_lane = {12}
  1068. self.data["v"] *= 3.6 # 转换速度
  1069. # 使用向量化和条件判断进行违规判定
  1070. conditions = [
  1071. (
  1072. self.data["road_fc"].isin(urban_expressway_or_highway)
  1073. & self.data["lane_type"].isin(driving_lane)
  1074. & (self.data["v"] == 0)
  1075. ),
  1076. (
  1077. self.data["road_fc"].isin(urban_expressway_or_highway)
  1078. & self.data["lane_type"].isin(emergency_lane)
  1079. & (self.data["v"] == 0)
  1080. ),
  1081. (
  1082. self.data["road_fc"].isin(urban_expressway_or_highway)
  1083. & self.data["lane_type"].isin(emergency_lane)
  1084. & (self.data["v"] != 0)
  1085. ),
  1086. ]
  1087. violation_types = [
  1088. "urbanExpresswayOrHighwayDrivingLaneStopped",
  1089. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  1090. "urbanExpresswayEmergencyLaneDriving",
  1091. ]
  1092. # 设置违规类型
  1093. self.data["violation_type"] = None
  1094. for condition, violation_type in zip(conditions, violation_types):
  1095. self.data.loc[condition, "violation_type"] = violation_type
  1096. # 统计违规情况
  1097. self.violation_count = (
  1098. self.data["violation_type"]
  1099. .value_counts()
  1100. .reindex(violation_types, fill_value=0)
  1101. .to_dict()
  1102. )
  1103. def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
  1104. self.process_violations()
  1105. return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
  1106. def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
  1107. self.process_violations()
  1108. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1109. def calculate_urbanExpresswayEmergencyLaneDriving(self):
  1110. self.process_violations()
  1111. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1112. class SpeedingViolation(object):
  1113. """超速违规类"""
  1114. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  1115. def __init__(self, df_data):
  1116. print("超速违规类初始化中...")
  1117. self.traffic_violations_type = "超速违规类"
  1118. # 存储原始数据引用
  1119. self._raw_data = df_data.obj_data[1]
  1120. # 初始化属性,但不立即创建数据副本
  1121. self._data = None
  1122. # 初始化违规统计
  1123. self.violation_counts = {
  1124. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  1125. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  1126. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  1127. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  1128. "generalRoadSpeedOverLimit50": 0,
  1129. "generalRoadSpeedOverLimit20to50": 0,
  1130. }
  1131. @property
  1132. def data(self):
  1133. """懒加载方式获取数据"""
  1134. if self._data is None:
  1135. # 使用浅拷贝代替深拷贝
  1136. self._data = self._raw_data.copy()
  1137. # 预处理数据 - 转换速度单位
  1138. self._data["v"] *= 3.6 # 转换为 km/h
  1139. return self._data
  1140. def process_violations(self):
  1141. """处理数据帧,检查超速和其他违规行为"""
  1142. # 提取有效道路类型
  1143. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  1144. general_road = {3} # 直接创建包含一个元素的集合
  1145. self.data["v"] *= 3.6 # 转换速度
  1146. # 违规判定
  1147. conditions = [
  1148. (
  1149. self.data["road_fc"].isin(urban_expressway_or_highway)
  1150. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1151. ),
  1152. (
  1153. self.data["road_fc"].isin(urban_expressway_or_highway)
  1154. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1155. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1156. ),
  1157. (
  1158. self.data["road_fc"].isin(urban_expressway_or_highway)
  1159. & (self.data["v"] > self.data["road_speed_max"])
  1160. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  1161. ),
  1162. (
  1163. self.data["road_fc"].isin(urban_expressway_or_highway)
  1164. & (self.data["v"] < self.data["road_speed_min"])
  1165. ),
  1166. (
  1167. self.data["road_fc"].isin(general_road)
  1168. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1169. ),
  1170. (
  1171. self.data["road_fc"].isin(general_road)
  1172. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1173. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1174. ),
  1175. ]
  1176. violation_types = [
  1177. "urbanExpresswayOrHighwaySpeedOverLimit50",
  1178. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  1179. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  1180. "urbanExpresswayOrHighwaySpeedUnderLimit",
  1181. "generalRoadSpeedOverLimit50",
  1182. "generalRoadSpeedOverLimit20to50",
  1183. ]
  1184. # 设置违规类型
  1185. self.data["violation_type"] = None
  1186. for condition, violation_type in zip(conditions, violation_types):
  1187. self.data.loc[condition, "violation_type"] = violation_type
  1188. # 统计各类违规情况
  1189. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  1190. def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
  1191. self.process_violations()
  1192. return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
  1193. "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
  1194. def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
  1195. self.process_violations()
  1196. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
  1197. "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
  1198. def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
  1199. self.process_violations()
  1200. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
  1201. "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
  1202. def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
  1203. self.process_violations()
  1204. return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
  1205. "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
  1206. def calculate_generalRoadSpeedOverLimit50(self):
  1207. self.process_violations()
  1208. return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
  1209. "generalRoadSpeedOverLimit50") else 0
  1210. def calculate_generalRoadSpeedOverLimit20to50_count(self):
  1211. self.process_violations()
  1212. return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
  1213. "generalRoadSpeedOverLimit20to50") else 0
  1214. class TrafficLightViolation(object):
  1215. """违反交通灯类"""
  1216. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  1217. def __init__(self, df_data):
  1218. """初始化方法"""
  1219. self.traffic_violations_type = "违反交通灯类"
  1220. print("违反交通灯类 类初始化中...")
  1221. self.config = df_data.vehicle_config
  1222. self.data_ego = df_data.ego_data # 获取数据
  1223. self.violation_counts = {
  1224. "trafficSignalViolation": 0,
  1225. "illegalDrivingOrParkingAtCrossroads": 0,
  1226. }
  1227. # 处理数据并判定违规
  1228. self.process_violations()
  1229. def is_point_cross_line(self, point, stop_line_points):
  1230. """
  1231. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  1232. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  1233. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  1234. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  1235. :return: True 如果车辆跨越了停止线,否则 False
  1236. """
  1237. line_vector = np.array(
  1238. [
  1239. stop_line_points[1][0] - stop_line_points[0][0],
  1240. stop_line_points[1][1] - stop_line_points[0][1],
  1241. ]
  1242. )
  1243. point_vector = np.array(
  1244. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  1245. )
  1246. cross_product = np.cross(line_vector, point_vector)
  1247. if cross_product != 0:
  1248. return False
  1249. mid_point = (
  1250. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  1251. + 0.5 * line_vector
  1252. )
  1253. axletree_to_mid_vector = np.array(
  1254. [point[0] - mid_point[0], point[1] - mid_point[1]]
  1255. )
  1256. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  1257. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  1258. norm_direction = np.linalg.norm(direction_vector)
  1259. if norm_axletree_to_mid == 0 or norm_direction == 0:
  1260. return False
  1261. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  1262. norm_axletree_to_mid * norm_direction
  1263. )
  1264. angle_theta = math.degrees(math.acos(cos_theta))
  1265. return angle_theta <= 90
  1266. def _filter_data(self):
  1267. """过滤数据,筛选出需要分析的记录"""
  1268. return self.data_ego[
  1269. (self.data_ego["stopline_id"] != -1)
  1270. & (self.data_ego["stopline_type"] == 1)
  1271. & (self.data_ego["trafficlight_id"] != -1)
  1272. ]
  1273. def _group_data(self, filtered_data):
  1274. """按时间差对数据进行分组"""
  1275. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  1276. threshold = 0.5
  1277. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  1278. return filtered_data.groupby("group")
  1279. def _analyze_group(self, group_data):
  1280. """分析单个分组的数据,判断是否闯红灯"""
  1281. photos = []
  1282. stop_in_intersection = False
  1283. for _, row in group_data.iterrows():
  1284. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  1285. stop_line_points = [
  1286. [row["stopline_x1"], row["stopline_y1"]],
  1287. [row["stopline_x2"], row["stopline_y2"]],
  1288. ]
  1289. traffic_light_status = row["stateMask"]
  1290. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  1291. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  1292. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1293. # config = yaml.load(f, Loader=yaml.FullLoader)
  1294. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  1295. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  1296. dist = math.sqrt(
  1297. (row["posX"] - row["traffic_light_x"]) ** 2
  1298. + (row["posY"] - row["traffic_light_y"]) ** 2
  1299. )
  1300. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  1301. has_crossed_line_front = (
  1302. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  1303. and traffic_light_status == 1
  1304. )
  1305. has_crossed_line_rear = (
  1306. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  1307. and row["v"] > 0
  1308. and traffic_light_status == 1
  1309. )
  1310. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  1311. has_passed_intersection = has_crossed_line_front and dist < 1.0
  1312. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  1313. photos.extend(
  1314. [
  1315. has_crossed_line_front,
  1316. has_crossed_line_rear,
  1317. has_passed_intersection,
  1318. has_stop_in_intersection,
  1319. ]
  1320. )
  1321. stop_in_intersection = has_passed_intersection
  1322. return photos, stop_in_intersection
  1323. def is_vehicle_run_a_red_light(self):
  1324. """判断车辆是否闯红灯"""
  1325. filtered_data = self._filter_data()
  1326. grouped_data = self._group_data(filtered_data)
  1327. self.photos_group = []
  1328. self.stop_in_intersections = []
  1329. for _, group_data in grouped_data:
  1330. photos, stop_in_intersection = self._analyze_group(group_data)
  1331. self.photos_group.append(photos)
  1332. self.stop_in_intersections.append(stop_in_intersection)
  1333. def process_violations(self):
  1334. """处理数据并判定违规"""
  1335. self.is_vehicle_run_a_red_light()
  1336. count_1 = sum(all(photos) for photos in self.photos_group)
  1337. count_2 = sum(
  1338. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  1339. )
  1340. self.violation_counts["trafficSignalViolation"] = count_1
  1341. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  1342. def calculate_trafficSignalViolation_count(self):
  1343. self.process_violations()
  1344. return self.violation_counts["trafficSignalViolation"]
  1345. def calculate_illegalDrivingOrParkingAtCrossroads(self):
  1346. self.process_violations()
  1347. return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
  1348. class WarningViolation(object):
  1349. """警告性违规类"""
  1350. def __init__(self, df_data):
  1351. print("警告性违规类初始化中...")
  1352. self.traffic_violations_type = "警告性违规类"
  1353. # 存储原始数据引用
  1354. self.config = df_data.vehicle_config
  1355. self._raw_data = df_data.obj_data[1]
  1356. # 初始化属性,但不立即创建数据副本
  1357. self._data = None
  1358. # 初始化违规计数器
  1359. self.violation_counts = {
  1360. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1361. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1362. }
  1363. @property
  1364. def data(self):
  1365. """懒加载方式获取数据"""
  1366. if self._data is None:
  1367. # 使用浅拷贝代替深拷贝
  1368. self._data = self._raw_data.copy()
  1369. return self._data
  1370. def process_violations(self):
  1371. """处理所有违规类型"""
  1372. # 处理普通道路不按规定车道行驶违规
  1373. self._process_irregular_lane_use()
  1374. # 处理骑、轧车行道分界线违规
  1375. self._process_lane_divider_violation()
  1376. def _process_irregular_lane_use(self):
  1377. """处理普通道路不按规定车道行驶违规"""
  1378. # 定义道路和车道类型
  1379. general_road = {3} # 普通道路
  1380. lane_type = {11} # 非机动车道
  1381. # 使用布尔索引来筛选满足条件的行
  1382. condition = (self.data["road_fc"].isin(general_road)) & (
  1383. self.data["lane_type"].isin(lane_type)
  1384. )
  1385. # 创建一个新的列,并根据条件设置值
  1386. self.data["is_violation"] = condition
  1387. # 统计满足条件的连续时间段
  1388. violation_segments = self.count_continuous_violations(
  1389. self.data["is_violation"], self.data["simTime"]
  1390. )
  1391. # 更新违规计数
  1392. self.violation_counts["generalRoadIrregularLaneUse"] = len(violation_segments)
  1393. def _process_lane_divider_violation(self):
  1394. """处理骑、轧车行道分界线违规"""
  1395. # 获取车辆和车道宽度
  1396. car_width = self.config["CAR_WIDTH"]
  1397. lane_width = self.data["lane_width"]
  1398. # 计算阈值
  1399. threshold = (lane_width - car_width) / 2
  1400. # 找到满足条件的行
  1401. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1402. # 统计满足条件的连续时间段
  1403. violation_segments = self.count_continuous_violations(
  1404. self.data["is_violation"], self.data["simTime"]
  1405. )
  1406. # 更新违规计数
  1407. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] = len(
  1408. violation_segments
  1409. )
  1410. def count_continuous_violations(self, violation_series, time_series):
  1411. """统计连续违规的时间段数量
  1412. Args:
  1413. violation_series: 表示是否违规的布尔序列
  1414. time_series: 对应的时间序列
  1415. Returns:
  1416. list: 连续违规时间段列表
  1417. """
  1418. continuous_segments = []
  1419. current_segment = []
  1420. for is_violation, time in zip(violation_series, time_series):
  1421. if is_violation:
  1422. if not current_segment: # 新的连续段开始
  1423. current_segment.append(time)
  1424. else:
  1425. if current_segment: # 连续段结束
  1426. current_segment.append(time) # 添加结束时间
  1427. continuous_segments.append(current_segment)
  1428. current_segment = []
  1429. # 检查是否有一个未结束的连续段在最后
  1430. if current_segment:
  1431. current_segment.append(time_series.iloc[-1]) # 使用最后的时间作为结束时间
  1432. continuous_segments.append(current_segment)
  1433. return continuous_segments
  1434. def calculate_generalRoadIrregularLaneUse_count(self):
  1435. """计算普通道路不按规定车道行驶违规次数"""
  1436. # 只处理普通道路不按规定车道行驶违规
  1437. self._process_irregular_lane_use()
  1438. return self.violation_counts["generalRoadIrregularLaneUse"]
  1439. def calculate_urbanExpresswayOrHighwayRideLaneDivider_count(self):
  1440. """计算骑、轧车行道分界线违规次数"""
  1441. # 只处理骑、轧车行道分界线违规
  1442. self._process_lane_divider_violation()
  1443. return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
  1444. class TrafficSignViolation:
  1445. """交通标志违规类"""
  1446. PROHIBITED_STRAIGHT_THRESHOLD = 5
  1447. SIGN_TYPE_STRAIGHT_PROHIBITED = 7
  1448. SIGN_TYPE_SPEED_LIMIT = 12
  1449. SIGN_TYPE_MIN_SPEED_LIMIT = 13
  1450. def __init__(self, df_data):
  1451. print("交通标志违规类初始化中...")
  1452. self.traffic_violations_type = "交通标志违规类"
  1453. # 存储原始数据引用
  1454. self._raw_data = df_data.obj_data[1]
  1455. # 初始化属性,但不立即创建数据副本
  1456. self._data = None
  1457. # 延迟计算标志
  1458. self._calculated = False
  1459. self._violation_counts = {
  1460. "NoStraightThrough": 0,
  1461. "SpeedLimitViolation": 0,
  1462. "MinimumSpeedLimitViolation": 0
  1463. }
  1464. @property
  1465. def data(self):
  1466. """懒加载方式获取数据"""
  1467. if self._data is None:
  1468. # 使用浅拷贝代替深拷贝
  1469. self._data = self._raw_data.copy()
  1470. # 预处理数据 - 按时间排序
  1471. self._data = self._data.sort_values('simTime').reset_index(drop=True)
  1472. return self._data
  1473. def _ensure_calculated(self):
  1474. """保证计算只执行一次"""
  1475. if not self._calculated:
  1476. self._check_prohibition_violations()
  1477. self._check_instruction_violations()
  1478. self._calculated = True
  1479. def calculate_NoStraightThrough_count(self):
  1480. """计算禁止直行违规次数"""
  1481. self._ensure_calculated()
  1482. return self._violation_counts["NoStraightThrough"]
  1483. def calculate_SpeedLimitViolation_count(self):
  1484. """计算超速违规次数"""
  1485. self._ensure_calculated()
  1486. return self._violation_counts["SpeedLimitViolation"]
  1487. def calculate_MinimumSpeedLimitViolation_count(self):
  1488. """计算最低限速违规次数"""
  1489. self._ensure_calculated()
  1490. return self._violation_counts["MinimumSpeedLimitViolation"]
  1491. def _check_prohibition_violations(self):
  1492. """处理禁令标志违规(禁止直行和限速)"""
  1493. self._check_straight_violation()
  1494. self._check_speed_violation(
  1495. self.SIGN_TYPE_SPEED_LIMIT,
  1496. operator.gt,
  1497. "SpeedLimitViolation"
  1498. )
  1499. def _check_instruction_violations(self):
  1500. """处理指示标志违规(最低限速)"""
  1501. self._check_speed_violation(
  1502. self.SIGN_TYPE_MIN_SPEED_LIMIT,
  1503. operator.lt,
  1504. "MinimumSpeedLimitViolation"
  1505. )
  1506. def _check_straight_violation(self):
  1507. """检查禁止直行违规"""
  1508. straight_df = self.data[self.data["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
  1509. if not straight_df.empty:
  1510. # 计算航向角变化并填充缺失值
  1511. straight_df = straight_df.copy()
  1512. straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0)
  1513. # 创建筛选条件
  1514. mask = (
  1515. (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) &
  1516. (straight_df['v'] > 0)
  1517. )
  1518. self._violation_counts["NoStraightThrough"] = mask.sum()
  1519. def _check_speed_violation(self, sign_type, compare_op, count_key):
  1520. """通用速度违规检查方法
  1521. Args:
  1522. sign_type: 标志类型
  1523. compare_op: 比较操作符
  1524. count_key: 违规计数键名
  1525. """
  1526. violation_df = self.data[self.data["sign_type1"] == sign_type]
  1527. if not violation_df.empty:
  1528. mask = compare_op(violation_df['v'], violation_df['sign_speed'])
  1529. self._violation_counts[count_key] = mask.sum()