#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/02 @Last Modified: 2023/11/02 @Summary: This module provides the function to parse the config json file. """ import pandas as pd import numpy as np from common import json2dict from collections import OrderedDict class ConfigParse(object): """ """ def __init__(self, json_file): # weight info self.scoreModel = "" self.dimension_weight = {} self.dimension_list = [] self.dimension_name = {} self.type_weight = {} self.type_list = [] self.type_name = {} self.metric_list = [] self.metric_dict = {} self.name_dict = {} self.unit_dict = {} self.builtinDimensionList = ["safe", "function", "compliance", "comfort", "efficient"] self.builtinTypeDict = { "safe": ['safeTime', 'safeDistance', 'safeAcceleration', 'safeProbability'], # "function": ['functionACC', 'functionLKA'], "function": ['function_ACC'], "compliance": ['function_ACC', 'functionLKA', 'safeAcceleration', 'safeProbability'], "comfort": ['comfortLat', 'comfortLon'], "efficient": ['efficientDrive', 'efficientStop'] } self.builtinMetricList = ["TTC", "MTTC", "THW", "LatSD", "LonSD", "DRAC", "BTN", "STN", "collisionRisk", "collisionSeverity", "followSpeedDeviation", "followDistanceDeviation", "followStopDistance", "followResponseTime", 'laneDistance', 'centerDistanceExpectation', 'centerDistanceStandardDeviation', 'centerDistanceMax', 'centerDistanceMin', 'centerDistanceFrequency', 'centerDistanceRange', "zigzag", "shake", "cadence", "slamBrake", "slamAccelerate", "pressSolidLine", "runRedLight", "overspeed20_50", "overspeed50", "overspeed10", "overspeed10_20", "averageSpeed", "stopDuration", "stopCount"] # score info self.config = {} # initialization self.config_dict = json2dict(json_file) self.parse_dict = self._config_parse(self.config_dict) def _config_parse(self, config_dict): # score if 'scoreModel' in list(config_dict.keys()): self.scoreModel = config_dict['scoreModel'] else: self.scoreModel = "builtin" # dimension info dimensionWeight = config_dict['dimensionWeight'] self.dimension_weight = dimensionWeight self.dimension_name = config_dict['dimensionName'] self.dimension_list = list(self.dimension_weight.keys()) # type info typeWeight = config_dict['typeWeight'] self.type_weight = typeWeight self.type_name = config_dict["typeName"] for dimension in self.dimension_list: if dimension == 'compliance': self.config[dimension] = self._compliance_config_parse(config_dict, typeWeight, dimensionWeight) else: self.config[dimension] = self._dimension_config_parse(dimension, config_dict, typeWeight, dimensionWeight) # self.name_dict[dimension] = self.config[dimension]['name'] # self.unit_dict[dimension] = self.config[dimension]['unit'] self.name_dict.update(self.config[dimension]['name']) self.unit_dict.update(self.config[dimension]['unit']) self.metric_dict[dimension] = self.config[dimension]['typeMetricDict'] self.metric_list.extend(self.config[dimension]['metric']) self.type_list.extend(self.config[dimension]['type']) print() def _dimension_config_parse(self, dimension, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightDimension = typeWeight[dimension] typeDimensionList = list(typeWeightDimension.keys()) if dimension in self.builtinDimensionList: typeWeightDimension_builtin = OrderedDict( (key, typeWeightDimension[key]) for key in self.builtinTypeDict[dimension] if key in typeDimensionList) typeWeightDimension_builtin = list(typeWeightDimension_builtin.keys()) typeDimension = typeWeightDimension_builtin + [x for x in typeDimensionList if x not in self.builtinTypeDict[dimension]] else: typeDimension = typeDimensionList typeWeightDimensionList = list(typeWeightDimension.values()) flagCustomDimension = not all(x is None for x in typeWeightDimensionList) # get type name typeNameDimension = self.type_name[dimension] # Dimension dimension_dict = config_dict[dimension] dimension_value_dict = {} typeMetricDict = {} for type in typeDimension: dimension_value_dict.update(dimension_dict[type]) typeMetricDict[type] = list(dimension_dict[type].keys()) df_dimension_value = pd.DataFrame(dimension_value_dict).T # get metric list metricDimension = df_dimension_value.index.tolist() # get name list nameDimension = df_dimension_value['name'].to_dict() # nameDimensionList = list(nameDimension.values()) # get unit list unitDimension = df_dimension_value['unit'].to_dict() unitDimension = {key: (value if value is not None else "") for key, value in unitDimension.items()} # unitDimensionList = list(unitDimension.values()) # get weight list weightDimension = df_dimension_value['weight'].astype(float).to_dict() weightDimensionList = list(weightDimension.values()) # get priority list priorityDimension = df_dimension_value['priority'].astype(int).to_dict() priorityDimensionList = list(priorityDimension.values()) # get paramList paramDimension = df_dimension_value['paramList'].to_dict() bulitin_first_key = next((key for key in paramDimension if key in self.builtinMetricList), None) first_key = next(iter(paramDimension)) if not bulitin_first_key else bulitin_first_key paramNum = len(paramDimension[first_key]) kindDimension = [{} for _ in range(paramNum)] optimalDimension = [{} for _ in range(paramNum)] multipleDimension = [{} for _ in range(paramNum)] spareDimension = [{} for _ in range(paramNum)] # spare1Dimension = [{} for _ in range(paramNum)] # spare2Dimension = [{} for _ in range(paramNum)] customMetricParam = {} # custiom metric paramList for key, value_list in paramDimension.items(): if key in self.builtinMetricList: for i in range(len(value_list)): kindDimension[i][key] = int(value_list[i]['kind']) optimalDimension[i][key] = float(value_list[i]['optimal']) multipleDimension[i][key] = [float(x) for x in value_list[i]['multiple']] spareDimension[i][key] = [item["param"] for item in value_list[i]["spare"]] # spareDimension[i][key] = [float(item["param"]) for item in value_list[i]["spare"]] # spare1Dimension[i][key] = (value_list[i]['spare1']) # spare2Dimension[i][key] = (value_list[i]['spare2']) else: customMetricParam[key] = value_list kindDimensionList = [value for dict_val in kindDimension for value in dict_val.values()] optimalDimensionList = [value for dict_val in optimalDimension for value in dict_val.values()] multipleDimensionList = [value for dict_val in multipleDimension for value in dict_val.values()] spareDimensionList = [value for dict_val in spareDimension for value in dict_val.values()] # spare1DimensionList = [value for dict_val in spare1Dimension for value in dict_val.values()] # spare2DimensionList = [value for dict_val in spare2Dimension for value in dict_val.values()] if paramNum == 1: kindDimension = kindDimension[0] optimalDimension = optimalDimension[0] multipleDimension = multipleDimension[0] spareDimension = spareDimension[0] # spare1Dimension = spare1Dimension[0] # spare2Dimension = spare2Dimension[0] result = { "weightDimension": float(dimensionWeight[dimension]), "weightCustom": flagCustomDimension, "type": typeDimension, "typeWeight": typeWeightDimension, "typeWeightList": typeWeightDimensionList, "typeName": typeNameDimension, "customMetricParam": customMetricParam, "metric": metricDimension, "typeMetricDict": typeMetricDict, "name": nameDimension, # "nameList": nameDimensionList, "unit": unitDimension, # "unitList": unitDimensionList, "weight": weightDimension, "weightList": weightDimensionList, "priority": priorityDimension, "priorityList": priorityDimensionList, "kind": kindDimension, "kindList": kindDimensionList, "optimal": optimalDimension, "optimalList": optimalDimensionList, "multiple": multipleDimension, "multipleList": multipleDimensionList, "spare": spareDimension, "spareList": spareDimensionList, # "spare1": spare1Dimension, # "spare1List": spare1DimensionList, # "spare2": spare2Dimension, # "spare2List": spare2DimensionList } return result def _safe_config_parse(self, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightSafe = typeWeight['safe'] # typeSafe = [key for key, value in typeWeightSafe.items() if value != 0] typeSafe = list(typeWeightSafe.keys()) # get type list # typeWeightSafeDict = {key: value for key, value in typeWeightSafe.items() if value != 0} # typeWeightSafeList = list(typeWeightSafeDict.values()) typeWeightSafeList = list(typeWeightSafe.values()) flagCustomSafe = not all(x is None for x in typeWeightSafeList) # safe safe_dict = config_dict['safe'] safe_value_dict = {} for type in typeSafe: safe_value_dict.update(safe_dict[type]) df_safe_value = pd.DataFrame(safe_value_dict).T # safe_value_dict.update(safe_dict['safeTime']) # safe_value_dict.update(safe_dict['safeDistance']) # safe_value_dict.update(safe_dict['safeAcceleration']) # safe_value_dict.update(safe_dict['safeProbability']) # df_safe_value = pd.DataFrame(safe_value_dict).T # df_safe_value = df_safe_value[df_safe_value['enable'] == True] # get metric list metricSafe = df_safe_value.index.tolist() # get weight list weightSafe = df_safe_value['weight'].astype(float).to_dict() weightSafeList = list(weightSafe.values()) # get priority list prioritySafe = df_safe_value['priority'].astype(int).to_dict() prioritySafeList = list(prioritySafe.values()) # get paramList paramSafe = df_safe_value['paramList'].to_dict() first_key = next(iter(paramSafe)) paramNum = len(paramSafe[first_key]) kindSafe = [{} for _ in range(paramNum)] optimalSafe = [{} for _ in range(paramNum)] multipleSafe = [{} for _ in range(paramNum)] spare1Safe = [{} for _ in range(paramNum)] spare2Safe = [{} for _ in range(paramNum)] for key, value_list in paramSafe.items(): for i in range(len(value_list)): kindSafe[i][key] = int(value_list[i]['kind']) optimalSafe[i][key] = float(value_list[i]['optimal']) multipleSafe[i][key] = [float(x) for x in value_list[i]['multiple']] spare1Safe[i][key] = (value_list[i]['spare1']) spare2Safe[i][key] = (value_list[i]['spare2']) kindSafeList = [value for dict_val in kindSafe for value in dict_val.values()] optimalSafeList = [value for dict_val in optimalSafe for value in dict_val.values()] multipleSafeList = [value for dict_val in multipleSafe for value in dict_val.values()] spare1SafeList = [value for dict_val in spare1Safe for value in dict_val.values()] spare2SafeList = [value for dict_val in spare2Safe for value in dict_val.values()] safe = { "weightSafe": float(dimensionWeight['safe']), "weightCustom": flagCustomSafe, "type": typeSafe, "typeWeight": typeWeightSafe, "typeWeightList": typeWeightSafeList, "metric": metricSafe, "weight": weightSafe, "weightList": weightSafeList, "priority": prioritySafe, "priorityList": prioritySafeList, "kind": kindSafe, "kindList": kindSafeList, "optimal": optimalSafe, "optimalList": optimalSafeList, "multiple": multipleSafe, "multipleList": multipleSafeList, "spare1": spare1Safe, "spare1List": spare1SafeList, "spare2": spare2Safe, "spare2List": spare2SafeList } return safe def _function_config_parse(self, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightFunction = typeWeight['function'] typeFunction = [key for key, value in typeWeightFunction.items() if value != 0] typeWeightFunctionDict = {key: value for key, value in typeWeightFunction.items() if value != 0} typeWeightFunctionList = list(typeWeightFunctionDict.values()) flagCustomFunction = not all(x is None for x in typeWeightFunctionList) # function function_dict = config_dict['function'] function_value_dict = {} for type in typeFunction: function_value_dict.update(function_dict[type]) df_function_value = pd.DataFrame(function_value_dict).T # get metric list metricFunction = df_function_value.index.tolist() # get weight list weightFunction = df_function_value['weight'].astype(float).to_dict() weightFunctionList = list(weightFunction.values()) # get priority list priorityFunction = df_function_value['priority'].astype(int).to_dict() priorityFunctionList = list(priorityFunction.values()) # get kind list kindFunction = df_function_value['kind'].astype(int).to_dict() kindFunctionList = list(kindFunction.values()) # get optimal list optimalFunction = df_function_value['optimal'].astype(float).to_dict() optimalFunctionList = list(optimalFunction.values()) # get multiple list multipleFunction = df_function_value['multiple'].astype(float).to_dict() multipleFunctionList = list(multipleFunction.values()) function = { "weightFunction": float(dimensionWeight['function']), "weightCustom": flagCustomFunction, "type": typeFunction, "typeWeight": typeWeightFunction, "typeWeightList": typeWeightFunctionList, "metric": metricFunction, "weight": weightFunction, "weightList": weightFunctionList, "priority": priorityFunction, "priorityList": priorityFunctionList, "kind": kindFunction, "kindList": kindFunctionList, "optimal": optimalFunction, "optimalList": optimalFunctionList, "multiple": multipleFunction, "multipleList": multipleFunctionList } return function def _compliance_config_parse(self, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightCompliance = typeWeight['compliance'] typeCompliance = list(typeWeightCompliance.keys()) typeWeightComplianceList = list(typeWeightCompliance.values()) flagCustomCompliance = not all(x is None for x in typeWeightComplianceList) # get type name typeNameCompliance = self.type_name["compliance"] # compliance compliance_dict = config_dict['compliance'] compliance_value_dict = {} typeMetricDict = {} for type in typeCompliance: compliance_value_dict.update(compliance_dict[type]) typeMetricDict[type] = list(compliance_dict[type].keys()) df_compliance_value = pd.DataFrame(compliance_value_dict).T # get metric list metricCompliance = df_compliance_value.index.tolist() # get weight type typeWeightCompliance = typeWeight['compliance'] typeWeightComplianceList = list(typeWeightCompliance.values()) # get name list nameCompliance = df_compliance_value['name'].to_dict() # get unit list unitCompliance = df_compliance_value['unit'].to_dict() # get weight list weightCompliance = df_compliance_value['weight'].astype(float).to_dict() weightComplianceList = list(weightCompliance.values()) compliance = { "weightDimension": float(dimensionWeight['compliance']), "weightCustom": flagCustomCompliance, "type": typeCompliance, "typeWeight": typeWeightCompliance, "typeWeightList": typeWeightComplianceList, "typeName": typeNameCompliance, "typeMetricDict": typeMetricDict, "metric": metricCompliance, "name": nameCompliance, "unit": unitCompliance, "weight": weightCompliance, "weightList": weightComplianceList } return compliance def _comfort_config_parse(self, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightComfort = typeWeight['comfort'] typeComfort = [key for key, value in typeWeightComfort.items() if value != 0] typeWeightComfortDict = {key: value for key, value in typeWeightComfort.items() if value != 0} typeWeightComfortList = list(typeWeightComfortDict.values()) flagCustomComfort = not all(x is None for x in typeWeightComfortList) # comfort comfort_dict = config_dict['comfort'] comfort_value_dict = {} for type in typeComfort: comfort_value_dict.update(comfort_dict[type]) df_comfort_value = pd.DataFrame(comfort_value_dict).T # comfort_value_dict.update(comfort_dict['comfortLat']) # comfort_value_dict.update(comfort_dict['comfortLon']) # df_comfort_value = pd.DataFrame(comfort_value_dict).T # df_comfort_value = df_comfort_value[df_comfort_value['enable'] == True] # get metric list metricComfort = df_comfort_value.index.tolist() # get weight type typeWeightComfort = typeWeight['comfort'] typeWeightComfortList = list(typeWeightComfort.values()) # get weight list weightComfort = df_comfort_value['weight'].astype(float).to_dict() weightComfortList = list(weightComfort.values()) # get priority list priorityComfort = df_comfort_value['priority'].astype(int).to_dict() priorityComfortList = list(priorityComfort.values()) # get kind list kind1 = df_comfort_value['kind1'].astype(int).to_dict() kind2 = df_comfort_value['kind2'].astype(int).to_dict() kind3 = df_comfort_value['kind3'].astype(int).to_dict() kindComfort = { "kind1": kind1, "kind2": kind2, "kind3": kind3 } kindComfortList = list(self._merge_dict(kind1, kind2, kind3).values()) # get optimal list optimal1 = df_comfort_value['optimal1'].astype(float).to_dict() optimal2 = df_comfort_value['optimal2'].astype(float).to_dict() optimal3 = df_comfort_value['optimal3'].astype(float).to_dict() optimalComfort = { "optimal1": optimal1, "optimal2": optimal2, "optimal3": optimal3 } optimalComfortList = list(self._merge_dict(optimal1, optimal2, optimal3).values()) # get multiple list multiple1 = df_comfort_value['multiple1'].astype(float).to_dict() multiple2 = df_comfort_value['multiple2'].astype(float).to_dict() multiple3 = df_comfort_value['multiple3'].astype(float).to_dict() multipleComfort = { "multiple1": multiple1, "multiple2": multiple2, "multiple3": multiple3 } multipleComfortList = list(self._merge_dict(multiple1, multiple2, multiple3).values()) comfort = { "weightComfort": float(dimensionWeight['comfort']), "weightCustom": flagCustomComfort, "type": typeComfort, "typeWeight": typeWeightComfort, "typeWeightList": typeWeightComfortList, "metric": metricComfort, "weight": weightComfort, "weightList": weightComfortList, "priority": priorityComfort, "priorityList": priorityComfortList, "kind": kindComfort, "kindList": kindComfortList, "optimal": optimalComfort, "optimalList": optimalComfortList, "multiple": multipleComfort, "multipleList": multipleComfortList } return comfort def _efficient_config_parse(self, config_dict, typeWeight, dimensionWeight): # get weight type typeWeightEfficient = typeWeight['efficient'] typeEfficient = [key for key, value in typeWeightEfficient.items() if value != 0] typeWeightEfficientDict = {key: value for key, value in typeWeightEfficient.items() if value != 0} typeWeightEfficientList = list(typeWeightEfficientDict.values()) flagCustomEfficient = not all(x is None for x in typeWeightEfficientList) # efficient efficient_dict = config_dict['efficient'] efficient_value_dict = {} for type in typeEfficient: efficient_value_dict.update(efficient_dict[type]) df_efficient_value = pd.DataFrame(efficient_value_dict).T # efficient_value_dict.update(efficient_dict['efficientDrive']) # efficient_value_dict.update(efficient_dict['efficientStop']) # df_efficient_value = pd.DataFrame(efficient_value_dict).T # df_efficient_value = df_efficient_value[df_efficient_value['enable'] == True] # get metric list metricEfficient = df_efficient_value.index.tolist() # get weight type typeWeightEfficient = typeWeight['efficient'] typeWeightEfficientList = list(typeWeightEfficient.values()) # get weight list weightEfficient = df_efficient_value['weight'].astype(float).to_dict() weightEfficientList = list(weightEfficient.values()) # get priority list priorityEfficient = df_efficient_value['priority'].astype(int).to_dict() priorityEfficientList = list(priorityEfficient.values()) # get kind list kindEfficient = df_efficient_value['kind'].astype(int).to_dict() kindEfficientList = list(kindEfficient.values()) # get optimal list optimalEfficient = df_efficient_value['optimal'].astype(float).to_dict() optimalEfficientList = list(optimalEfficient.values()) # get multiple list multipleEfficient = df_efficient_value['multiple'].astype(float).to_dict() multipleEfficientList = list(multipleEfficient.values()) efficient = { "weightEfficient": float(dimensionWeight['efficient']), "weightCustom": flagCustomEfficient, "type": typeEfficient, "typeWeight": typeWeightEfficient, "typeWeightList": typeWeightEfficientList, "metric": metricEfficient, "weight": weightEfficient, "weightList": weightEfficientList, "priority": priorityEfficient, "priorityList": priorityEfficientList, "kind": kindEfficient, "kindList": kindEfficientList, "optimal": optimalEfficient, "optimalList": optimalEfficientList, "multiple": multipleEfficient, "multipleList": multipleEfficientList } return efficient def _merge_dict(self, dict1, dict2, dict3): merged_dict = {} # save the key and values lists dict1_key = list(dict1.keys()) dict2_key = list(dict2.keys()) dict3_key = list(dict3.keys()) dict1_values = list(dict1.values()) dict2_values = list(dict2.values()) dict3_values = list(dict3.values()) for i in range(len(dict1)): merged_dict[f"{dict1_key[i]}1"] = dict1_values[i] merged_dict[f"{dict2_key[i]}2"] = dict2_values[i] merged_dict[f"{dict3_key[i]}3"] = dict3_values[i] return merged_dict