123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- # evaluation_engine.py
- import sys
- import warnings
- import time
- from pathlib import Path
- import argparse
- from concurrent.futures import ThreadPoolExecutor
- from functools import lru_cache
- from typing import Dict, Any
- from datetime import datetime
- # 强制导入所有可能动态加载的模块
- # 安全设置根目录路径(动态路径管理)
- # 判断是否处于编译模式
- if hasattr(sys, "_MEIPASS"):
- # 编译模式下使用临时资源目录
- _ROOT_PATH = Path(sys._MEIPASS)
- else:
- # 开发模式下使用原工程路径
- _ROOT_PATH = Path(__file__).resolve().parent.parent
- sys.path.insert(0, str(_ROOT_PATH))
- print(f"当前根目录:{_ROOT_PATH}")
- print(f'当前系统路径:{sys.path}')
- class EvaluationCore:
- """评估引擎核心类(单例模式)"""
- _instance = None
- def __new__(cls, logPath: str):
- if not cls._instance:
- cls._instance = super().__new__(cls)
- cls._instance._init(logPath)
- return cls._instance
- def _init(self, logPath: str = None) -> None:
- """初始化引擎组件"""
- # configPath: str, logPath: str, dataPath: str, resultPath: str
- self.log_path = logPath
- self._init_log_system()
- self._init_metrics()
- def _init_log_system(self) -> None:
- """集中式日志管理"""
- try:
- from modules.lib.log_manager import LogManager
- log_manager = LogManager(self.log_path)
- self.logger = log_manager.get_logger()
- except (PermissionError, IOError) as e:
- sys.stderr.write(f"日志系统初始化失败: {str(e)}\n")
- sys.exit(1)
- def _init_metrics(self) -> None:
- """初始化评估模块(策略模式)"""
- # from modules.metric import safety, comfort, traffic, efficient, function
- self.metric_modules = {
- "safety": self._load_module("modules.metric.safety", "Safe"),
- "comfort": self._load_module("modules.metric.comfort", "Comfort"),
- "traffic": self._load_module("modules.metric.traffic", "ViolationManager"),
- "efficient": self._load_module("modules.metric.efficient", "Efficient"),
- "function": self._load_module("modules.metric.function", "FunctionManager"),
- }
- @lru_cache(maxsize=32)
- def _load_module(self, module_path: str, class_name: str) -> Any:
- """动态加载评估模块(带缓存)"""
- try:
- __import__(module_path)
- return getattr(sys.modules[module_path], class_name)
- except (ImportError, AttributeError) as e:
- self.logger.error(f"模块加载失败: {module_path}.{class_name} - {str(e)}")
- raise
- def parallel_evaluate(self, data: Any) -> Dict[str, Any]:
- """并行化评估引擎(动态线程池)"""
- results = {}
- # 关键修改点1:线程数=模块数
- with ThreadPoolExecutor(max_workers=len(self.metric_modules)) as executor:
- # 关键修改点2:按模块名创建future映射
- futures = {
- module_name: executor.submit(
- self._run_module, module, data, module_name
- )
- for module_name, module in self.metric_modules.items()
- }
- # 关键修改点3:按模块顺序处理结果
- for module_name, future in futures.items():
- try:
- result = future.result()
- results.update(result[module_name])
- # 结构化合并结果(保留模块名键)
- # results[module_name] = result.get(module_name, {})
- except Exception as e:
- self.logger.error(
- f"{module_name} 评估失败: {str(e)}",
- exc_info=True,
- extra={"stack": True}, # 记录完整堆栈
- )
- # 错误信息结构化存储
- results[module_name] = {
- "status": "error",
- "message": str(e),
- "timestamp": datetime.now().isoformat(),
- }
- return results
- def _run_module(
- self, module_class: Any, data: Any, module_name: str
- ) -> Dict[str, Any]:
- """执行单个评估模块(带熔断机制)"""
- try:
- instance = module_class(data)
- return {module_name: instance.report_statistic()}
- except Exception as e:
- self.logger.error(f"{module_name} 执行异常: {str(e)}", stack_info=True)
- return {module_name: {"error": str(e)}}
- class EvaluationPipeline:
- """评估流水线控制器"""
- def __init__(self, configPath: str, logPath: str, dataPath: str, resultPath: str):
- self.engine = EvaluationCore(logPath)
- self.configPath = configPath
- self.data_path = dataPath
- self.report_path = resultPath
- # self.case_name = os.path.basename(os.path.dirname(dataPath))
- self.data_processor = self._load_data_processor()
- def _load_data_processor(self) -> Any:
- """动态加载数据预处理模块"""
- try:
- from modules.lib import data_process
- return data_process.DataPreprocessing(self.data_path, self.configPath)
- except ImportError as e:
- raise RuntimeError(f"数据处理器加载失败: {str(e)}") from e
- def execute_pipeline(self) -> Dict[str, Any]:
- """端到端执行评估流程"""
- self._validate_case()
- try:
- metric_results = self.engine.parallel_evaluate(self.data_processor)
- from modules.lib.score import get_overall_result
- all_result = get_overall_result(metric_results, self.configPath)
- report = self._generate_report(
- self.data_processor.case_name, all_result
- )
- return report
- except Exception as e:
- self.engine.logger.critical(f"流程执行失败: {str(e)}", exc_info=True)
- return {"error": str(e)}
- def _validate_case(
- self,
- ) -> None:
- """用例路径验证"""
- case_path = self.data_path
- if not case_path.exists():
- raise FileNotFoundError(f"用例路径不存在: {case_path}")
- if not case_path.is_dir():
- raise NotADirectoryError(f"无效的用例目录: {case_path}")
- def _generate_report(self, case_name: str, results: Dict) -> Dict:
- """生成评估报告(模板方法模式)"""
- from modules.lib.common import dict2json
- report_path = self.report_path
- report_path.mkdir(parents=True, exist_ok=True, mode=0o777)
- dict2json(results, report_path / f"{case_name}_report.json")
- return results
- def main():
- """命令行入口(工厂模式)"""
- parser = argparse.ArgumentParser(
- description="自动驾驶评估系统 V2.0",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- # 带帮助说明的参数定义,设置为必传参数
- parser.add_argument(
- "--logPath",
- type=Path,
- default="log/runtime.log",
- help="日志文件存储路径",
- )
- parser.add_argument(
- "--dataPath",
- type=Path,
- default="/home/kevin/kevin/zhaoyuan/sqlite3_demo/docker_build/preprocess_run/data/V2V_CSAE53-2020_ForwardCollisionW_LST_01-01",
- help="预处理后的输入数据目录",
- )
- parser.add_argument(
- "--configPath",
- type=Path,
- default="config/metric_config.yaml",
- help="评估指标配置文件路径",
- )
- parser.add_argument(
- "--reportPath",
- type=Path,
- default="result",
- help="评估报告输出目录",
- )
- args = parser.parse_args()
- try:
- ##############################################
- # 新增:动态生成日志路径
- ##############################################
- # 从dataPath提取用例名称
- data_name = args.dataPath.name # 获取数据目录名称
- log_dir = args.logPath.parent # 保持原日志目录配置
-
- # 创建日志目录(如果不存在)
- log_dir.mkdir(parents=True, exist_ok=True)
-
- # 生成新的日志路径:日志目录/数据名称.log
- new_log_path = log_dir / f"{data_name}.log"
- args.logPath = new_log_path # 覆盖原logPath参数
- ##############################################
- pipeline = EvaluationPipeline(
- args.configPath, args.logPath, args.dataPath, args.reportPath
- )
- start_time = time.perf_counter()
- result = pipeline.execute_pipeline()
- if "error" in result:
- sys.exit(1)
- print(f"评估完成,耗时: {time.perf_counter()-start_time:.2f}s")
- print(f"报告路径: {pipeline.report_path}")
- except KeyboardInterrupt:
- print("\n用户中断操作")
- sys.exit(130)
- if __name__ == "__main__":
- warnings.filterwarnings("ignore")
- main()
|