123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 自定义指标模板
- 用户可以基于此模板创建自己的指标
- """
- from typing import Dict, Any
- import numpy as np
- from modules.lib.score import Score
- # 导入基类
- # 注意:实际使用时,请确保路径正确
- from modules.lib.metric_registry import BaseMetric
- # 指定指标类别(必须)
- # 可选值: safety, comfort, traffic, efficient, function, custom
- METRIC_CATEGORY = "custom"
- class CustomMetricExample(BaseMetric):
- """自定义指标示例 - 计算平均速度"""
-
- def __init__(self, data: Any):
- """
- 初始化指标
-
- Args:
- data: 输入数据
- """
- super().__init__(data)
- # 在这里添加自定义初始化代码
-
- def calculate(self) -> Dict[str, Any]:
- """
- 计算指标
-
- Returns:
- 计算结果字典
- """
- # 在这里实现指标计算逻辑
- result = {
- "value": 0.0, # 指标值
- "score": 100, # 评分
- "details": {} # 详细信息
- }
-
- # 示例:计算平均速度
- try:
- if hasattr(self.data, 'velocities') and self.data.velocities:
- velocities = self.data.velocities
- if isinstance(velocities, dict) and 'vx' in velocities and 'vy' in velocities:
- # 计算合速度
- vx = np.array(velocities['vx'])
- vy = np.array(velocities['vy'])
- speeds = np.sqrt(vx**2 + vy**2)
-
- # 计算平均速度
- avg_speed = np.mean(speeds)
- result['value'] = float(avg_speed)
-
- # 简单评分逻辑
- if avg_speed < 10:
- result['score'] = 60 # 速度过低
- elif avg_speed > 50:
- result['score'] = 70 # 速度过高
- else:
- result['score'] = 100 # 速度适中
-
- # 添加详细信息
- result['details'] = {
- "max_speed": float(np.max(speeds)),
- "min_speed": float(np.min(speeds)),
- "std_speed": float(np.std(speeds))
- }
- except Exception as e:
- # 出错时记录错误信息
- result['value'] = 0.0
- result['score'] = 0
- result['details'] = {"error": str(e)}
-
- return result
-
- def report_statistic(self) -> Dict[str, Any]:
- """
- 报告统计结果
- 可以在这里自定义结果格式
- """
- result = self.calculate()
-
- # 可以在这里添加额外的处理逻辑
- # 例如:添加时间戳、格式化结果等
-
- return result
- # 可以在同一文件中定义多个指标类
- class AnotherCustomMetric(BaseMetric):
- """另一个自定义指标示例 - 计算加速度变化率"""
-
- def __init__(self, data: Any):
- super().__init__(data)
-
- def calculate(self) -> Dict[str, Any]:
- # 实现您的计算逻辑
- return {"value": 0.0, "score": 100, "details": {}}
|