123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/11/02
- @Last Modified: 2023/11/02
- @Summary: This module provides the function to parse the config json file.
- """
- import pandas as pd
- import numpy as np
- from common import json2dict
- from collections import OrderedDict
- class ConfigParse(object):
- """
- """
- def __init__(self, json_file):
- # weight info
- self.scoreModel = ""
- self.dimension_weight = {}
- self.dimension_list = []
- self.dimension_name = {}
- self.type_weight = {}
- self.type_list = []
- self.type_name = {}
- self.metric_list = []
- self.metric_dict = {}
- self.name_dict = {}
- self.unit_dict = {}
- self.builtinDimensionList = ["safe", "function", "compliance", "comfort", "efficient"]
- self.builtinTypeDict = {
- "safe": ['safeTime', 'safeDistance', 'safeAcceleration', 'safeProbability'],
- # "function": ['functionACC', 'functionLKA'],
- "function": ['function_ACC'],
- "compliance": ['function_ACC', 'functionLKA', 'safeAcceleration', 'safeProbability'],
- "comfort": ['comfortLat', 'comfortLon'],
- "efficient": ['efficientDrive', 'efficientStop']
- }
- self.builtinMetricList = ["TTC", "MTTC", "THW", "LatSD", "LonSD", "DRAC", "BTN", "STN", "collisionRisk",
- "collisionSeverity", "followSpeedDeviation", "followDistanceDeviation",
- "followStopDistance", "followResponseTime", 'laneDistance',
- 'centerDistanceExpectation', 'centerDistanceStandardDeviation',
- 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency',
- 'centerDistanceRange',
- "zigzag", "shake", "cadence", "slamBrake", "slamAccelerate",
- "pressSolidLine", "runRedLight", "overspeed20_50", "overspeed50", "overspeed10",
- "overspeed10_20",
- "averageSpeed", "stopDuration", "stopCount"]
- # score info
- self.config = {}
- # initialization
- self.config_dict = json2dict(json_file)
- self.parse_dict = self._config_parse(self.config_dict)
- def _config_parse(self, config_dict):
- # score
- if 'scoreModel' in list(config_dict.keys()):
- self.scoreModel = config_dict['scoreModel']
- else:
- self.scoreModel = "builtin"
- # dimension info
- dimensionWeight = config_dict['dimensionWeight']
- self.dimension_weight = dimensionWeight
- self.dimension_name = config_dict['dimensionName']
- self.dimension_list = list(self.dimension_weight.keys())
- # type info
- typeWeight = config_dict['typeWeight']
- self.type_weight = typeWeight
- self.type_name = config_dict["typeName"]
- for dimension in self.dimension_list:
- if dimension == 'compliance':
- self.config[dimension] = self._compliance_config_parse(config_dict, typeWeight, dimensionWeight)
- else:
- self.config[dimension] = self._dimension_config_parse(dimension, config_dict, typeWeight,
- dimensionWeight)
- # self.name_dict[dimension] = self.config[dimension]['name']
- # self.unit_dict[dimension] = self.config[dimension]['unit']
- self.name_dict.update(self.config[dimension]['name'])
- self.unit_dict.update(self.config[dimension]['unit'])
- self.metric_dict[dimension] = self.config[dimension]['typeMetricDict']
- self.metric_list.extend(self.config[dimension]['metric'])
- self.type_list.extend(self.config[dimension]['type'])
- print()
- def _dimension_config_parse(self, dimension, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightDimension = typeWeight[dimension]
- typeDimensionList = list(typeWeightDimension.keys())
- if dimension in self.builtinDimensionList:
- typeWeightDimension_builtin = OrderedDict(
- (key, typeWeightDimension[key]) for key in self.builtinTypeDict[dimension] if key in typeDimensionList)
- typeWeightDimension_builtin = list(typeWeightDimension_builtin.keys())
- typeDimension = typeWeightDimension_builtin + [x for x in typeDimensionList if
- x not in self.builtinTypeDict[dimension]]
- else:
- typeDimension = typeDimensionList
- typeWeightDimensionList = list(typeWeightDimension.values())
- flagCustomDimension = not all(x is None for x in typeWeightDimensionList)
- # get type name
- typeNameDimension = self.type_name[dimension]
- # Dimension
- dimension_dict = config_dict[dimension]
- dimension_value_dict = {}
- typeMetricDict = {}
- for type in typeDimension:
- dimension_value_dict.update(dimension_dict[type])
- typeMetricDict[type] = list(dimension_dict[type].keys())
- df_dimension_value = pd.DataFrame(dimension_value_dict).T
- # get metric list
- metricDimension = df_dimension_value.index.tolist()
- # get name list
- nameDimension = df_dimension_value['name'].to_dict()
- # nameDimensionList = list(nameDimension.values())
- # get unit list
- unitDimension = df_dimension_value['unit'].to_dict()
- unitDimension = {key: (value if value is not None else "") for key, value in unitDimension.items()}
- # unitDimensionList = list(unitDimension.values())
- # get weight list
- weightDimension = df_dimension_value['weight'].astype(float).to_dict()
- weightDimensionList = list(weightDimension.values())
- # get priority list
- priorityDimension = df_dimension_value['priority'].astype(int).to_dict()
- priorityDimensionList = list(priorityDimension.values())
- # get paramList
- paramDimension = df_dimension_value['paramList'].to_dict()
- bulitin_first_key = next((key for key in paramDimension if key in self.builtinMetricList), None)
- first_key = next(iter(paramDimension)) if not bulitin_first_key else bulitin_first_key
- paramNum = len(paramDimension[first_key])
- kindDimension = [{} for _ in range(paramNum)]
- optimalDimension = [{} for _ in range(paramNum)]
- multipleDimension = [{} for _ in range(paramNum)]
- spareDimension = [{} for _ in range(paramNum)]
- # spare1Dimension = [{} for _ in range(paramNum)]
- # spare2Dimension = [{} for _ in range(paramNum)]
- customMetricParam = {} # custiom metric paramList
- for key, value_list in paramDimension.items():
- if key in self.builtinMetricList:
- for i in range(len(value_list)):
- kindDimension[i][key] = int(value_list[i]['kind'])
- optimalDimension[i][key] = float(value_list[i]['optimal'])
- multipleDimension[i][key] = [float(x) for x in value_list[i]['multiple']]
- spareDimension[i][key] = [item["param"] for item in value_list[i]["spare"]]
- # spareDimension[i][key] = [float(item["param"]) for item in value_list[i]["spare"]]
- # spare1Dimension[i][key] = (value_list[i]['spare1'])
- # spare2Dimension[i][key] = (value_list[i]['spare2'])
- else:
- customMetricParam[key] = value_list
- kindDimensionList = [value for dict_val in kindDimension for value in dict_val.values()]
- optimalDimensionList = [value for dict_val in optimalDimension for value in dict_val.values()]
- multipleDimensionList = [value for dict_val in multipleDimension for value in dict_val.values()]
- spareDimensionList = [value for dict_val in spareDimension for value in dict_val.values()]
- # spare1DimensionList = [value for dict_val in spare1Dimension for value in dict_val.values()]
- # spare2DimensionList = [value for dict_val in spare2Dimension for value in dict_val.values()]
- if paramNum == 1:
- kindDimension = kindDimension[0]
- optimalDimension = optimalDimension[0]
- multipleDimension = multipleDimension[0]
- spareDimension = spareDimension[0]
- # spare1Dimension = spare1Dimension[0]
- # spare2Dimension = spare2Dimension[0]
- result = {
- "weightDimension": float(dimensionWeight[dimension]),
- "weightCustom": flagCustomDimension,
- "type": typeDimension,
- "typeWeight": typeWeightDimension,
- "typeWeightList": typeWeightDimensionList,
- "typeName": typeNameDimension,
- "customMetricParam": customMetricParam,
- "metric": metricDimension,
- "typeMetricDict": typeMetricDict,
- "name": nameDimension,
- # "nameList": nameDimensionList,
- "unit": unitDimension,
- # "unitList": unitDimensionList,
- "weight": weightDimension,
- "weightList": weightDimensionList,
- "priority": priorityDimension,
- "priorityList": priorityDimensionList,
- "kind": kindDimension,
- "kindList": kindDimensionList,
- "optimal": optimalDimension,
- "optimalList": optimalDimensionList,
- "multiple": multipleDimension,
- "multipleList": multipleDimensionList,
- "spare": spareDimension,
- "spareList": spareDimensionList,
- # "spare1": spare1Dimension,
- # "spare1List": spare1DimensionList,
- # "spare2": spare2Dimension,
- # "spare2List": spare2DimensionList
- }
- return result
- def _safe_config_parse(self, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightSafe = typeWeight['safe']
- # typeSafe = [key for key, value in typeWeightSafe.items() if value != 0]
- typeSafe = list(typeWeightSafe.keys()) # get type list
- # typeWeightSafeDict = {key: value for key, value in typeWeightSafe.items() if value != 0}
- # typeWeightSafeList = list(typeWeightSafeDict.values())
- typeWeightSafeList = list(typeWeightSafe.values())
- flagCustomSafe = not all(x is None for x in typeWeightSafeList)
- # safe
- safe_dict = config_dict['safe']
- safe_value_dict = {}
- for type in typeSafe:
- safe_value_dict.update(safe_dict[type])
- df_safe_value = pd.DataFrame(safe_value_dict).T
- # safe_value_dict.update(safe_dict['safeTime'])
- # safe_value_dict.update(safe_dict['safeDistance'])
- # safe_value_dict.update(safe_dict['safeAcceleration'])
- # safe_value_dict.update(safe_dict['safeProbability'])
- # df_safe_value = pd.DataFrame(safe_value_dict).T
- # df_safe_value = df_safe_value[df_safe_value['enable'] == True]
- # get metric list
- metricSafe = df_safe_value.index.tolist()
- # get weight list
- weightSafe = df_safe_value['weight'].astype(float).to_dict()
- weightSafeList = list(weightSafe.values())
- # get priority list
- prioritySafe = df_safe_value['priority'].astype(int).to_dict()
- prioritySafeList = list(prioritySafe.values())
- # get paramList
- paramSafe = df_safe_value['paramList'].to_dict()
- first_key = next(iter(paramSafe))
- paramNum = len(paramSafe[first_key])
- kindSafe = [{} for _ in range(paramNum)]
- optimalSafe = [{} for _ in range(paramNum)]
- multipleSafe = [{} for _ in range(paramNum)]
- spare1Safe = [{} for _ in range(paramNum)]
- spare2Safe = [{} for _ in range(paramNum)]
- for key, value_list in paramSafe.items():
- for i in range(len(value_list)):
- kindSafe[i][key] = int(value_list[i]['kind'])
- optimalSafe[i][key] = float(value_list[i]['optimal'])
- multipleSafe[i][key] = [float(x) for x in value_list[i]['multiple']]
- spare1Safe[i][key] = (value_list[i]['spare1'])
- spare2Safe[i][key] = (value_list[i]['spare2'])
- kindSafeList = [value for dict_val in kindSafe for value in dict_val.values()]
- optimalSafeList = [value for dict_val in optimalSafe for value in dict_val.values()]
- multipleSafeList = [value for dict_val in multipleSafe for value in dict_val.values()]
- spare1SafeList = [value for dict_val in spare1Safe for value in dict_val.values()]
- spare2SafeList = [value for dict_val in spare2Safe for value in dict_val.values()]
- safe = {
- "weightSafe": float(dimensionWeight['safe']),
- "weightCustom": flagCustomSafe,
- "type": typeSafe,
- "typeWeight": typeWeightSafe,
- "typeWeightList": typeWeightSafeList,
- "metric": metricSafe,
- "weight": weightSafe,
- "weightList": weightSafeList,
- "priority": prioritySafe,
- "priorityList": prioritySafeList,
- "kind": kindSafe,
- "kindList": kindSafeList,
- "optimal": optimalSafe,
- "optimalList": optimalSafeList,
- "multiple": multipleSafe,
- "multipleList": multipleSafeList,
- "spare1": spare1Safe,
- "spare1List": spare1SafeList,
- "spare2": spare2Safe,
- "spare2List": spare2SafeList
- }
- return safe
- def _function_config_parse(self, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightFunction = typeWeight['function']
- typeFunction = [key for key, value in typeWeightFunction.items() if value != 0]
- typeWeightFunctionDict = {key: value for key, value in typeWeightFunction.items() if value != 0}
- typeWeightFunctionList = list(typeWeightFunctionDict.values())
- flagCustomFunction = not all(x is None for x in typeWeightFunctionList)
- # function
- function_dict = config_dict['function']
- function_value_dict = {}
- for type in typeFunction:
- function_value_dict.update(function_dict[type])
- df_function_value = pd.DataFrame(function_value_dict).T
- # get metric list
- metricFunction = df_function_value.index.tolist()
- # get weight list
- weightFunction = df_function_value['weight'].astype(float).to_dict()
- weightFunctionList = list(weightFunction.values())
- # get priority list
- priorityFunction = df_function_value['priority'].astype(int).to_dict()
- priorityFunctionList = list(priorityFunction.values())
- # get kind list
- kindFunction = df_function_value['kind'].astype(int).to_dict()
- kindFunctionList = list(kindFunction.values())
- # get optimal list
- optimalFunction = df_function_value['optimal'].astype(float).to_dict()
- optimalFunctionList = list(optimalFunction.values())
- # get multiple list
- multipleFunction = df_function_value['multiple'].astype(float).to_dict()
- multipleFunctionList = list(multipleFunction.values())
- function = {
- "weightFunction": float(dimensionWeight['function']),
- "weightCustom": flagCustomFunction,
- "type": typeFunction,
- "typeWeight": typeWeightFunction,
- "typeWeightList": typeWeightFunctionList,
- "metric": metricFunction,
- "weight": weightFunction,
- "weightList": weightFunctionList,
- "priority": priorityFunction,
- "priorityList": priorityFunctionList,
- "kind": kindFunction,
- "kindList": kindFunctionList,
- "optimal": optimalFunction,
- "optimalList": optimalFunctionList,
- "multiple": multipleFunction,
- "multipleList": multipleFunctionList
- }
- return function
- def _compliance_config_parse(self, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightCompliance = typeWeight['compliance']
- typeCompliance = list(typeWeightCompliance.keys())
- typeWeightComplianceList = list(typeWeightCompliance.values())
- flagCustomCompliance = not all(x is None for x in typeWeightComplianceList)
- # get type name
- typeNameCompliance = self.type_name["compliance"]
- # compliance
- compliance_dict = config_dict['compliance']
- compliance_value_dict = {}
- typeMetricDict = {}
- for type in typeCompliance:
- compliance_value_dict.update(compliance_dict[type])
- typeMetricDict[type] = list(compliance_dict[type].keys())
- df_compliance_value = pd.DataFrame(compliance_value_dict).T
- # get metric list
- metricCompliance = df_compliance_value.index.tolist()
- # get weight type
- typeWeightCompliance = typeWeight['compliance']
- typeWeightComplianceList = list(typeWeightCompliance.values())
- # get name list
- nameCompliance = df_compliance_value['name'].to_dict()
- # get unit list
- unitCompliance = df_compliance_value['unit'].to_dict()
- # get weight list
- weightCompliance = df_compliance_value['weight'].astype(float).to_dict()
- weightComplianceList = list(weightCompliance.values())
- compliance = {
- "weightDimension": float(dimensionWeight['compliance']),
- "weightCustom": flagCustomCompliance,
- "type": typeCompliance,
- "typeWeight": typeWeightCompliance,
- "typeWeightList": typeWeightComplianceList,
- "typeName": typeNameCompliance,
- "typeMetricDict": typeMetricDict,
- "metric": metricCompliance,
- "name": nameCompliance,
- "unit": unitCompliance,
- "weight": weightCompliance,
- "weightList": weightComplianceList
- }
- return compliance
- def _comfort_config_parse(self, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightComfort = typeWeight['comfort']
- typeComfort = [key for key, value in typeWeightComfort.items() if value != 0]
- typeWeightComfortDict = {key: value for key, value in typeWeightComfort.items() if value != 0}
- typeWeightComfortList = list(typeWeightComfortDict.values())
- flagCustomComfort = not all(x is None for x in typeWeightComfortList)
- # comfort
- comfort_dict = config_dict['comfort']
- comfort_value_dict = {}
- for type in typeComfort:
- comfort_value_dict.update(comfort_dict[type])
- df_comfort_value = pd.DataFrame(comfort_value_dict).T
- # comfort_value_dict.update(comfort_dict['comfortLat'])
- # comfort_value_dict.update(comfort_dict['comfortLon'])
- # df_comfort_value = pd.DataFrame(comfort_value_dict).T
- # df_comfort_value = df_comfort_value[df_comfort_value['enable'] == True]
- # get metric list
- metricComfort = df_comfort_value.index.tolist()
- # get weight type
- typeWeightComfort = typeWeight['comfort']
- typeWeightComfortList = list(typeWeightComfort.values())
- # get weight list
- weightComfort = df_comfort_value['weight'].astype(float).to_dict()
- weightComfortList = list(weightComfort.values())
- # get priority list
- priorityComfort = df_comfort_value['priority'].astype(int).to_dict()
- priorityComfortList = list(priorityComfort.values())
- # get kind list
- kind1 = df_comfort_value['kind1'].astype(int).to_dict()
- kind2 = df_comfort_value['kind2'].astype(int).to_dict()
- kind3 = df_comfort_value['kind3'].astype(int).to_dict()
- kindComfort = {
- "kind1": kind1,
- "kind2": kind2,
- "kind3": kind3
- }
- kindComfortList = list(self._merge_dict(kind1, kind2, kind3).values())
- # get optimal list
- optimal1 = df_comfort_value['optimal1'].astype(float).to_dict()
- optimal2 = df_comfort_value['optimal2'].astype(float).to_dict()
- optimal3 = df_comfort_value['optimal3'].astype(float).to_dict()
- optimalComfort = {
- "optimal1": optimal1,
- "optimal2": optimal2,
- "optimal3": optimal3
- }
- optimalComfortList = list(self._merge_dict(optimal1, optimal2, optimal3).values())
- # get multiple list
- multiple1 = df_comfort_value['multiple1'].astype(float).to_dict()
- multiple2 = df_comfort_value['multiple2'].astype(float).to_dict()
- multiple3 = df_comfort_value['multiple3'].astype(float).to_dict()
- multipleComfort = {
- "multiple1": multiple1,
- "multiple2": multiple2,
- "multiple3": multiple3
- }
- multipleComfortList = list(self._merge_dict(multiple1, multiple2, multiple3).values())
- comfort = {
- "weightComfort": float(dimensionWeight['comfort']),
- "weightCustom": flagCustomComfort,
- "type": typeComfort,
- "typeWeight": typeWeightComfort,
- "typeWeightList": typeWeightComfortList,
- "metric": metricComfort,
- "weight": weightComfort,
- "weightList": weightComfortList,
- "priority": priorityComfort,
- "priorityList": priorityComfortList,
- "kind": kindComfort,
- "kindList": kindComfortList,
- "optimal": optimalComfort,
- "optimalList": optimalComfortList,
- "multiple": multipleComfort,
- "multipleList": multipleComfortList
- }
- return comfort
- def _efficient_config_parse(self, config_dict, typeWeight, dimensionWeight):
- # get weight type
- typeWeightEfficient = typeWeight['efficient']
- typeEfficient = [key for key, value in typeWeightEfficient.items() if value != 0]
- typeWeightEfficientDict = {key: value for key, value in typeWeightEfficient.items() if value != 0}
- typeWeightEfficientList = list(typeWeightEfficientDict.values())
- flagCustomEfficient = not all(x is None for x in typeWeightEfficientList)
- # efficient
- efficient_dict = config_dict['efficient']
- efficient_value_dict = {}
- for type in typeEfficient:
- efficient_value_dict.update(efficient_dict[type])
- df_efficient_value = pd.DataFrame(efficient_value_dict).T
- # efficient_value_dict.update(efficient_dict['efficientDrive'])
- # efficient_value_dict.update(efficient_dict['efficientStop'])
- # df_efficient_value = pd.DataFrame(efficient_value_dict).T
- # df_efficient_value = df_efficient_value[df_efficient_value['enable'] == True]
- # get metric list
- metricEfficient = df_efficient_value.index.tolist()
- # get weight type
- typeWeightEfficient = typeWeight['efficient']
- typeWeightEfficientList = list(typeWeightEfficient.values())
- # get weight list
- weightEfficient = df_efficient_value['weight'].astype(float).to_dict()
- weightEfficientList = list(weightEfficient.values())
- # get priority list
- priorityEfficient = df_efficient_value['priority'].astype(int).to_dict()
- priorityEfficientList = list(priorityEfficient.values())
- # get kind list
- kindEfficient = df_efficient_value['kind'].astype(int).to_dict()
- kindEfficientList = list(kindEfficient.values())
- # get optimal list
- optimalEfficient = df_efficient_value['optimal'].astype(float).to_dict()
- optimalEfficientList = list(optimalEfficient.values())
- # get multiple list
- multipleEfficient = df_efficient_value['multiple'].astype(float).to_dict()
- multipleEfficientList = list(multipleEfficient.values())
- efficient = {
- "weightEfficient": float(dimensionWeight['efficient']),
- "weightCustom": flagCustomEfficient,
- "type": typeEfficient,
- "typeWeight": typeWeightEfficient,
- "typeWeightList": typeWeightEfficientList,
- "metric": metricEfficient,
- "weight": weightEfficient,
- "weightList": weightEfficientList,
- "priority": priorityEfficient,
- "priorityList": priorityEfficientList,
- "kind": kindEfficient,
- "kindList": kindEfficientList,
- "optimal": optimalEfficient,
- "optimalList": optimalEfficientList,
- "multiple": multipleEfficient,
- "multipleList": multipleEfficientList
- }
- return efficient
- def _merge_dict(self, dict1, dict2, dict3):
- merged_dict = {}
- # save the key and values lists
- dict1_key = list(dict1.keys())
- dict2_key = list(dict2.keys())
- dict3_key = list(dict3.keys())
- dict1_values = list(dict1.values())
- dict2_values = list(dict2.values())
- dict3_values = list(dict3.values())
- for i in range(len(dict1)):
- merged_dict[f"{dict1_key[i]}1"] = dict1_values[i]
- merged_dict[f"{dict2_key[i]}2"] = dict2_values[i]
- merged_dict[f"{dict3_key[i]}3"] = dict3_values[i]
- return merged_dict
|