123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- # evaluation_engine.py
- import sys
- import warnings
- import time
- import importlib
- import yaml # 添加yaml模块导入
- from pathlib import Path
- import argparse
- from concurrent.futures import ThreadPoolExecutor
- from functools import lru_cache
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- # 强制导入所有可能动态加载的模块
- # 安全设置根目录路径(动态路径管理)
- # 判断是否处于编译模式
- if hasattr(sys, "_MEIPASS"):
- # 编译模式下使用临时资源目录
- _ROOT_PATH = Path(sys._MEIPASS)
- else:
- # 开发模式下使用原工程路径
- _ROOT_PATH = Path(__file__).resolve().parent.parent
- sys.path.insert(0, str(_ROOT_PATH))
- print(f"当前根目录:{_ROOT_PATH}")
- print(f'当前系统路径:{sys.path}')
- class EvaluationCore:
- """评估引擎核心类(单例模式)"""
- _instance = None
- def __new__(cls, logPath: str, configPath: str = None, customConfigPath: str = None, customMetricsPath: str = None):
- if not cls._instance:
- cls._instance = super().__new__(cls)
- cls._instance._init(logPath, configPath, customConfigPath, customMetricsPath)
- return cls._instance
- def _init(self, logPath: str = None, configPath: str = None, customConfigPath: str = None, customMetricsPath: str = None) -> None:
- """初始化引擎组件"""
- self.log_path = logPath
- self.config_path = configPath
- self.custom_config_path = customConfigPath
- self.custom_metrics_path = customMetricsPath
-
- # 加载配置
- self.metrics_config = {}
- self.custom_metrics_config = {}
- self.merged_config = {} # 添加合并后的配置
-
- # 自定义指标脚本模块
- self.custom_metrics_modules = {}
-
- self._init_log_system()
- self._load_configs() # 加载并合并配置
- self._init_metrics()
- self._load_custom_metrics()
- def _init_log_system(self) -> None:
- """集中式日志管理"""
- try:
- from modules.lib.log_manager import LogManager
- log_manager = LogManager(self.log_path)
- self.logger = log_manager.get_logger()
- except (PermissionError, IOError) as e:
- sys.stderr.write(f"日志系统初始化失败: {str(e)}\n")
- sys.exit(1)
- def _init_metrics(self) -> None:
- """初始化评估模块(策略模式)"""
- # from modules.metric import safety, comfort, traffic, efficient, function
- self.metric_modules = {
- "safety": self._load_module("modules.metric.safety", "SafeManager"),
- "comfort": self._load_module("modules.metric.comfort", "ComfortManager"),
- "traffic": self._load_module("modules.metric.traffic", "ViolationManager"),
- "efficient": self._load_module("modules.metric.efficient", "EfficientManager"),
- "function": self._load_module("modules.metric.function", "FunctionManager"),
- }
- @lru_cache(maxsize=32)
- def _load_module(self, module_path: str, class_name: str) -> Any:
- """动态加载评估模块(带缓存)"""
- try:
- __import__(module_path)
- return getattr(sys.modules[module_path], class_name)
- except (ImportError, AttributeError) as e:
- self.logger.error(f"模块加载失败: {module_path}.{class_name} - {str(e)}")
- raise
- def _load_configs(self) -> None:
- """加载并合并内置指标和自定义指标配置"""
- # 加载内置指标配置
- if self.config_path and Path(self.config_path).exists():
- try:
- with open(self.config_path, 'r', encoding='utf-8') as f:
- self.metrics_config = yaml.safe_load(f)
- self.logger.info(f"成功加载内置指标配置: {self.config_path}")
- except Exception as e:
- self.logger.error(f"加载内置指标配置失败: {str(e)}")
- self.metrics_config = {}
-
- # 加载自定义指标配置
- if self.custom_config_path and Path(self.custom_config_path).exists():
- try:
- with open(self.custom_config_path, 'r', encoding='utf-8') as f:
- self.custom_metrics_config = yaml.safe_load(f)
- self.logger.info(f"成功加载自定义指标配置: {self.custom_config_path}")
- except Exception as e:
- self.logger.error(f"加载自定义指标配置失败: {str(e)}")
- self.custom_metrics_config = {}
-
- # 合并配置
- self.merged_config = self._merge_configs(self.metrics_config, self.custom_metrics_config)
- def _merge_configs(self, base_config: Dict, custom_config: Dict) -> Dict:
- """
- 合并内置指标和自定义指标配置
-
- 策略:
- 1. 如果自定义指标与内置指标有相同的一级指标,则合并其下的二级指标
- 2. 如果自定义指标与内置指标有相同的二级指标,则合并其下的三级指标
- 3. 如果是全新的指标,则直接添加
- """
- merged = base_config.copy()
-
- for level1_key, level1_value in custom_config.items():
- # 跳过非指标配置项(如vehicle等)
- if not isinstance(level1_value, dict) or 'name' not in level1_value:
- if level1_key not in merged:
- merged[level1_key] = level1_value
- continue
-
- if level1_key not in merged:
- # 全新的一级指标
- merged[level1_key] = level1_value
- else:
- # 合并已存在的一级指标下的内容
- for level2_key, level2_value in level1_value.items():
- if level2_key == 'name' or level2_key == 'priority':
- continue
-
- if isinstance(level2_value, dict):
- if level2_key not in merged[level1_key]:
- # 新的二级指标
- merged[level1_key][level2_key] = level2_value
- else:
- # 合并已存在的二级指标下的内容
- for level3_key, level3_value in level2_value.items():
- if level3_key == 'name' or level3_key == 'priority':
- continue
-
- if isinstance(level3_value, dict):
- if level3_key not in merged[level1_key][level2_key]:
- # 新的三级指标
- merged[level1_key][level2_key][level3_key] = level3_value
-
- return merged
- def _load_custom_metrics(self) -> None:
- """加载自定义指标脚本"""
- if not self.custom_metrics_path or not Path(self.custom_metrics_path).exists():
- return
-
- custom_metrics_dir = Path(self.custom_metrics_path)
- if not custom_metrics_dir.is_dir():
- self.logger.warning(f"自定义指标路径不是目录: {custom_metrics_dir}")
- return
-
- # 遍历自定义指标脚本目录
- for file_path in custom_metrics_dir.glob("*.py"):
- if file_path.name.startswith("metric_") and file_path.name.endswith(".py"):
- try:
- # 解析脚本名称,获取指标层级信息
- parts = file_path.stem[7:].split('_') # 去掉'metric_'前缀
- if len(parts) < 3:
- self.logger.warning(f"自定义指标脚本 {file_path.name} 命名不符合规范,应为 metric_<level1>_<level2>_<level3>.py")
- continue
-
- level1, level2, level3 = parts[0], parts[1], parts[2]
-
- # 检查指标是否在配置中
- if not self._check_metric_in_config(level1, level2, level3, self.custom_metrics_config):
- self.logger.warning(f"自定义指标 {level1}.{level2}.{level3} 在配置中不存在,跳过加载")
- continue
-
- # 加载脚本模块
- module_name = f"custom_metric_{level1}_{level2}_{level3}"
- spec = importlib.util.spec_from_file_location(module_name, file_path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
-
- # 检查模块是否包含必要的函数
- if not hasattr(module, 'evaluate'):
- self.logger.warning(f"自定义指标脚本 {file_path.name} 缺少 evaluate 函数")
- continue
-
- # 存储模块引用
- key = f"{level1}.{level2}.{level3}"
- self.custom_metrics_modules[key] = module
- self.logger.info(f"成功加载自定义指标脚本: {file_path.name}")
-
- except Exception as e:
- self.logger.error(f"加载自定义指标脚本 {file_path.name} 失败: {str(e)}")
- def _check_metric_in_config(self, level1: str, level2: str, level3: str, config: Dict) -> bool:
- """检查指标是否在配置中存在"""
- try:
- return (level1 in config and
- isinstance(config[level1], dict) and
- level2 in config[level1] and
- isinstance(config[level1][level2], dict) and
- level3 in config[level1][level2] and
- isinstance(config[level1][level2][level3], dict))
- except Exception:
- return False
- def parallel_evaluate(self, data: Any) -> Dict[str, Any]:
- """并行化评估引擎(动态线程池)"""
- # 存储所有评估结果
- results = {}
-
- # 1. 先评估内置指标
- self._evaluate_built_in_metrics(data, results)
-
- # 2. 再评估自定义指标并合并结果
- self._evaluate_and_merge_custom_metrics(data, results)
-
- return results
-
- def _evaluate_built_in_metrics(self, data: Any, results: Dict[str, Any]) -> None:
- """评估内置指标"""
- # 关键修改点1:线程数=模块数
- with ThreadPoolExecutor(max_workers=len(self.metric_modules)) as executor:
- # 关键修改点2:按模块名创建future映射
- futures = {
- module_name: executor.submit(
- self._run_module, module, data, module_name
- )
- for module_name, module in self.metric_modules.items()
- }
- # 关键修改点3:按模块顺序处理结果
- for module_name, future in futures.items():
- try:
- from modules.lib.score import Score
- evaluator = Score(self.merged_config, module_name)
- result_module = future.result()
- result = evaluator.evaluate(result_module)
- # results.update(result[module_name])
- results.update(result)
- except Exception as e:
- self.logger.error(
- f"{module_name} 评估失败: {str(e)}",
- exc_info=True,
- extra={"stack": True}, # 记录完整堆栈
- )
- # 错误信息结构化存储
- results[module_name] = {
- "status": "error",
- "message": str(e),
- "timestamp": datetime.now().isoformat(),
- }
-
- def _evaluate_and_merge_custom_metrics(self, data: Any, results: Dict[str, Any]) -> None:
- """评估自定义指标并合并结果"""
- if not self.custom_metrics_modules:
- return
-
- # 按一级指标分组自定义指标
- grouped_metrics = {}
- for metric_key in self.custom_metrics_modules:
- level1 = metric_key.split('.')[0]
- if level1 not in grouped_metrics:
- grouped_metrics[level1] = []
- grouped_metrics[level1].append(metric_key)
-
- # 处理每个一级指标组
- for level1, metric_keys in grouped_metrics.items():
- # 检查是否为内置一级指标
- is_built_in = level1 in self.metrics_config and 'name' in self.metrics_config[level1]
- level1_name = self.merged_config[level1].get('name', level1) if level1 in self.merged_config else level1
-
- # 如果是内置一级指标,将结果合并到已有结果中
- if is_built_in and level1_name in results:
- for metric_key in metric_keys:
- self._evaluate_and_merge_single_metric(data, results, metric_key, level1_name)
- else:
- # 如果是新的一级指标,创建新的结果结构
- if level1_name not in results:
- results[level1_name] = {}
-
- # 评估该一级指标下的所有自定义指标
- for metric_key in metric_keys:
- self._evaluate_and_merge_single_metric(data, results, metric_key, level1_name)
-
- def _evaluate_and_merge_single_metric(self, data: Any, results: Dict[str, Any], metric_key: str, level1_name: str) -> None:
- """评估单个自定义指标并合并结果"""
- try:
- level1, level2, level3 = metric_key.split('.')
- module = self.custom_metrics_modules[metric_key]
-
- # 获取指标配置
- metric_config = self.custom_metrics_config[level1][level2][level3]
-
- # 获取指标名称
- level2_name = self.custom_metrics_config[level1][level2].get('name', level2)
- level3_name = metric_config.get('name', level3)
-
- # 确保结果字典结构存在
- if level2_name not in results[level1_name]:
- results[level1_name][level2_name] = {}
-
- # 调用自定义指标评测函数
- metric_result = module.evaluate(data)
- from modules.lib.score import Score
- evaluator = Score(self.merged_config, level1_name)
-
- result = evaluator.evaluate(metric_result)
-
- results.update(result)
-
-
- self.logger.info(f"评测自定义指标: {level1_name}.{level2_name}.{level3_name}")
-
- except Exception as e:
- self.logger.error(f"评测自定义指标 {metric_key} 失败: {str(e)}")
-
- # 尝试添加错误信息到结果中
- try:
- level1, level2, level3 = metric_key.split('.')
- level2_name = self.custom_metrics_config[level1][level2].get('name', level2)
- level3_name = self.custom_metrics_config[level1][level2][level3].get('name', level3)
-
- if level2_name not in results[level1_name]:
- results[level1_name][level2_name] = {}
-
- results[level1_name][level2_name][level3_name] = {
- "status": "error",
- "message": str(e),
- "timestamp": datetime.now().isoformat(),
- }
- except Exception:
- pass
- def _run_module(
- self, module_class: Any, data: Any, module_name: str
- ) -> Dict[str, Any]:
- """执行单个评估模块(带熔断机制)"""
- try:
- instance = module_class(data)
- return {module_name: instance.report_statistic()}
- except Exception as e:
- self.logger.error(f"{module_name} 执行异常: {str(e)}", stack_info=True)
- return {module_name: {"error": str(e)}}
- class EvaluationPipeline:
- """评估流水线控制器"""
- def __init__(self, configPath: str, logPath: str, dataPath: str, resultPath: str, customMetricsPath: Optional[str] = None, customConfigPath: Optional[str] = None):
- self.configPath = Path(configPath)
- self.custom_config_path = Path(customConfigPath) if customConfigPath else None
- self.data_path = Path(dataPath)
- self.report_path = Path(resultPath)
- self.custom_metrics_path = Path(customMetricsPath) if customMetricsPath else None
-
- # 创建评估引擎实例,传入所有必要参数
- self.engine = EvaluationCore(
- logPath,
- configPath=str(self.configPath),
- customConfigPath=str(self.custom_config_path) if self.custom_config_path else None,
- customMetricsPath=str(self.custom_metrics_path) if self.custom_metrics_path else None
- )
-
- self.data_processor = self._load_data_processor()
- def _load_data_processor(self) -> Any:
- """动态加载数据预处理模块"""
- try:
- from modules.lib import data_process
- return data_process.DataPreprocessing(self.data_path, self.configPath)
- except ImportError as e:
- raise RuntimeError(f"数据处理器加载失败: {str(e)}") from e
- def execute_pipeline(self) -> Dict[str, Any]:
- """端到端执行评估流程"""
- self._validate_case()
- try:
- metric_results = self.engine.parallel_evaluate(self.data_processor)
- report = self._generate_report(
- self.data_processor.case_name, metric_results
- )
- return report
- except Exception as e:
- self.engine.logger.critical(f"流程执行失败: {str(e)}", exc_info=True)
- return {"error": str(e)}
- def _validate_case(self) -> None:
- """用例路径验证"""
- case_path = self.data_path
- if not case_path.exists():
- raise FileNotFoundError(f"用例路径不存在: {case_path}")
- if not case_path.is_dir():
- raise NotADirectoryError(f"无效的用例目录: {case_path}")
- def _generate_report(self, case_name: str, results: Dict) -> Dict:
- """生成评估报告(模板方法模式)"""
- from modules.lib.common import dict2json
- report_path = self.report_path
- report_path.mkdir(parents=True, exist_ok=True, mode=0o777)
- report_file = report_path / f"{case_name}_report.json"
- dict2json(results, report_file)
- self.engine.logger.info(f"评估报告已生成: {report_file}")
- return results
- def main():
- """命令行入口(工厂模式)"""
- parser = argparse.ArgumentParser(
- description="自动驾驶评估系统 V3.0 - 支持动态指标选择和自定义指标",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- # 带帮助说明的参数定义,增加默认值
- parser.add_argument(
- "--logPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/logs/test.log",
- help="日志文件存储路径",
- )
- parser.add_argument(
- "--dataPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/data/zhaoyuan1",
- help="预处理后的输入数据目录",
- )
- parser.add_argument(
- "--configPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/config/metrics_config.yaml",
- help="评估指标配置文件路径",
- )
- parser.add_argument(
- "--reportPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/reports",
- help="评估报告输出目录",
- )
- # 新增自定义指标路径参数(可选)
- parser.add_argument(
- "--customMetricsPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/custom_metrics",
- help="自定义指标脚本目录(可选)",
- )
- # 新增自定义指标路径参数(可选)
- parser.add_argument(
- "--customConfigPath",
- type=str,
- default="d:/Kevin/zhaoyuan/zhaoyuan_new/config/custom_metrics_config.yaml",
- help="自定义指标脚本目录(可选)",
- )
- args = parser.parse_args()
- try:
- pipeline = EvaluationPipeline(
- args.configPath, args.logPath, args.dataPath, args.reportPath, args.customMetricsPath, args.customConfigPath
- )
- start_time = time.perf_counter()
- result = pipeline.execute_pipeline()
- if "error" in result:
- sys.exit(1)
- print(f"评估完成,耗时: {time.perf_counter()-start_time:.2f}s")
- print(f"报告路径: {pipeline.report_path}")
- except KeyboardInterrupt:
- print("\n用户中断操作")
- sys.exit(130)
- except Exception as e:
- print(f"程序执行异常: {str(e)}")
- sys.exit(1)
- if __name__ == "__main__":
- warnings.filterwarnings("ignore")
- main()
|