import os import sys import yaml import json sys.path.append('/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/') from config import config class Score: def __init__(self, config_path, calculated_metrics=None): self.config_path = config_path # self.calculated_metrics = calculated_metrics self.calculated_metrics = calculated_metrics self.safety_config = self.load_config() self.level_3_merics = self._extract_level_3_metrics(self.safety_config) self.result = {} def load_config(self): with open(self.config_path, 'r') as file: return yaml.safe_load(file) def _extract_level_3_metrics(self, d): name = [] for key, value in d.items(): if isinstance(value, dict): # 如果值是字典,继续遍历 self._extract_level_3_metrics(value) elif key == 'name': # 找到name键时,将值添加到列表 name.append(value) return name def is_within_range(self, value, min_val, max_val): return min_val <= value <= max_val def evaluate_level_3(self, metrics): result3 = {} name = metrics.get('name') priority = metrics.get('priority') max_val = metrics.get('max') min_val = metrics.get('min') self.level_3_merics.append(name) metric_value = self.calculated_metrics.get(name) result3[name] = { 'result': True, 'priority': priority } if metric_value is None: return result3 if not self.is_within_range(metric_value, min_val, max_val) and priority == 0: result3[name]['result'] = False elif not self.is_within_range(metric_value, min_val, max_val) and priority == 1: result3[name]['priority_1_count'] += 1 # Count priority 1 failures and override result if more than 3 priority_1_metrics = [v for v in result3.values() if v['priority'] == 1 and not v['result']] if len([m for m in priority_1_metrics if not m['result']]) > 3: result3[name]['result'] = False # print("------------3-------------") # print(result3) return result3 def evaluate_level_2(self, metrics): result2 = {} name = metrics.get('name') priority = metrics.get('priority') result2[name] = {} for metric, sub_metrics in metrics.items(): if metric not in ['name', 'priority']: result2[name].update(self.evaluate_level_3(sub_metrics)) # Aggregate results for level 2 config.T0 config.T1 config.T2 priority_0_count = sum(1 for v in result2[name].values() if v['priority'] == 0 and not v['result']) priority_1_count = sum(1 for v in result2[name].values() if v['priority'] == 1 and not v['result']) priority_2_count = sum(1 for v in result2[name].values() if v['priority'] == 2 and not v['result']) if priority_0_count > config.T0: result2[name]['result'] = False elif priority_1_count > config.T1: for metric in result2[name].values(): metric['result'] = False elif priority_2_count > config.T2: for metric in result2[name].values(): metric['result'] = False else: result2[name]['result'] = True # Default to True unless overridden result2[name]['priority'] = priority result2[name]['priority_0_count'] = priority_0_count result2[name]['priority_1_count'] = priority_1_count result2[name]['priority_2_count'] = priority_2_count # print("------------2-------------") # print(result2) return result2 def evaluate_level_1(self): priority_1_count = 0 name = self.safety_config.get('name') priority = self.safety_config.get('priority') result1 = {} result1[name] = {} for metric, metrics in self.safety_config.items(): if metric not in ['name', 'priority']: result1[name].update(self.evaluate_level_2(metrics)) # Aggregate results for level 2 config.T0 config.T1 config.T2 priority_0_count = sum(1 for v in result1[name].values() if v['priority'] == 0 and not v['result']) priority_1_count = sum(1 for v in result1[name].values() if v['priority'] == 1 and not v['result']) priority_2_count = sum(1 for v in result1[name].values() if v['priority'] == 2 and not v['result']) if priority_0_count > config.T0: result1[name]['result'] = False elif priority_1_count > config.T1: for metric in result1[name].values(): metric['result'] = False elif priority_2_count > config.T2: for metric in result1[name].values(): metric['result'] = False else: result1[name]['result'] = True # Default to True unless overridden result1[name]['priority'] = priority result1[name]['priority_0_count'] = priority_0_count result1[name]['priority_1_count'] = priority_1_count result1[name]['priority_2_count'] = priority_2_count # print("------------2-------------") # print(result1) return result1 def evaluate(self, calculated_metrics): self.calculated_metrics = calculated_metrics self.result = self.evaluate_level_1() return self.result def main(): config_path = r'/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/models/safety/safety_config.yaml' config_path1 = r'/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/models/safety/safety_config.json' calculated_metrics = { 'TTC': 1.0, 'MTTC': 1.0, 'THW': 1.0, 'LonSD': 50.0, 'LatSD': 3.0, 'DRAC': 3.0, 'BTN': -1000.0, 'STN': 0.5, 'collisionRisk': 5.0, 'collisionSeverity': 2.0, } # evaluator = Score(config_path, calculated_metrics) evaluator = Score(config_path) result = evaluator.evaluate(calculated_metrics) with open(config_path1, 'w') as json_file: json.dump(result, json_file, indent=4) # `indent` 参数用于美化输出 #print(f"Is the overall safety valid? {result['safety_indicator_name']['result'] if 'safety_indicator_name' in result else 'Unknown'}") # Replace 'safety_indicator_name' with actual top-level metric name if __name__ == '__main__': main()