efficient.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2024 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen
  10. @Data: 2024/12/23
  11. @Last Modified: 2024/12/23
  12. @Summary: Efficient metrics calculation
  13. """
  14. from modules.lib.score import Score
  15. from modules.lib.log_manager import LogManager
  16. import numpy as np
  17. from typing import Dict, Tuple, Optional, Callable, Any
  18. import pandas as pd
  19. class Efficient:
  20. """高效性指标计算类"""
  21. def __init__(self, data_processed):
  22. """初始化高效性指标计算类
  23. Args:
  24. data_processed: 预处理后的数据对象
  25. """
  26. self.logger = LogManager().get_logger()
  27. self.data_processed = data_processed
  28. self.df = data_processed.object_df.copy() # 浅拷贝
  29. self.ego_df = data_processed.ego_data.copy() # 浅拷贝
  30. # 配置参数
  31. self.STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值 (m/s)
  32. self.STOP_TIME_THRESHOLD = 0.5 # 停车时间阈值 (秒)
  33. self.FRAME_RANGE = 13 # 停车帧数阈值
  34. # 初始化结果变量
  35. self.stop_count = 0 # 停车次数
  36. self.stop_duration = 0 # 平均停车时长
  37. self.average_v = 0 # 平均速度
  38. # 统计指标结果字典
  39. self.calculated_value = {
  40. 'maxSpeed': 0,
  41. 'deviationSpeed': 0,
  42. 'averagedSpeed': 0,
  43. 'stopDuration': 0,
  44. 'speedUtilizationRatio': 0,
  45. 'accelerationSmoothness': 0 # 添加新指标的默认值
  46. }
  47. def _max_speed(self):
  48. """计算最大速度
  49. Returns:
  50. float: 最大速度 (m/s)
  51. """
  52. max_speed = self.ego_df['v'].max() * 3.6 # 转换为 km/h
  53. self.calculated_value['maxSpeed'] = max_speed
  54. return max_speed
  55. def _deviation_speed(self):
  56. """计算速度方差(单位 km²/h²) 改成标准差比较好"""
  57. deviation = (self.ego_df['v'] * 3.6).std() # m/s → km/h,再计算标准差
  58. self.calculated_value['deviationSpeed'] = deviation
  59. return deviation
  60. def average_velocity(self):
  61. """计算平均速度
  62. Returns:
  63. float: 平均速度 (km/h)
  64. """
  65. self.average_v = self.ego_df['v'].mean() * 3.6 # 转换为 km/h
  66. self.calculated_value['averagedSpeed'] = self.average_v
  67. return self.average_v
  68. def acceleration_smoothness(self):
  69. """计算加速度平稳度
  70. 加速度平稳度用以衡量车辆加减速过程的平滑程度,
  71. 通过计算加速度序列的波动程度(标准差)来评估。
  72. 平稳度指标定义为 1-σ_a/a_max(归一化后靠近1代表加速度更稳定)。
  73. Returns:
  74. float: 加速度平稳度 (0-1之间的比率,越接近1表示越平稳)
  75. """
  76. # 获取加速度数据
  77. # 优先使用车辆坐标系下的加速度数据
  78. if 'lon_acc_vehicle' in self.ego_df.columns and 'lat_acc_vehicle' in self.ego_df.columns:
  79. # 使用车辆坐标系下的加速度计算合成加速度
  80. lon_acc = self.ego_df['lon_acc_vehicle'].values
  81. lat_acc = self.ego_df['lat_acc_vehicle'].values
  82. accel_magnitude = np.sqrt(lon_acc ** 2 + lat_acc ** 2)
  83. self.logger.info("使用车辆坐标系下的加速度计算合成加速度")
  84. elif 'accelX' in self.ego_df.columns and 'accelY' in self.ego_df.columns:
  85. # 计算合成加速度(考虑X和Y方向)
  86. accel_x = self.ego_df['accelX'].values
  87. accel_y = self.ego_df['accelY'].values
  88. accel_magnitude = np.sqrt(accel_x ** 2 + accel_y ** 2)
  89. self.logger.info("使用accelX和accelY计算合成加速度")
  90. else:
  91. # 从速度差分计算加速度
  92. velocity = self.ego_df['v'].values
  93. time_diff = self.ego_df['simTime'].diff().fillna(0).values
  94. # 避免除以零
  95. time_diff[time_diff == 0] = 1e-6
  96. accel_magnitude = np.abs(np.diff(velocity, prepend=velocity[0]) / time_diff)
  97. self.logger.info("从速度差分计算加速度")
  98. # 过滤掉异常值(可选)
  99. # 使用3倍标准差作为阈值
  100. mean_accel = np.mean(accel_magnitude)
  101. std_accel = np.std(accel_magnitude)
  102. threshold = mean_accel + 3 * std_accel
  103. filtered_accel = accel_magnitude[accel_magnitude <= threshold]
  104. # 如果过滤后数据太少,则使用原始数据
  105. if len(filtered_accel) < len(accel_magnitude) * 0.8:
  106. filtered_accel = accel_magnitude
  107. self.logger.info("过滤后数据太少,使用原始加速度数据")
  108. else:
  109. self.logger.info(f"过滤掉 {len(accel_magnitude) - len(filtered_accel)} 个异常加速度值")
  110. # 计算加速度标准差
  111. accel_std = np.std(filtered_accel)
  112. # 计算最大加速度(使用95百分位数以避免极端值影响)
  113. accel_max = np.percentile(filtered_accel, 95)
  114. # 防止除以零
  115. if accel_max < 0.001:
  116. accel_max = 0.001
  117. # 计算平稳度指标: 1 - σ_a/a_max
  118. smoothness = 1.0 - (accel_std / accel_max)
  119. # 限制在0-1范围内
  120. smoothness = np.clip(smoothness, 0.0, 1.0)
  121. self.calculated_value['accelerationSmoothness'] = smoothness
  122. self.logger.info(f"加速度标准差: {accel_std:.4f} m/s²")
  123. self.logger.info(f"加速度最大值(95百分位): {accel_max:.4f} m/s²")
  124. self.logger.info(f"加速度平稳度(Acceleration Smoothness): {smoothness:.4f}")
  125. return smoothness
  126. def stop_duration_and_count(self):
  127. """计算停车次数和平均停车时长
  128. Returns:
  129. float: 平均停车时长 (秒)
  130. """
  131. # 获取速度低于阈值的时间和帧号
  132. stop_mask = self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD
  133. if not any(stop_mask):
  134. self.calculated_value['stopDuration'] = 0
  135. return 0 # 如果没有停车,直接返回0
  136. stop_time_list = self.ego_df.loc[stop_mask, 'simTime'].values.tolist()
  137. stop_frame_list = self.ego_df.loc[stop_mask, 'simFrame'].values.tolist()
  138. if not stop_frame_list:
  139. return 0 # 防止空列表导致的索引错误
  140. stop_frame_group = []
  141. stop_time_group = []
  142. sum_stop_time = 0
  143. f1, t1 = stop_frame_list[0], stop_time_list[0]
  144. # 检测停车段
  145. for i in range(1, len(stop_frame_list)):
  146. if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续
  147. f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1]
  148. # 如果停车有效(帧数差 >= FRAME_RANGE)
  149. if f2 - f1 >= self.FRAME_RANGE:
  150. stop_frame_group.append((f1, f2))
  151. stop_time_group.append((t1, t2))
  152. sum_stop_time += (t2 - t1)
  153. self.stop_count += 1
  154. # 更新起始点
  155. f1, t1 = stop_frame_list[i], stop_time_list[i]
  156. # 检查最后一段停车
  157. if len(stop_frame_list) > 0:
  158. f2, t2 = stop_frame_list[-1], stop_time_list[-1]
  159. last_frame = self.ego_df['simFrame'].values[-1]
  160. # 确保不是因为数据结束导致的停车
  161. if f2 - f1 >= self.FRAME_RANGE and f2 != last_frame:
  162. stop_frame_group.append((f1, f2))
  163. stop_time_group.append((t1, t2))
  164. sum_stop_time += (t2 - t1)
  165. self.stop_count += 1
  166. # 计算平均停车时长
  167. self.stop_duration = sum_stop_time / self.stop_count if self.stop_count > 0 else 0
  168. self.calculated_value['stopDuration'] = self.stop_duration
  169. self.logger.info(f"检测到停车次数: {self.stop_count}, 平均停车时长: {self.stop_duration:.2f}秒")
  170. return self.stop_duration
  171. def speed_utilization_ratio(self, default_speed_limit=60.0):
  172. """计算速度利用率
  173. 速度利用率度量车辆实际速度与道路限速之间的比率,
  174. 反映车辆对道路速度资源的利用程度。
  175. 计算公式: R_v = v_actual / v_limit
  176. Args:
  177. default_speed_limit: 默认道路限速 (km/h),当无法获取实际限速时使用
  178. Returns:
  179. float: 速度利用率 (0-1之间的比率)
  180. """
  181. # 获取车辆速度数据 (m/s)
  182. speeds = self.ego_df['v'].values
  183. # 尝试从数据中获取道路限速信息
  184. # 首先检查road_speed_max列,其次检查speedLimit列,最后使用默认值
  185. if 'road_speed_max' in self.ego_df.columns:
  186. speed_limits = self.ego_df['road_speed_max'].values
  187. # self.logger.info(f"使用road_speed_max列作为道路限速信息: {speed_limits} km/h")
  188. elif 'speedLimit' in self.ego_df.columns:
  189. speed_limits = self.ego_df['speedLimit'].values
  190. # self.logger.info("使用speedLimit列作为道路限速信息")
  191. else:
  192. # 默认限速转换为 m/s
  193. default_limit_ms = default_speed_limit / 3.6
  194. speed_limits = np.full_like(speeds, default_limit_ms)
  195. # self.logger.info(f"未找到道路限速信息,使用默认限速: {default_speed_limit} km/h")
  196. # 确保限速值为m/s单位,如果数据是km/h需要转换
  197. # 假设如果限速值大于30,则认为是km/h单位,需要转换为m/s
  198. if np.mean(speed_limits) > 30:
  199. speed_limits = speed_limits / 3.6
  200. # self.logger.info(f"将限速单位从km/h转换为m/s: {speed_limits} m/s")
  201. # 计算每一帧的速度利用率
  202. ratios = np.divide(speeds, speed_limits,
  203. out=np.zeros_like(speeds),
  204. where=speed_limits != 0)
  205. # 限制比率不超过1(超速按1计算)
  206. ratios = np.minimum(ratios, 1.0)
  207. # 计算平均速度利用率
  208. avg_ratio = np.mean(ratios)
  209. self.calculated_value['speedUtilizationRatio'] = avg_ratio
  210. self.logger.info(f"速度利用率(Speed Utilization Ratio): {avg_ratio:.4f}")
  211. return avg_ratio
  212. class EfficientManager:
  213. """高效性指标管理类"""
  214. def __init__(self, data_processed, plot_path):
  215. self.data = data_processed
  216. self.efficient = EfficientRegistry(self.data)
  217. self.plot_path = plot_path
  218. def report_statistic(self):
  219. """Generate the statistics and report the results."""
  220. # 使用注册表批量执行指标计算
  221. efficient_result = self.efficient.batch_execute()
  222. return efficient_result
  223. # ----------------------
  224. # 基础指标计算函数
  225. # ----------------------
  226. def maxSpeed(data_processed) -> dict:
  227. """计算最大速度"""
  228. efficient = Efficient(data_processed)
  229. max_speed = efficient._max_speed()
  230. return {"maxSpeed": float(max_speed)}
  231. def deviationSpeed(data_processed) -> dict:
  232. """计算速度方差"""
  233. efficient = Efficient(data_processed)
  234. deviation = efficient._deviation_speed()
  235. return {"deviationSpeed": float(deviation)}
  236. def averagedSpeed(data_processed) -> dict:
  237. """计算平均速度"""
  238. efficient = Efficient(data_processed)
  239. avg_speed = efficient.average_velocity()
  240. return {"averagedSpeed": float(avg_speed)}
  241. def stopDuration(data_processed) -> dict:
  242. """计算停车持续时间和次数"""
  243. efficient = Efficient(data_processed)
  244. stop_duration = efficient.stop_duration_and_count()
  245. return {"stopDuration": float(stop_duration)}
  246. def speedUtilizationRatio(data_processed) -> dict:
  247. """计算速度利用率"""
  248. efficient = Efficient(data_processed)
  249. ratio = efficient.speed_utilization_ratio()
  250. return {"speedUtilizationRatio": float(ratio)}
  251. def accelerationSmoothness(data_processed) -> dict:
  252. """计算加速度平稳度"""
  253. efficient = Efficient(data_processed)
  254. smoothness = efficient.acceleration_smoothness()
  255. return {"accelerationSmoothness": float(smoothness)}
  256. class EfficientManager:
  257. """高效性指标管理类"""
  258. def __init__(self, data_processed, plot_path):
  259. self.data = data_processed
  260. self.logger = LogManager().get_logger()
  261. self.plot_path = plot_path
  262. # 检查efficient_config是否为空
  263. if not hasattr(data_processed, 'efficient_config') or not data_processed.efficient_config:
  264. self.logger.warning("高效性配置为空,跳过高效性指标计算初始化")
  265. self.registry = None
  266. else:
  267. self.registry = EfficientRegistry(self.data)
  268. def report_statistic(self):
  269. """计算并报告高效性指标结果"""
  270. # 如果registry为None,直接返回空字典
  271. if self.registry is None:
  272. self.logger.info("高效性指标管理器未初始化,返回空结果")
  273. return {}
  274. efficient_result = self.registry.batch_execute()
  275. return efficient_result
  276. class EfficientRegistry:
  277. """高效性指标注册器"""
  278. def __init__(self, data_processed):
  279. self.logger = LogManager().get_logger() # 获取全局日志实例
  280. self.data = data_processed
  281. # 检查efficient_config是否为空
  282. if not hasattr(data_processed, 'efficient_config') or not data_processed.efficient_config:
  283. self.logger.warning("高效性配置为空,跳过高效性指标计算")
  284. self.eff_config = {}
  285. self.metrics = []
  286. self._registry = {}
  287. return
  288. self.eff_config = data_processed.efficient_config.get("efficient", {})
  289. self.metrics = self._extract_metrics(self.eff_config)
  290. self._registry = self._build_registry()
  291. def _extract_metrics(self, config_node: dict) -> list:
  292. """DFS遍历提取指标"""
  293. metrics = []
  294. def _recurse(node):
  295. if isinstance(node, dict):
  296. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  297. metrics.append(node['name'])
  298. for v in node.values():
  299. _recurse(v)
  300. _recurse(config_node)
  301. self.logger.info(f'评比的高效性指标列表:{metrics}')
  302. return metrics
  303. def _build_registry(self) -> dict:
  304. """自动注册指标函数"""
  305. registry = {}
  306. for metric_name in self.metrics:
  307. try:
  308. registry[metric_name] = globals()[metric_name]
  309. except KeyError:
  310. self.logger.error(f"未实现指标函数: {metric_name}")
  311. return registry
  312. def batch_execute(self) -> dict:
  313. """批量执行指标计算"""
  314. results = {}
  315. # 如果配置为空或没有注册的指标,直接返回空结果
  316. if not hasattr(self, 'eff_config') or not self.eff_config or not self._registry:
  317. self.logger.info("高效性配置为空或无注册指标,返回空结果")
  318. return results
  319. for name, func in self._registry.items():
  320. try:
  321. result = func(self.data)
  322. results.update(result)
  323. # 新增:将每个指标的结果写入日志
  324. self.logger.info(f'高效性指标[{name}]计算结果: {result}')
  325. except Exception as e:
  326. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  327. results[name] = None
  328. self.logger.info(f'高效性指标计算结果:{results}')
  329. return results