efficient.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2024 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen
  10. @Data: 2024/12/23
  11. @Last Modified: 2024/12/23
  12. @Summary: Efficient metrics calculation
  13. """
  14. from modules.lib.score import Score
  15. from modules.lib.log_manager import LogManager
  16. import numpy as np
  17. from typing import Dict, Tuple, Optional, Callable, Any
  18. import pandas as pd
  19. # ----------------------
  20. # 基础指标计算函数
  21. # ----------------------
  22. def maxSpeed(data_processed) -> dict:
  23. """计算最大速度"""
  24. max_speed = data_processed.ego_data['v'].max()
  25. return {"maxSpeed": float(max_speed)}
  26. def deviationSpeed(data_processed) -> dict:
  27. """计算速度方差"""
  28. deviation = data_processed.ego_data['v'].var()
  29. return {"deviationSpeed": float(deviation)}
  30. def averagedSpeed(data_processed) -> dict:
  31. """计算平均速度"""
  32. avg_speed = data_processed.ego_data['v'].mean()
  33. return {"averagedSpeed": float(avg_speed)}
  34. def stopDuration(data_processed) -> dict:
  35. """计算停车持续时间和次数"""
  36. STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值
  37. FRAME_RANGE = 13 # 停车帧数阈值
  38. ego_df = data_processed.ego_data
  39. stop_time_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simTime'].values.tolist()
  40. stop_frame_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simFrame'].values.tolist()
  41. stop_frame_group = []
  42. stop_time_group = []
  43. sum_stop_time = 0
  44. stop_count = 0
  45. if not stop_frame_list:
  46. return {"stopDuration": 0.0, "stopCount": 0}
  47. f1, t1 = stop_frame_list[0], stop_time_list[0]
  48. for i in range(1, len(stop_frame_list)):
  49. if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续
  50. f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1]
  51. # 如果停车有效(帧间隔 >= FRAME_RANGE)
  52. if f2 - f1 >= FRAME_RANGE:
  53. stop_frame_group.append((f1, f2))
  54. stop_time_group.append((t1, t2))
  55. sum_stop_time += (t2 - t1)
  56. stop_count += 1
  57. # 更新 f1, t1
  58. f1, t1 = stop_frame_list[i], stop_time_list[i]
  59. # 检查最后一段停车
  60. if len(stop_frame_list) > 0:
  61. f2, t2 = stop_frame_list[-1], stop_time_list[-1]
  62. if f2 - f1 >= FRAME_RANGE and f2 != ego_df['simFrame'].values[-1]:
  63. stop_frame_group.append((f1, f2))
  64. stop_time_group.append((t1, t2))
  65. sum_stop_time += (t2 - t1)
  66. stop_count += 1
  67. # 计算停车持续时间
  68. stop_duration = sum_stop_time / stop_count if stop_count != 0 else 0
  69. return {"stopDuration": float(stop_duration), "stopCount": stop_count}
  70. class EfficientRegistry:
  71. """高效性指标注册器"""
  72. def __init__(self, data_processed):
  73. self.logger = LogManager().get_logger() # 获取全局日志实例
  74. self.data = data_processed
  75. self.eff_config = data_processed.efficient_config["efficient"]
  76. self.metrics = self._extract_metrics(self.eff_config)
  77. self._registry = self._build_registry()
  78. def _extract_metrics(self, config_node: dict) -> list:
  79. """DFS遍历提取指标"""
  80. metrics = []
  81. def _recurse(node):
  82. if isinstance(node, dict):
  83. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  84. metrics.append(node['name'])
  85. for v in node.values():
  86. _recurse(v)
  87. _recurse(config_node)
  88. self.logger.info(f'评比的高效性指标列表:{metrics}')
  89. return metrics
  90. def _build_registry(self) -> dict:
  91. """自动注册指标函数"""
  92. registry = {}
  93. for metric_name in self.metrics:
  94. try:
  95. registry[metric_name] = globals()[metric_name]
  96. except KeyError:
  97. self.logger.error(f"未实现指标函数: {metric_name}")
  98. return registry
  99. def batch_execute(self) -> dict:
  100. """批量执行指标计算"""
  101. results = {}
  102. for name, func in self._registry.items():
  103. try:
  104. result = func(self.data)
  105. results.update(result)
  106. except Exception as e:
  107. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  108. results[name] = None
  109. self.logger.info(f'高效性指标计算结果:{results}')
  110. return results
  111. class EfficientManager:
  112. """高效性指标管理类"""
  113. def __init__(self, data_processed):
  114. self.data = data_processed
  115. self.efficient = EfficientRegistry(self.data)
  116. def report_statistic(self):
  117. """Generate the statistics and report the results."""
  118. # 使用注册表批量执行指标计算
  119. efficient_result = self.efficient.batch_execute()
  120. # evaluator = Score(self.data.efficient_config)
  121. # result = evaluator.evaluate(efficient_result)
  122. # return result
  123. return efficient_result