123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 安全指标计算模块
- """
- import numpy as np
- import pandas as pd
- from typing import Dict, Any, List, Optional
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- # 安全指标计算函数
- def calculate_ttc(data_processed) -> dict:
- """计算TTC (Time To Collision)"""
- # 实现TTC计算逻辑
- # ...
- return {"TTC": 3.5} # 示例返回值
- def calculate_mttc(data_processed) -> dict:
- """计算MTTC (Modified Time To Collision)"""
- # 实现MTTC计算逻辑
- # ...
- return {"MTTC": 4.2} # 示例返回值
- def calculate_thw(data_processed) -> dict:
- """计算THW (Time Headway)"""
- # 实现THW计算逻辑
- # ...
- return {"THW": 2.1} # 示例返回值
- def calculate_collision_risk(data_processed) -> dict:
- """计算碰撞风险"""
- # 实现碰撞风险计算逻辑
- # ...
- return {"collisionRisk": 0.15} # 示例返回值
- class SafetyRegistry:
- """安全指标注册器"""
-
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger()
- self.data = data_processed
- self.safety_config = data_processed.safety_config["safety"]
- self.metrics = self._extract_metrics(self.safety_config)
- self._registry = self._build_registry()
-
- def _extract_metrics(self, config_node: dict) -> list:
- """从配置中提取指标名称"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
- metrics.append(node['name'])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f'评比的安全指标列表:{metrics}')
- return metrics
-
- def _build_registry(self) -> dict:
- """构建指标函数注册表"""
- registry = {}
- for metric_name in self.metrics:
- func_name = f"calculate_{metric_name.lower()}"
- if func_name in globals():
- registry[metric_name] = globals()[func_name]
- else:
- self.logger.warning(f"未实现安全指标函数: {func_name}")
- return registry
-
- def batch_execute(self) -> dict:
- """批量执行指标计算"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data)
- results.update(result)
- except Exception as e:
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f'安全指标计算结果:{results}')
- return results
- class SafeManager:
- """安全指标管理类"""
-
- def __init__(self, data_processed):
- self.data = data_processed
- self.registry = SafetyRegistry(self.data)
-
- def report_statistic(self):
- """计算并报告安全指标结果"""
- safety_result = self.registry.batch_execute()
-
- # evaluator = Score(self.data.safety_config)
- # result = evaluator.evaluate(safety_result)
- # return result
- return safety_result
-
-
|