efficient.py 8.4 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2024 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen
  10. @Data: 2024/12/23
  11. @Last Modified: 2024/12/23
  12. @Summary: Efficient metrics calculation
  13. """
  14. from modules.lib.score import Score
  15. from modules.lib.log_manager import LogManager
  16. import numpy as np
  17. from typing import Dict, Tuple, Optional, Callable, Any
  18. import pandas as pd
  19. class Efficient:
  20. """高效性指标计算类"""
  21. def __init__(self, data_processed):
  22. """初始化高效性指标计算类
  23. Args:
  24. data_processed: 预处理后的数据对象
  25. """
  26. self.logger = LogManager().get_logger()
  27. self.data_processed = data_processed
  28. self.df = data_processed.object_df.copy() # 浅拷贝
  29. self.ego_df = data_processed.ego_data.copy() # 浅拷贝
  30. # 配置参数
  31. self.STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值 (m/s)
  32. self.STOP_TIME_THRESHOLD = 0.5 # 停车时间阈值 (秒)
  33. self.FRAME_RANGE = 13 # 停车帧数阈值
  34. # 初始化结果变量
  35. self.stop_count = 0 # 停车次数
  36. self.stop_duration = 0 # 平均停车时长
  37. self.average_v = 0 # 平均速度
  38. def _max_speed(self):
  39. """计算最大速度
  40. Returns:
  41. float: 最大速度 (m/s)
  42. """
  43. return self.ego_df['v'].max()
  44. def _deviation_speed(self):
  45. """计算速度方差
  46. Returns:
  47. float: 速度方差
  48. """
  49. return self.ego_df['v'].var()
  50. def average_velocity(self):
  51. """计算平均速度
  52. Returns:
  53. float: 平均速度 (m/s)
  54. """
  55. self.average_v = self.ego_df['v'].mean()
  56. return self.average_v
  57. def stop_duration_and_count(self):
  58. """计算停车次数和平均停车时长
  59. Returns:
  60. float: 平均停车时长 (秒)
  61. """
  62. # 获取速度低于阈值的时间和帧号
  63. stop_mask = self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD
  64. if not any(stop_mask):
  65. return 0 # 如果没有停车,直接返回0
  66. stop_time_list = self.ego_df.loc[stop_mask, 'simTime'].values.tolist()
  67. stop_frame_list = self.ego_df.loc[stop_mask, 'simFrame'].values.tolist()
  68. if not stop_frame_list:
  69. return 0 # 防止空列表导致的索引错误
  70. stop_frame_group = []
  71. stop_time_group = []
  72. sum_stop_time = 0
  73. f1, t1 = stop_frame_list[0], stop_time_list[0]
  74. # 检测停车段
  75. for i in range(1, len(stop_frame_list)):
  76. if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续
  77. f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1]
  78. # 如果停车有效(帧数差 >= FRAME_RANGE)
  79. if f2 - f1 >= self.FRAME_RANGE:
  80. stop_frame_group.append((f1, f2))
  81. stop_time_group.append((t1, t2))
  82. sum_stop_time += (t2 - t1)
  83. self.stop_count += 1
  84. # 更新起始点
  85. f1, t1 = stop_frame_list[i], stop_time_list[i]
  86. # 检查最后一段停车
  87. if len(stop_frame_list) > 0:
  88. f2, t2 = stop_frame_list[-1], stop_time_list[-1]
  89. last_frame = self.ego_df['simFrame'].values[-1]
  90. # 确保不是因为数据结束导致的停车
  91. if f2 - f1 >= self.FRAME_RANGE and f2 != last_frame:
  92. stop_frame_group.append((f1, f2))
  93. stop_time_group.append((t1, t2))
  94. sum_stop_time += (t2 - t1)
  95. self.stop_count += 1
  96. # 计算平均停车时长
  97. self.stop_duration = sum_stop_time / self.stop_count if self.stop_count > 0 else 0
  98. self.logger.info(f"检测到停车次数: {self.stop_count}, 平均停车时长: {self.stop_duration:.2f}秒")
  99. return self.stop_duration
  100. def report_statistic(self):
  101. """生成统计报告
  102. Returns:
  103. dict: 高效性评估结果
  104. """
  105. # 计算各项指标
  106. max_speed_ms = self._max_speed()
  107. deviation_speed_ms = self._deviation_speed()
  108. average_speed_ms = self.average_velocity()
  109. # 将 m/s 转换为 km/h 用于评分
  110. max_speed_kmh = max_speed_ms * 3.6
  111. deviation_speed_kmh = deviation_speed_ms * 3.6
  112. average_speed_kmh = average_speed_ms * 3.6
  113. efficient_result = {
  114. 'maxSpeed': max_speed_kmh, # 转换为 km/h
  115. 'deviationSpeed': deviation_speed_kmh, # 转换为 km/h
  116. 'averagedSpeed': average_speed_kmh, # 转换为 km/h
  117. 'stopDuration': self.stop_duration_and_count()
  118. }
  119. self.logger.info(f"高效性指标计算完成,结果: {efficient_result}")
  120. return efficient_result
  121. # ----------------------
  122. # 基础指标计算函数
  123. # ----------------------
  124. def maxSpeed(data_processed) -> dict:
  125. """计算最大速度"""
  126. efficient = Efficient(data_processed)
  127. max_speed = efficient._max_speed() * 3.6 # 转换为 km/h
  128. return {"maxSpeed": float(max_speed)}
  129. def deviationSpeed(data_processed) -> dict:
  130. """计算速度方差"""
  131. efficient = Efficient(data_processed)
  132. deviation = efficient._deviation_speed() * 3.6 # 转换为 km/h
  133. return {"deviationSpeed": float(deviation)}
  134. def averagedSpeed(data_processed) -> dict:
  135. """计算平均速度"""
  136. efficient = Efficient(data_processed)
  137. avg_speed = efficient.average_velocity() * 3.6 # 转换为 km/h
  138. return {"averagedSpeed": float(avg_speed)}
  139. def stopDuration(data_processed) -> dict:
  140. """计算停车持续时间和次数"""
  141. efficient = Efficient(data_processed)
  142. stop_duration = efficient.stop_duration_and_count()
  143. return {"stopDuration": float(stop_duration)}
  144. class EfficientRegistry:
  145. """高效性指标注册器"""
  146. def __init__(self, data_processed):
  147. self.logger = LogManager().get_logger() # 获取全局日志实例
  148. self.data = data_processed
  149. self.eff_config = data_processed.efficient_config["efficient"]
  150. self.metrics = self._extract_metrics(self.eff_config)
  151. self._registry = self._build_registry()
  152. def _extract_metrics(self, config_node: dict) -> list:
  153. """DFS遍历提取指标"""
  154. metrics = []
  155. def _recurse(node):
  156. if isinstance(node, dict):
  157. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  158. metrics.append(node['name'])
  159. for v in node.values():
  160. _recurse(v)
  161. _recurse(config_node)
  162. self.logger.info(f'评比的高效性指标列表:{metrics}')
  163. return metrics
  164. def _build_registry(self) -> dict:
  165. """自动注册指标函数"""
  166. registry = {}
  167. for metric_name in self.metrics:
  168. try:
  169. registry[metric_name] = globals()[metric_name]
  170. except KeyError:
  171. self.logger.error(f"未实现指标函数: {metric_name}")
  172. return registry
  173. def batch_execute(self) -> dict:
  174. """批量执行指标计算"""
  175. results = {}
  176. for name, func in self._registry.items():
  177. try:
  178. result = func(self.data)
  179. results.update(result)
  180. # 新增:将每个指标的结果写入日志
  181. self.logger.info(f'高效性指标[{name}]计算结果: {result}')
  182. except Exception as e:
  183. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  184. results[name] = None
  185. self.logger.info(f'高效性指标计算结果:{results}')
  186. return results
  187. class EfficientManager:
  188. """高效性指标管理类"""
  189. def __init__(self, data_processed):
  190. self.data = data_processed
  191. self.efficient = EfficientRegistry(self.data)
  192. def report_statistic(self):
  193. """Generate the statistics and report the results."""
  194. # 使用注册表批量执行指标计算
  195. efficient_result = self.efficient.batch_execute()
  196. return efficient_result