unified_custom_metric_template.py 6.6 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 自定义指标统一模板
  5. 本模板提供两种实现自定义指标的方式:
  6. 1. 基于类继承的方式(推荐):继承BaseMetric基类,实现calculate方法
  7. 2. 基于函数式的方式:实现evaluate函数
  8. 用户可以根据自己的需求选择合适的实现方式。
  9. """
  10. from typing import Dict, Any, Union, Optional
  11. import numpy as np
  12. import logging
  13. from modules.lib.score import Score
  14. # 导入基类
  15. # 注意:实际使用时,请确保路径正确
  16. from modules.lib.metric_registry import BaseMetric
  17. # 指定指标类别(必须)
  18. # 可选值: safety, comfort, traffic, efficient, function, custom
  19. METRIC_CATEGORY = "custom"
  20. #############################################################
  21. # 方式一:基于类继承的实现方式(推荐)
  22. # 优点:结构清晰,易于扩展,支持复杂指标计算
  23. # 适用场景:需要复杂状态管理、多步骤计算的指标
  24. #############################################################
  25. class CustomMetricExample(BaseMetric):
  26. """自定义指标示例 - 计算平均速度"""
  27. def __init__(self, data: Any):
  28. """
  29. 初始化指标
  30. Args:
  31. data: 输入数据,通常包含场景、轨迹等信息
  32. """
  33. super().__init__(data)
  34. # 在这里添加自定义初始化代码
  35. def calculate(self) -> Dict[str, Any]:
  36. """
  37. 计算指标
  38. Returns:
  39. 计算结果字典,包含以下字段:
  40. - value: 指标值
  41. - score: 评分(0-100)
  42. - details: 详细信息(可选)
  43. """
  44. # 在这里实现指标计算逻辑
  45. result = {
  46. "value": 0.0, # 指标值
  47. "score": 100, # 评分
  48. "details": {} # 详细信息
  49. }
  50. # 示例:计算平均速度
  51. try:
  52. if hasattr(self.data, 'ego_data') and hasattr(self.data.ego_data, 'v'):
  53. # 获取速度数据
  54. speeds = self.data.ego_data['v'].values
  55. # 计算平均速度
  56. avg_speed = np.mean(speeds)
  57. result['value'] = float(avg_speed)
  58. # 简单评分逻辑
  59. if avg_speed < 10:
  60. result['score'] = 60 # 速度过低
  61. elif avg_speed > 50:
  62. result['score'] = 70 # 速度过高
  63. else:
  64. result['score'] = 100 # 速度适中
  65. # 添加详细信息
  66. result['details'] = {
  67. "max_speed": float(np.max(speeds)),
  68. "min_speed": float(np.min(speeds)),
  69. "std_speed": float(np.std(speeds))
  70. }
  71. except Exception as e:
  72. # 出错时记录错误信息
  73. logging.error(f"计算指标失败: {str(e)}")
  74. result['value'] = 0.0
  75. result['score'] = 0
  76. result['details'] = {"error": str(e)}
  77. return result
  78. def report_statistic(self) -> Dict[str, Any]:
  79. """
  80. 报告统计结果
  81. 可以在这里自定义结果格式
  82. Returns:
  83. 统计结果字典
  84. """
  85. result = self.calculate()
  86. # 可以在这里添加额外的处理逻辑
  87. # 例如:添加时间戳、格式化结果等
  88. return result
  89. #############################################################
  90. # 方式二:基于函数式的实现方式
  91. # 优点:简单直接,易于理解
  92. # 适用场景:简单的指标计算,无需复杂状态管理
  93. #############################################################
  94. def evaluate(data) -> Dict[str, Any]:
  95. """
  96. 评测自定义指标
  97. Args:
  98. data: 评测数据,包含场景、轨迹等信息
  99. Returns:
  100. 评测结果,包含指标值、分数、详情等
  101. """
  102. try:
  103. # 计算指标值
  104. result = calculate_metric(data)
  105. # 可以使用Score类评估结果
  106. # evaluator = Score(config)
  107. # result = evaluator.evaluate(result)
  108. return result
  109. except Exception as e:
  110. logging.error(f"评测指标失败: {str(e)}")
  111. # 发生异常时返回错误信息
  112. return {
  113. "value": 0.0,
  114. "score": 0,
  115. "details": {
  116. "error": str(e)
  117. }
  118. }
  119. def calculate_metric(data) -> Dict[str, Any]:
  120. """
  121. 计算指标值
  122. Args:
  123. data: 输入数据
  124. Returns:
  125. 指标计算结果
  126. """
  127. # 这里是计算指标的具体逻辑
  128. # 以下是一个简化的示例
  129. if data is None:
  130. raise ValueError("输入数据不能为空")
  131. try:
  132. # 示例:计算TTC (Time To Collision)
  133. if hasattr(data, 'ego_data'):
  134. # 这里应该实现实际的指标计算逻辑
  135. # 临时使用固定值代替实际计算
  136. metric_value = 1.5
  137. # 返回结果
  138. return {
  139. "value": metric_value,
  140. "score": 85, # 示例评分
  141. "details": {
  142. "min_value": metric_value,
  143. "max_value": metric_value * 2
  144. }
  145. }
  146. else:
  147. raise ValueError("数据格式不正确,缺少ego_data")
  148. except Exception as e:
  149. logging.error(f"计算指标失败: {str(e)}")
  150. raise
  151. #############################################################
  152. # 使用说明
  153. #############################################################
  154. """
  155. 如何选择实现方式:
  156. 1. 基于类继承的方式(推荐):
  157. - 适用于复杂指标计算
  158. - 需要维护状态或多步骤计算
  159. - 需要与系统深度集成
  160. 2. 基于函数式的方式:
  161. - 适用于简单指标计算
  162. - 逻辑简单,无需复杂状态管理
  163. - 快速实现原型
  164. 文件命名规范:
  165. - 文件名应以 metric_ 开头
  166. - 后跟指标类别、二级指标名、三级指标名
  167. - 例如:metric_safety_safeTime_CustomTTC.py
  168. 必要条件:
  169. 1. 类实现方式:必须继承 BaseMetric 基类并实现 calculate() 方法
  170. 2. 函数实现方式:必须实现 evaluate() 函数
  171. 3. 必须在文件中定义 METRIC_CATEGORY 变量,指定指标类别
  172. 返回结果格式:
  173. {
  174. "value": 0.0, # 指标值
  175. "score": 100, # 评分(0-100)
  176. "details": {} # 详细信息(可选)
  177. }
  178. """
  179. # 测试代码(实际使用时可删除)
  180. if __name__ == "__main__":
  181. # 这里可以添加测试代码
  182. pass