Browse Source

解决自定义逻辑混乱bug;最终报告添加单用例结果信息

cicv 1 day ago
parent
commit
2398085e06

+ 206 - 96
README.md

@@ -1,48 +1,200 @@
-# 自定义指标开发指南
+# 自动驾驶评估系统
 
-## 概述
+## 系统概述
 
-本系统支持用户自定义评估指标,您可以通过编写Python脚本来实现自己的指标计算逻辑,并将其集成到评估系统中。
+本系统是一个模块化的自动驾驶评估框架,用于对自动驾驶系统的各项性能指标进行全面评估。系统采用组件化设计,支持内置指标和自定义指标的灵活配置,能够并行处理多种评估任务,并生成结构化的评估报告。
+
+## 系统架构
+
+系统由以下核心组件构成:
+
+### 1. 配置管理器 (ConfigManager)
+
+负责加载、解析和合并配置文件,支持内置指标配置和自定义指标配置的智能合并。主要功能包括:
+- 配置文件加载与解析
+- 内置配置与自定义配置的合并
+- 配置拆分(从全量配置中提取自定义部分)
+
+**配置管理特性:**
+- **自动拆分功能**:能够从全量配置文件(all_metrics_config.yaml)中自动提取出自定义指标部分,生成独立的自定义配置文件(custom_metrics_config.yaml)
+- **智能合并策略**:采用三级合并策略,确保自定义指标配置能够正确覆盖或扩展内置指标配置
+- **结构完整性保障**:在拆分过程中保留指标的name和priority等关键属性,确保配置结构完整
+
+### 2. 指标加载器 (MetricLoader)
+
+负责动态加载内置和自定义评估指标模块。主要功能包括:
+- 内置指标模块的动态加载
+- 自定义指标脚本的发现与加载
+- 指标模块的验证与注册
+
+### 3. 评估引擎 (EvaluationEngine)
+
+系统的核心组件,负责协调各个指标模块执行评估任务。主要功能包括:
+- 并行执行多个评估指标(内置与自定义指标均支持并行处理)
+- 收集和整合评估结果,包括内置指标和自定义指标的统计结果
+- 处理评估过程中的异常
+- 支持整体评估结果统计(如 overall_result、threshold_checks 等),并将其集成到最终报告中
+- 评分统计逻辑通过调用 Score 评分模块实现,自动根据配置权重和阈值计算各级别得分
+
+### 4. 日志管理器 (LoggingManager)
+
+负责系统日志的配置和管理,支持多级日志记录。主要功能包括:
+- 日志系统初始化
+- 多级日志记录
+- 日志文件管理
+
+### 5. 数据处理器 (DataProcessor)
+
+负责加载和预处理评估所需的数据。主要功能包括:
+- 数据文件的加载与验证
+- 数据格式转换与预处理
+- 提供统一的数据访问接口
+
+### 6. 评估流水线 (EvaluationPipeline)
+
+整个系统的控制器,负责协调各个组件完成评估流程。主要功能包括:
+- 组件初始化与配置
+- 评估流程的执行与监控(包括内置与自定义指标的并行评估)
+- 自动调用评分统计逻辑,整合各类评估结果
+- 评估报告的生成与输出,报告中包含整体统计结果和各项指标详细信息
+
+### 7. 图表生成器 (ChartGenerator)
+
+负责生成各类评估指标的可视化图表。主要功能包括:
+- 功能指标图表生成
+- 舒适性指标图表生成
+- 安全性指标图表生成
+- 交通指标图表生成
+- 支持多种图表类型(折线图、散点图、双轴图等)
+- 高质量图表输出(支持自定义DPI)
+
+## 目录结构
+
+```
+├── scripts/
+│   └── evaluator_enhanced.py    # 评估引擎主程序
+├── modules/
+│   ├── lib/
+│   │   ├── metric_registry.py   # 指标注册系统
+│   │   ├── data_process.py      # 数据处理模块
+│   │   ├── score.py             # 评分计算模块
+│   │   ├── common.py            # 通用工具函数
+│   │   ├── chart_generator.py   # 图表生成模块
+│   │   └── log_manager.py       # 日志管理模块
+│   └── metric/
+│       ├── safety.py            # 安全性指标模块
+│       ├── comfort.py           # 舒适性指标模块
+│       ├── traffic.py           # 交通规则指标模块
+│       ├── efficient.py         # 效率指标模块
+│       └── function.py          # 功能指标模块
+├── config/
+│   ├── all_metrics_config.yaml  # 全量指标配置
+│   └── custom_metrics_config.yaml # 自定义指标配置
+├── templates/
+│   ├── custom_metric_template.py  # 自定义指标模板
+│   └── unified_custom_metric_template.py # 统一自定义指标模板
+└── custom_metrics/                # 用户自定义指标目录
+```
+
+## 模块详细说明
+
+### modules 目录
+
+#### lib 子目录
+
+- **metric_registry.py**: 提供指标注册基础设施,包含BaseMetric基类,所有自定义指标都需继承此类
+- **data_process.py**: 负责数据预处理,提供统一的数据访问接口
+- **score.py**: 实现评分计算逻辑,根据配置文件中的权重和阈值计算最终得分,支持整体评估结果(overall_result)与阈值检查(threshold_checks)等统计,并与评估引擎集成,自动输出到最终报告。
+- **common.py**: 包含系统通用工具函数和辅助方法
+- **chart_generator.py**: 实现各类评估指标的可视化图表生成功能
+- **log_manager.py**: 提供集中式日志管理,支持多级日志记录和文件轮转
+
+#### metric 子目录
+
+- **safety.py**: 实现安全性相关指标,如TTC(碰撞时间)、碰撞风险等
+- **comfort.py**: 实现舒适性相关指标,如加速度平滑度、抖动指标等
+- **traffic.py**: 实现交通规则相关指标,如车道保持、信号灯遵守等
+- **efficient.py**: 实现效率相关指标,如行程时间、能耗等
+- **function.py**: 实现功能性指标,如感知准确度、决策合理性等
+
+### custom_metrics 目录
+
+用于存放用户自定义的指标实现脚本。系统会自动扫描此目录下符合命名规范的Python脚本,并将其作为自定义指标加载。
+
+**命名规范**:
+- 文件名必须以`metric_`开头
+- 后跟三级指标名称,如`metric_safety_safeTime_CustomTTC.py`
+- 三级指标名称必须在自定义配置文件中有对应配置
 
-## 快速开始
+**实现方式**:
+- 基于类的实现:继承BaseMetric基类,实现calculate方法
+- 基于函数的实现:提供evaluate函数
 
-1. 复制 `custom_metric_template.py` 模板文件
-2. 根据您的需求修改指标计算逻辑
-3. 将您的自定义指标脚本放置在指定目录中
-4. 在运行评估时,使用 `--customMetricsPath` 参数指定自定义指标目录
+### templates 目录
 
-## 自定义指标规范
+包含自定义指标开发的模板文件,用户可以基于这些模板快速开发自己的指标。
 
-### 必要条件
+- **custom_metric_template.py**: 基础自定义指标模板,提供完整的类结构和注释
+- **unified_custom_metric_template.py**: 统一风格的自定义指标模板,包含更多标准化实现
 
-1. 每个指标类必须继承 `BaseMetric` 基类
-2. 必须实现 `calculate()` 方法
-3. 必须在文件中定义 `METRIC_CATEGORY` 变量,指定指标类别
+## 运行流程
 
-### 指标类别
+### 1. 初始化阶段
 
-可选的指标类别包括:
-- safety: 安全性指标
-- comfort: 舒适性指标
-- traffic: 交通规则指标
-- efficient: 效率指标
-- function: 功能指标
-- custom: 自定义类别
+1. 解析命令行参数,获取配置文件路径、数据路径、报告路径等
+2. 初始化日志管理器,配置日志系统
+3. 初始化配置管理器,加载并合并内置和自定义配置
+4. 初始化指标加载器,加载内置指标模块和自定义指标脚本
+5. 初始化评估引擎和数据处理器
 
-### 返回值格式
+### 2. 评估阶段
 
-`calculate()` 方法应返回一个字典,包含以下字段:
-- value: 指标计算值
-- score: 评分(0-100)
-- details: 详细信息(可选)
+1. 数据处理器验证并加载评估数据
+2. 评估引擎并行执行各个指标模块的评估任务
+3. 收集内置指标和自定义指标的评估结果
+4. 处理和合并评估结果
 
-## 示例
+### 3. 报告生成阶段
+
+1. 整合评估结果,添加元数据信息
+2. 生成JSON格式的评估报告
+3. 将报告保存到指定目录
+
+## 使用方法
+
+### 基本用法
+
+```bash
+python scripts/evaluator_enhanced.py --configPath config/all_metrics_config.yaml --dataPath data_dir --reportPath report_dir --logPath logs
+```
+
+### 参数说明
+
+- `--configPath`: 指标配置文件路径
+- `--dataPath`: 评估数据目录路径
+- `--reportPath`: 评估报告输出目录
+- `--logPath`: 日志文件路径
+- `--customMetricsPath`: 自定义指标脚本目录(可选)
+- `--customConfigPath`: 自定义指标配置文件路径(可选)
+
+## 自定义指标开发指南
+
+### 概述
+
+本系统支持用户自定义评估指标,您可以通过编写Python脚本来实现自己的指标计算逻辑,并将其集成到评估系统中。
+
+### 开发步骤
+
+1. 在`custom_metrics`目录下创建新的Python脚本,遵循命名规范
+2. 基于模板实现自定义指标逻辑
+3. 在自定义配置文件中添加对应的配置项
+4. 运行评估系统,指定自定义指标目录和配置文件
+
+### 示例
 
 ```python
 from modules.lib.metric_registry import BaseMetric
 
-METRIC_CATEGORY = "custom"
-
 class MyCustomMetric(BaseMetric):
     def __init__(self, data):
         super().__init__(data)
@@ -54,78 +206,36 @@ class MyCustomMetric(BaseMetric):
             "score": 85,
             "details": {"max": 100, "min": 0}
         }
+```
 
-python evaluator.py --configPath config.yaml --dataPath data_dir --reportPath report_dir --logPath logs --customMetricsPath custom_metrics
-## 架构说明
+### 使用自定义指标
 
-新的架构设计主要包括以下几个部分:
+```bash
+python scripts/evaluator_enhanced.py --configPath config/all_metrics_config.yaml --dataPath data_dir --reportPath report_dir --logPath logs --customMetricsPath custom_metrics --customConfigPath config/custom_metrics_config.yaml
+```
 
-1. **指标注册系统**:通过 `MetricRegistry` 类实现,负责管理所有可用的指标(内置和自定义)。
-
-2. **指标基类**:所有指标都继承自 `BaseMetric` 基类,确保接口一致性。
-
-3. **动态指标选择**:通过配置文件中的指标定义,系统只会运行被选中的指标,提高效率。
-
-4. **自定义指标加载**:支持从指定目录加载用户自定义的指标脚本,扩展系统功能。
+## 扩展性设计
 
-5. **兼容性保证**:保留了原有的 `safety.py`、`comfort.py` 等模块,确保系统向后兼容。
+### 1. 新增内置指标
+   
+- 在对应类别模块中添加新方法
+- 系统会自动注册和识别
 
-这种设计使系统更加灵活,既能满足选择性运行指标的需求,又能支持用户自定义指标,同时保持了原有代码结构的稳定性。
+### 2. 添加自定义指标
+   
+- 基于模板创建新的指标脚本
+- 放置在自定义指标目录
+- 无需修改核心代码
 
-├── scripts/
-│   └── evaluator.py          # 评估引擎主程序
-├── modules/
-│   ├── lib/
-│   │   ├── metric_registry.py  # 指标注册系统
-│   │   ├── data_process.py     # 数据处理模块
-│   │   └── log_manager.py      # 日志管理模块
-│   └── metric/
-│       ├── safety.py           # 安全性指标模块
-│       ├── comfort.py          # 舒适性指标模块
-│       ├── traffic.py          # 交通规则指标模块
-│       ├── efficient.py        # 效率指标模块
-│       └── function.py         # 功能指标模块
-├── templates/
-│   ├── custom_metric_template.py  # 自定义指标模板
-│   └── README.md                  # 自定义指标开发指南
-└── custom_metrics/                # 用户自定义指标目录
+### 3. 新增指标类别
+   
+- 创建新的类别模块
+- 在注册系统中添加类别支持
 
+## 系统优势
 
-## 工作流程
-1. 初始化阶段
-   
-   - 加载配置文件
-   - 注册内置指标模块
-   - 加载自定义指标脚本
-   - 提取启用的指标列表
-2. 评估阶段
-   
-   - 加载和预处理数据
-   - 创建启用指标的实例
-   - 并行执行指标计算
-   - 收集和组织结果
-3. 报告阶段
-   
-   - 生成结构化评估报告
-   - 输出到指定目录
-## 扩展性设计
-1. 新增内置指标
-   
-   - 在对应类别模块中添加新方法
-   - 系统会自动注册和识别
-2. 添加自定义指标
-   
-   - 基于模板创建新的指标脚本
-   - 放置在自定义指标目录
-   - 无需修改核心代码
-3. 新增指标类别
-   
-   - 创建新的类别模块
-   - 在注册系统中添加类别支持
-## 优势
-1. 灵活性 :可以根据配置选择性运行指标,提高效率
-2. 可扩展性 :支持用户自定义指标,无需修改核心代码
-3. 兼容性 :保留原有模块结构,确保向后兼容
-4. 并行处理 :利用多线程提高评估效率
-5. 模块化 :清晰的职责分离,便于维护和扩展
-这个架构设计满足了您的需求,既支持从配置文件中选择性运行指标,又允许用户通过自定义脚本扩展系统功能,同时保留了原有的模块化结构。
+1. **模块化设计**: 系统各组件职责明确,便于维护和扩展
+2. **灵活配置**: 可以根据配置选择性运行指标,提高效率
+3. **可扩展性**: 支持用户自定义指标,无需修改核心代码
+4. **并行处理**: 多指标并行评估,提高处理效率
+5. **异常处理**: 完善的异常捕获和日志记录机制,提高系统稳定性

+ 15 - 15
config/all_metrics_config.yaml

@@ -43,21 +43,21 @@ safety:
       priority: 0
       max: 2000.0
       min: 1.5
-    # TLC:
-    #   name: TLC
-    #   priority: 0
-    #   max: 2000.0
-    #   min: 1.5
-    # TTB:
-    #   name: TTB
-    #   priority: 0
-    #   max: 2000.0
-    #   min: 1.5
-    # TM:
-    #   name: TM
-    #   priority: 0
-    #   max: 2000.0
-    #   min: 1.5
+    TLC:
+      name: TLC
+      priority: 0
+      max: 2000.0
+      min: 1.5
+    TTB:
+      name: TTB
+      priority: 0
+      max: 2000.0
+      min: 1.5
+    TM:
+      name: TM
+      priority: 0
+      max: 2000.0
+      min: 1.5
   safeDistance:
     name: safeDistance
     priority: 0

+ 94 - 23
config/metrics_config.yaml → config/builtin_metrics_config.yaml

@@ -1,10 +1,14 @@
-
 safety:
   name: safety
   priority: 0
   safeTime:
     name: safetime
     priority: 0
+    CustomTTC:  
+      name: CustomTTC
+      priority: 0
+      max: 20.0
+      min: 3.5
     TTC:
       name: TTC
       priority: 0
@@ -20,6 +24,21 @@ safety:
       priority: 0
       max: 2000.0
       min: 1.5
+    TLC:
+      name: TLC
+      priority: 0
+      max: 2000.0
+      min: 1.5
+    TTB:
+      name: TTB
+      priority: 0
+      max: 2000.0
+      min: 1.5
+    TM:
+      name: TM
+      priority: 0
+      max: 2000.0
+      min: 1.5
   safeDistance:
     name: safeDistance
     priority: 0
@@ -55,14 +74,26 @@ safety:
       max: 10.0
       min: 0.0
 
+user:
+  name: user
+  priority: 0
+  safeTime:
+    name: safetime
+    priority: 0
+    CustomTTC:
+      name: customTTC
+      priority: 0
+      max: 20.0
+      min: 3.5
+
 comfort:
   name: comfort
   priority: 0
   comfortLat:
     name: comfortLat
     priority: 0
-    weaving:
-      name: weaving
+    zigzag:
+      name: zigzag
       priority: 0
       max: 0
       min: 0
@@ -89,6 +120,39 @@ comfort:
       priority: 0
       max: 0
       min: 0
+  comforDynamic:
+    name: comforDynamic
+    priority: 0
+    rideQualityScore:
+      name: rideQualityScore
+      priority: 0
+      max: 0
+      min: 0
+    motionSickness:
+      name: motionSickness
+      priority: 0
+      max: 0.0
+      min: 0.0
+    motionComfortIndex:
+      name: motionComfortIndex
+      priority: 0
+      max: 0.0
+      min: 0.0
+    vdv:
+      name: vdv
+      priority: 0
+      max: 0
+      min: 0
+    ava_vav:
+      name: ava_vav
+      priority: 0
+      max: 0
+      min: 0.0
+    msdv:
+      name: msdv
+      priority: 0
+      max: 0.0
+      min: 0.0
 
 efficient:
   name: efficient
@@ -111,6 +175,16 @@ efficient:
       priority: 0
       max: 80.0
       min: 30.0
+    speedUtilizationRatio:
+      name: speedUtilizationRatio
+      priority: 0
+      max: 1.0
+      min: 0.0
+    accelerationSmoothness:
+      name: accelerationSmoothness
+      priority: 0
+      max: 1.0
+      min: 0.0
   parkingMode:
     name: parkingMode
     priority: 0
@@ -123,9 +197,9 @@ efficient:
 function:
   name: function
   priority: 0
-  scenario:
+  ForwardCollision:
     name: ForwardCollision
-    priority: 0 
+    priority: 0
     latestWarningDistance_TTC_LST:
       name: latestWarningDistance_TTC_LST
       priority: 0
@@ -146,7 +220,7 @@ function:
       priority: 0
       max: 17.29
       min: 10.51
-
+      
 traffic:
   name: traffic
   priority: 0
@@ -158,17 +232,6 @@ traffic:
       priority: 0
       max: 0
       min: 0
-    urbanExpresswayOrHighwayReverse:
-      name: higwayreverse
-      priority: 0
-      max: 0
-      min: 0
-    urbanExpresswayOrHighwayDrivingAgainst:
-      name: higwayDrivingAgainst
-      priority: 0
-      max: 0
-      min: 0
-
   seriousViolation:
     name: seriousViolation
     priority: 0
@@ -178,7 +241,7 @@ traffic:
       max: 0
       min: 0
     urbanExpresswayOrHighwayEmergencyLaneStopped:
-      name: highwayEmergencyLaneStopped
+      name: urbanExpresswayOrHighwayEmergencyLaneStopped
       priority: 0
       max: 0
       min: 0
@@ -206,7 +269,6 @@ traffic:
       priority: 0
       max: 0
       min: 0
-
   generalViolation:
     name: generalViolation
     priority: 0
@@ -309,12 +371,21 @@ traffic:
   minorViolation:
     name: minorViolation
     priority: 0
-    noUTurnViolation:
-      name: noUTurnViolation
+    turn_in_forbiden_turn_left_sign:
+      name: turn_in_forbiden_turn_left_sign
+      priority: 0
+      max: 0
+      min: 0
+    turn_in_forbiden_turn_back_sign:
+      name: turn_in_forbiden_turn_back_sign
+      priority: 0
+      max: 0
+      min: 0
+    avoid_pedestrian_when_turn_back:
+      name: avoid_pedestrian_when_turn_back
       priority: 0
       max: 0
       min: 0
-
   warningViolation:
     name: warningViolation
     priority: 0
@@ -332,4 +403,4 @@ traffic:
       name: generalRoadIrregularLaneUse
       priority: 0
       max: 0
-      min: 0
+      min: 0

+ 0 - 60
config/custom_metrics_config.yaml

@@ -1,60 +0,0 @@
-safety:
-  name: safety
-  priority: 0
-  safeTime:
-    name: safetime
-    priority: 0
-    CustomTTC:
-      name: CustomTTC
-      priority: 0
-      max: 20.0
-      min: 3.5
-user:
-  name: user
-  priority: 0
-  safeTime:
-    name: safetime
-    priority: 0
-    CustomTTC:
-      name: CustomTTC
-      priority: 0
-      max: 20.0
-      min: 3.5
-comfort:
-  name: comfort
-  priority: 0
-  comfortLat:
-    name: comfortLat
-    priority: 0
-    zigzag:
-      name: zigzag
-      priority: 0
-      max: 0
-      min: 0
-function:
-  name: function
-  priority: 0
-  LKA:
-    name: LKA
-    priority: 0
-    latestWarningDistance_TTC:
-      name: latestWarningDistance_TTC
-      priority: 0
-      max: 5
-      min: 1.98
-    latestWarningDistance:
-      name: latestWarningDistance
-      priority: 0
-      max: 150
-      min: 0
-traffic:
-  name: traffic
-  priority: 0
-  generalViolation:
-    name: generalViolation
-    priority: 0
-    aviod_pedestrian_when_turning:
-      name: aviod_pedestrian_when_turning
-      priority: 0
-      max: 0
-      min: 0

+ 195 - 147
scripts/evaluator_enhanced.py

@@ -17,6 +17,7 @@ import traceback
 import json
 import inspect
 
+
 # 常量定义
 DEFAULT_WORKERS = 4
 CUSTOM_METRIC_PREFIX = "metric_"
@@ -30,46 +31,42 @@ else:
 
 sys.path.insert(0, str(_ROOT_PATH))
 
-
 class ConfigManager:
     """配置管理组件"""
-
+    
     def __init__(self, logger: logging.Logger):
         self.logger = logger
         self.base_config: Dict[str, Any] = {}
         self.custom_config: Dict[str, Any] = {}
         self.merged_config: Dict[str, Any] = {}
-
-    def split_configs(self, all_config_path: Path, base_config_path: Path, custom_config_path: Path) -> None:
+    
+    def split_configs(self, all_metrics_path: Path, builtin_metrics_path: Path, custom_metrics_path: Path) -> None:
         """从all_metrics_config.yaml拆分成内置和自定义配置"""
         try:
-            with open(all_config_path, 'r', encoding='utf-8') as f:
-                all_metrics = yaml.safe_load(f) or {}
-
-            with open(base_config_path, 'r', encoding='utf-8') as f:
-                builtin_metrics = yaml.safe_load(f) or {}
-
-            custom_metrics = self._find_custom_metrics(all_metrics, builtin_metrics)
-
-            if custom_metrics:
-                with open(custom_config_path, 'w', encoding='utf-8') as f:
-                    yaml.dump(custom_metrics, f, allow_unicode=True, sort_keys=False, indent=2)
-                self.logger.info(f"Split configs: custom metrics saved to {custom_config_path}")
-        except Exception as e:
-            self.logger.error(f"Failed to split configs: {str(e)}")
+            with open(all_metrics_path, 'r', encoding='utf-8') as f:
+                all_metrics_dict = yaml.safe_load(f) or {}
+            with open(builtin_metrics_path, 'r', encoding='utf-8') as f:
+                builtin_metrics_dict = yaml.safe_load(f) or {}
+            custom_metrics_dict = self._find_custom_metrics(all_metrics_dict, builtin_metrics_dict)
+            if custom_metrics_dict:
+                with open(custom_metrics_path, 'w', encoding='utf-8') as f:
+                    yaml.dump(custom_metrics_dict, f, allow_unicode=True, sort_keys=False, indent=2)
+                self.logger.info(f"Split configs: custom metrics saved to {custom_metrics_path}")
+        except Exception as err:
+            self.logger.error(f"Failed to split configs: {str(err)}")
             raise
-
+    
     def _find_custom_metrics(self, all_metrics, builtin_metrics, current_path=""):
         """递归比较找出自定义指标"""
         custom_metrics = {}
-
+        
         if isinstance(all_metrics, dict) and isinstance(builtin_metrics, dict):
             for key in all_metrics:
                 if key not in builtin_metrics:
                     custom_metrics[key] = all_metrics[key]
                 else:
                     child_custom = self._find_custom_metrics(
-                        all_metrics[key],
+                        all_metrics[key], 
                         builtin_metrics[key],
                         f"{current_path}.{key}" if current_path else key
                     )
@@ -77,116 +74,106 @@ class ConfigManager:
                         custom_metrics[key] = child_custom
         elif all_metrics != builtin_metrics:
             return all_metrics
-
+        
         if custom_metrics:
             return self._ensure_structure(custom_metrics, all_metrics, current_path)
         return None
-
+    
     def _ensure_structure(self, metrics_dict, full_dict, path):
         """确保每级包含name和priority"""
         if not isinstance(metrics_dict, dict):
             return metrics_dict
-
+        
         current = full_dict
         for key in path.split('.'):
             if key in current:
                 current = current[key]
             else:
                 break
-
+        
         result = {}
         if isinstance(current, dict):
             if 'name' in current:
                 result['name'] = current['name']
             if 'priority' in current:
                 result['priority'] = current['priority']
-
+        
         for key, value in metrics_dict.items():
             if key not in ['name', 'priority']:
                 result[key] = self._ensure_structure(value, full_dict, f"{path}.{key}" if path else key)
-
+        
         return result
 
-    def load_configs(self, base_config_path: Optional[Path], custom_config_path: Optional[Path]) -> Dict[str, Any]:
+    def load_configs(self, all_config_path: Optional[Path], builtin_metrics_path: Optional[Path], custom_metrics_path: Optional[Path]) -> Dict[str, Any]:
         """加载并合并配置"""
         # 自动拆分配置
-        if base_config_path and base_config_path.exists():
-            all_config_path = base_config_path.parent / "all_metrics_config.yaml"
-            if all_config_path.exists():
-                target_custom_path = custom_config_path or (base_config_path.parent / "custom_metrics_config.yaml")
-                self.split_configs(all_config_path, base_config_path, target_custom_path)
-                custom_config_path = target_custom_path
-
-        self.base_config = self._safe_load_config(base_config_path) if base_config_path else {}
-        self.custom_config = self._safe_load_config(custom_config_path) if custom_config_path else {}
+        
+        if all_config_path.exists():
+            self.split_configs(all_config_path, builtin_metrics_path, custom_metrics_path)
+            
+        self.base_config = self._safe_load_config(builtin_metrics_path) if builtin_metrics_path else {}
+        self.custom_config = self._safe_load_config(custom_metrics_path) if custom_metrics_path else {}
         self.merged_config = self._merge_configs(self.base_config, self.custom_config)
         return self.merged_config
-
+    
     def _safe_load_config(self, config_path: Path) -> Dict[str, Any]:
         """安全加载YAML配置"""
         try:
             if not config_path.exists():
                 self.logger.warning(f"Config file not found: {config_path}")
                 return {}
-
             with config_path.open('r', encoding='utf-8') as f:
-                config = yaml.safe_load(f) or {}
+                config_dict = yaml.safe_load(f) or {}
                 self.logger.info(f"Loaded config: {config_path}")
-                return config
-        except Exception as e:
-            self.logger.error(f"Failed to load config {config_path}: {str(e)}")
+                return config_dict
+        except Exception as err:
+            self.logger.error(f"Failed to load config {config_path}: {str(err)}")
             return {}
-
-    def _merge_configs(self, base_config: Dict, custom_config: Dict) -> Dict:
+    
+    def _merge_configs(self, builtin_config: Dict, custom_config: Dict) -> Dict:
         """智能合并配置"""
-        merged = base_config.copy()
-
+        merged_config = builtin_config.copy()
         for level1_key, level1_value in custom_config.items():
             if not isinstance(level1_value, dict) or 'name' not in level1_value:
-                if level1_key not in merged:
-                    merged[level1_key] = level1_value
+                if level1_key not in merged_config:
+                    merged_config[level1_key] = level1_value
                 continue
-
-            if level1_key not in merged:
-                merged[level1_key] = level1_value
+            if level1_key not in merged_config:
+                merged_config[level1_key] = level1_value
             else:
                 for level2_key, level2_value in level1_value.items():
                     if level2_key in ['name', 'priority']:
                         continue
-
                     if isinstance(level2_value, dict):
-                        if level2_key not in merged[level1_key]:
-                            merged[level1_key][level2_key] = level2_value
+                        if level2_key not in merged_config[level1_key]:
+                            merged_config[level1_key][level2_key] = level2_value
                         else:
                             for level3_key, level3_value in level2_value.items():
                                 if level3_key in ['name', 'priority']:
                                     continue
-
                                 if isinstance(level3_value, dict):
-                                    if level3_key not in merged[level1_key][level2_key]:
-                                        merged[level1_key][level2_key][level3_key] = level3_value
-
-        return merged
-
+                                    if level3_key not in merged_config[level1_key][level2_key]:
+                                        merged_config[level1_key][level2_key][level3_key] = level3_value
+        return merged_config
+    
     def get_config(self) -> Dict[str, Any]:
         return self.merged_config
-
+    
     def get_base_config(self) -> Dict[str, Any]:
         return self.base_config
-
+    
     def get_custom_config(self) -> Dict[str, Any]:
         return self.custom_config
 
-
 class MetricLoader:
     """指标加载器组件"""
-
+    
     def __init__(self, logger: logging.Logger, config_manager: ConfigManager):
         self.logger = logger
         self.config_manager = config_manager
         self.metric_modules: Dict[str, Type] = {}
         self.custom_metric_modules: Dict[str, Any] = {}
-
+    
     def load_builtin_metrics(self) -> Dict[str, Type]:
         """加载内置指标模块"""
         module_mapping = {
@@ -196,15 +183,15 @@ class MetricLoader:
             "efficient": ("modules.metric.efficient", "EfficientManager"),
             "function": ("modules.metric.function", "FunctionManager"),
         }
-
+        
         self.metric_modules = {
             name: self._load_module(*info)
             for name, info in module_mapping.items()
         }
-
+        
         self.logger.info(f"Loaded builtin metrics: {', '.join(self.metric_modules.keys())}")
         return self.metric_modules
-
+    
     @lru_cache(maxsize=32)
     def _load_module(self, module_path: str, class_name: str) -> Type:
         """动态加载Python模块"""
@@ -214,7 +201,7 @@ class MetricLoader:
         except (ImportError, AttributeError) as e:
             self.logger.error(f"Failed to load module: {module_path}.{class_name} - {str(e)}")
             raise
-
+    
     def load_custom_metrics(self, custom_metrics_path: Optional[Path]) -> Dict[str, Any]:
         """加载自定义指标模块"""
         if not custom_metrics_path or not custom_metrics_path.is_dir():
@@ -226,30 +213,30 @@ class MetricLoader:
             if py_file.name.startswith(CUSTOM_METRIC_PREFIX):
                 if self._process_custom_metric_file(py_file):
                     loaded_count += 1
-
+        
         self.logger.info(f"Loaded {loaded_count} custom metric modules")
         return self.custom_metric_modules
-
+    
     def _process_custom_metric_file(self, file_path: Path) -> bool:
         """处理单个自定义指标文件"""
         try:
             metric_key = self._validate_metric_file(file_path)
-
+            
             module_name = f"custom_metric_{file_path.stem}"
             spec = importlib.util.spec_from_file_location(module_name, file_path)
             module = importlib.util.module_from_spec(spec)
             spec.loader.exec_module(module)
-
+            
             from modules.lib.metric_registry import BaseMetric
             metric_class = None
-
+            
             for name, obj in inspect.getmembers(module):
-                if (inspect.isclass(obj) and
-                        issubclass(obj, BaseMetric) and
-                        obj != BaseMetric):
+                if (inspect.isclass(obj) and 
+                    issubclass(obj, BaseMetric) and 
+                    obj != BaseMetric):
                     metric_class = obj
                     break
-
+            
             if metric_class:
                 self.custom_metric_modules[metric_key] = {
                     'type': 'class',
@@ -265,7 +252,7 @@ class MetricLoader:
                 self.logger.info(f"Loaded function-based custom metric: {metric_key}")
             else:
                 raise AttributeError(f"Missing evaluate() function or BaseMetric subclass: {file_path.name}")
-
+                
             return True
         except ValueError as e:
             self.logger.warning(str(e))
@@ -273,25 +260,24 @@ class MetricLoader:
         except Exception as e:
             self.logger.error(f"Failed to load custom metric {file_path}: {str(e)}")
             return False
-
+    
     def _validate_metric_file(self, file_path: Path) -> str:
         """验证自定义指标文件命名规范"""
         stem = file_path.stem[len(CUSTOM_METRIC_PREFIX):]
         parts = stem.split('_')
         if len(parts) < 3:
-            raise ValueError(
-                f"Invalid custom metric filename: {file_path.name}, should be metric_<level1>_<level2>_<level3>.py")
+            raise ValueError(f"Invalid custom metric filename: {file_path.name}, should be metric_<level1>_<level2>_<level3>.py")
 
         level1, level2, level3 = parts[:3]
         if not self._is_metric_configured(level1, level2, level3):
             raise ValueError(f"Unconfigured metric: {level1}.{level2}.{level3}")
         return f"{level1}.{level2}.{level3}"
-
+    
     def _is_metric_configured(self, level1: str, level2: str, level3: str) -> bool:
         """检查指标是否在配置中注册"""
         custom_config = self.config_manager.get_custom_config()
         try:
-            return (level1 in custom_config and
+            return (level1 in custom_config and 
                     isinstance(custom_config[level1], dict) and
                     level2 in custom_config[level1] and
                     isinstance(custom_config[level1][level2], dict) and
@@ -299,33 +285,32 @@ class MetricLoader:
                     isinstance(custom_config[level1][level2][level3], dict))
         except Exception:
             return False
-
+    
     def get_builtin_metrics(self) -> Dict[str, Type]:
         return self.metric_modules
-
+    
     def get_custom_metrics(self) -> Dict[str, Any]:
         return self.custom_metric_modules
 
-
 class EvaluationEngine:
     """评估引擎组件"""
-
+    
     def __init__(self, logger: logging.Logger, config_manager: ConfigManager, metric_loader: MetricLoader):
         self.logger = logger
         self.config_manager = config_manager
         self.metric_loader = metric_loader
-
+    
     def evaluate(self, data: Any) -> Dict[str, Any]:
         """执行评估流程"""
         raw_results = self._collect_builtin_metrics(data)
         custom_results = self._collect_custom_metrics(data)
         return self._process_merged_results(raw_results, custom_results)
-
+    
     def _collect_builtin_metrics(self, data: Any) -> Dict[str, Any]:
         """收集内置指标结果"""
         metric_modules = self.metric_loader.get_builtin_metrics()
         raw_results: Dict[str, Any] = {}
-
+        
         with ThreadPoolExecutor(max_workers=len(metric_modules)) as executor:
             futures = {
                 executor.submit(self._run_module, module, data, module_name): module_name
@@ -347,21 +332,21 @@ class EvaluationEngine:
                         "message": str(e),
                         "timestamp": datetime.now().isoformat(),
                     }
-
+        
         return raw_results
-
+    
     def _collect_custom_metrics(self, data: Any) -> Dict[str, Dict]:
         """收集自定义指标结果"""
         custom_metrics = self.metric_loader.get_custom_metrics()
         if not custom_metrics:
             return {}
-
+            
         custom_results = {}
-
+        
         for metric_key, metric_info in custom_metrics.items():
             try:
                 level1, level2, level3 = metric_key.split('.')
-
+                
                 if metric_info['type'] == 'class':
                     metric_class = metric_info['class']
                     metric_instance = metric_class(data)
@@ -369,22 +354,22 @@ class EvaluationEngine:
                 else:
                     module = metric_info['module']
                     metric_result = module.evaluate(data)
-
+                
                 if level1 not in custom_results:
                     custom_results[level1] = {}
                 custom_results[level1] = metric_result
-
+                
                 self.logger.info(f"Calculated custom metric: {level1}.{level2}.{level3}")
-
+                
             except Exception as e:
                 self.logger.error(f"Custom metric {metric_key} failed: {str(e)}")
-
+                
                 try:
                     level1, level2, level3 = metric_key.split('.')
-
+                    
                     if level1 not in custom_results:
                         custom_results[level1] = {}
-
+                        
                     custom_results[level1] = {
                         "status": "error",
                         "message": str(e),
@@ -392,9 +377,9 @@ class EvaluationEngine:
                     }
                 except Exception:
                     pass
-
+        
         return custom_results
-
+    
     def _process_merged_results(self, raw_results: Dict, custom_results: Dict) -> Dict:
         """处理合并后的评估结果"""
         from modules.lib.score import Score
@@ -420,14 +405,14 @@ class EvaluationEngine:
                     final_results[level1] = self._format_error(e)
 
         return final_results
-
+        
     def _format_error(self, e: Exception) -> Dict:
         return {
             "status": "error",
             "message": str(e),
             "timestamp": datetime.now().isoformat()
         }
-
+                
     def _run_module(self, module_class: Any, data: Any, module_name: str) -> Dict[str, Any]:
         """执行单个评估模块"""
         try:
@@ -437,14 +422,13 @@ class EvaluationEngine:
             self.logger.error(f"{module_name} execution error: {str(e)}", exc_info=True)
             return {module_name: {"error": str(e)}}
 
-
 class LoggingManager:
     """日志管理组件"""
-
+    
     def __init__(self, log_path: Path):
         self.log_path = log_path
         self.logger = self._init_logger()
-
+    
     def _init_logger(self) -> logging.Logger:
         """初始化日志系统"""
         try:
@@ -459,21 +443,20 @@ class LoggingManager:
             logger.addHandler(console_handler)
             logger.warning(f"Failed to init standard logger: {str(e)}, using fallback logger")
             return logger
-
+    
     def get_logger(self) -> logging.Logger:
         return self.logger
 
-
 class DataProcessor:
     """数据处理组件"""
-
+    
     def __init__(self, logger: logging.Logger, data_path: Path, config_path: Optional[Path] = None):
         self.logger = logger
         self.data_path = data_path
         self.config_path = config_path
         self.processor = self._load_processor()
         self.case_name = self.data_path.name
-
+    
     def _load_processor(self) -> Any:
         """加载数据处理器"""
         try:
@@ -482,7 +465,7 @@ class DataProcessor:
         except ImportError as e:
             self.logger.error(f"Failed to load data processor: {str(e)}")
             raise RuntimeError(f"Failed to load data processor: {str(e)}") from e
-
+    
     def validate(self) -> None:
         """验证数据路径"""
         if not self.data_path.exists():
@@ -490,74 +473,131 @@ class DataProcessor:
         if not self.data_path.is_dir():
             raise NotADirectoryError(f"Invalid data directory: {self.data_path}")
 
-
 class EvaluationPipeline:
     """评估流水线控制器"""
-
-    def __init__(self, config_path: str, log_path: str, data_path: str, report_path: str,
+    
+    def __init__(self, all_config_path: str, base_config_path: str, log_path: str, data_path: str, report_path: str, 
                  custom_metrics_path: Optional[str] = None, custom_config_path: Optional[str] = None):
         # 路径初始化
-        self.config_path = Path(config_path) if config_path else None
+        self.all_config_path = Path(all_config_path) if all_config_path else None
+        self.base_config_path = Path(base_config_path) if base_config_path else None
         self.custom_config_path = Path(custom_config_path) if custom_config_path else None
         self.data_path = Path(data_path)
         self.report_path = Path(report_path)
         self.custom_metrics_path = Path(custom_metrics_path) if custom_metrics_path else None
-
-        # 组件初始化
+        
+        # 日志
         self.logging_manager = LoggingManager(Path(log_path))
         self.logger = self.logging_manager.get_logger()
+        # 配置
         self.config_manager = ConfigManager(self.logger)
-        self.config_manager.load_configs(self.config_path, self.custom_config_path)
+        self.config = self.config_manager.load_configs(
+            self.all_config_path, self.base_config_path, self.custom_config_path
+        )
+        # 指标加载
         self.metric_loader = MetricLoader(self.logger, self.config_manager)
         self.metric_loader.load_builtin_metrics()
         self.metric_loader.load_custom_metrics(self.custom_metrics_path)
+        # 数据处理
+        self.data_processor = DataProcessor(self.logger, self.data_path, self.all_config_path)
         self.evaluation_engine = EvaluationEngine(self.logger, self.config_manager, self.metric_loader)
-        self.data_processor = DataProcessor(self.logger, self.data_path, self.config_path)
-
+    
     def execute(self) -> Dict[str, Any]:
         """执行评估流水线"""
         try:
             self.data_processor.validate()
-
+            
             self.logger.info(f"Start evaluation: {self.data_path.name}")
             start_time = time.perf_counter()
             results = self.evaluation_engine.evaluate(self.data_processor.processor)
             elapsed_time = time.perf_counter() - start_time
             self.logger.info(f"Evaluation completed, time: {elapsed_time:.2f}s")
-
+            
             report = self._generate_report(self.data_processor.case_name, results)
             return report
-
+            
         except Exception as e:
             self.logger.critical(f"Evaluation failed: {str(e)}", exc_info=True)
             return {"error": str(e), "traceback": traceback.format_exc()}
-
+    
+    def _add_overall_result(self, report: Dict[str, Any]) -> Dict[str, Any]:
+        """处理评测报告并添加总体结果字段"""
+        # 加载阈值参数
+        thresholds = {
+            "T0": self.config['T_threshold']['T0_threshold'],
+            "T1": self.config['T_threshold']['T1_threshold'],
+            "T2": self.config['T_threshold']['T2_threshold']
+        }
+        
+        # 初始化计数器
+        counters = {'p0': 0, 'p1': 0, 'p2': 0}
+        
+        # 遍历报告中的所有键,包括内置和自定义一级指标
+        for category, category_data in report.items():
+            # 跳过非指标键(如metadata等)
+            if not isinstance(category_data, dict) or category == "metadata":
+                continue
+                
+            # 如果该维度的结果为False,根据其priority增加对应计数
+            if not category_data.get('result', True):
+                priority = category_data.get('priority')
+                if priority == 0:
+                    counters['p0'] += 1
+                elif priority == 1:
+                    counters['p1'] += 1
+                elif priority == 2:
+                    counters['p2'] += 1
+        
+        # 阈值判断逻辑
+        thresholds_exceeded = (
+            counters['p0'] > thresholds['T0'],
+            counters['p1'] > thresholds['T1'],
+            counters['p2'] > thresholds['T2']
+        )
+        
+        # 生成处理后的报告
+        processed_report = report.copy()
+        processed_report['overall_result'] = not any(thresholds_exceeded)
+        
+        # 添加统计信息
+        processed_report['threshold_checks'] = {
+            'T0_threshold': thresholds['T0'],
+            'T1_threshold': thresholds['T1'],
+            'T2_threshold': thresholds['T2'],
+            'actual_counts': counters
+        }
+        
+        self.logger.info(f"Added overall result: {processed_report['overall_result']}")
+        return processed_report
+        
     def _generate_report(self, case_name: str, results: Dict[str, Any]) -> Dict[str, Any]:
         """生成评估报告"""
         from modules.lib.common import dict2json
-
+        
         self.report_path.mkdir(parents=True, exist_ok=True)
-
+        
         results["metadata"] = {
             "case_name": case_name,
             "timestamp": datetime.now().isoformat(),
             "version": "3.1.0",
         }
-
+        
+        # 添加总体结果评估
+        results = self._add_overall_result(results)
+        
         report_file = self.report_path / f"{case_name}_report.json"
         dict2json(results, report_file)
         self.logger.info(f"Report generated: {report_file}")
-
+        
         return results
 
-
 def main():
     """命令行入口"""
     parser = argparse.ArgumentParser(
         description="Autonomous Driving Evaluation System V3.1",
         formatter_class=argparse.ArgumentDefaultsHelpFormatter,
     )
-
+    
     parser.add_argument(
         "--logPath",
         type=str,
@@ -570,11 +610,19 @@ def main():
         default=r"D:\Kevin\zhaoyuan\data\V2V_CSAE53-2020_ForwardCollision_LST_01-02",
         help="Input data directory",
     )
+    
     parser.add_argument(
-        "--configPath",
+        "--allConfigPath",
         type=str,
         default="config/all_metrics_config.yaml",
-        help="Metrics config file path",
+        help="Full metrics config file path (built-in + custom)",
+    )
+    
+    parser.add_argument(
+        "--baseConfigPath",
+        type=str,
+        default="config/builtin_metrics_config.yaml",
+        help="Built-in metrics config file path",
     )
     parser.add_argument(
         "--reportPath",
@@ -594,19 +642,20 @@ def main():
         default="config/custom_metrics_config.yaml",
         help="Custom metrics config path (optional)",
     )
-
+    
     args = parser.parse_args()
 
     try:
         pipeline = EvaluationPipeline(
-            args.configPath,
-            args.logPath,
-            args.dataPath,
-            args.reportPath,
-            args.customMetricsPath,
-            args.customConfigPath
+            all_config_path=args.allConfigPath,
+            base_config_path=args.baseConfigPath,
+            log_path=args.logPath, 
+            data_path=args.dataPath, 
+            report_path=args.reportPath, 
+            custom_metrics_path=args.customMetricsPath, 
+            custom_config_path=args.customConfigPath
         )
-
+        
         start_time = time.perf_counter()
         result = pipeline.execute()
         elapsed_time = time.perf_counter() - start_time
@@ -617,7 +666,7 @@ def main():
 
         print(f"Evaluation completed, total time: {elapsed_time:.2f}s")
         print(f"Report path: {pipeline.report_path}")
-
+        
     except KeyboardInterrupt:
         print("\nUser interrupted")
         sys.exit(130)
@@ -626,7 +675,6 @@ def main():
         traceback.print_exc()
         sys.exit(1)
 
-
 if __name__ == "__main__":
     warnings.filterwarnings("ignore")
-    main()
+    main()

+ 0 - 498
scripts/evaluator_optimized.py

@@ -1,498 +0,0 @@
-# evaluation_engine.py
-import sys
-import warnings
-import time
-import importlib
-import yaml  # 添加yaml模块导入
-from pathlib import Path
-import argparse
-from concurrent.futures import ThreadPoolExecutor
-from functools import lru_cache
-from typing import Dict, Any, List, Optional
-from datetime import datetime
-
-# 强制导入所有可能动态加载的模块
-
-
-# 安全设置根目录路径(动态路径管理)
-# 判断是否处于编译模式
-if hasattr(sys, "_MEIPASS"):
-    # 编译模式下使用临时资源目录
-    _ROOT_PATH = Path(sys._MEIPASS)
-else:
-    # 开发模式下使用原工程路径
-    _ROOT_PATH = Path(__file__).resolve().parent.parent
-
-sys.path.insert(0, str(_ROOT_PATH))
-print(f"当前根目录:{_ROOT_PATH}")
-print(f'当前系统路径:{sys.path}')
-
-
-class EvaluationCore:
-    """评估引擎核心类(单例模式)"""
-
-    _instance = None
-
-    def __new__(cls, logPath: str, configPath: str = None, customConfigPath: str = None, customMetricsPath: str = None):
-        if not cls._instance:
-            cls._instance = super().__new__(cls)
-            cls._instance._init(logPath, configPath, customConfigPath, customMetricsPath)
-        return cls._instance
-
-    def _init(self, logPath: str = None, configPath: str = None, customConfigPath: str = None,
-              customMetricsPath: str = None) -> None:
-        """初始化引擎组件"""
-        self.log_path = logPath
-        self.config_path = configPath
-        self.custom_config_path = customConfigPath
-        self.custom_metrics_path = customMetricsPath
-
-        # 加载配置
-        self.metrics_config = {}
-        self.custom_metrics_config = {}
-        self.merged_config = {}  # 添加合并后的配置
-
-        # 自定义指标脚本模块
-        self.custom_metrics_modules = {}
-
-        self._init_log_system()
-        self._load_configs()  # 加载并合并配置
-        self._init_metrics()
-        self._load_custom_metrics()
-
-    def _init_log_system(self) -> None:
-        """集中式日志管理"""
-        try:
-            from modules.lib.log_manager import LogManager
-
-            log_manager = LogManager(self.log_path)
-            self.logger = log_manager.get_logger()
-        except (PermissionError, IOError) as e:
-            sys.stderr.write(f"日志系统初始化失败: {str(e)}\n")
-            sys.exit(1)
-
-    def _init_metrics(self) -> None:
-        """初始化评估模块(策略模式)"""
-        # from modules.metric import safety, comfort, traffic, efficient, function
-        self.metric_modules = {
-            "safety": self._load_module("modules.metric.safety", "SafeManager"),
-            "comfort": self._load_module("modules.metric.comfort", "ComfortManager"),
-            "traffic": self._load_module("modules.metric.traffic", "TrafficManager"),
-            "efficient": self._load_module("modules.metric.efficient", "EfficientManager"),
-            "function": self._load_module("modules.metric.function", "FunctionManager"),
-        }
-
-    @lru_cache(maxsize=32)
-    def _load_module(self, module_path: str, class_name: str) -> Any:
-        """动态加载评估模块(带缓存)"""
-        try:
-            __import__(module_path)
-            return getattr(sys.modules[module_path], class_name)
-        except (ImportError, AttributeError) as e:
-            self.logger.error(f"模块加载失败: {module_path}.{class_name} - {str(e)}")
-            raise
-
-    def _load_configs(self) -> None:
-        """加载并合并内置指标和自定义指标配置"""
-        # 加载内置指标配置
-        if self.config_path and Path(self.config_path).exists():
-            try:
-                with open(self.config_path, 'r', encoding='utf-8') as f:
-                    self.metrics_config = yaml.safe_load(f)
-                self.logger.info(f"成功加载内置指标配置: {self.config_path}")
-            except Exception as e:
-                self.logger.error(f"加载内置指标配置失败: {str(e)}")
-                self.metrics_config = {}
-
-        # 加载自定义指标配置
-        if self.custom_config_path and Path(self.custom_config_path).exists():
-            try:
-                with open(self.custom_config_path, 'r', encoding='utf-8') as f:
-                    self.custom_metrics_config = yaml.safe_load(f)
-                self.logger.info(f"成功加载自定义指标配置: {self.custom_config_path}")
-            except Exception as e:
-                self.logger.error(f"加载自定义指标配置失败: {str(e)}")
-                self.custom_metrics_config = {}
-
-        # 合并配置
-        self.merged_config = self._merge_configs(self.metrics_config, self.custom_metrics_config)
-
-    def _merge_configs(self, base_config: Dict, custom_config: Dict) -> Dict:
-        """
-        合并内置指标和自定义指标配置
-
-        策略:
-        1. 如果自定义指标与内置指标有相同的一级指标,则合并其下的二级指标
-        2. 如果自定义指标与内置指标有相同的二级指标,则合并其下的三级指标
-        3. 如果是全新的指标,则直接添加
-        """
-        merged = base_config.copy()
-
-        for level1_key, level1_value in custom_config.items():
-            # 跳过非指标配置项(如vehicle等)
-            if not isinstance(level1_value, dict) or 'name' not in level1_value:
-                if level1_key not in merged:
-                    merged[level1_key] = level1_value
-                continue
-
-            if level1_key not in merged:
-                # 全新的一级指标
-                merged[level1_key] = level1_value
-            else:
-                # 合并已存在的一级指标下的内容
-                for level2_key, level2_value in level1_value.items():
-                    if level2_key == 'name' or level2_key == 'priority':
-                        continue
-
-                    if isinstance(level2_value, dict):
-                        if level2_key not in merged[level1_key]:
-                            # 新的二级指标
-                            merged[level1_key][level2_key] = level2_value
-                        else:
-                            # 合并已存在的二级指标下的内容
-                            for level3_key, level3_value in level2_value.items():
-                                if level3_key == 'name' or level3_key == 'priority':
-                                    continue
-
-                                if isinstance(level3_value, dict):
-                                    if level3_key not in merged[level1_key][level2_key]:
-                                        # 新的三级指标
-                                        merged[level1_key][level2_key][level3_key] = level3_value
-
-        return merged
-
-    def _load_custom_metrics(self) -> None:
-        """加载自定义指标脚本"""
-        if not self.custom_metrics_path or not Path(self.custom_metrics_path).exists():
-            return
-
-        custom_metrics_dir = Path(self.custom_metrics_path)
-        if not custom_metrics_dir.is_dir():
-            self.logger.warning(f"自定义指标路径不是目录: {custom_metrics_dir}")
-            return
-
-        # 遍历自定义指标脚本目录
-        for file_path in custom_metrics_dir.glob("*.py"):
-            if file_path.name.startswith("metric_") and file_path.name.endswith(".py"):
-                try:
-                    # 解析脚本名称,获取指标层级信息
-                    parts = file_path.stem[7:].split('_')  # 去掉'metric_'前缀
-                    if len(parts) < 3:
-                        self.logger.warning(
-                            f"自定义指标脚本 {file_path.name} 命名不符合规范,应为 metric_<level1>_<level2>_<level3>.py")
-                        continue
-
-                    level1, level2, level3 = parts[0], parts[1], parts[2]
-
-                    # 检查指标是否在配置中
-                    if not self._check_metric_in_config(level1, level2, level3, self.custom_metrics_config):
-                        self.logger.warning(f"自定义指标 {level1}.{level2}.{level3} 在配置中不存在,跳过加载")
-                        continue
-
-                    # 加载脚本模块
-                    module_name = f"custom_metric_{level1}_{level2}_{level3}"
-                    spec = importlib.util.spec_from_file_location(module_name, file_path)
-                    module = importlib.util.module_from_spec(spec)
-                    spec.loader.exec_module(module)
-
-                    # 检查模块是否包含必要的函数
-                    if not hasattr(module, 'evaluate'):
-                        self.logger.warning(f"自定义指标脚本 {file_path.name} 缺少 evaluate 函数")
-                        continue
-
-                    # 存储模块引用
-                    key = f"{level1}.{level2}.{level3}"
-                    self.custom_metrics_modules[key] = module
-                    self.logger.info(f"成功加载自定义指标脚本: {file_path.name}")
-
-                except Exception as e:
-                    self.logger.error(f"加载自定义指标脚本 {file_path.name} 失败: {str(e)}")
-
-    def _check_metric_in_config(self, level1: str, level2: str, level3: str, config: Dict) -> bool:
-        """检查指标是否在配置中存在"""
-        try:
-            return (level1 in config and
-                    isinstance(config[level1], dict) and
-                    level2 in config[level1] and
-                    isinstance(config[level1][level2], dict) and
-                    level3 in config[level1][level2] and
-                    isinstance(config[level1][level2][level3], dict))
-        except Exception:
-            return False
-
-    def parallel_evaluate(self, data: Any) -> Dict[str, Any]:
-        """并行化评估引擎(动态线程池)"""
-        # 存储所有评估结果
-        results = {}
-
-        # 1. 先评估内置指标
-        self._evaluate_built_in_metrics(data, results)
-
-        # 2. 再评估自定义指标并合并结果
-        self._evaluate_and_merge_custom_metrics(data, results)
-
-        return results
-
-    def _evaluate_built_in_metrics(self, data: Any, results: Dict[str, Any]) -> None:
-        """评估内置指标"""
-        # 关键修改点1:线程数=模块数
-        with ThreadPoolExecutor(max_workers=len(self.metric_modules)) as executor:
-            # 关键修改点2:按模块名创建future映射
-            futures = {
-                module_name: executor.submit(
-                    self._run_module, module, data, module_name
-                )
-                for module_name, module in self.metric_modules.items()
-            }
-
-            # 关键修改点3:按模块顺序处理结果
-            for module_name, future in futures.items():
-                try:
-                    from modules.lib.score import Score
-                    evaluator = Score(self.merged_config, module_name)
-                    result_module = future.result()
-                    result = evaluator.evaluate(result_module)
-                    # results.update(result[module_name])
-                    results.update(result)
-                except Exception as e:
-                    self.logger.error(
-                        f"{module_name} 评估失败: {str(e)}",
-                        exc_info=True,
-                        extra={"stack": True},  # 记录完整堆栈
-                    )
-                    # 错误信息结构化存储
-                    results[module_name] = {
-                        "status": "error",
-                        "message": str(e),
-                        "timestamp": datetime.now().isoformat(),
-                    }
-
-    def _evaluate_and_merge_custom_metrics(self, data: Any, results: Dict[str, Any]) -> None:
-        """评估自定义指标并合并结果"""
-        if not self.custom_metrics_modules:
-            return
-
-        # 按一级指标分组自定义指标
-        grouped_metrics = {}
-        for metric_key in self.custom_metrics_modules:
-            level1 = metric_key.split('.')[0]
-            if level1 not in grouped_metrics:
-                grouped_metrics[level1] = []
-            grouped_metrics[level1].append(metric_key)
-
-        # 处理每个一级指标组
-        for level1, metric_keys in grouped_metrics.items():
-            # 检查是否为内置一级指标
-            is_built_in = level1 in self.metrics_config and 'name' in self.metrics_config[level1]
-            level1_name = self.merged_config[level1].get('name', level1) if level1 in self.merged_config else level1
-
-            # 如果是内置一级指标,将结果合并到已有结果中
-            if is_built_in and level1_name in results:
-                for metric_key in metric_keys:
-                    self._evaluate_and_merge_single_metric(data, results, metric_key, level1_name)
-            else:
-                # 如果是新的一级指标,创建新的结果结构
-                if level1_name not in results:
-                    results[level1_name] = {}
-
-                # 评估该一级指标下的所有自定义指标
-                for metric_key in metric_keys:
-                    self._evaluate_and_merge_single_metric(data, results, metric_key, level1_name)
-
-    def _evaluate_and_merge_single_metric(self, data: Any, results: Dict[str, Any], metric_key: str,
-                                          level1_name: str) -> None:
-        """评估单个自定义指标并合并结果"""
-        try:
-            level1, level2, level3 = metric_key.split('.')
-            module = self.custom_metrics_modules[metric_key]
-
-            # 获取指标配置
-            metric_config = self.custom_metrics_config[level1][level2][level3]
-
-            # 获取指标名称
-            level2_name = self.custom_metrics_config[level1][level2].get('name', level2)
-            level3_name = metric_config.get('name', level3)
-
-            # 确保结果字典结构存在
-            if level2_name not in results[level1_name]:
-                results[level1_name][level2_name] = {}
-
-            # 调用自定义指标评测函数
-            metric_result = module.evaluate(data)
-            from modules.lib.score import Score
-            evaluator = Score(self.merged_config, level1_name)
-
-            result = evaluator.evaluate(metric_result)
-
-            results.update(result)
-
-            self.logger.info(f"评测自定义指标: {level1_name}.{level2_name}.{level3_name}")
-
-        except Exception as e:
-            self.logger.error(f"评测自定义指标 {metric_key} 失败: {str(e)}")
-
-            # 尝试添加错误信息到结果中
-            try:
-                level1, level2, level3 = metric_key.split('.')
-                level2_name = self.custom_metrics_config[level1][level2].get('name', level2)
-                level3_name = self.custom_metrics_config[level1][level2][level3].get('name', level3)
-
-                if level2_name not in results[level1_name]:
-                    results[level1_name][level2_name] = {}
-
-                results[level1_name][level2_name][level3_name] = {
-                    "status": "error",
-                    "message": str(e),
-                    "timestamp": datetime.now().isoformat(),
-                }
-            except Exception:
-                pass
-
-    def _run_module(
-            self, module_class: Any, data: Any, module_name: str
-    ) -> Dict[str, Any]:
-        """执行单个评估模块(带熔断机制)"""
-        try:
-            instance = module_class(data)
-            return {module_name: instance.report_statistic()}
-        except Exception as e:
-            self.logger.error(f"{module_name} 执行异常: {str(e)}", stack_info=True)
-            return {module_name: {"error": str(e)}}
-
-
-class EvaluationPipeline:
-    """评估流水线控制器"""
-
-    def __init__(self, configPath: str, logPath: str, dataPath: str, resultPath: str,
-                 customMetricsPath: Optional[str] = None, customConfigPath: Optional[str] = None):
-        self.configPath = Path(configPath)
-        self.custom_config_path = Path(customConfigPath) if customConfigPath else None
-        self.data_path = Path(dataPath)
-        self.report_path = Path(resultPath)
-        self.custom_metrics_path = Path(customMetricsPath) if customMetricsPath else None
-
-        # 创建评估引擎实例,传入所有必要参数
-        self.engine = EvaluationCore(
-            logPath,
-            configPath=str(self.configPath),
-            customConfigPath=str(self.custom_config_path) if self.custom_config_path else None,
-            customMetricsPath=str(self.custom_metrics_path) if self.custom_metrics_path else None
-        )
-
-        self.data_processor = self._load_data_processor()
-
-    def _load_data_processor(self) -> Any:
-        """动态加载数据预处理模块"""
-        try:
-            from modules.lib import data_process
-
-            return data_process.DataPreprocessing(self.data_path, self.configPath)
-        except ImportError as e:
-            raise RuntimeError(f"数据处理器加载失败: {str(e)}") from e
-
-    def execute_pipeline(self) -> Dict[str, Any]:
-        """端到端执行评估流程"""
-        self._validate_case()
-
-        try:
-            metric_results = self.engine.parallel_evaluate(self.data_processor)
-            report = self._generate_report(
-                self.data_processor.case_name, metric_results
-            )
-            return report
-        except Exception as e:
-            self.engine.logger.critical(f"流程执行失败: {str(e)}", exc_info=True)
-            return {"error": str(e)}
-
-    def _validate_case(self) -> None:
-        """用例路径验证"""
-        case_path = self.data_path
-        if not case_path.exists():
-            raise FileNotFoundError(f"用例路径不存在: {case_path}")
-        if not case_path.is_dir():
-            raise NotADirectoryError(f"无效的用例目录: {case_path}")
-
-    def _generate_report(self, case_name: str, results: Dict) -> Dict:
-        """生成评估报告(模板方法模式)"""
-        from modules.lib.common import dict2json
-
-        report_path = self.report_path
-        report_path.mkdir(parents=True, exist_ok=True, mode=0o777)
-
-        report_file = report_path / f"{case_name}_report.json"
-        dict2json(results, report_file)
-        self.engine.logger.info(f"评估报告已生成: {report_file}")
-        return results
-
-
-def main():
-    """命令行入口(工厂模式)"""
-    parser = argparse.ArgumentParser(
-        description="自动驾驶评估系统 V3.0 - 支持动态指标选择和自定义指标",
-        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
-    )
-    # 带帮助说明的参数定义,增加默认值
-    parser.add_argument(
-        "--logPath",
-        type=str,
-        default=r"D:\Cicv\招远\zhaoyuan\test.log",
-        help="日志文件存储路径",
-    )
-    parser.add_argument(
-        "--dataPath",
-        type=str,
-        default=r"D:\Cicv\招远\V2V_CSAE53-2020_ForwardCollision_LST_01-02_new",
-        help="预处理后的输入数据目录",
-    )
-    parser.add_argument(
-        "--configPath",
-        type=str,
-        default=r"D:\Cicv\招远\zhaoyuan\zhaoyuan\config\all_metrics_config.yaml",
-        help="评估指标配置文件路径",
-    )
-    parser.add_argument(
-        "--reportPath",
-        type=str,
-        default=r"D:\Cicv\招远\zhaoyuan\zhaoyuan\result",
-        help="评估报告输出目录",
-    )
-    # 新增自定义指标路径参数(可选)
-    parser.add_argument(
-        "--customMetricsPath",
-        type=str,
-        default=r"D:\Cicv\招远\zhaoyuan\zhaoyuan\custom_metrics",
-        help="自定义指标脚本目录(可选)",
-    )
-    # 新增自定义指标路径参数(可选)
-    parser.add_argument(
-        "--customConfigPath",
-        type=str,
-        default=r"D:\Cicv\招远\zhaoyuan\zhaoyuan\test\custom_metrics_config.yaml",
-        help="自定义指标脚本目录(可选)",
-    )
-    args = parser.parse_args()
-
-    try:
-        pipeline = EvaluationPipeline(
-            args.configPath, args.logPath, args.dataPath, args.reportPath, args.customMetricsPath, args.customConfigPath
-        )
-        start_time = time.perf_counter()
-
-        result = pipeline.execute_pipeline()
-
-        if "error" in result:
-            sys.exit(1)
-
-        print(f"评估完成,耗时: {time.perf_counter() - start_time:.2f}s")
-        print(f"报告路径: {pipeline.report_path}")
-    except KeyboardInterrupt:
-        print("\n用户中断操作")
-        sys.exit(130)
-    except Exception as e:
-        print(f"程序执行异常: {str(e)}")
-        sys.exit(1)
-
-
-if __name__ == "__main__":
-    warnings.filterwarnings("ignore")
-    main()