# evaluation_engine.py import sys import warnings import time from pathlib import Path import argparse from concurrent.futures import ThreadPoolExecutor from functools import lru_cache from typing import Dict, Any from datetime import datetime # 强制导入所有可能动态加载的模块 # 安全设置根目录路径(动态路径管理) # 判断是否处于编译模式 if hasattr(sys, "_MEIPASS"): # 编译模式下使用临时资源目录 _ROOT_PATH = Path(sys._MEIPASS) else: # 开发模式下使用原工程路径 _ROOT_PATH = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_ROOT_PATH)) print(f"当前根目录:{_ROOT_PATH}") print(f'当前系统路径:{sys.path}') class EvaluationCore: """评估引擎核心类(单例模式)""" _instance = None def __new__(cls, logPath: str): if not cls._instance: cls._instance = super().__new__(cls) cls._instance._init(logPath) return cls._instance def _init(self, logPath: str = None) -> None: """初始化引擎组件""" # configPath: str, logPath: str, dataPath: str, resultPath: str self.log_path = logPath self._init_log_system() self._init_metrics() def _init_log_system(self) -> None: """集中式日志管理""" try: from modules.lib.log_manager import LogManager log_manager = LogManager(self.log_path) self.logger = log_manager.get_logger() except (PermissionError, IOError) as e: sys.stderr.write(f"日志系统初始化失败: {str(e)}\n") sys.exit(1) def _init_metrics(self) -> None: """初始化评估模块(策略模式)""" # from modules.metric import safety, comfort, traffic, efficient, function self.metric_modules = { "safety": self._load_module("modules.metric.safety", "Safe"), "comfort": self._load_module("modules.metric.comfort", "Comfort"), "traffic": self._load_module("modules.metric.traffic", "ViolationManager"), "efficient": self._load_module("modules.metric.efficient", "Efficient"), "function": self._load_module("modules.metric.function", "FunctionManager"), } @lru_cache(maxsize=32) def _load_module(self, module_path: str, class_name: str) -> Any: """动态加载评估模块(带缓存)""" try: __import__(module_path) return getattr(sys.modules[module_path], class_name) except (ImportError, AttributeError) as e: self.logger.error(f"模块加载失败: {module_path}.{class_name} - {str(e)}") raise def parallel_evaluate(self, data: Any) -> Dict[str, Any]: """并行化评估引擎(动态线程池)""" results = {} # 关键修改点1:线程数=模块数 with ThreadPoolExecutor(max_workers=len(self.metric_modules)) as executor: # 关键修改点2:按模块名创建future映射 futures = { module_name: executor.submit( self._run_module, module, data, module_name ) for module_name, module in self.metric_modules.items() } # 关键修改点3:按模块顺序处理结果 for module_name, future in futures.items(): try: result = future.result() results.update(result[module_name]) # 结构化合并结果(保留模块名键) # results[module_name] = result.get(module_name, {}) except Exception as e: self.logger.error( f"{module_name} 评估失败: {str(e)}", exc_info=True, extra={"stack": True}, # 记录完整堆栈 ) # 错误信息结构化存储 results[module_name] = { "status": "error", "message": str(e), "timestamp": datetime.now().isoformat(), } return results def _run_module( self, module_class: Any, data: Any, module_name: str ) -> Dict[str, Any]: """执行单个评估模块(带熔断机制)""" try: instance = module_class(data) return {module_name: instance.report_statistic()} except Exception as e: self.logger.error(f"{module_name} 执行异常: {str(e)}", stack_info=True) return {module_name: {"error": str(e)}} class EvaluationPipeline: """评估流水线控制器""" def __init__(self, configPath: str, logPath: str, dataPath: str, resultPath: str): self.engine = EvaluationCore(logPath) self.configPath = configPath self.data_path = dataPath self.report_path = resultPath # self.case_name = os.path.basename(os.path.dirname(dataPath)) self.data_processor = self._load_data_processor() def _load_data_processor(self) -> Any: """动态加载数据预处理模块""" try: from modules.lib import data_process return data_process.DataPreprocessing(self.data_path, self.configPath) except ImportError as e: raise RuntimeError(f"数据处理器加载失败: {str(e)}") from e def execute_pipeline(self) -> Dict[str, Any]: """端到端执行评估流程""" self._validate_case() try: metric_results = self.engine.parallel_evaluate(self.data_processor) from modules.lib.score import get_overall_result all_result = get_overall_result(metric_results, self.configPath) report = self._generate_report( self.data_processor.case_name, all_result ) return report except Exception as e: self.engine.logger.critical(f"流程执行失败: {str(e)}", exc_info=True) return {"error": str(e)} def _validate_case( self, ) -> None: """用例路径验证""" case_path = self.data_path if not case_path.exists(): raise FileNotFoundError(f"用例路径不存在: {case_path}") if not case_path.is_dir(): raise NotADirectoryError(f"无效的用例目录: {case_path}") def _generate_report(self, case_name: str, results: Dict) -> Dict: """生成评估报告(模板方法模式)""" from modules.lib.common import dict2json report_path = self.report_path report_path.mkdir(parents=True, exist_ok=True, mode=0o777) dict2json(results, report_path / f"{case_name}_report.json") return results def main(): """命令行入口(工厂模式)""" parser = argparse.ArgumentParser( description="自动驾驶评估系统 V2.0", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) # 带帮助说明的参数定义,设置为必传参数 parser.add_argument( "--logPath", type=Path, default="log/runtime.log", help="日志文件存储路径", ) parser.add_argument( "--dataPath", type=Path, default="data/V2V_CSAE53-2020_ForwardCollisionW_LST_01-01", help="预处理后的输入数据目录", ) parser.add_argument( "--configPath", type=Path, default="config/metric_config.yaml", help="评估指标配置文件路径", ) parser.add_argument( "--reportPath", type=Path, default="result", help="评估报告输出目录", ) args = parser.parse_args() try: ############################################## # 新增:动态生成日志路径 ############################################## # 从dataPath提取用例名称 data_name = args.dataPath.name # 获取数据目录名称 log_dir = args.logPath.parent # 保持原日志目录配置 # 创建日志目录(如果不存在) log_dir.mkdir(parents=True, exist_ok=True) # 生成新的日志路径:日志目录/数据名称.log new_log_path = log_dir / f"{data_name}.log" args.logPath = new_log_path # 覆盖原logPath参数 ############################################## pipeline = EvaluationPipeline( args.configPath, args.logPath, args.dataPath, args.reportPath ) start_time = time.perf_counter() result = pipeline.execute_pipeline() if "error" in result: sys.exit(1) print(f"评估完成,耗时: {time.perf_counter()-start_time:.2f}s") print(f"报告路径: {pipeline.report_path}") except KeyboardInterrupt: print("\n用户中断操作") sys.exit(130) if __name__ == "__main__": warnings.filterwarnings("ignore") main()