#!/usr/bin/env python # -*- coding: utf-8 -*- """ 安全指标计算模块 """ import numpy as np import pandas as pd from typing import Dict, Any, List, Optional from modules.lib.score import Score from modules.lib.log_manager import LogManager # 安全指标计算函数 def calculate_ttc(data_processed) -> dict: """计算TTC (Time To Collision)""" # 实现TTC计算逻辑 # ... return {"TTC": 3.5} # 示例返回值 def calculate_mttc(data_processed) -> dict: """计算MTTC (Modified Time To Collision)""" # 实现MTTC计算逻辑 # ... return {"MTTC": 4.2} # 示例返回值 def calculate_thw(data_processed) -> dict: """计算THW (Time Headway)""" # 实现THW计算逻辑 # ... return {"THW": 2.1} # 示例返回值 def calculate_collision_risk(data_processed) -> dict: """计算碰撞风险""" # 实现碰撞风险计算逻辑 # ... return {"collisionRisk": 0.15} # 示例返回值 class SafetyRegistry: """安全指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() self.data = data_processed self.safety_config = data_processed.safety_config["safety"] self.metrics = self._extract_metrics(self.safety_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """从配置中提取指标名称""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的安全指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """构建指标函数注册表""" registry = {} for metric_name in self.metrics: func_name = f"calculate_{metric_name.lower()}" if func_name in globals(): registry[metric_name] = globals()[func_name] else: self.logger.warning(f"未实现安全指标函数: {func_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'安全指标计算结果:{results}') return results class SafeManager: """安全指标管理类""" def __init__(self, data_processed): self.data = data_processed self.registry = SafetyRegistry(self.data) def report_statistic(self): """计算并报告安全指标结果""" safety_result = self.registry.batch_execute() # evaluator = Score(self.data.safety_config) # result = evaluator.evaluate(safety_result) # return result return safety_result