Source code for autoflow.hdl2shps.hdl2shps

import re
from collections import defaultdict, Counter
from copy import deepcopy
from importlib import import_module
from typing import Dict, List

import numpy as np
from ConfigSpace import CategoricalHyperparameter, Constant
from ConfigSpace import ConfigurationSpace
from ConfigSpace import ForbiddenInClause, ForbiddenEqualsClause, ForbiddenAndConjunction
from ConfigSpace import InCondition, EqualsCondition
from hyperopt import fmin, tpe, hp

import autoflow.hdl.smac as smac_hdl
from autoflow.constants import PHASE2, SERIES_CONNECT_LEADER_TOKEN
from autoflow.hdl.utils import is_hdl_bottom, get_origin_models, purify_keys, purify_key, add_leader_model
from autoflow.utils.dict import filter_item_by_key_condition
from autoflow.utils.klass import StrSignatureMixin
from autoflow.utils.logging_ import get_logger
from autoflow.utils.ml_task import MLTask
from autoflow.utils.packages import get_class_name_of_module


[docs]class RelyModels: info = []
[docs]class HDL2SHPS(StrSignatureMixin): def __init__(self): self.ml_task = None self.logger = get_logger(__name__)
[docs] def set_task(self, ml_task: MLTask): self.ml_task = ml_task
[docs] def get_forbid_hit_in_models_by_rely(self, models, rely_model="boost_model"): forbid_in_value = [] hit = [] models = get_origin_models(models) for model in models: module_path = f"autoflow.pipeline.components.{self.ml_task.mainTask}.{model}" _class = get_class_name_of_module(module_path) M = import_module(module_path) cls = getattr(M, _class) is_hit = getattr(cls, rely_model, False) if not is_hit: forbid_in_value.append(model) else: hit.append(model) return forbid_in_value, hit
[docs] def set_probabilities_in_cs( self, cs: ConfigurationSpace, relied2models: Dict[str, List[str]], relied2AllModels: Dict[str, List[str]], all_models: List[str], **kwargs ): estimator = cs.get_hyperparameter(f"{PHASE2}:__choice__") probabilities = [] model2prob = {} L = 0 for rely_model in relied2models: cur_models = relied2models[rely_model] L += len(cur_models) for model in cur_models: model2prob[model] = kwargs[rely_model] / len(cur_models) p_rest = (1 - sum(model2prob.values())) / (len(all_models) - L) for model in estimator.choices: probabilities.append(model2prob.get(model, p_rest)) estimator.probabilities = probabilities default_estimator_choice = None for models in relied2models.values(): if models: default_estimator_choice = models[0] estimator.default_value = default_estimator_choice for rely_model, path in RelyModels.info: forbid_eq_value = path[-1] path = path[:-1] forbid_eq_key = ":".join(path + ["__choice__"]) forbid_eq_key_hp = cs.get_hyperparameter(forbid_eq_key) forbid_in_key = f"{PHASE2}:__choice__" hit = relied2AllModels.get(rely_model) if not hit: choices = list(forbid_eq_key_hp.choices) choices.remove(forbid_eq_value) forbid_eq_key_hp.choices = tuple(choices) forbid_eq_key_hp.default_value = choices[0] forbid_eq_key_hp.probabilities = [1 / len(choices)] * len(choices) # fixme 最后我放弃了在这上面进行修改,在hdl部分就做了预处理 continue forbid_in_value = list(set(all_models) - set(hit)) # 只选择了boost模型 if not forbid_in_value: continue choices = forbid_eq_key_hp.choices probabilities = [] p: float = kwargs[rely_model] p_rest = (1 - p) * (len(choices) - 1) for choice in choices: if choice == forbid_eq_value: probabilities.append(p) else: probabilities.append(p_rest) forbid_eq_key_hp.probabilities = probabilities cs.add_forbidden_clause(ForbiddenAndConjunction( ForbiddenEqualsClause(forbid_eq_key_hp, forbid_eq_value), ForbiddenInClause(cs.get_hyperparameter(forbid_in_key), forbid_in_value), ))
def __rely_model(self, cs: ConfigurationSpace): if not RelyModels.info: return all_models = list(cs.get_hyperparameter(f"{PHASE2}:__choice__").choices) rely_model_counter = Counter([x[0] for x in RelyModels.info]) # 依赖模式->所有相应模型 relied2AllModels = {} # 依赖模式->无交集相应模型 relied2models = {} for rely_model in rely_model_counter.keys(): _, hit = self.get_forbid_hit_in_models_by_rely(all_models, rely_model) relied2AllModels[rely_model] = hit # 如果某依赖模式不对应任何模型,删除 for k, v in list(relied2AllModels.items()): if not v: relied2AllModels.pop(k) rely_model_counter.pop(k) has_any_hit = any(relied2AllModels.values()) if not has_any_hit: return # 按照规则计算 relied2models : 无交集相应模型 relied_cnts_tuples = [(k, v) for k, v in rely_model_counter.items()] relied_cnts_tuples.sort(key=lambda x: x[-1]) visited = set() for rely_model, _ in relied_cnts_tuples: models = relied2AllModels[rely_model] for other in set(rely_model_counter.keys()) - {rely_model}: if (rely_model, other) in visited: continue other_models = relied2AllModels[other] if len(other_models) <= len(models): models = list(set(models) - set(other_models)) visited.add((rely_model, other)) visited.add((other, rely_model)) relied2models[rely_model] = models # 键的顺序遵循rely_model_counter.keys() def objective(relyModel2prob, debug=False): # relyModel2prob = {rely_model: prob for rely_model, prob in zip(list(rely_model_counter.keys()), args)} cur_cs = deepcopy(cs) self.set_probabilities_in_cs(cur_cs, relied2models, relied2AllModels, all_models, **relyModel2prob) cur_cs.seed(42) try: sample_times = len(all_models) * 15 counter = Counter([_hp.get(f"{PHASE2}:__choice__") for _hp in cur_cs.sample_configuration(sample_times)]) if debug: self.logger.info(f"Finally, sample {sample_times} times in estimator list's frequency: \n{counter}") except Exception: return np.inf vl = list(counter.values()) return np.var(vl) + 100 * (len(models) - len(vl)) space = {} eps = 0.001 N_rely_model = len(rely_model_counter.keys()) for rely_model in rely_model_counter.keys(): space[rely_model] = hp.uniform(rely_model, eps, (1 / N_rely_model) - eps) best = fmin( fn=objective, space=space, algo=tpe.suggest, max_evals=100, rstate=np.random.RandomState(42), show_progressbar=False, ) self.logger.info(f"The best probability is {best}") objective(best, debug=True) self.set_probabilities_in_cs(cs, relied2models, relied2AllModels, all_models, **best) # todo: 将计算的概率缓存
[docs] def purify_isolate_rely_in_hdl(self, hdl: Dict, models: List[str]): # 为了应对estimator中全为boost的情况 # 做法: 删除 __rely_model键 for key, value in hdl.items(): if isinstance(value, dict): ok = False for name, sub_dict in value.items(): if isinstance(sub_dict, dict) and "__rely_model" in purify_keys(sub_dict): ok = True endswith_rely_model_dicts = filter_item_by_key_condition( sub_dict, lambda x: x.endswith("__rely_model")) rely_models = list(endswith_rely_model_dicts.values()) rely_model = rely_models[0] _, hit = self.get_forbid_hit_in_models_by_rely(models, rely_model) if set(hit) == set(models): for key in endswith_rely_model_dicts: sub_dict.pop(key) if not ok: self.purify_isolate_rely_in_hdl(value, models)
[docs] def drop_invalid_rely_in_hdl(self, hdl: Dict, models: List[str]): # 为了应对estimator中没有boost,但是特征工程序列中却有依赖boost的特征工程的情况 # 做法:将这样的特征工程删除 for key, value in hdl.items(): if isinstance(value, dict): ok = False deleted_keys = [] for name, sub_dict in value.items(): if isinstance(sub_dict, dict) and "__rely_model" in purify_keys(sub_dict): ok = True rely_models = filter_item_by_key_condition( sub_dict, lambda x: x.endswith("__rely_model")).values() rely_models = list(rely_models) rely_model = rely_models[0] _, hit = self.get_forbid_hit_in_models_by_rely(models, rely_model) if not hit: deleted_keys.append(name) for deleted_key in deleted_keys: value.pop(deleted_key) if not ok: self.drop_invalid_rely_in_hdl(value, models)
def __call__(self, hdl: Dict): # 对HDL进行处理 models = hdl[f"{PHASE2}(choice)"] self.drop_invalid_rely_in_hdl(hdl, models) self.purify_isolate_rely_in_hdl(hdl, models) RelyModels.info = [] cs = self.recursion(hdl) self.__rely_model(cs) return cs # return { # "shps":cs, # "p":p # } def __condition(self, item: Dict, store: Dict, leader_model): child = add_leader_model(item["_child"], leader_model, SERIES_CONNECT_LEADER_TOKEN) child = store[child] parent = add_leader_model(item["_parent"], leader_model, SERIES_CONNECT_LEADER_TOKEN) parent = store[parent] value = (item["_values"]) if (isinstance(value, list) and len(value) == 1): value = value[0] if isinstance(value, list): cond = InCondition(child, parent, list(map(smac_hdl._encode, value))) else: cond = EqualsCondition(child, parent, smac_hdl._encode(value)) return cond def __forbidden(self, value: List, store: Dict, cs: ConfigurationSpace, leader_model): assert isinstance(value, list) for item in value: assert isinstance(item, dict) clauses = [] for name, forbidden_values in item.items(): true_name = add_leader_model(name, leader_model, SERIES_CONNECT_LEADER_TOKEN) if isinstance(forbidden_values, list) and len(forbidden_values) == 1: forbidden_values = forbidden_values[0] if isinstance(forbidden_values, list): clauses.append(ForbiddenInClause(store[true_name], list(map(smac_hdl._encode, forbidden_values)))) else: clauses.append(ForbiddenEqualsClause(store[true_name], smac_hdl._encode(forbidden_values))) cs.add_forbidden_clause(ForbiddenAndConjunction(*clauses)) # def activate_helper(self,value):
[docs] def reverse_dict(self, dict_: Dict): reversed_dict = defaultdict(list) for key, value in dict_.items(): if isinstance(value, list): for v in value: reversed_dict[v].append(key) else: reversed_dict[value].append(key) reversed_dict = dict(reversed_dict) for key, value in reversed_dict.items(): reversed_dict[key] = list(set(value)) return reversed_dict
[docs] def pop_covered_item(self, dict_: Dict, length: int): dict_ = deepcopy(dict_) should_pop = [] for key, value in dict_.items(): assert isinstance(value, list) if len(value) > length: self.logger.warning("len(value) > length") should_pop.append(key) elif len(value) == length: should_pop.append(key) for key in should_pop: dict_.pop(key) return dict_
def __activate(self, value: Dict, store: Dict, cs: ConfigurationSpace, leader_model): assert isinstance(value, dict) for k, v in value.items(): assert isinstance(v, dict) reversed_dict = self.reverse_dict(v) reversed_dict = self.pop_covered_item(reversed_dict, len(v)) for sk, sv in reversed_dict.items(): cond = self.__condition( { "_child": sk, "_values": sv, "_parent": k }, store, leader_model ) cs.add_condition(cond)
[docs] def recursion(self, hdl: Dict, path=()) -> ConfigurationSpace: cs = ConfigurationSpace() # 检测一下这个dict是否在直接描述超参 key_list = list(hdl.keys()) if len(key_list) == 0: cs.add_hyperparameter(Constant("placeholder", "placeholder")) return cs else: sample_key = key_list[0] sample_value = hdl[sample_key] if is_hdl_bottom(sample_key, sample_value): store = {} conditions_dict = {} for key, value in hdl.items(): if purify_key(key).startswith("__"): conditions_dict[key] = value else: hp = self.__parse_dict_to_config(key, value) cs.add_hyperparameter(hp) store[key] = hp for key, value in conditions_dict.items(): if SERIES_CONNECT_LEADER_TOKEN in key: leader_model, condition_indicator = key.split(SERIES_CONNECT_LEADER_TOKEN) else: leader_model, condition_indicator = None, key if condition_indicator == "__condition": assert isinstance(value, list) for item in value: cond = self.__condition(item, store, leader_model) cs.add_condition(cond) elif condition_indicator == "__activate": self.__activate(value, store, cs, leader_model) elif condition_indicator == "__forbidden": self.__forbidden(value, store, cs, leader_model) elif condition_indicator == "__rely_model": RelyModels.info.append([ value, deepcopy(path) ]) return cs pattern = re.compile(r"(.*)\((.*)\)") for key, value in hdl.items(): mat = pattern.match(key) if mat: groups = mat.groups() assert len(groups) == 2 prefix_name, method = groups value_list = list(value.keys()) assert len(value_list) >= 1 if method == "choice": pass else: raise NotImplementedError() cur_cs = ConfigurationSpace() assert isinstance(value, dict) # 不能用constant,会报错 choice2proba = {} not_specific_proba_choices = [] sum_proba = 0 for k in value_list: v = value[k] if isinstance(v, dict) and "__proba" in v: proba = v.pop("__proba") choice2proba[k] = proba sum_proba += proba else: not_specific_proba_choices.append(k) if sum_proba <= 1: if len(not_specific_proba_choices) > 0: p_rest = (1 - sum_proba) / len(not_specific_proba_choices) for not_specific_proba_choice in not_specific_proba_choices: choice2proba[not_specific_proba_choice] = p_rest else: choice2proba = {k: 1 / len(value_list) for k in value_list} proba_list = [choice2proba[k] for k in value_list] value_list = list(map(smac_hdl._encode, value_list)) # choices must be str option_param = CategoricalHyperparameter('__choice__', value_list, weights=proba_list) # todo : default cur_cs.add_hyperparameter(option_param) for sub_key, sub_value in value.items(): assert isinstance(sub_value, dict) sub_cs = self.recursion(sub_value, path=list(path) + [prefix_name, sub_key]) parent_hyperparameter = {'parent': option_param, 'value': sub_key} cur_cs.add_configuration_space(sub_key, sub_cs, parent_hyperparameter=parent_hyperparameter) cs.add_configuration_space(prefix_name, cur_cs) elif isinstance(value, dict): sub_cs = self.recursion(value, path=list(path) + [key]) cs.add_configuration_space(key, sub_cs) else: raise NotImplementedError() return cs
def __parse_dict_to_config(self, key, value): if isinstance(value, dict): _type = value.get("_type") _value = value.get("_value") _default = value.get("_default") assert _value is not None if _type == "choice": return smac_hdl.choice(key, _value, _default) else: return eval(f'''smac_hdl.{_type}("{key}",*_value,default=_default)''') else: return Constant(key, smac_hdl._encode(value))