|
@@ -39,87 +39,98 @@ class ConfigManager:
|
|
|
self.base_config: Dict[str, Any] = {}
|
|
|
self.custom_config: Dict[str, Any] = {}
|
|
|
self.merged_config: Dict[str, Any] = {}
|
|
|
+ self._config_cache = {}
|
|
|
|
|
|
def split_configs(self, all_metrics_path: Path, builtin_metrics_path: Path, custom_metrics_path: Path) -> None:
|
|
|
"""从all_metrics_config.yaml拆分成内置和自定义配置"""
|
|
|
+ # 检查是否已经存在提取的配置文件,如果存在则跳过拆分过程
|
|
|
+ extracted_builtin_path = builtin_metrics_path.parent / f"{builtin_metrics_path.stem}_extracted{builtin_metrics_path.suffix}"
|
|
|
+ if extracted_builtin_path.exists() and custom_metrics_path.exists():
|
|
|
+ self.logger.info(f"使用已存在的拆分配置文件: {extracted_builtin_path}")
|
|
|
+ return
|
|
|
+
|
|
|
try:
|
|
|
- with open(all_metrics_path, 'r', encoding='utf-8') as f:
|
|
|
- all_metrics_dict = yaml.safe_load(f) or {}
|
|
|
- with open(builtin_metrics_path, 'r', encoding='utf-8') as f:
|
|
|
- builtin_metrics_dict = yaml.safe_load(f) or {}
|
|
|
- custom_metrics_dict = self._find_custom_metrics(all_metrics_dict, builtin_metrics_dict)
|
|
|
+ # 使用缓存加载配置文件,避免重复读取
|
|
|
+ all_metrics_dict = self._safe_load_config(all_metrics_path)
|
|
|
+ builtin_metrics_dict = self._safe_load_config(builtin_metrics_path)
|
|
|
+
|
|
|
+ # 递归提取内置和自定义指标
|
|
|
+ extracted_builtin_metrics, custom_metrics_dict = self._split_metrics_recursive(
|
|
|
+ all_metrics_dict, builtin_metrics_dict
|
|
|
+ )
|
|
|
+
|
|
|
+ # 保存提取的内置指标到新文件
|
|
|
+ with open(extracted_builtin_path, 'w', encoding='utf-8') as f:
|
|
|
+ yaml.dump(extracted_builtin_metrics, f, allow_unicode=True, sort_keys=False, indent=2)
|
|
|
+ self.logger.info(f"拆分配置: 提取的内置指标已保存到 {extracted_builtin_path}")
|
|
|
+
|
|
|
if custom_metrics_dict:
|
|
|
with open(custom_metrics_path, 'w', encoding='utf-8') as f:
|
|
|
yaml.dump(custom_metrics_dict, f, allow_unicode=True, sort_keys=False, indent=2)
|
|
|
- self.logger.info(f"Split configs: custom metrics saved to {custom_metrics_path}")
|
|
|
+ self.logger.info(f"拆分配置: 自定义指标已保存到 {custom_metrics_path}")
|
|
|
+
|
|
|
except Exception as err:
|
|
|
- self.logger.error(f"Failed to split configs: {str(err)}")
|
|
|
+ self.logger.error(f"拆分配置失败: {str(err)}")
|
|
|
raise
|
|
|
|
|
|
- def _find_custom_metrics(self, all_metrics, builtin_metrics, current_path=""):
|
|
|
- """递归比较找出自定义指标"""
|
|
|
+ def _split_metrics_recursive(self, all_dict: Dict, builtin_dict: Dict) -> Tuple[Dict, Dict]:
|
|
|
+ """递归拆分内置和自定义指标配置"""
|
|
|
+ extracted_builtin = {}
|
|
|
custom_metrics = {}
|
|
|
|
|
|
- if isinstance(all_metrics, dict) and isinstance(builtin_metrics, dict):
|
|
|
- for key in all_metrics:
|
|
|
- if key not in builtin_metrics:
|
|
|
- custom_metrics[key] = all_metrics[key]
|
|
|
+ for key, value in all_dict.items():
|
|
|
+ if key in builtin_dict:
|
|
|
+ # 如果是字典类型,继续递归
|
|
|
+ if isinstance(value, dict) and isinstance(builtin_dict[key], dict):
|
|
|
+ sub_builtin, sub_custom = self._split_metrics_recursive(value, builtin_dict[key])
|
|
|
+ if sub_builtin:
|
|
|
+ extracted_builtin[key] = sub_builtin
|
|
|
+ if sub_custom:
|
|
|
+ custom_metrics[key] = sub_custom
|
|
|
else:
|
|
|
- child_custom = self._find_custom_metrics(
|
|
|
- all_metrics[key],
|
|
|
- builtin_metrics[key],
|
|
|
- f"{current_path}.{key}" if current_path else key
|
|
|
- )
|
|
|
- if child_custom:
|
|
|
- custom_metrics[key] = child_custom
|
|
|
- elif all_metrics != builtin_metrics:
|
|
|
- return all_metrics
|
|
|
-
|
|
|
- if custom_metrics:
|
|
|
- return self._ensure_structure(custom_metrics, all_metrics, current_path)
|
|
|
- return None
|
|
|
-
|
|
|
- def _ensure_structure(self, metrics_dict, full_dict, path):
|
|
|
- """确保每级包含name和priority"""
|
|
|
- if not isinstance(metrics_dict, dict):
|
|
|
- return metrics_dict
|
|
|
-
|
|
|
- current = full_dict
|
|
|
- for key in path.split('.'):
|
|
|
- if key in current:
|
|
|
- current = current[key]
|
|
|
+ # 如果不是字典类型,直接复制
|
|
|
+ extracted_builtin[key] = value
|
|
|
else:
|
|
|
- break
|
|
|
+ # 如果键不在内置配置中,归类为自定义指标
|
|
|
+ custom_metrics[key] = value
|
|
|
|
|
|
- result = {}
|
|
|
- if isinstance(current, dict):
|
|
|
- if 'name' in current:
|
|
|
- result['name'] = current['name']
|
|
|
- if 'priority' in current:
|
|
|
- result['priority'] = current['priority']
|
|
|
-
|
|
|
- for key, value in metrics_dict.items():
|
|
|
- if key not in ['name', 'priority']:
|
|
|
- result[key] = self._ensure_structure(value, full_dict, f"{path}.{key}" if path else key)
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
+ return extracted_builtin, custom_metrics
|
|
|
+
|
|
|
def load_configs(self, all_config_path: Optional[Path], builtin_metrics_path: Optional[Path], custom_metrics_path: Optional[Path]) -> Dict[str, Any]:
|
|
|
"""加载并合并配置"""
|
|
|
+ # 如果已经加载过配置,直接返回缓存的结果
|
|
|
+ cache_key = f"{all_config_path}_{builtin_metrics_path}_{custom_metrics_path}"
|
|
|
+ if cache_key in self._config_cache:
|
|
|
+ self.logger.info("使用缓存的配置数据")
|
|
|
+ return self._config_cache[cache_key]
|
|
|
+
|
|
|
# 自动拆分配置
|
|
|
+ extracted_builtin_path = None
|
|
|
|
|
|
- if all_config_path.exists():
|
|
|
+ if all_config_path and all_config_path.exists():
|
|
|
+ # 生成提取的内置指标配置文件路径
|
|
|
+ extracted_builtin_path = builtin_metrics_path.parent / f"{builtin_metrics_path.stem}_extracted{builtin_metrics_path.suffix}"
|
|
|
self.split_configs(all_config_path, builtin_metrics_path, custom_metrics_path)
|
|
|
|
|
|
- self.base_config = self._safe_load_config(builtin_metrics_path) if builtin_metrics_path else {}
|
|
|
+ # 优先使用提取的内置指标配置
|
|
|
+ if extracted_builtin_path and extracted_builtin_path.exists():
|
|
|
+ self.base_config = self._safe_load_config(extracted_builtin_path)
|
|
|
+ else:
|
|
|
+ self.base_config = self._safe_load_config(builtin_metrics_path) if builtin_metrics_path else {}
|
|
|
+
|
|
|
self.custom_config = self._safe_load_config(custom_metrics_path) if custom_metrics_path else {}
|
|
|
- self.merged_config = self._merge_configs(self.base_config, self.custom_config)
|
|
|
- return self.merged_config
|
|
|
-
|
|
|
+ if all_config_path and all_config_path.exists():
|
|
|
+ self.merged_config = self._safe_load_config(all_config_path)
|
|
|
+ # 缓存配置结果
|
|
|
+ self._config_cache[cache_key] = self.merged_config
|
|
|
+ return self.merged_config
|
|
|
+ return {}
|
|
|
+
|
|
|
+ @lru_cache(maxsize=16)
|
|
|
def _safe_load_config(self, config_path: Path) -> Dict[str, Any]:
|
|
|
- """安全加载YAML配置"""
|
|
|
+ """安全加载YAML配置,使用lru_cache减少重复读取"""
|
|
|
try:
|
|
|
- if not config_path.exists():
|
|
|
+ if not config_path or not config_path.exists():
|
|
|
self.logger.warning(f"Config file not found: {config_path}")
|
|
|
return {}
|
|
|
with config_path.open('r', encoding='utf-8') as f:
|
|
@@ -130,32 +141,6 @@ class ConfigManager:
|
|
|
self.logger.error(f"Failed to load config {config_path}: {str(err)}")
|
|
|
return {}
|
|
|
|
|
|
- def _merge_configs(self, builtin_config: Dict, custom_config: Dict) -> Dict:
|
|
|
- """智能合并配置"""
|
|
|
- merged_config = builtin_config.copy()
|
|
|
- for level1_key, level1_value in custom_config.items():
|
|
|
- if not isinstance(level1_value, dict) or 'name' not in level1_value:
|
|
|
- if level1_key not in merged_config:
|
|
|
- merged_config[level1_key] = level1_value
|
|
|
- continue
|
|
|
- if level1_key not in merged_config:
|
|
|
- merged_config[level1_key] = level1_value
|
|
|
- else:
|
|
|
- for level2_key, level2_value in level1_value.items():
|
|
|
- if level2_key in ['name', 'priority']:
|
|
|
- continue
|
|
|
- if isinstance(level2_value, dict):
|
|
|
- if level2_key not in merged_config[level1_key]:
|
|
|
- merged_config[level1_key][level2_key] = level2_value
|
|
|
- else:
|
|
|
- for level3_key, level3_value in level2_value.items():
|
|
|
- if level3_key in ['name', 'priority']:
|
|
|
- continue
|
|
|
- if isinstance(level3_value, dict):
|
|
|
- if level3_key not in merged_config[level1_key][level2_key]:
|
|
|
- merged_config[level1_key][level2_key][level3_key] = level3_value
|
|
|
- return merged_config
|
|
|
-
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
|
return self.merged_config
|
|
|
|
|
@@ -208,6 +193,16 @@ class MetricLoader:
|
|
|
self.logger.info("No custom metrics path or path not exists")
|
|
|
return {}
|
|
|
|
|
|
+ # 检查是否有新的自定义指标文件
|
|
|
+ current_files = set(f.name for f in custom_metrics_path.glob(CUSTOM_METRIC_FILE_PATTERN)
|
|
|
+ if f.name.startswith(CUSTOM_METRIC_PREFIX))
|
|
|
+ loaded_files = set(self.custom_metric_modules.keys())
|
|
|
+
|
|
|
+ # 如果没有新文件且已有加载的模块,直接返回
|
|
|
+ if self.custom_metric_modules and not (current_files - loaded_files):
|
|
|
+ self.logger.info(f"No new custom metrics to load, using {len(self.custom_metric_modules)} cached modules")
|
|
|
+ return self.custom_metric_modules
|
|
|
+
|
|
|
loaded_count = 0
|
|
|
for py_file in custom_metrics_path.glob(CUSTOM_METRIC_FILE_PATTERN):
|
|
|
if py_file.name.startswith(CUSTOM_METRIC_PREFIX):
|
|
@@ -311,12 +306,28 @@ class EvaluationEngine:
|
|
|
metric_modules = self.metric_loader.get_builtin_metrics()
|
|
|
raw_results: Dict[str, Any] = {}
|
|
|
|
|
|
- with ThreadPoolExecutor(max_workers=len(metric_modules)) as executor:
|
|
|
+ # 获取配置中实际存在的指标
|
|
|
+ config = self.config_manager.get_config()
|
|
|
+ available_metrics = {
|
|
|
+ metric_name for metric_name in metric_modules.keys()
|
|
|
+ if metric_name in config and isinstance(config[metric_name], dict)
|
|
|
+ }
|
|
|
+
|
|
|
+ # 只处理配置中存在的指标
|
|
|
+ filtered_modules = {
|
|
|
+ name: module for name, module in metric_modules.items()
|
|
|
+ if name in available_metrics
|
|
|
+ }
|
|
|
+
|
|
|
+ # 优化线程池大小,避免创建过多线程
|
|
|
+ max_workers = min(len(filtered_modules), DEFAULT_WORKERS)
|
|
|
+
|
|
|
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
futures = {
|
|
|
executor.submit(self._run_module, module, data, module_name): module_name
|
|
|
- for module_name, module in metric_modules.items()
|
|
|
+ for module_name, module in filtered_modules.items()
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for future in futures:
|
|
|
module_name = futures[future]
|
|
|
try:
|
|
@@ -343,42 +354,55 @@ class EvaluationEngine:
|
|
|
|
|
|
custom_results = {}
|
|
|
|
|
|
- for metric_key, metric_info in custom_metrics.items():
|
|
|
- try:
|
|
|
- level1, level2, level3 = metric_key.split('.')
|
|
|
-
|
|
|
- if metric_info['type'] == 'class':
|
|
|
- metric_class = metric_info['class']
|
|
|
- metric_instance = metric_class(data)
|
|
|
- metric_result = metric_instance.calculate()
|
|
|
- else:
|
|
|
- module = metric_info['module']
|
|
|
- metric_result = module.evaluate(data)
|
|
|
-
|
|
|
- if level1 not in custom_results:
|
|
|
- custom_results[level1] = {}
|
|
|
- custom_results[level1] = metric_result
|
|
|
-
|
|
|
- self.logger.info(f"Calculated custom metric: {level1}.{level2}.{level3}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- self.logger.error(f"Custom metric {metric_key} failed: {str(e)}")
|
|
|
-
|
|
|
+ # 使用线程池并行处理自定义指标
|
|
|
+ max_workers = min(len(custom_metrics), DEFAULT_WORKERS)
|
|
|
+
|
|
|
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
+ futures = {}
|
|
|
+
|
|
|
+ # 提交所有自定义指标任务
|
|
|
+ for metric_key, metric_info in custom_metrics.items():
|
|
|
+ futures[executor.submit(self._run_custom_metric, metric_key, metric_info, data)] = metric_key
|
|
|
+
|
|
|
+ # 收集结果
|
|
|
+ for future in futures:
|
|
|
+ metric_key = futures[future]
|
|
|
try:
|
|
|
- level1, level2, level3 = metric_key.split('.')
|
|
|
-
|
|
|
- if level1 not in custom_results:
|
|
|
- custom_results[level1] = {}
|
|
|
-
|
|
|
- custom_results[level1] = {
|
|
|
- "status": "error",
|
|
|
- "message": str(e),
|
|
|
- "timestamp": datetime.now().isoformat(),
|
|
|
- }
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
+ level1, result = future.result()
|
|
|
+ if level1:
|
|
|
+ custom_results[level1] = result
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"Custom metric {metric_key} execution failed: {str(e)}")
|
|
|
|
|
|
return custom_results
|
|
|
+
|
|
|
+ def _run_custom_metric(self, metric_key: str, metric_info: Dict, data: Any) -> Tuple[str, Dict]:
|
|
|
+ """执行单个自定义指标"""
|
|
|
+ try:
|
|
|
+ level1, level2, level3 = metric_key.split('.')
|
|
|
+
|
|
|
+ if metric_info['type'] == 'class':
|
|
|
+ metric_class = metric_info['class']
|
|
|
+ metric_instance = metric_class(data)
|
|
|
+ metric_result = metric_instance.calculate()
|
|
|
+ else:
|
|
|
+ module = metric_info['module']
|
|
|
+ metric_result = module.evaluate(data)
|
|
|
+
|
|
|
+ self.logger.info(f"Calculated custom metric: {level1}.{level2}.{level3}")
|
|
|
+ return level1, metric_result
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"Custom metric {metric_key} failed: {str(e)}")
|
|
|
+ try:
|
|
|
+ level1 = metric_key.split('.')[0]
|
|
|
+ return level1, {
|
|
|
+ "status": "error",
|
|
|
+ "message": str(e),
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ }
|
|
|
+ except Exception:
|
|
|
+ return "", {}
|
|
|
|
|
|
def _process_merged_results(self, raw_results: Dict, custom_results: Dict) -> Dict:
|
|
|
"""处理合并后的评估结果"""
|
|
@@ -454,14 +478,25 @@ class DataProcessor:
|
|
|
self.logger = logger
|
|
|
self.data_path = data_path
|
|
|
self.config_path = config_path
|
|
|
- self.processor = self._load_processor()
|
|
|
self.case_name = self.data_path.name
|
|
|
+ self._processor = None
|
|
|
+
|
|
|
+ @property
|
|
|
+ def processor(self) -> Any:
|
|
|
+ """懒加载数据处理器,只在首次访问时创建"""
|
|
|
+ if self._processor is None:
|
|
|
+ self._processor = self._load_processor()
|
|
|
+ return self._processor
|
|
|
|
|
|
def _load_processor(self) -> Any:
|
|
|
"""加载数据处理器"""
|
|
|
try:
|
|
|
+ start_time = time.perf_counter()
|
|
|
from modules.lib import data_process
|
|
|
- return data_process.DataPreprocessing(self.data_path, self.config_path)
|
|
|
+ processor = data_process.DataPreprocessing(self.data_path, self.config_path)
|
|
|
+ elapsed_time = time.perf_counter() - start_time
|
|
|
+ self.logger.info(f"Data processor loaded in {elapsed_time:.2f}s")
|
|
|
+ return processor
|
|
|
except ImportError as e:
|
|
|
self.logger.error(f"Failed to load data processor: {str(e)}")
|
|
|
raise RuntimeError(f"Failed to load data processor: {str(e)}") from e
|
|
@@ -505,15 +540,26 @@ class EvaluationPipeline:
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
|
"""执行评估流水线"""
|
|
|
try:
|
|
|
+ # 只在首次运行时验证数据路径
|
|
|
self.data_processor.validate()
|
|
|
|
|
|
self.logger.info(f"Start evaluation: {self.data_path.name}")
|
|
|
start_time = time.perf_counter()
|
|
|
+
|
|
|
+ # 性能分析日志
|
|
|
+ config_start = time.perf_counter()
|
|
|
results = self.evaluation_engine.evaluate(self.data_processor.processor)
|
|
|
- elapsed_time = time.perf_counter() - start_time
|
|
|
- self.logger.info(f"Evaluation completed, time: {elapsed_time:.2f}s")
|
|
|
+ eval_time = time.perf_counter() - config_start
|
|
|
|
|
|
+ # 生成报告
|
|
|
+ report_start = time.perf_counter()
|
|
|
report = self._generate_report(self.data_processor.case_name, results)
|
|
|
+ report_time = time.perf_counter() - report_start
|
|
|
+
|
|
|
+ # 总耗时
|
|
|
+ elapsed_time = time.perf_counter() - start_time
|
|
|
+ self.logger.info(f"Evaluation completed, time: {elapsed_time:.2f}s (评估: {eval_time:.2f}s, 报告: {report_time:.2f}s)")
|
|
|
+
|
|
|
return report
|
|
|
|
|
|
except Exception as e:
|
|
@@ -532,32 +578,32 @@ class EvaluationPipeline:
|
|
|
# 初始化计数器
|
|
|
counters = {'p0': 0, 'p1': 0, 'p2': 0}
|
|
|
|
|
|
- # 遍历报告中的所有键,包括内置和自定义一级指标
|
|
|
- for category, category_data in report.items():
|
|
|
- # 跳过非指标键(如metadata等)
|
|
|
- if not isinstance(category_data, dict) or category == "metadata":
|
|
|
- continue
|
|
|
-
|
|
|
- # 如果该维度的结果为False,根据其priority增加对应计数
|
|
|
- if not category_data.get('result', True):
|
|
|
- priority = category_data.get('priority')
|
|
|
- if priority == 0:
|
|
|
- counters['p0'] += 1
|
|
|
- elif priority == 1:
|
|
|
- counters['p1'] += 1
|
|
|
- elif priority == 2:
|
|
|
- counters['p2'] += 1
|
|
|
+ # 优化:一次性收集所有失败的指标
|
|
|
+ failed_categories = [
|
|
|
+ (category, category_data.get('priority'))
|
|
|
+ for category, category_data in report.items()
|
|
|
+ if isinstance(category_data, dict) and category != "metadata" and not category_data.get('result', True)
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 计数
|
|
|
+ for _, priority in failed_categories:
|
|
|
+ if priority == 0:
|
|
|
+ counters['p0'] += 1
|
|
|
+ elif priority == 1:
|
|
|
+ counters['p1'] += 1
|
|
|
+ elif priority == 2:
|
|
|
+ counters['p2'] += 1
|
|
|
|
|
|
# 阈值判断逻辑
|
|
|
- thresholds_exceeded = (
|
|
|
- counters['p0'] > thresholds['T0'],
|
|
|
- counters['p1'] > thresholds['T1'],
|
|
|
+ overall_result = not (
|
|
|
+ counters['p0'] > thresholds['T0'] or
|
|
|
+ counters['p1'] > thresholds['T1'] or
|
|
|
counters['p2'] > thresholds['T2']
|
|
|
)
|
|
|
|
|
|
# 生成处理后的报告
|
|
|
processed_report = report.copy()
|
|
|
- processed_report['overall_result'] = not any(thresholds_exceeded)
|
|
|
+ processed_report['overall_result'] = overall_result
|
|
|
|
|
|
# 添加统计信息
|
|
|
processed_report['threshold_checks'] = {
|
|
@@ -567,7 +613,7 @@ class EvaluationPipeline:
|
|
|
'actual_counts': counters
|
|
|
}
|
|
|
|
|
|
- self.logger.info(f"Added overall result: {processed_report['overall_result']}")
|
|
|
+ self.logger.info(f"Added overall result: {overall_result}")
|
|
|
return processed_report
|
|
|
|
|
|
def _generate_report(self, case_name: str, results: Dict[str, Any]) -> Dict[str, Any]:
|
|
@@ -579,7 +625,7 @@ class EvaluationPipeline:
|
|
|
results["metadata"] = {
|
|
|
"case_name": case_name,
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
- "version": "3.1.0",
|
|
|
+ "version": "1.0",
|
|
|
}
|
|
|
|
|
|
# 添加总体结果评估
|
|
@@ -598,50 +644,58 @@ def main():
|
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
|
)
|
|
|
|
|
|
- parser.add_argument(
|
|
|
- "--logPath",
|
|
|
- type=str,
|
|
|
- default="test.log",
|
|
|
- help="Log file path",
|
|
|
- )
|
|
|
+ # 必要参数
|
|
|
parser.add_argument(
|
|
|
"--dataPath",
|
|
|
type=str,
|
|
|
- default=r"D:\Cicv\招远\AD_GBT41798-2022_TrafficSignalRecognitionAndResponse_LST_01",
|
|
|
+ default=r"D:\Kevin\zhaoyuan\data\V2V_CSAE53-2020_ForwardCollision_LST_01-02",
|
|
|
help="Input data directory",
|
|
|
)
|
|
|
|
|
|
- parser.add_argument(
|
|
|
+ # 配置参数
|
|
|
+ config_group = parser.add_argument_group('Configuration')
|
|
|
+ config_group.add_argument(
|
|
|
"--allConfigPath",
|
|
|
type=str,
|
|
|
- default=r"D:\Cicv\招远\zhaoyuan\config\all_metrics_config.yaml",
|
|
|
+ default=r"config/all_metrics_config.yaml",
|
|
|
help="Full metrics config file path (built-in + custom)",
|
|
|
)
|
|
|
-
|
|
|
- parser.add_argument(
|
|
|
+ config_group.add_argument(
|
|
|
"--baseConfigPath",
|
|
|
type=str,
|
|
|
- default=r"D:\Cicv\招远\zhaoyuan\config\builtin_metrics_config.yaml",
|
|
|
+ default=r"config/builtin_metrics_config.yaml",
|
|
|
help="Built-in metrics config file path",
|
|
|
)
|
|
|
- parser.add_argument(
|
|
|
+ config_group.add_argument(
|
|
|
+ "--customConfigPath",
|
|
|
+ type=str,
|
|
|
+ default=r"config/custom_metrics_config.yaml",
|
|
|
+ help="Custom metrics config path (optional)",
|
|
|
+ )
|
|
|
+
|
|
|
+ # 输出参数
|
|
|
+ output_group = parser.add_argument_group('Output')
|
|
|
+ output_group.add_argument(
|
|
|
+ "--logPath",
|
|
|
+ type=str,
|
|
|
+ default="test.log",
|
|
|
+ help="Log file path",
|
|
|
+ )
|
|
|
+ output_group.add_argument(
|
|
|
"--reportPath",
|
|
|
type=str,
|
|
|
default="reports",
|
|
|
help="Output report directory",
|
|
|
)
|
|
|
- parser.add_argument(
|
|
|
+
|
|
|
+ # 扩展参数
|
|
|
+ ext_group = parser.add_argument_group('Extensions')
|
|
|
+ ext_group.add_argument(
|
|
|
"--customMetricsPath",
|
|
|
type=str,
|
|
|
default="custom_metrics",
|
|
|
help="Custom metrics scripts directory (optional)",
|
|
|
)
|
|
|
- parser.add_argument(
|
|
|
- "--customConfigPath",
|
|
|
- type=str,
|
|
|
- default=r"D:\Cicv\招远\zhaoyuan\config\custom_metrics_config.yaml",
|
|
|
- help="Custom metrics config path (optional)",
|
|
|
- )
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|