#!/usr/bin/env python # -*- coding: utf-8 -*- """ 自定义指标模板 用户可以基于此模板创建自己的指标 """ from typing import Dict, Any import numpy as np from modules.lib.score import Score # 导入基类 # 注意:实际使用时,请确保路径正确 from modules.lib.metric_registry import BaseMetric # 指定指标类别(必须) # 可选值: safety, comfort, traffic, efficient, function, custom METRIC_CATEGORY = "custom" class CustomMetricExample(BaseMetric): """自定义指标示例 - 计算平均速度""" def __init__(self, data: Any): """ 初始化指标 Args: data: 输入数据 """ super().__init__(data) # 在这里添加自定义初始化代码 def calculate(self) -> Dict[str, Any]: """ 计算指标 Returns: 计算结果字典 """ # 在这里实现指标计算逻辑 result = { "value": 0.0, # 指标值 "score": 100, # 评分 "details": {} # 详细信息 } # 示例:计算平均速度 try: if hasattr(self.data, 'velocities') and self.data.velocities: velocities = self.data.velocities if isinstance(velocities, dict) and 'vx' in velocities and 'vy' in velocities: # 计算合速度 vx = np.array(velocities['vx']) vy = np.array(velocities['vy']) speeds = np.sqrt(vx**2 + vy**2) # 计算平均速度 avg_speed = np.mean(speeds) result['value'] = float(avg_speed) # 简单评分逻辑 if avg_speed < 10: result['score'] = 60 # 速度过低 elif avg_speed > 50: result['score'] = 70 # 速度过高 else: result['score'] = 100 # 速度适中 # 添加详细信息 result['details'] = { "max_speed": float(np.max(speeds)), "min_speed": float(np.min(speeds)), "std_speed": float(np.std(speeds)) } except Exception as e: # 出错时记录错误信息 result['value'] = 0.0 result['score'] = 0 result['details'] = {"error": str(e)} return result def report_statistic(self) -> Dict[str, Any]: """ 报告统计结果 可以在这里自定义结果格式 """ result = self.calculate() # 可以在这里添加额外的处理逻辑 # 例如:添加时间戳、格式化结果等 return result # 可以在同一文件中定义多个指标类 class AnotherCustomMetric(BaseMetric): """另一个自定义指标示例 - 计算加速度变化率""" def __init__(self, data: Any): super().__init__(data) def calculate(self) -> Dict[str, Any]: # 实现您的计算逻辑 return {"value": 0.0, "score": 100, "details": {}}