123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import os
- import sys
- import yaml
- import json
- sys.path.append('/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/')
- from config import config
- class Score:
- def __init__(self, config_path, calculated_metrics=None):
- self.config_path = config_path
- # self.calculated_metrics = calculated_metrics
- self.calculated_metrics = calculated_metrics
- self.safety_config = self.load_config()
- self.level_3_merics = self._extract_level_3_metrics(self.safety_config)
- self.result = {}
-
- def load_config(self):
- with open(self.config_path, 'r') as file:
- return yaml.safe_load(file)
-
- def _extract_level_3_metrics(self, d):
- name = []
- for key, value in d.items():
- if isinstance(value, dict): # 如果值是字典,继续遍历
- self._extract_level_3_metrics(value)
- elif key == 'name': # 找到name键时,将值添加到列表
- name.append(value)
- return name
-
- def is_within_range(self, value, min_val, max_val):
- return min_val <= value <= max_val
-
- def evaluate_level_3(self, metrics):
- result3 = {}
- name = metrics.get('name')
- priority = metrics.get('priority')
- max_val = metrics.get('max')
- min_val = metrics.get('min')
-
- self.level_3_merics.append(name)
- metric_value = self.calculated_metrics.get(name)
- result3[name] = {
- 'result': True,
- 'priority': priority
- }
- if metric_value is None:
- return result3
-
- if not self.is_within_range(metric_value, min_val, max_val) and priority == 0:
- result3[name]['result'] = False
- elif not self.is_within_range(metric_value, min_val, max_val) and priority == 1:
- result3[name]['priority_1_count'] += 1
-
- # Count priority 1 failures and override result if more than 3
-
- priority_1_metrics = [v for v in result3.values() if v['priority'] == 1 and not v['result']]
- if len([m for m in priority_1_metrics if not m['result']]) > 3:
- result3[name]['result'] = False
- # print("------------3-------------")
- # print(result3)
-
- return result3
-
- def evaluate_level_2(self, metrics):
- result2 = {}
- name = metrics.get('name')
- priority = metrics.get('priority')
- result2[name] = {}
-
- for metric, sub_metrics in metrics.items():
- if metric not in ['name', 'priority']:
- result2[name].update(self.evaluate_level_3(sub_metrics))
-
- # Aggregate results for level 2 config.T0 config.T1 config.T2
- priority_0_count = sum(1 for v in result2[name].values() if v['priority'] == 0 and not v['result'])
- priority_1_count = sum(1 for v in result2[name].values() if v['priority'] == 1 and not v['result'])
- priority_2_count = sum(1 for v in result2[name].values() if v['priority'] == 2 and not v['result'])
- if priority_0_count > config.T0:
- result2[name]['result'] = False
-
- elif priority_1_count > config.T1:
- for metric in result2[name].values():
- metric['result'] = False
- elif priority_2_count > config.T2:
- for metric in result2[name].values():
- metric['result'] = False
- else:
- result2[name]['result'] = True # Default to True unless overridden
- result2[name]['priority'] = priority
- result2[name]['priority_0_count'] = priority_0_count
- result2[name]['priority_1_count'] = priority_1_count
- result2[name]['priority_2_count'] = priority_2_count
- # print("------------2-------------")
- # print(result2)
-
- return result2
-
- def evaluate_level_1(self):
- priority_1_count = 0
- name = self.safety_config.get('name')
- priority = self.safety_config.get('priority')
- result1 = {}
- result1[name] = {}
- for metric, metrics in self.safety_config.items():
- if metric not in ['name', 'priority']:
- result1[name].update(self.evaluate_level_2(metrics))
-
- # Aggregate results for level 2 config.T0 config.T1 config.T2
- priority_0_count = sum(1 for v in result1[name].values() if v['priority'] == 0 and not v['result'])
- priority_1_count = sum(1 for v in result1[name].values() if v['priority'] == 1 and not v['result'])
- priority_2_count = sum(1 for v in result1[name].values() if v['priority'] == 2 and not v['result'])
- if priority_0_count > config.T0:
- result1[name]['result'] = False
-
- elif priority_1_count > config.T1:
- for metric in result1[name].values():
- metric['result'] = False
- elif priority_2_count > config.T2:
- for metric in result1[name].values():
- metric['result'] = False
- else:
- result1[name]['result'] = True # Default to True unless overridden
- result1[name]['priority'] = priority
- result1[name]['priority_0_count'] = priority_0_count
- result1[name]['priority_1_count'] = priority_1_count
- result1[name]['priority_2_count'] = priority_2_count
- # print("------------2-------------")
- # print(result1)
-
- return result1
-
- def evaluate(self, calculated_metrics):
- self.calculated_metrics = calculated_metrics
- self.result = self.evaluate_level_1()
- return self.result
-
-
- def main():
- config_path = r'/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/models/safety/safety_config.yaml'
- config_path1 = r'/home/kevin/kevin/zhaoyuan/evaluate_zhaoyuan/models/safety/safety_config.json'
- calculated_metrics = {
- 'TTC': 1.0,
- 'MTTC': 1.0,
- 'THW': 1.0,
- 'LonSD': 50.0,
- 'LatSD': 3.0,
- 'DRAC': 3.0,
- 'BTN': -1000.0,
- 'STN': 0.5,
- 'collisionRisk': 5.0,
- 'collisionSeverity': 2.0,
- }
-
- # evaluator = Score(config_path, calculated_metrics)
- evaluator = Score(config_path)
- result = evaluator.evaluate(calculated_metrics)
- with open(config_path1, 'w') as json_file:
- json.dump(result, json_file, indent=4) # `indent` 参数用于美化输出
- #print(f"Is the overall safety valid? {result['safety_indicator_name']['result'] if 'safety_indicator_name' in result else 'Unknown'}") # Replace 'safety_indicator_name' with actual top-level metric name
-
- if __name__ == '__main__':
- main()
|