traffic.py 74 KB


  1. # ... 保留原有导入和常量定义 ...
  2. import math
  3. import operator
  4. import copy
  5. import numpy as np
  6. import pandas as pd
  7. from typing import Dict, Any, List, Optional
  8. from modules.lib import log_manager
  9. from modules.lib.score import Score
  10. from modules.lib.log_manager import LogManager
  11. from modules.lib import data_process
  12. OVERTAKE_INFO = [
  13. "simTime",
  14. "simFrame",
  15. "playerId",
  16. "speedX",
  17. "speedY",
  18. "posX",
  19. "posY",
  20. "posH",
  21. "lane_id",
  22. "lane_type",
  23. "road_type",
  24. "interid",
  25. "crossid",
  26. ]
  27. SLOWDOWN_INFO = [
  28. "simTime",
  29. "simFrame",
  30. "playerId",
  31. "speedX",
  32. "speedY",
  33. "posX",
  34. "posY",
  35. "crossid",
  36. "lane_type",
  37. ]
  38. TURNAROUND_INFO = [
  39. "simTime",
  40. "simFrame",
  41. "playerId",
  42. "speedX",
  43. "speedY",
  44. "posX",
  45. "posY",
  46. "sign_type1",
  47. "lane_type",
  48. ]
  49. TRFFICSIGN_INFO = [
  50. "simTime",
  51. "simFrame",
  52. "playerId",
  53. "speedX",
  54. "speedY",
  55. "v",
  56. "posX",
  57. "posY",
  58. "sign_type1",
  59. "sign_ref_link",
  60. "sign_x",
  61. "sign_y",
  62. ]
  63. # 修改指标函数名称为 calculate_xxx 格式
  64. def calculate_overtake_when_passing_car(data_processed):
  65. """计算会车时超车指标"""
  66. overtakingviolation = OvertakingViolation(data_processed)
  67. overtake_when_passing_car_count = overtakingviolation.calculate_overtake_when_passing_car_count()
  68. return {"overtake_when_passing_car": overtake_when_passing_car_count}
  69. def calculate_overtake_on_right(data_processed):
  70. """计算右侧超车指标"""
  71. overtakingviolation = OvertakingViolation(data_processed)
  72. overtake_on_right_count = overtakingviolation.calculate_overtake_on_right_count()
  73. return {"overtake_on_right": overtake_on_right_count}
  74. def calculate_overtake_when_turn_around(data_processed):
  75. """计算掉头时超车指标"""
  76. overtakingviolation = OvertakingViolation(data_processed)
  77. overtake_when_turn_around_count = overtakingviolation.calculate_overtake_when_turn_around_count()
  78. return {"overtake_when_turn_around": overtake_when_turn_around_count}
  79. def calculate_overtake_in_forbid_lane(data_processed):
  80. """计算在禁止车道超车指标"""
  81. overtakingviolation = OvertakingViolation(data_processed)
  82. overtake_in_forbid_lane_count = overtakingviolation.calculate_overtake_in_forbid_lane_count()
  83. return {"overtake_in_forbid_lane": overtake_in_forbid_lane_count}
  84. def calculate_overtake_in_ramp(data_processed):
  85. """计算在匝道超车指标"""
  86. overtakingviolation = OvertakingViolation(data_processed)
  87. overtake_in_ramp_area_count = overtakingviolation.calculate_overtake_in_ramp_area_count()
  88. return {"overtake_in_ramp": overtake_in_ramp_area_count}
  89. def calculate_overtake_in_tunnel(data_processed):
  90. """计算在隧道超车指标"""
  91. overtakingviolation = OvertakingViolation(data_processed)
  92. overtake_in_tunnel_area_count = overtakingviolation.calculate_overtake_in_tunnel_area_count()
  93. return {"overtake_in_tunnel": overtake_in_tunnel_area_count}
  94. def calculate_overtake_on_accelerate_lane(data_processed):
  95. """计算在加速车道超车指标"""
  96. overtakingviolation = OvertakingViolation(data_processed)
  97. overtake_on_accelerate_lane_count = overtakingviolation.calculate_overtake_on_accelerate_lane_count()
  98. return {"overtake_on_accelerate_lane": overtake_on_accelerate_lane_count}
  99. def calculate_overtake_on_decelerate_lane(data_processed):
  100. """计算在减速车道超车指标"""
  101. overtakingviolation = OvertakingViolation(data_processed)
  102. overtake_on_decelerate_lane_count = overtakingviolation.calculate_overtake_on_decelerate_lane_count()
  103. return {"overtake_on_decelerate_lane": overtake_on_decelerate_lane_count}
  104. def calculate_overtake_in_different_senerios(data_processed):
  105. """计算在不同场景超车指标"""
  106. overtakingviolation = OvertakingViolation(data_processed)
  107. overtake_in_different_senerios_count = overtakingviolation.calculate_overtake_in_different_senerios_count()
  108. return {"overtake_in_different_senerios": overtake_in_different_senerios_count}
  109. def calculate_slow_down_in_crosswalk(data_processed):
  110. """计算在人行横道减速指标"""
  111. slowdownviolation = SlowdownViolation(data_processed)
  112. slow_down_in_crosswalk_count = slowdownviolation.calculate_slow_down_in_crosswalk_count()
  113. return {"slowdown_down_in_crosswalk": slow_down_in_crosswalk_count}
  114. def calculate_avoid_pedestrian_in_crosswalk(data_processed):
  115. """计算在人行横道避让行人指标"""
  116. avoidpedestrianincrosswalk = SlowdownViolation(data_processed)
  117. avoid_pedestrian_in_crosswalk_count = avoidpedestrianincrosswalk.calculate_avoid_pedestrian_in_the_crosswalk_count()
  118. return {"avoid_pedestrian_in_crosswalk": avoid_pedestrian_in_crosswalk_count}
  119. def calculate_avoid_pedestrian_in_the_road(data_processed):
  120. """计算在道路上避让行人指标"""
  121. avoidpedestrianintheroad = SlowdownViolation(data_processed)
  122. avoid_pedestrian_in_the_road_count = avoidpedestrianintheroad.calculate_avoid_pedestrian_in_the_road_count()
  123. return {"avoid_pedestrian_in_the_road": avoid_pedestrian_in_the_road_count}
  124. def calculate_avoid_pedestrian_when_turning(data_processed):
  125. """计算转弯时避让行人指标"""
  126. avoidpedestrianwhenturning = SlowdownViolation(data_processed)
  127. avoid_pedestrian_when_turning_count = avoidpedestrianwhenturning.calculate_avoid_pedestrian_when_turning_count()
  128. return {"avoid_pedestrian_when_turning_count": avoid_pedestrian_when_turning_count}
  129. def calculate_turn_in_forbiden_turn_left_sign(data_processed):
  130. """计算在禁止左转标志处左转指标"""
  131. turnaroundviolation = TurnaroundViolation(data_processed)
  132. turn_in_forbiden_turn_left_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_left_sign_count()
  133. return {"turn_in_forbiden_turn_left_sign": turn_in_forbiden_turn_left_sign_count}
  134. def calculate_turn_in_forbiden_turn_back_sign(data_processed):
  135. """计算在禁止掉头标志处掉头指标"""
  136. turnaroundviolation = TurnaroundViolation(data_processed)
  137. turn_in_forbiden_turn_back_sign_count = turnaroundviolation.calculate_turn_in_forbiden_turn_back_sign_count()
  138. return {"turn_in_forbiden_turn_back_sign": turn_in_forbiden_turn_back_sign_count}
  139. def calculate_avoid_pedestrian_when_turn_back(data_processed):
  140. """计算掉头时避让行人指标"""
  141. turnaroundviolation = TurnaroundViolation(data_processed)
  142. avoid_pedestrian_when_turn_back_count = turnaroundviolation.calaulate_avoid_pedestrian_when_turn_back_count()
  143. return {"avoid_pedestrian_when_turn_back": avoid_pedestrian_when_turn_back_count}
  144. def calculate_urbanexpresswayorhighwaydrivinglanestopped(data_processed):
  145. """计算城市快速路或高速公路行车道停车指标"""
  146. wrongwayviolation = WrongWayViolation(data_processed)
  147. urbanExpresswayOrHighwayDrivingLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  148. return {"urbanExpresswayOrHighwayDrivingLaneStopped": urbanExpresswayOrHighwayDrivingLaneStopped_count}
  149. def calculate_urbanexpresswayorhighwayemergencylanestopped(data_processed):
  150. """计算城市快速路或高速公路应急车道停车指标"""
  151. wrongwayviolation = WrongWayViolation(data_processed)
  152. urbanExpresswayOrHighwayEmergencyLaneStopped_count = wrongwayviolation.calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count()
  153. return {"urbanExpresswayOrHighwayEmergencyLaneStopped": urbanExpresswayOrHighwayEmergencyLaneStopped_count}
  154. def calculate_urbanexpresswayemergencylanedriving(data_processed):
  155. """计算城市快速路应急车道行驶指标"""
  156. wrongwayviolation = WrongWayViolation(data_processed)
  157. urbanExpresswayEmergencyLaneDriving_count = wrongwayviolation.calculate_urbanExpresswayEmergencyLaneDriving()
  158. return {"urbanExpresswayEmergencyLaneDriving": urbanExpresswayEmergencyLaneDriving_count}
  159. def calculate_urbanexpresswayorhighwayspeedoverlimit50(data_processed):
  160. """计算城市快速路或高速公路超速50%以上指标"""
  161. speedingviolation = SpeedingViolation(data_processed)
  162. urbanExpresswayOrHighwaySpeedOverLimit50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count()
  163. return {"urbanExpresswayOrHighwaySpeedOverLimit50": urbanExpresswayOrHighwaySpeedOverLimit50_count}
  164. def calculate_urbanexpresswayorhighwayspeedoverlimit20to50(data_processed):
  165. """计算城市快速路或高速公路超速20%-50%指标"""
  166. speedingviolation = SpeedingViolation(data_processed)
  167. urbanExpresswayOrHighwaySpeedOverLimit20to50_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count()
  168. return {"urbanExpresswayOrHighwaySpeedOverLimit20to50": urbanExpresswayOrHighwaySpeedOverLimit20to50_count}
  169. def calculate_urbanexpresswayorhighwayspeedoverlimit0to20(data_processed):
  170. """计算城市快速路或高速公路超速0-20%指标"""
  171. speedingviolation = SpeedingViolation(data_processed)
  172. urbanExpresswayOrHighwaySpeedOverLimit0to20_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count()
  173. return {"urbanExpresswayOrHighwaySpeedOverLimit0to20": urbanExpresswayOrHighwaySpeedOverLimit0to20_count}
  174. def calculate_urbanexpresswayorhighwayspeedunderlimit(data_processed):
  175. """计算城市快速路或高速公路低于最低限速指标"""
  176. speedingviolation = SpeedingViolation(data_processed)
  177. urbanExpresswayOrHighwaySpeedUnderLimit_count = speedingviolation.calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count()
  178. return {"urbanExpresswayOrHighwaySpeedUnderLimit": urbanExpresswayOrHighwaySpeedUnderLimit_count}
  179. def calculate_generalroadspeedoverlimit50(data_processed):
  180. """计算一般道路超速50%以上指标"""
  181. speedingviolation = SpeedingViolation(data_processed)
  182. generalRoadSpeedOverLimit50_count = speedingviolation.calculate_generalRoadSpeedOverLimit50()
  183. return {"generalRoadSpeedOverLimit50": generalRoadSpeedOverLimit50_count}
  184. def calculate_generalroadspeedoverlimit20to50(data_processed):
  185. """计算一般道路超速20%-50%指标"""
  186. speedingviolation = SpeedingViolation(data_processed)
  187. generalRoadSpeedOverLimit20to50_count = speedingviolation.calculate_generalRoadSpeedOverLimit20to50_count()
  188. return {"generalRoadSpeedOverLimit20to50": generalRoadSpeedOverLimit20to50_count}
  189. def calculate_trafficsignalviolation(data_processed):
  190. """计算交通信号违规指标"""
  191. trafficlightviolation = TrafficLightViolation(data_processed)
  192. trafficSignalViolation_count = trafficlightviolation.calculate_trafficSignalViolation_count()
  193. return {"trafficSignalViolation": trafficSignalViolation_count}
  194. def calculate_illegaldrivingorparkingatcrossroads(data_processed):
  195. """计算交叉路口违法行驶或停车指标"""
  196. trafficlightviolation = TrafficLightViolation(data_processed)
  197. illegalDrivingOrParkingAtCrossroads_count = trafficlightviolation.calculate_illegalDrivingOrParkingAtCrossroads()
  198. return {"illegalDrivingOrParkingAtCrossroads": illegalDrivingOrParkingAtCrossroads_count}
  199. def calculate_generalroadirregularlaneuse(data_processed):
  200. """计算一般道路不按规定车道行驶指标"""
  201. warningviolation = WarningViolation(data_processed)
  202. generalRoadIrregularLaneUse_count = warningviolation.calculate_generalRoadIrregularLaneUse_count()
  203. return {"generalRoadIrregularLaneUse": generalRoadIrregularLaneUse_count}
  204. def calculate_urbanexpresswayorhighwayridelanedivider(data_processed):
  205. """计算城市快速路或高速公路骑车道线行驶指标"""
  206. warningviolation = WarningViolation(data_processed)
  207. urbanExpresswayOrHighwayRideLaneDivider_count = warningviolation.calculate_urbanExpresswayOrHighwayRideLaneDivider_count()
  208. return {"urbanExpresswayOrHighwayRideLaneDivider": urbanExpresswayOrHighwayRideLaneDivider_count}
  209. def calculate_nostraightthrough(data_processed):
  210. """计算禁止直行标志牌处直行指标"""
  211. trafficsignviolation = TrafficSignViolation(data_processed)
  212. noStraightThrough_count = trafficsignviolation.calculate_NoStraightThrough_count()
  213. return {"NoStraightThrough": noStraightThrough_count}
  214. def calculate_speedlimitviolation(data_processed):
  215. """计算违反限速规定指标"""
  216. trafficsignviolation = TrafficSignViolation(data_processed)
  217. SpeedLimitViolation_count = trafficsignviolation.calculate_SpeedLimitViolation_count()
  218. return {"SpeedLimitViolation": SpeedLimitViolation_count}
  219. def calculate_minimumspeedlimitviolation(data_processed):
  220. """计算违反最低限速规定指标"""
  221. trafficsignviolation = TrafficSignViolation(data_processed)
  222. calculate_MinimumSpeedLimitViolation_count = trafficsignviolation.calculate_MinimumSpeedLimitViolation_count()
  223. return {"MinimumSpeedLimitViolation": calculate_MinimumSpeedLimitViolation_count}
  224. # 修改 TrafficRegistry 类的 _build_registry 方法
  225. class TrafficRegistry:
  226. """交通违规指标注册器"""
  227. def __init__(self, data_processed):
  228. self.logger = LogManager().get_logger()
  229. self.data = data_processed
  230. # 检查traffic_config是否为空
  231. if not hasattr(data_processed, 'traffic_config') or not data_processed.traffic_config:
  232. self.logger.warning("交通违规配置为空,跳过交通违规指标计算")
  233. self.traffic_config = {}
  234. self.metrics = []
  235. self._registry = {}
  236. return
  237. self.traffic_config = data_processed.traffic_config.get("traffic", {})
  238. self.metrics = self._extract_metrics(self.traffic_config)
  239. self._registry = self._build_registry()
  240. def _extract_metrics(self, config_node: dict) -> list:
  241. """从配置中提取指标名称"""
  242. metrics = []
  243. def _recurse(node):
  244. if isinstance(node, dict):
  245. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  246. metrics.append(node['name'])
  247. for v in node.values():
  248. _recurse(v)
  249. _recurse(config_node)
  250. self.logger.info(f'评比的交通违规指标列表:{metrics}')
  251. return metrics
  252. def _build_registry(self) -> dict:
  253. """构建指标函数注册表"""
  254. registry = {}
  255. for metric_name in self.metrics:
  256. func_name = f"calculate_{metric_name.lower()}"
  257. try:
  258. registry[metric_name] = globals()[func_name]
  259. except KeyError:
  260. self.logger.error(f"未实现交通违规指标函数: {func_name}")
  261. return registry
  262. def batch_execute(self) -> dict:
  263. """批量执行指标计算"""
  264. results = {}
  265. # 如果配置为空或没有注册的指标,直接返回空结果
  266. if not hasattr(self, 'traffic_config') or not self.traffic_config or not self._registry:
  267. self.logger.info("交通违规配置为空或无注册指标,返回空结果")
  268. return results
  269. for name, func in self._registry.items():
  270. try:
  271. result = func(self.data)
  272. results.update(result)
  273. # 新增:将每个指标的结果写入日志
  274. self.logger.info(f'交通违规指标[{name}]计算结果: {result}')
  275. except Exception as e:
  276. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  277. results[name] = None
  278. self.logger.info(f'交通违规指标计算结果:{results}')
  279. return results
  280. class TrafficManager:
  281. """交通违规指标管理类"""
  282. def __init__(self, data_processed):
  283. self.data = data_processed
  284. self.logger = LogManager().get_logger()
  285. # 检查traffic_config是否为空
  286. if not hasattr(data_processed, 'traffic_config') or not data_processed.traffic_config:
  287. self.logger.warning("交通违规配置为空,跳过交通违规指标计算初始化")
  288. self.registry = None
  289. else:
  290. self.registry = TrafficRegistry(self.data)
  291. def report_statistic(self):
  292. """计算并报告交通违规指标结果"""
  293. # 如果registry为None,直接返回空字典
  294. if self.registry is None:
  295. self.logger.info("交通违规指标管理器未初始化,返回空结果")
  296. return {}
  297. traffic_result = self.registry.batch_execute()
  298. return traffic_result
  299. class OvertakingViolation(object):
  300. """超车违规类"""
  301. def __init__(self, df_data):
  302. print("超车违规类初始化中...")
  303. self.traffic_violations_type = "超车违规类"
  304. # 存储原始数据引用,不进行拷贝
  305. self._raw_data = df_data.obj_data[1] # 自车数据
  306. # 安全获取其他车辆数据
  307. self._data_obj = None
  308. self._other_obj_data1 = None
  309. # 检查是否存在ID为2的对象数据
  310. if 2 in df_data.obj_id_list:
  311. self._data_obj = df_data.obj_data[2]
  312. # 检查是否存在ID为3的对象数据
  313. if 3 in df_data.obj_id_list:
  314. self._other_obj_data1 = df_data.obj_data[3]
  315. # 初始化属性,但不立即创建数据副本
  316. self._ego_data = None
  317. self._obj_data = None
  318. self._other_obj_data = None
  319. # 使用字典统一管理违规计数器
  320. self.violation_counts = {
  321. "overtake_on_right": 0,
  322. "overtake_when_turn_around": 0,
  323. "overtake_when_passing_car": 0,
  324. "overtake_in_forbid_lane": 0,
  325. "overtake_in_ramp": 0,
  326. "overtake_in_tunnel": 0,
  327. "overtake_on_accelerate_lane": 0,
  328. "overtake_on_decelerate_lane": 0,
  329. "overtake_in_different_senerios": 0
  330. }
  331. # 标记计算状态
  332. self._calculated = {
  333. "illegal_overtake": False,
  334. "forbid_lane": False,
  335. "ramp_area": False,
  336. "tunnel_area": False,
  337. "accelerate_lane": False,
  338. "decelerate_lane": False,
  339. "different_senerios": False
  340. }
  341. @property
  342. def ego_data(self):
  343. """懒加载方式获取ego数据,只在首次访问时创建副本"""
  344. if self._ego_data is None:
  345. self._ego_data = self._raw_data[OVERTAKE_INFO].copy().reset_index(drop=True)
  346. return self._ego_data
  347. @property
  348. def obj_data(self):
  349. """懒加载方式获取obj数据"""
  350. if self._obj_data is None:
  351. if self._data_obj is not None:
  352. self._obj_data = self._data_obj[OVERTAKE_INFO].copy().reset_index(drop=True)
  353. else:
  354. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  355. self._obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  356. return self._obj_data
  357. @property
  358. def other_obj_data(self):
  359. """懒加载方式获取other_obj数据"""
  360. if self._other_obj_data is None:
  361. if self._other_obj_data1 is not None:
  362. self._other_obj_data = self._other_obj_data1[OVERTAKE_INFO].copy().reset_index(drop=True)
  363. else:
  364. # 如果没有数据,创建一个空的DataFrame,列名与ego_data相同
  365. self._other_obj_data = pd.DataFrame(columns=OVERTAKE_INFO)
  366. return self._other_obj_data
  367. def different_road_area_simtime(self, df, threshold=0.5):
  368. if not df:
  369. return []
  370. simtime_group = []
  371. current_simtime_group = [df[0]]
  372. for i in range(1, len(df)):
  373. if abs(df[i] - df[i - 1]) <= threshold:
  374. current_simtime_group.append(df[i])
  375. else:
  376. simtime_group.append(current_simtime_group)
  377. current_simtime_group = [df[i]]
  378. simtime_group.append(current_simtime_group)
  379. return simtime_group
  380. def _is_overtake(self, lane_id, dx, dy, ego_speedx, ego_speedy):
  381. lane_start = lane_id[0]
  382. lane_end = lane_id[-1]
  383. start_condition = dx[0] * ego_speedx[0] + dy[0] * ego_speedy[0] >= 0
  384. end_condition = dx[-1] * ego_speedx[-1] + dy[-1] * ego_speedy[-1] < 0
  385. return lane_start == lane_end and start_condition and end_condition
  386. def _is_dxy_of_car(self, ego_df, obj_df):
  387. """
  388. :param df: objstate.csv and so on
  389. :param id: playerId
  390. :param string_type: posX/Y or speedX/Y and so on
  391. :return: dataframe of dx/y and so on
  392. """
  393. car_dx = obj_df["posX"].values - ego_df["posX"].values
  394. car_dy = obj_df["posY"].values - ego_df["posY"].values
  395. return car_dx, car_dy
  396. def illegal_overtake_with_car_detector(self, window_width=250):
  397. """检测超车违规"""
  398. # 如果已经计算过,直接返回
  399. if self._calculated["illegal_overtake"]:
  400. return
  401. # 如果没有其他车辆数据,直接返回,保持默认值0
  402. if self.obj_data.empty:
  403. print("没有其他车辆数据,无法检测超车违规,默认为0")
  404. self._calculated["illegal_overtake"] = True
  405. return
  406. # 获取csv文件中最短的帧数
  407. frame_id_length = len(self.ego_data["simFrame"])
  408. start_frame_id = self.ego_data["simFrame"].iloc[0] # 获取起始点的帧数
  409. while (start_frame_id + window_width) < frame_id_length:
  410. simframe_window1 = list(
  411. np.arange(start_frame_id, start_frame_id + window_width)
  412. )
  413. simframe_window = list(map(int, simframe_window1))
  414. # 读取滑动窗口的dataframe数据
  415. ego_data_frames = self.ego_data[
  416. self.ego_data["simFrame"].isin(simframe_window)
  417. ]
  418. # 确保有足够的数据进行处理
  419. if len(ego_data_frames) == 0:
  420. start_frame_id += 1
  421. continue
  422. obj_data_frames = self.obj_data[
  423. self.obj_data["simFrame"].isin(simframe_window)
  424. ]
  425. # 如果没有其他车辆数据,跳过当前窗口
  426. if len(obj_data_frames) == 0:
  427. start_frame_id += 1
  428. continue
  429. other_data_frames = self.other_obj_data[
  430. self.other_obj_data["simFrame"].isin(simframe_window)
  431. ]
  432. # 读取前后的laneId
  433. lane_id = ego_data_frames["lane_id"].tolist()
  434. # 读取前后方向盘转角steeringWheel
  435. driverctrl_start_state = ego_data_frames["posH"].iloc[0]
  436. driverctrl_end_state = ego_data_frames["posH"].iloc[-1]
  437. # 读取车辆前后的位置信息
  438. dx, dy = self._is_dxy_of_car(ego_data_frames, obj_data_frames)
  439. ego_speedx = ego_data_frames["speedX"].tolist()
  440. ego_speedy = ego_data_frames["speedY"].tolist()
  441. # 安全获取obj_speedx和obj_speedy
  442. obj_with_id_2 = obj_data_frames[obj_data_frames["playerId"] == 2]
  443. if not obj_with_id_2.empty:
  444. obj_speedx = obj_with_id_2["speedX"].tolist()
  445. obj_speedy = obj_with_id_2["speedY"].tolist()
  446. else:
  447. obj_speedx = []
  448. obj_speedy = []
  449. # 检查会车时超车
  450. if len(other_data_frames) > 0:
  451. other_start_speedx = other_data_frames["speedX"].iloc[0]
  452. other_start_speedy = other_data_frames["speedY"].iloc[0]
  453. if (
  454. ego_speedx[0] * other_start_speedx
  455. + ego_speedy[0] * other_start_speedy
  456. < 0
  457. ):
  458. self.violation_counts["overtake_when_passing_car"] += self._is_overtake(
  459. lane_id, dx, dy, ego_speedx, ego_speedy
  460. )
  461. start_frame_id += window_width
  462. continue
  463. # 检查右侧超车
  464. if driverctrl_start_state > 0 and driverctrl_end_state < 0:
  465. self.violation_counts["overtake_on_right"] += self._is_overtake(
  466. lane_id, dx, dy, ego_speedx, ego_speedy
  467. )
  468. start_frame_id += window_width
  469. continue
  470. # 检查掉头时超车
  471. if obj_speedx and obj_speedy: # 确保列表不为空
  472. if ego_speedx[0] * obj_speedx[0] + ego_speedy[0] * obj_speedy[0] < 0:
  473. self.violation_counts["overtake_when_turn_around"] += self._is_overtake(
  474. lane_id, dx, dy, ego_speedx, ego_speedy
  475. )
  476. start_frame_id += window_width
  477. continue
  478. # 如果没有检测到任何违规,移动窗口
  479. start_frame_id += 1
  480. self._calculated["illegal_overtake"] = True
  481. # 借道超车场景
  482. def overtake_in_forbid_lane_detector(self):
  483. """检测借道超车违规"""
  484. # 如果已经计算过,直接返回
  485. if self._calculated["forbid_lane"]:
  486. return
  487. # 如果没有其他车辆数据,直接返回,保持默认值0
  488. if self.obj_data.empty:
  489. print("没有其他车辆数据,无法检测借道超车违规,默认为0")
  490. self._calculated["forbid_lane"] = True
  491. return
  492. simTime = self.obj_data["simTime"].tolist()
  493. simtime_devide = self.different_road_area_simtime(simTime)
  494. for simtime in simtime_devide:
  495. lane_overtake = self.ego_data[self.ego_data["simTime"].isin(simtime)]
  496. try:
  497. lane_type = lane_overtake["lane_type"].tolist()
  498. if (50002 in lane_type and len(set(lane_type)) > 2) or (
  499. 50002 not in lane_type and len(set(lane_type)) > 1
  500. ):
  501. self.violation_counts["overtake_in_forbid_lane"] += 1
  502. except Exception as e:
  503. print("数据缺少lane_type信息")
  504. self._calculated["forbid_lane"] = True
  505. # 在匝道超车
  506. def overtake_in_ramp_area_detector(self):
  507. """检测匝道超车违规"""
  508. # 如果已经计算过,直接返回
  509. if self._calculated["ramp_area"]:
  510. return
  511. # 如果没有其他车辆数据,直接返回,保持默认值0
  512. if self.obj_data.empty:
  513. print("没有其他车辆数据,无法检测匝道超车违规,默认为0")
  514. self._calculated["ramp_area"] = True
  515. return
  516. ramp_simtime_list = self.ego_data[(self.ego_data["road_type"] == 19)][
  517. "simTime"
  518. ].tolist()
  519. ramp_simTime_list = self.different_road_area_simtime(ramp_simtime_list)
  520. for ramp_simtime in ramp_simTime_list:
  521. lane_id = self.ego_data["lane_id"].tolist()
  522. ego_in_ramp = self.ego_data[self.ego_data["simTime"].isin(ramp_simtime)]
  523. objstate_in_ramp = self.obj_data[
  524. self.obj_data["simTime"].isin(ramp_simtime)
  525. ]
  526. dx, dy = self._is_dxy_of_car(ego_in_ramp, objstate_in_ramp)
  527. ego_speedx = ego_in_ramp["speedX"].tolist()
  528. ego_speedy = ego_in_ramp["speedY"].tolist()
  529. if len(lane_id) > 0:
  530. self.violation_counts["overtake_in_ramp"] += self._is_overtake(
  531. lane_id, dx, dy, ego_speedx, ego_speedy
  532. )
  533. else:
  534. continue
  535. self._calculated["ramp_area"] = True
  536. def overtake_in_tunnel_area_detector(self):
  537. """检测隧道超车违规"""
  538. # 如果已经计算过,直接返回
  539. if self._calculated["tunnel_area"]:
  540. return
  541. # 如果没有其他车辆数据,直接返回,保持默认值0
  542. if self.obj_data.empty:
  543. print("没有其他车辆数据,无法检测隧道超车违规,默认为0")
  544. self._calculated["tunnel_area"] = True
  545. return
  546. tunnel_simtime_list = self.ego_data[(self.ego_data["road_type"] == 15)][
  547. "simTime"
  548. ].tolist()
  549. tunnel_simTime_list = self.different_road_area_simtime(tunnel_simtime_list)
  550. for tunnel_simtime in tunnel_simTime_list:
  551. lane_id = self.ego_data["lane_id"].tolist()
  552. ego_in_tunnel = self.ego_data[self.ego_data["simTime"].isin(tunnel_simtime)]
  553. objstate_in_tunnel = self.obj_data[
  554. self.obj_data["simTime"].isin(tunnel_simtime)
  555. ]
  556. dx, dy = self._is_dxy_of_car(ego_in_tunnel, objstate_in_tunnel)
  557. ego_speedx = ego_in_tunnel["speedX"].tolist()
  558. ego_speedy = ego_in_tunnel["speedY"].tolist()
  559. if len(lane_id) > 0:
  560. self.violation_counts["overtake_in_tunnel"] += self._is_overtake(
  561. lane_id, dx, dy, ego_speedx, ego_speedy
  562. )
  563. else:
  564. continue
  565. self._calculated["tunnel_area"] = True
  566. # 加速车道超车
  567. def overtake_on_accelerate_lane_detector(self):
  568. """检测加速车道超车违规"""
  569. # 如果已经计算过,直接返回
  570. if self._calculated["accelerate_lane"]:
  571. return
  572. # 如果没有其他车辆数据,直接返回,保持默认值0
  573. if self.obj_data.empty:
  574. print("没有其他车辆数据,无法检测加速车道超车违规,默认为0")
  575. self._calculated["accelerate_lane"] = True
  576. return
  577. accelerate_simtime_list = self.ego_data[self.ego_data["lane_type"] == 2][
  578. "simTime"
  579. ].tolist()
  580. accelerate_simTime_list = self.different_road_area_simtime(
  581. accelerate_simtime_list
  582. )
  583. for accelerate_simtime in accelerate_simTime_list:
  584. lane_id = self.ego_data["lane_id"].tolist()
  585. ego_in_accelerate = self.ego_data[
  586. self.ego_data["simTime"].isin(accelerate_simtime)
  587. ]
  588. objstate_in_accelerate = self.obj_data[
  589. self.obj_data["simTime"].isin(accelerate_simtime)
  590. ]
  591. dx, dy = self._is_dxy_of_car(ego_in_accelerate, objstate_in_accelerate)
  592. ego_speedx = ego_in_accelerate["speedX"].tolist()
  593. ego_speedy = ego_in_accelerate["speedY"].tolist()
  594. self.violation_counts["overtake_on_accelerate_lane"] += self._is_overtake(
  595. lane_id, dx, dy, ego_speedx, ego_speedy
  596. )
  597. self._calculated["accelerate_lane"] = True
  598. # 减速车道超车
  599. def overtake_on_decelerate_lane_detector(self):
  600. """检测减速车道超车违规"""
  601. # 如果已经计算过,直接返回
  602. if self._calculated["decelerate_lane"]:
  603. return
  604. # 如果没有其他车辆数据,直接返回,保持默认值0
  605. if self.obj_data.empty:
  606. print("没有其他车辆数据,无法检测减速车道超车违规,默认为0")
  607. self._calculated["decelerate_lane"] = True
  608. return
  609. decelerate_simtime_list = self.ego_data[(self.ego_data["lane_type"] == 3)][
  610. "simTime"
  611. ].tolist()
  612. decelerate_simTime_list = self.different_road_area_simtime(
  613. decelerate_simtime_list
  614. )
  615. for decelerate_simtime in decelerate_simTime_list:
  616. lane_id = self.ego_data["id"].tolist()
  617. ego_in_decelerate = self.ego_data[
  618. self.ego_data["simTime"].isin(decelerate_simtime)
  619. ]
  620. objstate_in_decelerate = self.obj_data[
  621. self.obj_data["simTime"].isin(decelerate_simtime)
  622. ]
  623. dx, dy = self._is_dxy_of_car(ego_in_decelerate, objstate_in_decelerate)
  624. ego_speedx = ego_in_decelerate["speedX"].tolist()
  625. ego_speedy = ego_in_decelerate["speedY"].tolist()
  626. self.violation_counts["overtake_on_decelerate_lane"] += self._is_overtake(
  627. lane_id, dx, dy, ego_speedx, ego_speedy
  628. )
  629. self._calculated["decelerate_lane"] = True
  630. # 在交叉路口
  631. def overtake_in_different_senerios_detector(self):
  632. """检测不同场景超车违规"""
  633. # 如果已经计算过,直接返回
  634. if self._calculated["different_senerios"]:
  635. return
  636. # 如果没有其他车辆数据,直接返回,保持默认值0
  637. if self.obj_data.empty:
  638. print("没有其他车辆数据,无法检测不同场景超车违规,默认为0")
  639. self._calculated["different_senerios"] = True
  640. return
  641. crossroad_simTime = self.ego_data[self.ego_data["interid"] != 10000][
  642. "simTime"
  643. ].tolist() # 判断是路口或者隧道区域
  644. # 筛选在路口或者隧道区域的objectstate、driverctrl、laneinfo数据
  645. crossroad_ego = self.ego_data[self.ego_data["simTime"].isin(crossroad_simTime)]
  646. crossroad_objstate = self.obj_data[
  647. self.obj_data["simTime"].isin(crossroad_simTime)
  648. ]
  649. # 读取前后的laneId
  650. lane_id = crossroad_ego["lane_id"].tolist()
  651. # 读取车辆前后的位置信息
  652. dx, dy = self._is_dxy_of_car(crossroad_ego, crossroad_objstate)
  653. ego_speedx = crossroad_ego["speedX"].tolist()
  654. ego_speedy = crossroad_ego["speedY"].tolist()
  655. """
  656. 如果滑动窗口开始和最后的laneid一致;
  657. 自车和前车的位置发生的交换;
  658. 则认为发生超车
  659. """
  660. if len(lane_id) > 0:
  661. self.violation_counts["overtake_in_different_senerios"] += self._is_overtake(
  662. lane_id, dx, dy, ego_speedx, ego_speedy
  663. )
  664. self._calculated["different_senerios"] = True
  665. def calculate_overtake_when_passing_car_count(self):
  666. """计算会车时超车违规次数"""
  667. self.illegal_overtake_with_car_detector()
  668. return self.violation_counts["overtake_when_passing_car"]
  669. def calculate_overtake_on_right_count(self):
  670. """计算右侧超车违规次数"""
  671. self.illegal_overtake_with_car_detector()
  672. return self.violation_counts["overtake_on_right"]
  673. def calculate_overtake_when_turn_around_count(self):
  674. """计算掉头时超车违规次数"""
  675. self.illegal_overtake_with_car_detector()
  676. return self.violation_counts["overtake_when_turn_around"]
  677. def calculate_overtake_in_forbid_lane_count(self):
  678. """计算借道超车违规次数"""
  679. self.overtake_in_forbid_lane_detector()
  680. return self.violation_counts["overtake_in_forbid_lane"]
  681. def calculate_overtake_in_ramp_area_count(self):
  682. """计算匝道超车违规次数"""
  683. self.overtake_in_ramp_area_detector()
  684. return self.violation_counts["overtake_in_ramp"]
  685. def calculate_overtake_in_tunnel_area_count(self):
  686. """计算隧道超车违规次数"""
  687. self.overtake_in_tunnel_area_detector()
  688. return self.violation_counts["overtake_in_tunnel"]
  689. def calculate_overtake_on_accelerate_lane_count(self):
  690. """计算加速车道超车违规次数"""
  691. self.overtake_on_accelerate_lane_detector()
  692. return self.violation_counts["overtake_on_accelerate_lane"]
  693. def calculate_overtake_on_decelerate_lane_count(self):
  694. """计算减速车道超车违规次数"""
  695. self.overtake_on_decelerate_lane_detector()
  696. return self.violation_counts["overtake_on_decelerate_lane"]
  697. def calculate_overtake_in_different_senerios_count(self):
  698. """计算不同场景超车违规次数"""
  699. self.overtake_in_different_senerios_detector()
  700. return self.violation_counts["overtake_in_different_senerios"]
  701. class SlowdownViolation(object):
  702. """减速让行违规类"""
  703. def __init__(self, df_data):
  704. print("减速让行违规类-------------------------")
  705. self.traffic_violations_type = "减速让行违规类"
  706. # 存储原始数据引用
  707. self._raw_data = df_data.obj_data[1]
  708. self.object_items = set(df_data.object_df.type.tolist())
  709. # 存储行人数据引用
  710. self._pedestrian_df = None
  711. if 13 in self.object_items: # 行人的type是13
  712. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  713. # 初始化属性,但不立即创建数据副本
  714. self._ego_data = None
  715. self._pedestrian_data = None
  716. # 初始化计数器
  717. self.slow_down_in_crosswalk_count = 0
  718. self.avoid_pedestrian_in_crosswalk_count = 0
  719. self.avoid_pedestrian_in_the_road_count = 0
  720. self.aviod_pedestrian_when_turning_count = 0
  721. @property
  722. def ego_data(self):
  723. """懒加载方式获取ego数据"""
  724. if self._ego_data is None:
  725. self._ego_data = self._raw_data[SLOWDOWN_INFO].copy().reset_index(drop=True)
  726. return self._ego_data
  727. @property
  728. def pedestrian_data(self):
  729. """懒加载方式获取行人数据"""
  730. if self._pedestrian_data is None:
  731. if self._pedestrian_df is not None:
  732. # 使用浅拷贝代替深拷贝
  733. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  734. else:
  735. self._pedestrian_data = pd.DataFrame()
  736. return self._pedestrian_data
  737. def pedestrian_in_front_of_car(self):
  738. if len(self.pedestrian_data) == 0:
  739. return []
  740. else:
  741. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  742. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  743. self.ego_data["dist"] = np.sqrt(
  744. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  745. )
  746. self.ego_data["rela_pos"] = (
  747. self.ego_data["dx"] * self.ego_data["speedX"]
  748. + self.ego_data["dy"] * self.ego_data["speedY"]
  749. )
  750. simtime = self.ego_data[
  751. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  752. ]["simTime"].tolist()
  753. return simtime
  754. def different_road_area_simtime(self, df, threshold=0.6):
  755. if not df:
  756. return []
  757. simtime_group = []
  758. current_simtime_group = [df[0]]
  759. for i in range(1, len(df)):
  760. if abs(df[i] - df[i - 1]) <= threshold:
  761. current_simtime_group.append(df[i])
  762. else:
  763. simtime_group.append(current_simtime_group)
  764. current_simtime_group = [df[i]]
  765. simtime_group.append(current_simtime_group)
  766. return simtime_group
  767. def slow_down_in_crosswalk_detector(self):
  768. # 筛选出路口或隧道区域的时间点
  769. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  770. "simTime"
  771. ].tolist()
  772. crosswalk_simTime_divide = self.different_road_area_simtime(crosswalk_simTime)
  773. for crosswalk_simtime in crosswalk_simTime_divide:
  774. # 筛选出当前时间段内的数据
  775. # start_time, end_time = crosswalk_simtime
  776. start_time = crosswalk_simtime[0]
  777. end_time = crosswalk_simtime[-1]
  778. print(f"当前时间段:{start_time} - {end_time}")
  779. crosswalk_objstate = self.ego_data[
  780. (self.ego_data["simTime"] >= start_time)
  781. & (self.ego_data["simTime"] <= end_time)
  782. ]
  783. # 计算车辆速度
  784. ego_speedx = np.array(crosswalk_objstate["speedX"].tolist())
  785. ego_speedy = np.array(crosswalk_objstate["speedY"].tolist())
  786. ego_speed = np.sqrt(ego_speedx ** 2 + ego_speedy ** 2)
  787. # 判断是否超速
  788. if max(ego_speed) > 15 / 3.6: # 15 km/h 转换为 m/s
  789. self.slow_down_in_crosswalk_count += 1
  790. # 输出总次数
  791. print(f"在人行横道超车总次数:{self.slow_down_in_crosswalk_count}次")
  792. def avoid_pedestrian_in_crosswalk_detector(self):
  793. crosswalk_simTime = self.ego_data[self.ego_data["crossid"] != 20000][
  794. "simTime"
  795. ].tolist()
  796. crosswalk_simTime_devide = self.different_road_area_simtime(crosswalk_simTime)
  797. for crosswalk_simtime in crosswalk_simTime_devide:
  798. if not self.pedestrian_data.empty:
  799. crosswalk_objstate = self.pedestrian_data[
  800. self.pedestrian_data["simTime"].isin(crosswalk_simtime)
  801. ]
  802. else:
  803. crosswalk_objstate = pd.DataFrame()
  804. if len(crosswalk_objstate) > 0:
  805. pedestrian_simtime = crosswalk_objstate["simTime"]
  806. pedestrian_objstate = crosswalk_objstate[
  807. crosswalk_objstate["simTime"].isin(pedestrian_simtime)
  808. ]
  809. ego_speed = np.sqrt(
  810. pedestrian_objstate["speedX"] ** 2
  811. + pedestrian_objstate["speedY"] ** 2
  812. )
  813. if ego_speed.any() > 0:
  814. self.avoid_pedestrian_in_crosswalk_count += 1
  815. def avoid_pedestrian_in_the_road_detector(self):
  816. simtime = self.pedestrian_in_front_of_car()
  817. if len(simtime) == 0:
  818. self.avoid_pedestrian_in_the_road_count += 0
  819. else:
  820. pedestrian_on_the_road = self.pedestrian_data[
  821. self.pedestrian_data["simTime"].isin(simtime)
  822. ]
  823. simTime = pedestrian_on_the_road["simTime"].tolist()
  824. simTime_devide = self.different_road_area_simtime(simTime)
  825. for simtime1 in simTime_devide:
  826. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  827. pedestrian_on_the_road["simTime"].isin(simtime1)
  828. ]
  829. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime1))]
  830. dist = np.sqrt(
  831. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  832. ** 2
  833. + (
  834. ego_car["posY"].values
  835. - sub_pedestrian_on_the_road["posY"].values
  836. )
  837. ** 2
  838. )
  839. speed = np.sqrt(
  840. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  841. )
  842. data = {"dist": dist, "speed": speed}
  843. new_ego_car = pd.DataFrame(data)
  844. new_ego_car = new_ego_car.assign(
  845. Column3=lambda x: (x["dist"] < 1) & (x["speed"] == 0)
  846. )
  847. if new_ego_car["Column3"].any():
  848. self.avoid_pedestrian_in_the_road_count += 1
  849. def aviod_pedestrian_when_turning_detector(self):
  850. pedestrian_simtime_list = self.pedestrian_in_front_of_car()
  851. if len(pedestrian_simtime_list) > 0:
  852. simtime_list = self.ego_data[
  853. (self.ego_data["simTime"].isin(pedestrian_simtime_list))
  854. & (self.ego_data["lane_type"] == 20)
  855. ]["simTime"].tolist()
  856. simTime_list = self.different_road_area_simtime(simtime_list)
  857. pedestrian_on_the_road = self.pedestrian_data[
  858. self.pedestrian_data["simTime"].isin(simtime_list)
  859. ]
  860. for simtime in simTime_list:
  861. sub_pedestrian_on_the_road = pedestrian_on_the_road[
  862. pedestrian_on_the_road["simTime"].isin(simtime)
  863. ]
  864. ego_car = self.ego_data.loc[(self.ego_data["simTime"].isin(simtime))]
  865. ego_car["dist"] = np.sqrt(
  866. (ego_car["posX"].values - sub_pedestrian_on_the_road["posX"].values)
  867. ** 2
  868. + (
  869. ego_car["posY"].values
  870. - sub_pedestrian_on_the_road["posY"].values
  871. )
  872. ** 2
  873. )
  874. ego_car["speed"] = np.sqrt(
  875. ego_car["speedX"].values ** 2 + ego_car["speedY"].values ** 2
  876. )
  877. if any(ego_car["speed"].tolist()) != 0:
  878. self.aviod_pedestrian_when_turning_count += 1
  879. def calculate_slow_down_in_crosswalk_count(self):
  880. self.slow_down_in_crosswalk_detector()
  881. return self.slow_down_in_crosswalk_count
  882. def calculate_avoid_pedestrian_in_the_crosswalk_count(self):
  883. self.avoid_pedestrian_in_crosswalk_detector()
  884. return self.avoid_pedestrian_in_crosswalk_count
  885. def calculate_avoid_pedestrian_in_the_road_count(self):
  886. self.avoid_pedestrian_in_the_road_detector()
  887. return self.avoid_pedestrian_in_the_road_count
  888. def calculate_avoid_pedestrian_when_turning_count(self):
  889. self.aviod_pedestrian_when_turning_detector()
  890. return self.aviod_pedestrian_when_turning_count
  891. class TurnaroundViolation(object):
  892. def __init__(self, df_data):
  893. print("掉头违规类初始化中...")
  894. self.traffic_violations_type = "掉头违规类"
  895. # 存储原始数据引用
  896. self._raw_data = df_data.obj_data[1]
  897. self.object_items = set(df_data.object_df.type.tolist())
  898. # 存储行人数据引用
  899. self._pedestrian_df = None
  900. if 13 in self.object_items: # 行人的type是13
  901. self._pedestrian_df = df_data.object_df[df_data.object_df.type == 13]
  902. # 初始化属性,但不立即创建数据副本
  903. self._ego_data = None
  904. self._pedestrian_data = None
  905. # 初始化计数器
  906. self.turning_in_forbiden_turn_back_sign_count = 0
  907. self.turning_in_forbiden_turn_left_sign_count = 0
  908. self.avoid_pedestrian_when_turn_back_count = 0
  909. @property
  910. def ego_data(self):
  911. """懒加载方式获取ego数据"""
  912. if self._ego_data is None:
  913. self._ego_data = self._raw_data[TURNAROUND_INFO].copy().reset_index(drop=True)
  914. return self._ego_data
  915. @property
  916. def pedestrian_data(self):
  917. """懒加载方式获取行人数据"""
  918. if self._pedestrian_data is None:
  919. if self._pedestrian_df is not None:
  920. self._pedestrian_data = self._pedestrian_df[SLOWDOWN_INFO].copy().reset_index(drop=True)
  921. else:
  922. self._pedestrian_data = pd.DataFrame()
  923. return self._pedestrian_data
  924. def pedestrian_in_front_of_car(self):
  925. if len(self.pedestrian_data) == 0:
  926. return []
  927. else:
  928. self.ego_data["dx"] = self.ego_data["posX"] - self.pedestrian_data["posX"]
  929. self.ego_data["dy"] = self.ego_data["posY"] - self.pedestrian_data["posY"]
  930. self.ego_data["dist"] = np.sqrt(
  931. self.ego_data["dx"] ** 2 + self.ego_data["dy"] ** 2
  932. )
  933. self.ego_data["rela_pos"] = (
  934. self.ego_data["dx"] * self.ego_data["speedX"]
  935. + self.ego_data["dy"] * self.ego_data["speedY"]
  936. )
  937. simtime = self.ego_data[
  938. (self.ego_data["rela_pos"] > 0) & (self.ego_data["dist"] < 50)
  939. ]["simTime"].tolist()
  940. return simtime
  941. def different_road_area_simtime(self, df, threshold=0.5):
  942. if not df:
  943. return []
  944. simtime_group = []
  945. current_simtime_group = [df[0]]
  946. for i in range(1, len(df)):
  947. if abs(df[i] - df[i - 1]) <= threshold:
  948. current_simtime_group.append(df[i])
  949. else:
  950. simtime_group.append(current_simtime_group)
  951. current_simtime_group = [df[i]]
  952. simtime_group.append(current_simtime_group)
  953. return simtime_group
  954. def turn_back_in_forbiden_sign_detector(self):
  955. """
  956. 禁止掉头type = 8
  957. """
  958. forbiden_turn_back_simTime = self.ego_data[self.ego_data["sign_type1"] == 8][
  959. "simTime"
  960. ].tolist()
  961. forbiden_turn_left_simTime = self.ego_data[self.ego_data["sign_type1"] == 9][
  962. "simTime"
  963. ].tolist()
  964. forbiden_turn_back_simtime_devide = self.different_road_area_simtime(
  965. forbiden_turn_back_simTime
  966. )
  967. forbiden_turn_left_simtime_devide = self.different_road_area_simtime(
  968. forbiden_turn_left_simTime
  969. )
  970. for forbiden_turn_back_simtime in forbiden_turn_back_simtime_devide:
  971. ego_car1 = self.ego_data.loc[
  972. (self.ego_data["simFrame"].isin(forbiden_turn_back_simtime))
  973. ]
  974. ego_start_speedx1 = ego_car1["speedX"].iloc[0]
  975. ego_start_speedy1 = ego_car1["speedY"].iloc[0]
  976. ego_end_speedx1 = ego_car1["speedX"].iloc[-1]
  977. ego_end_speedy1 = ego_car1["speedY"].iloc[-1]
  978. if (
  979. ego_end_speedx1 * ego_start_speedx1
  980. + ego_end_speedy1 * ego_start_speedy1
  981. < 0
  982. ):
  983. self.turning_in_forbiden_turn_back_sign_count += 1
  984. for forbiden_turn_left_simtime in forbiden_turn_left_simtime_devide:
  985. ego_car2 = self.ego_data.loc[
  986. (self.ego_data["simFrame"].isin(forbiden_turn_left_simtime))
  987. ]
  988. ego_start_speedx2 = ego_car2["speedX"].iloc[0]
  989. ego_start_speedy2 = ego_car2["speedY"].iloc[0]
  990. ego_end_speedx2 = ego_car2["speedX"].iloc[-1]
  991. ego_end_speedy2 = ego_car2["speedY"].iloc[-1]
  992. if (
  993. ego_end_speedx2 * ego_start_speedx2
  994. + ego_end_speedy2 * ego_start_speedy2
  995. < 0
  996. ):
  997. self.turning_in_forbiden_turn_left_sign_count += 1
  998. def avoid_pedestrian_when_turn_back_detector(self):
  999. sensor_on_intersection = self.pedestrian_in_front_of_car()
  1000. avoid_pedestrian_when_turn_back_simTime_list = self.ego_data[
  1001. self.ego_data["lane_type"] == 20
  1002. ]["simTime"].tolist()
  1003. avoid_pedestrian_when_turn_back_simTime_devide = (
  1004. self.different_road_area_simtime(
  1005. avoid_pedestrian_when_turn_back_simTime_list
  1006. )
  1007. )
  1008. if len(sensor_on_intersection) > 0:
  1009. for avoid_pedestrian_when_turn_back_simtime in avoid_pedestrian_when_turn_back_simTime_devide:
  1010. pedestrian_in_intersection_simtime = self.pedestrian_data[
  1011. self.pedestrian_data["simTime"].isin(
  1012. avoid_pedestrian_when_turn_back_simtime
  1013. )
  1014. ].tolist()
  1015. ego_df = self.ego_data[
  1016. self.ego_data["simTime"].isin(pedestrian_in_intersection_simtime)
  1017. ].reset_index(drop=True)
  1018. pedestrian_df = self.pedestrian_data[
  1019. self.pedestrian_data["simTime"].isin(
  1020. pedestrian_in_intersection_simtime
  1021. )
  1022. ].reset_index(drop=True)
  1023. ego_df["dist"] = np.sqrt(
  1024. (ego_df["posx"] - pedestrian_df["posx"]) ** 2
  1025. + (ego_df["posy"] - pedestrian_df["posy"]) ** 2
  1026. )
  1027. ego_df["speed"] = np.sqrt(ego_df["speedx"] ** 2 + ego_df["speedy"] ** 2)
  1028. if any(ego_df["speed"].tolist()) != 0:
  1029. self.avoid_pedestrian_when_turn_back_count += 1
  1030. def calculate_turn_in_forbiden_turn_left_sign_count(self):
  1031. self.turn_back_in_forbiden_sign_detector()
  1032. return self.turning_in_forbiden_turn_left_sign_count
  1033. def calculate_turn_in_forbiden_turn_back_sign_count(self):
  1034. self.turn_back_in_forbiden_sign_detector()
  1035. return self.turning_in_forbiden_turn_back_sign_count
  1036. def calaulate_avoid_pedestrian_when_turn_back_count(self):
  1037. self.avoid_pedestrian_when_turn_back_detector()
  1038. return self.avoid_pedestrian_when_turn_back_count
  1039. class WrongWayViolation(object):
  1040. """停车违规类"""
  1041. def __init__(self, df_data):
  1042. print("停车违规类初始化中...")
  1043. self.traffic_violations_type = "停车违规类"
  1044. # 存储原始数据引用
  1045. self._raw_data = df_data.obj_data[1]
  1046. # 初始化属性,但不立即创建数据副本
  1047. self._data = None
  1048. # 初始化违规统计
  1049. self.violation_count = {
  1050. "urbanExpresswayOrHighwayDrivingLaneStopped": 0,
  1051. "urbanExpresswayOrHighwayEmergencyLaneStopped": 0,
  1052. "urbanExpresswayEmergencyLaneDriving": 0,
  1053. }
  1054. @property
  1055. def data(self):
  1056. """懒加载方式获取数据"""
  1057. if self._data is None:
  1058. # 使用浅拷贝代替深拷贝
  1059. self._data = self._raw_data.copy()
  1060. return self._data
  1061. def process_violations(self):
  1062. """处理停车或者紧急车道行驶违规数据"""
  1063. # 提取有效道路类型
  1064. urban_expressway_or_highway = {1, 2}
  1065. driving_lane = {1, 4, 5, 6}
  1066. emergency_lane = {12}
  1067. self.data["v"] *= 3.6 # 转换速度
  1068. # 使用向量化和条件判断进行违规判定
  1069. conditions = [
  1070. (
  1071. self.data["road_fc"].isin(urban_expressway_or_highway)
  1072. & self.data["lane_type"].isin(driving_lane)
  1073. & (self.data["v"] == 0)
  1074. ),
  1075. (
  1076. self.data["road_fc"].isin(urban_expressway_or_highway)
  1077. & self.data["lane_type"].isin(emergency_lane)
  1078. & (self.data["v"] == 0)
  1079. ),
  1080. (
  1081. self.data["road_fc"].isin(urban_expressway_or_highway)
  1082. & self.data["lane_type"].isin(emergency_lane)
  1083. & (self.data["v"] != 0)
  1084. ),
  1085. ]
  1086. violation_types = [
  1087. "urbanExpresswayOrHighwayDrivingLaneStopped",
  1088. "urbanExpresswayOrHighwayEmergencyLaneStopped",
  1089. "urbanExpresswayEmergencyLaneDriving",
  1090. ]
  1091. # 设置违规类型
  1092. self.data["violation_type"] = None
  1093. for condition, violation_type in zip(conditions, violation_types):
  1094. self.data.loc[condition, "violation_type"] = violation_type
  1095. # 统计违规情况
  1096. self.violation_count = (
  1097. self.data["violation_type"]
  1098. .value_counts()
  1099. .reindex(violation_types, fill_value=0)
  1100. .to_dict()
  1101. )
  1102. def calculate_urbanExpresswayOrHighwayDrivingLaneStopped_count(self):
  1103. self.process_violations()
  1104. return self.violation_count["urbanExpresswayOrHighwayDrivingLaneStopped"]
  1105. def calculate_urbanExpresswayOrHighwayEmergencyLaneStopped_count(self):
  1106. self.process_violations()
  1107. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1108. def calculate_urbanExpresswayEmergencyLaneDriving(self):
  1109. self.process_violations()
  1110. return self.violation_count["urbanExpresswayEmergencyLaneDriving"]
  1111. class SpeedingViolation(object):
  1112. """超速违规类"""
  1113. """ 这里没有道路标志牌限速指标,因为shp地图中没有这个信息"""
  1114. def __init__(self, df_data):
  1115. print("超速违规类初始化中...")
  1116. self.traffic_violations_type = "超速违规类"
  1117. # 存储原始数据引用
  1118. self._raw_data = df_data.obj_data[1]
  1119. # 初始化属性,但不立即创建数据副本
  1120. self._data = None
  1121. # 初始化违规统计
  1122. self.violation_counts = {
  1123. "urbanExpresswayOrHighwaySpeedOverLimit50": 0,
  1124. "urbanExpresswayOrHighwaySpeedOverLimit20to50": 0,
  1125. "urbanExpresswayOrHighwaySpeedOverLimit0to20": 0,
  1126. "urbanExpresswayOrHighwaySpeedUnderLimit": 0,
  1127. "generalRoadSpeedOverLimit50": 0,
  1128. "generalRoadSpeedOverLimit20to50": 0,
  1129. }
  1130. @property
  1131. def data(self):
  1132. """懒加载方式获取数据"""
  1133. if self._data is None:
  1134. # 使用浅拷贝代替深拷贝
  1135. self._data = self._raw_data.copy()
  1136. # 预处理数据 - 转换速度单位
  1137. self._data["v"] *= 3.6 # 转换为 km/h
  1138. return self._data
  1139. def process_violations(self):
  1140. """处理数据帧,检查超速和其他违规行为"""
  1141. # 提取有效道路类型
  1142. urban_expressway_or_highway = {1, 2} # 使用大括号直接创建集合
  1143. general_road = {3} # 直接创建包含一个元素的集合
  1144. self.data["v"] *= 3.6 # 转换速度
  1145. # 违规判定
  1146. conditions = [
  1147. (
  1148. self.data["road_fc"].isin(urban_expressway_or_highway)
  1149. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1150. ),
  1151. (
  1152. self.data["road_fc"].isin(urban_expressway_or_highway)
  1153. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1154. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1155. ),
  1156. (
  1157. self.data["road_fc"].isin(urban_expressway_or_highway)
  1158. & (self.data["v"] > self.data["road_speed_max"])
  1159. & (self.data["v"] <= self.data["road_speed_max"] * 1.2)
  1160. ),
  1161. (
  1162. self.data["road_fc"].isin(urban_expressway_or_highway)
  1163. & (self.data["v"] < self.data["road_speed_min"])
  1164. ),
  1165. (
  1166. self.data["road_fc"].isin(general_road)
  1167. & (self.data["v"] > self.data["road_speed_max"] * 1.5)
  1168. ),
  1169. (
  1170. self.data["road_fc"].isin(general_road)
  1171. & (self.data["v"] > self.data["road_speed_max"] * 1.2)
  1172. & (self.data["v"] <= self.data["road_speed_max"] * 1.5)
  1173. ),
  1174. ]
  1175. violation_types = [
  1176. "urbanExpresswayOrHighwaySpeedOverLimit50",
  1177. "urbanExpresswayOrHighwaySpeedOverLimit20to50",
  1178. "urbanExpresswayOrHighwaySpeedOverLimit0to20",
  1179. "urbanExpresswayOrHighwaySpeedUnderLimit",
  1180. "generalRoadSpeedOverLimit50",
  1181. "generalRoadSpeedOverLimit20to50",
  1182. ]
  1183. # 设置违规类型
  1184. self.data["violation_type"] = None
  1185. for condition, violation_type in zip(conditions, violation_types):
  1186. self.data.loc[condition, "violation_type"] = violation_type
  1187. # 统计各类违规情况
  1188. self.violation_counts = self.data["violation_type"].value_counts().to_dict()
  1189. def calculate_urbanExpresswayOrHighwaySpeedOverLimit50_count(self):
  1190. self.process_violations()
  1191. return self.violation_counts.get("urbanExpresswayOrHighwaySpeedOverLimit50") if self.violation_counts.get(
  1192. "urbanExpresswayOrHighwaySpeedOverLimit50") else 0
  1193. def calculate_urbanExpresswayOrHighwaySpeedOverLimit20to50_count(self):
  1194. self.process_violations()
  1195. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit20to50"] if self.violation_counts.get(
  1196. "urbanExpresswayOrHighwaySpeedOverLimit20to50") else 0
  1197. def calculate_urbanExpresswayOrHighwaySpeedOverLimit0to20_count(self):
  1198. self.process_violations()
  1199. return self.violation_counts["urbanExpresswayOrHighwaySpeedOverLimit0to20"] if self.violation_counts.get(
  1200. "urbanExpresswayOrHighwaySpeedOverLimit0to20") else 0
  1201. def calculate_urbanExpresswayOrHighwaySpeedUnderLimit_count(self):
  1202. self.process_violations()
  1203. return self.violation_counts["urbanExpresswayOrHighwaySpeedUnderLimit"] if self.violation_counts.get(
  1204. "urbanExpresswayOrHighwaySpeedUnderLimit") else 0
  1205. def calculate_generalRoadSpeedOverLimit50(self):
  1206. self.process_violations()
  1207. return self.violation_counts["generalRoadSpeedOverLimit50"] if self.violation_counts.get(
  1208. "generalRoadSpeedOverLimit50") else 0
  1209. def calculate_generalRoadSpeedOverLimit20to50_count(self):
  1210. self.process_violations()
  1211. return self.violation_counts["generalRoadSpeedOverLimit20to50"] if self.violation_counts.get(
  1212. "generalRoadSpeedOverLimit20to50") else 0
  1213. class TrafficLightViolation(object):
  1214. """违反交通灯类"""
  1215. """需要补充判断车辆是左转直行还是右转,判断红绿灯是方向性红绿灯还是通过性红绿灯"""
  1216. def __init__(self, df_data):
  1217. """初始化方法"""
  1218. self.traffic_violations_type = "违反交通灯类"
  1219. print("违反交通灯类 类初始化中...")
  1220. self.config = df_data.vehicle_config
  1221. self.data_ego = df_data.ego_data # 获取数据
  1222. self.violation_counts = {
  1223. "trafficSignalViolation": 0,
  1224. "illegalDrivingOrParkingAtCrossroads": 0,
  1225. }
  1226. # 处理数据并判定违规
  1227. self.process_violations()
  1228. def is_point_cross_line(self, point, stop_line_points):
  1229. """
  1230. 判断车辆的某一坐标点是否跨越了由两个点定义的停止线(线段)。
  1231. 使用向量叉积判断点是否在线段上,并通过计算车辆的航向角来判断是否跨越了停止线。
  1232. :param point: 车辆位置点 (x, y, heading),包括 x, y 位置以及朝向角度(弧度制)
  1233. :param stop_line_points: 停止线两个端点 [[x1, y1], [x2, y2]]
  1234. :return: True 如果车辆跨越了停止线,否则 False
  1235. """
  1236. line_vector = np.array(
  1237. [
  1238. stop_line_points[1][0] - stop_line_points[0][0],
  1239. stop_line_points[1][1] - stop_line_points[0][1],
  1240. ]
  1241. )
  1242. point_vector = np.array(
  1243. [point[0] - stop_line_points[0][0], point[1] - stop_line_points[0][1]]
  1244. )
  1245. cross_product = np.cross(line_vector, point_vector)
  1246. if cross_product != 0:
  1247. return False
  1248. mid_point = (
  1249. np.array([stop_line_points[0][0], stop_line_points[0][1]])
  1250. + 0.5 * line_vector
  1251. )
  1252. axletree_to_mid_vector = np.array(
  1253. [point[0] - mid_point[0], point[1] - mid_point[1]]
  1254. )
  1255. direction_vector = np.array([math.cos(point[2]), math.sin(point[2])])
  1256. norm_axletree_to_mid = np.linalg.norm(axletree_to_mid_vector)
  1257. norm_direction = np.linalg.norm(direction_vector)
  1258. if norm_axletree_to_mid == 0 or norm_direction == 0:
  1259. return False
  1260. cos_theta = np.dot(axletree_to_mid_vector, direction_vector) / (
  1261. norm_axletree_to_mid * norm_direction
  1262. )
  1263. angle_theta = math.degrees(math.acos(cos_theta))
  1264. return angle_theta <= 90
  1265. def _filter_data(self):
  1266. """过滤数据,筛选出需要分析的记录"""
  1267. return self.data_ego[
  1268. (self.data_ego["stopline_id"] != -1)
  1269. & (self.data_ego["stopline_type"] == 1)
  1270. & (self.data_ego["trafficlight_id"] != -1)
  1271. ]
  1272. def _group_data(self, filtered_data):
  1273. """按时间差对数据进行分组"""
  1274. filtered_data["time_diff"] = filtered_data["simTime"].diff().fillna(0)
  1275. threshold = 0.5
  1276. filtered_data["group"] = (filtered_data["time_diff"] > threshold).cumsum()
  1277. return filtered_data.groupby("group")
  1278. def _analyze_group(self, group_data):
  1279. """分析单个分组的数据,判断是否闯红灯"""
  1280. photos = []
  1281. stop_in_intersection = False
  1282. for _, row in group_data.iterrows():
  1283. vehicle_pos = np.array([row["posX"], row["posY"], row["posH"]])
  1284. stop_line_points = [
  1285. [row["stopline_x1"], row["stopline_y1"]],
  1286. [row["stopline_x2"], row["stopline_y2"]],
  1287. ]
  1288. traffic_light_status = row["traffic_light_status"]
  1289. heading_vector = np.array([np.cos(row["posH"]), np.sin(row["posH"])])
  1290. heading_vector = heading_vector / np.linalg.norm(heading_vector)
  1291. # with open(self.config_path / "vehicle_config.yaml", 'r') as f:
  1292. # config = yaml.load(f, Loader=yaml.FullLoader)
  1293. front_wheel_pos = vehicle_pos[:2] + self.config["EGO_WHEELBASS"] * heading_vector
  1294. rear_wheel_pos = vehicle_pos[:2] - self.config["EGO_WHEELBASS"] * heading_vector
  1295. dist = math.sqrt(
  1296. (row["posX"] - row["traffic_light_x"]) ** 2
  1297. + (row["posY"] - row["traffic_light_y"]) ** 2
  1298. )
  1299. if abs(row["speedH"]) > 0.01 or abs(row["speedH"]) < 0.01:
  1300. has_crossed_line_front = (
  1301. self.is_point_cross_line(front_wheel_pos, stop_line_points)
  1302. and traffic_light_status == 1
  1303. )
  1304. has_crossed_line_rear = (
  1305. self.is_point_cross_line(rear_wheel_pos, stop_line_points)
  1306. and row["v"] > 0
  1307. and traffic_light_status == 1
  1308. )
  1309. has_stop_in_intersection = has_crossed_line_front and row["v"] == 0
  1310. has_passed_intersection = has_crossed_line_front and dist < 1.0
  1311. # print(f'time: {row["simTime"]}, speed: {row["speedH"]}, posH: {row["posH"]}, dist: {dist:.2f}, has_stop_in_intersection: {has_stop_in_intersection}, has_passed_intersection: {has_passed_intersection}')
  1312. photos.extend(
  1313. [
  1314. has_crossed_line_front,
  1315. has_crossed_line_rear,
  1316. has_passed_intersection,
  1317. has_stop_in_intersection,
  1318. ]
  1319. )
  1320. stop_in_intersection = has_passed_intersection
  1321. return photos, stop_in_intersection
  1322. def is_vehicle_run_a_red_light(self):
  1323. """判断车辆是否闯红灯"""
  1324. filtered_data = self._filter_data()
  1325. grouped_data = self._group_data(filtered_data)
  1326. self.photos_group = []
  1327. self.stop_in_intersections = []
  1328. for _, group_data in grouped_data:
  1329. photos, stop_in_intersection = self._analyze_group(group_data)
  1330. self.photos_group.append(photos)
  1331. self.stop_in_intersections.append(stop_in_intersection)
  1332. def process_violations(self):
  1333. """处理数据并判定违规"""
  1334. self.is_vehicle_run_a_red_light()
  1335. count_1 = sum(all(photos) for photos in self.photos_group)
  1336. count_2 = sum(
  1337. stop_in_intersection for stop_in_intersection in self.stop_in_intersections
  1338. )
  1339. self.violation_counts["trafficSignalViolation"] = count_1
  1340. self.violation_counts["illegalDrivingOrParkingAtCrossroads"] = count_2
  1341. def calculate_trafficSignalViolation_count(self):
  1342. self.process_violations()
  1343. return self.violation_counts["trafficSignalViolation"]
  1344. def calculate_illegalDrivingOrParkingAtCrossroads(self):
  1345. self.process_violations()
  1346. return self.violation_counts["illegalDrivingOrParkingAtCrossroads"]
  1347. class WarningViolation(object):
  1348. """警告性违规类"""
  1349. def __init__(self, df_data):
  1350. print("警告性违规类初始化中...")
  1351. self.traffic_violations_type = "警告性违规类"
  1352. # 存储原始数据引用
  1353. self.config = df_data.vehicle_config
  1354. self._raw_data = df_data.obj_data[1]
  1355. # 初始化属性,但不立即创建数据副本
  1356. self._data = None
  1357. # 初始化违规计数器
  1358. self.violation_counts = {
  1359. "generalRoadIrregularLaneUse": 0, # 驾驶机动车在高速公路、城市快速路以外的道路上不按规定车道行驶
  1360. "urbanExpresswayOrHighwayRideLaneDivider": 0, # 机动车在高速公路或者城市快速路上骑、轧车行道分界线
  1361. }
  1362. @property
  1363. def data(self):
  1364. """懒加载方式获取数据"""
  1365. if self._data is None:
  1366. # 使用浅拷贝代替深拷贝
  1367. self._data = self._raw_data.copy()
  1368. return self._data
  1369. def process_violations(self):
  1370. """处理所有违规类型"""
  1371. # 处理普通道路不按规定车道行驶违规
  1372. self._process_irregular_lane_use()
  1373. # 处理骑、轧车行道分界线违规
  1374. self._process_lane_divider_violation()
  1375. def _process_irregular_lane_use(self):
  1376. """处理普通道路不按规定车道行驶违规"""
  1377. # 定义道路和车道类型
  1378. general_road = {3} # 普通道路
  1379. lane_type = {11} # 非机动车道
  1380. # 使用布尔索引来筛选满足条件的行
  1381. condition = (self.data["road_fc"].isin(general_road)) & (
  1382. self.data["lane_type"].isin(lane_type)
  1383. )
  1384. # 创建一个新的列,并根据条件设置值
  1385. self.data["is_violation"] = condition
  1386. # 统计满足条件的连续时间段
  1387. violation_segments = self.count_continuous_violations(
  1388. self.data["is_violation"], self.data["simTime"]
  1389. )
  1390. # 更新违规计数
  1391. self.violation_counts["generalRoadIrregularLaneUse"] = len(violation_segments)
  1392. def _process_lane_divider_violation(self):
  1393. """处理骑、轧车行道分界线违规"""
  1394. # 获取车辆和车道宽度
  1395. car_width = self.config["CAR_WIDTH"]
  1396. lane_width = self.data["lane_width"]
  1397. # 计算阈值
  1398. threshold = (lane_width - car_width) / 2
  1399. # 找到满足条件的行
  1400. self.data["is_violation"] = self.data["laneOffset"] > threshold
  1401. # 统计满足条件的连续时间段
  1402. violation_segments = self.count_continuous_violations(
  1403. self.data["is_violation"], self.data["simTime"]
  1404. )
  1405. # 更新违规计数
  1406. self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"] = len(
  1407. violation_segments
  1408. )
  1409. def count_continuous_violations(self, violation_series, time_series):
  1410. """统计连续违规的时间段数量
  1411. Args:
  1412. violation_series: 表示是否违规的布尔序列
  1413. time_series: 对应的时间序列
  1414. Returns:
  1415. list: 连续违规时间段列表
  1416. """
  1417. continuous_segments = []
  1418. current_segment = []
  1419. for is_violation, time in zip(violation_series, time_series):
  1420. if is_violation:
  1421. if not current_segment: # 新的连续段开始
  1422. current_segment.append(time)
  1423. else:
  1424. if current_segment: # 连续段结束
  1425. current_segment.append(time) # 添加结束时间
  1426. continuous_segments.append(current_segment)
  1427. current_segment = []
  1428. # 检查是否有一个未结束的连续段在最后
  1429. if current_segment:
  1430. current_segment.append(time_series.iloc[-1]) # 使用最后的时间作为结束时间
  1431. continuous_segments.append(current_segment)
  1432. return continuous_segments
  1433. def calculate_generalRoadIrregularLaneUse_count(self):
  1434. """计算普通道路不按规定车道行驶违规次数"""
  1435. # 只处理普通道路不按规定车道行驶违规
  1436. self._process_irregular_lane_use()
  1437. return self.violation_counts["generalRoadIrregularLaneUse"]
  1438. def calculate_urbanExpresswayOrHighwayRideLaneDivider_count(self):
  1439. """计算骑、轧车行道分界线违规次数"""
  1440. # 只处理骑、轧车行道分界线违规
  1441. self._process_lane_divider_violation()
  1442. return self.violation_counts["urbanExpresswayOrHighwayRideLaneDivider"]
  1443. class TrafficSignViolation:
  1444. """交通标志违规类"""
  1445. PROHIBITED_STRAIGHT_THRESHOLD = 5
  1446. SIGN_TYPE_STRAIGHT_PROHIBITED = 7
  1447. SIGN_TYPE_SPEED_LIMIT = 12
  1448. SIGN_TYPE_MIN_SPEED_LIMIT = 13
  1449. def __init__(self, df_data):
  1450. print("交通标志违规类初始化中...")
  1451. self.traffic_violations_type = "交通标志违规类"
  1452. # 存储原始数据引用
  1453. self._raw_data = df_data.obj_data[1]
  1454. # 初始化属性,但不立即创建数据副本
  1455. self._data = None
  1456. # 延迟计算标志
  1457. self._calculated = False
  1458. self._violation_counts = {
  1459. "NoStraightThrough": 0,
  1460. "SpeedLimitViolation": 0,
  1461. "MinimumSpeedLimitViolation": 0
  1462. }
  1463. @property
  1464. def data(self):
  1465. """懒加载方式获取数据"""
  1466. if self._data is None:
  1467. # 使用浅拷贝代替深拷贝
  1468. self._data = self._raw_data.copy()
  1469. # 预处理数据 - 按时间排序
  1470. self._data = self._data.sort_values('simTime').reset_index(drop=True)
  1471. return self._data
  1472. def _ensure_calculated(self):
  1473. """保证计算只执行一次"""
  1474. if not self._calculated:
  1475. self._check_prohibition_violations()
  1476. self._check_instruction_violations()
  1477. self._calculated = True
  1478. def calculate_NoStraightThrough_count(self):
  1479. """计算禁止直行违规次数"""
  1480. self._ensure_calculated()
  1481. return self._violation_counts["NoStraightThrough"]
  1482. def calculate_SpeedLimitViolation_count(self):
  1483. """计算超速违规次数"""
  1484. self._ensure_calculated()
  1485. return self._violation_counts["SpeedLimitViolation"]
  1486. def calculate_MinimumSpeedLimitViolation_count(self):
  1487. """计算最低限速违规次数"""
  1488. self._ensure_calculated()
  1489. return self._violation_counts["MinimumSpeedLimitViolation"]
  1490. def _check_prohibition_violations(self):
  1491. """处理禁令标志违规(禁止直行和限速)"""
  1492. self._check_straight_violation()
  1493. self._check_speed_violation(
  1494. self.SIGN_TYPE_SPEED_LIMIT,
  1495. operator.gt,
  1496. "SpeedLimitViolation"
  1497. )
  1498. def _check_instruction_violations(self):
  1499. """处理指示标志违规(最低限速)"""
  1500. self._check_speed_violation(
  1501. self.SIGN_TYPE_MIN_SPEED_LIMIT,
  1502. operator.lt,
  1503. "MinimumSpeedLimitViolation"
  1504. )
  1505. def _check_straight_violation(self):
  1506. """检查禁止直行违规"""
  1507. straight_df = self.data[self.data["sign_type1"] == self.SIGN_TYPE_STRAIGHT_PROHIBITED]
  1508. if not straight_df.empty:
  1509. # 计算航向角变化并填充缺失值
  1510. straight_df = straight_df.copy()
  1511. straight_df['posH_diff'] = straight_df['posH'].diff().abs().fillna(0)
  1512. # 创建筛选条件
  1513. mask = (
  1514. (straight_df['posH_diff'] <= self.PROHIBITED_STRAIGHT_THRESHOLD) &
  1515. (straight_df['v'] > 0)
  1516. )
  1517. self._violation_counts["NoStraightThrough"] = mask.sum()
  1518. def _check_speed_violation(self, sign_type, compare_op, count_key):
  1519. """通用速度违规检查方法
  1520. Args:
  1521. sign_type: 标志类型
  1522. compare_op: 比较操作符
  1523. count_key: 违规计数键名
  1524. """
  1525. violation_df = self.data[self.data["sign_type1"] == sign_type]
  1526. if not violation_df.empty:
  1527. mask = compare_op(violation_df['v'], violation_df['sign_speed'])
  1528. self._violation_counts[count_key] = mask.sum()