123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2024 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen
- @Data: 2024/12/23
- @Last Modified: 2024/12/23
- @Summary: Efficient metrics calculation
- """
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- import numpy as np
- from typing import Dict, Tuple, Optional, Callable, Any
- import pandas as pd
- # ----------------------
- # 基础指标计算函数
- # ----------------------
- def maxSpeed(data_processed) -> dict:
- """计算最大速度"""
- max_speed = data_processed.ego_data['v'].max()
- return {"maxSpeed": float(max_speed)}
- def deviationSpeed(data_processed) -> dict:
- """计算速度方差"""
- deviation = data_processed.ego_data['v'].var()
- return {"deviationSpeed": float(deviation)}
- def averagedSpeed(data_processed) -> dict:
- """计算平均速度"""
- avg_speed = data_processed.ego_data['v'].mean()
- return {"averagedSpeed": float(avg_speed)}
- def stopDuration(data_processed) -> dict:
- """计算停车持续时间和次数"""
- STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值
- FRAME_RANGE = 13 # 停车帧数阈值
-
- ego_df = data_processed.ego_data
-
- stop_time_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simTime'].values.tolist()
- stop_frame_list = ego_df[ego_df['v'] <= STOP_SPEED_THRESHOLD]['simFrame'].values.tolist()
- stop_frame_group = []
- stop_time_group = []
- sum_stop_time = 0
- stop_count = 0
-
- if not stop_frame_list:
- return {"stopDuration": 0.0, "stopCount": 0}
-
- f1, t1 = stop_frame_list[0], stop_time_list[0]
- for i in range(1, len(stop_frame_list)):
- if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续
- f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1]
- # 如果停车有效(帧间隔 >= FRAME_RANGE)
- if f2 - f1 >= FRAME_RANGE:
- stop_frame_group.append((f1, f2))
- stop_time_group.append((t1, t2))
- sum_stop_time += (t2 - t1)
- stop_count += 1
- # 更新 f1, t1
- f1, t1 = stop_frame_list[i], stop_time_list[i]
- # 检查最后一段停车
- if len(stop_frame_list) > 0:
- f2, t2 = stop_frame_list[-1], stop_time_list[-1]
- if f2 - f1 >= FRAME_RANGE and f2 != ego_df['simFrame'].values[-1]:
- stop_frame_group.append((f1, f2))
- stop_time_group.append((t1, t2))
- sum_stop_time += (t2 - t1)
- stop_count += 1
- # 计算停车持续时间
- stop_duration = sum_stop_time / stop_count if stop_count != 0 else 0
-
- return {"stopDuration": float(stop_duration), "stopCount": stop_count}
- class EfficientRegistry:
- """高效性指标注册器"""
-
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger() # 获取全局日志实例
- self.data = data_processed
- self.eff_config = data_processed.efficient_config["efficient"]
- self.metrics = self._extract_metrics(self.eff_config)
- self._registry = self._build_registry()
-
- def _extract_metrics(self, config_node: dict) -> list:
- """DFS遍历提取指标"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
- metrics.append(node['name'])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f'评比的高效性指标列表:{metrics}')
- return metrics
-
- def _build_registry(self) -> dict:
- """自动注册指标函数"""
- registry = {}
- for metric_name in self.metrics:
- try:
- registry[metric_name] = globals()[metric_name]
- except KeyError:
- self.logger.error(f"未实现指标函数: {metric_name}")
- return registry
-
- def batch_execute(self) -> dict:
- """批量执行指标计算"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data)
- results.update(result)
- except Exception as e:
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f'高效性指标计算结果:{results}')
- return results
- class EfficientManager:
- """高效性指标管理类"""
- def __init__(self, data_processed):
- self.data = data_processed
- self.efficient = EfficientRegistry(self.data)
-
- def report_statistic(self):
- """Generate the statistics and report the results."""
- # 使用注册表批量执行指标计算
- efficient_result = self.efficient.batch_execute()
- # evaluator = Score(self.data.efficient_config)
- # result = evaluator.evaluate(efficient_result)
- # return result
- return efficient_result
-
|