#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2024 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen @Data: 2024/12/23 @Last Modified: 2024/12/23 @Summary: Efficient metrics calculation """ from modules.lib.score import Score from modules.lib.log_manager import LogManager import numpy as np from typing import Dict, Tuple, Optional, Callable, Any import pandas as pd # ---------------------- # 基础指标计算函数 # ---------------------- def maxSpeed(data_processed) -> dict: """计算最大速度""" max_speed = data_processed.ego_data['v'].max() return {"maxSpeed": float(max_speed)} def deviationSpeed(data_processed) -> dict: """计算速度方差""" deviation = data_processed.ego_data['v'].var() return {"deviationSpeed": float(deviation)} def averagedSpeed(data_processed) -> dict: """计算平均速度""" avg_speed = data_processed.ego_data['v'].mean() return {"averagedSpeed": float(avg_speed)} def stopDuration(data_processed) -> dict: """计算停车持续时间和次数""" STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值 FRAME_RANGE = 13 # 停车帧数阈值 ego_df = data_processed.ego_data stop_time_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simTime'].values.tolist() stop_frame_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simFrame'].values.tolist() stop_frame_group = [] stop_time_group = [] sum_stop_time = 0 stop_count = 0 if not stop_frame_list: return {"stopDuration": 0.0, "stopCount": 0} f1, t1 = stop_frame_list[0], stop_time_list[0] for i in range(1, len(stop_frame_list)): if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续 f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1] # 如果停车有效(帧间隔 >= FRAME_RANGE) if f2 - f1 >= FRAME_RANGE: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) stop_count += 1 # 更新 f1, t1 f1, t1 = stop_frame_list[i], stop_time_list[i] # 检查最后一段停车 if len(stop_frame_list) > 0: f2, t2 = stop_frame_list[-1], stop_time_list[-1] if f2 - f1 >= FRAME_RANGE and f2 != ego_df['simFrame'].values[-1]: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) stop_count += 1 # 计算停车持续时间 stop_duration = sum_stop_time / stop_count if stop_count != 0 else 0 return {"stopDuration": float(stop_duration), "stopCount": stop_count} class EfficientRegistry: """高效性指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() # 获取全局日志实例 self.data = data_processed self.eff_config = data_processed.efficient_config["efficient"] self.metrics = self._extract_metrics(self.eff_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """DFS遍历提取指标""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的高效性指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """自动注册指标函数""" registry = {} for metric_name in self.metrics: try: registry[metric_name] = globals()[metric_name] except KeyError: self.logger.error(f"未实现指标函数: {metric_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'高效性指标计算结果:{results}') return results class EfficientManager: """高效性指标管理类""" def __init__(self, data_processed): self.data = data_processed self.efficient = EfficientRegistry(self.data) def report_statistic(self): """Generate the statistics and report the results.""" # 使用注册表批量执行指标计算 efficient_result = self.efficient.batch_execute() # evaluator = Score(self.data.efficient_config) # result = evaluator.evaluate(efficient_result) # return result return efficient_result