Source code for autoflow.hdl.hdl_constructor

from collections import OrderedDict
from copy import deepcopy
from typing import Union, Tuple, List, Any, Dict

import numpy as np
import pandas as pd
from frozendict import frozendict

from autoflow.constants import PHASE1, PHASE2, SERIES_CONNECT_LEADER_TOKEN, SERIES_CONNECT_SEPARATOR_TOKEN
from autoflow.hdl.smac import _encode
from autoflow.hdl.utils import get_hdl_bank, get_default_hdl_bank
from autoflow.utils.dict_ import add_prefix_in_dict_keys, sort_dict
from autoflow.utils.graphviz import ColorSelector
from autoflow.utils.klass import StrSignatureMixin
from autoflow.utils.logging_ import get_logger
from autoflow.utils.math_ import get_int_length
from autoflow.utils.packages import get_class_object_in_pipeline_components


[docs]class HDL_Constructor(StrSignatureMixin): ''' ``HDL`` is abbreviation of Hyper-parameter Descriptions Language. It describes an abstract hyperparametric space that independent with concrete implementation. ``HDL_Constructor`` is a class who is responsible for translating dict-type ``DAG-workflow`` into ``H.D.L`` . If ``DAG-workflow`` didn't be explicit assign (str "generic_recommend" is default ), a generic ``DAG-workflow`` will be recommend by analyzing input data in doing :meth:`run`. And then, by using function :meth:`run` , ``DAG-workflow`` will be translated to ``HDL``. ''' def __init__( self, DAG_workflow: Union[str, Dict[str, Any]] = "generic_recommend", hdl_bank_path=None, hdl_bank=None, hdl_metadata=frozendict(), included_classifiers=( "adaboost", "catboost", "decision_tree", "extra_trees", "gaussian_nb", "knn", "linearsvc", "svc", "lightgbm", "logistic_regression", "random_forest", "sgd"), included_regressors=( "adaboost", "bayesian_ridge", "catboost", "decision_tree", "elasticnet", "extra_trees", "gaussian_process", "knn", "kernel_ridge", "linearsvr", "lightgbm", "random_forest", "sgd"), included_highR_nan_imputers=("operate.drop", "operate.keep_going"), included_imputers=( "impute.adaptive_fill",), included_highC_cat_encoders=("operate.drop", "encode.ordinal", "encode.cat_boost"), included_cat_encoders=("encode.one_hot", "encode.ordinal", "encode.cat_boost"), num2purified_workflow=frozendict({ "num->scaled": ["scale.standardize", "operate.keep_going"], "scaled->purified": ["operate.keep_going", "transform.power"] }), text2purified_workflow=frozendict({ "text->tokenized": "text.tokenize.simple", "tokenized->purified": [ "text.topic.tsvd", "text.topic.lsi", "text.topic.nmf", ] }), date2purified_workflow=frozendict({ }), purified2final_workflow=frozendict({ "purified->final": ["operate.keep_going"] }) ): ''' Parameters ---------- DAG_workflow: str or dict, default="generic_recommend" directed acyclic graph (DAG) workflow to describe the machine-learning procedure. By default, this value is "generic_recommend", means HDL_Constructor will analyze the training data to recommend a valid DAG workflow. If you want design DAG workflow by yourself, you can seed a dict . hdl_bank_path: str, default=None ``hdl_bank`` is a json file which contains all the hyper-parameters of the algorithm. ``hdl_bank_path`` is this file's path. If it is None, ``autoflow/hdl/hdl_bank.json`` will be choosed. hdl_bank: dict, default=None If you pass param ``hdl_bank_path=None`` and pass ``hdl_bank`` as a dict, program will not load ``hdl_bank.json``, it uses passed ``hdl_bank`` directly. included_classifiers: list or tuple active if ``DAG_workflow="generic_recommend"``, and all of the following params will active in such situation. It decides which **classifiers** will consider in the algorithm selection. included_regressors: list or tuple It decides which **regressors** will consider in the algorithm selection. included_highR_nan_imputers: list or tuple ``highR_nan`` is a feature_group, means ``NaN`` has a high ratio in a column. for example: >>> from numpy import NaN >>> column = [1, 2, NaN, NaN, NaN] # nan ratio is 60% , more than 50% (default highR_nan_threshold) ``highR_nan_imputers`` algorithms will handle such columns contain high ratio missing value. included_cat_nan_imputers: list or tuple ``cat_nan`` is a feature_group, means a categorical feature column contains ``NaN`` value. for example: >>> column = ["a", "b", "c", "d", NaN] ``cat_nan_imputers`` algorithms will handle such columns. included_num_nan_imputers: list or tuple ``num_nan`` is a feature_group, means a numerical feature column contains ``NaN`` value. for example: >>> column = [1, 2, 3, 4, NaN] ``num_nan_imputers`` algorithms will handle such columns. included_highC_cat_encoders: list or tuple ``highC_cat`` is a feature_group, means a categorical feature column contains highly cardinality ratio. for example: >>> import numpy as np >>> column = ["a", "b", "c", "d", "a"] >>> rows = len(column) >>> np.unique(column).size / rows # result is 0.8 , is higher than 0.5 (default highR_cat_ratio) 0.8 ``highR_cat_imputers`` algorithms will handle such columns. included_lowR_cat_encoders: list or tuple ``lowR_cat`` is a feature_group, means a categorical feature column contains lowly cardinality ratio. for example: >>> import numpy as np >>> column = ["a", "a", "a", "d", "a"] >>> rows = len(column) >>> np.unique(column).size / rows # result is 0.4 , is lower than 0.5 (default lowR_cat_ratio) 0.4 ``lowR_cat_imputers`` algorithms will handle such columns. Attributes ---------- random_state: int ml_task: :class:`autoflow.utils.ml_task.MLTask` data_manager: :class:`autoflow.manager.data_manager.DataManager` hdl: dict construct by :meth:`run` Examples ---------- >>> import numpy as np >>> from autoflow.manager.data_manager import DataManager >>> from autoflow.hdl.hdl_constructor import HDL_Constructor >>> hdl_constructor = HDL_Constructor(DAG_workflow={"num->target":["lightgbm"]}, ... hdl_bank={"classification":{"lightgbm":{"boosting_type": {"_type": "choice", "_value":["gbdt","dart","goss"]}}}}) >>> data_manager = DataManager(X_train=np.random.rand(3,3), y_train=np.arange(3)) >>> hdl_constructor.run(data_manager, 42, 0.5) >>> hdl_constructor.hdl {'preprocessing': {}, 'estimating(choice)': {'lightgbm': {'boosting_type': {'_type': 'choice', '_value': ['gbdt', 'dart', 'goss']}}}} ''' self.date2purified_workflow = date2purified_workflow self.text2purified_workflow = text2purified_workflow self.purified2final_workflow = purified2final_workflow self.num2purified_workflow = num2purified_workflow self.hdl_metadata = dict(hdl_metadata) self.included_cat_encoders = included_cat_encoders self.included_highC_cat_encoders = included_highC_cat_encoders self.included_imputers = included_imputers self.included_highR_nan_imputers = included_highR_nan_imputers self.included_regressors = included_regressors self.included_classifiers = included_classifiers self.logger = get_logger(self) self.hdl_bank_path = hdl_bank_path self.DAG_workflow = DAG_workflow if hdl_bank is None: if hdl_bank_path: hdl_bank = get_hdl_bank(hdl_bank_path) else: hdl_bank = get_default_hdl_bank() if hdl_bank is None: hdl_bank = {} self.logger.warning("No hdl_bank, will use DAG_descriptions only.") self.hdl_bank = hdl_bank self.random_state = 42 self.ml_task = None self.data_manager = None
[docs] def parse_item(self, value: Union[dict, str]) -> Tuple[str, dict, bool]: value = deepcopy(value) if isinstance(value, dict): packages = value.pop("_name") if "_vanilla" in value: is_vanilla = value.pop("_vanilla") else: is_vanilla = False addition_dict = value elif isinstance(value, str): packages = value addition_dict = {} is_vanilla = False elif value is None: packages = "None" addition_dict = {} is_vanilla = False else: raise TypeError return packages, addition_dict, is_vanilla
[docs] def purify_DAG_describe(self): DAG_describe = {} for k, v in self.DAG_workflow.items(): if not isinstance(v, (list, tuple)): v = [v] DAG_describe[k.replace(" ", "").replace("\n", "").replace("\t", "")] = v self.DAG_workflow = DAG_describe
def _get_params_in_dict(self, dict_: dict, package: str) -> dict: result = deepcopy(dict_) for path in package.split("."): result = result.get(path, {}) return result
[docs] def get_params_in_dict(self, hdl_bank: dict, packages: str, phase: str, mainTask): assert phase in (PHASE1, PHASE2) packages: list = packages.split(SERIES_CONNECT_SEPARATOR_TOKEN) params_list: List[dict] = [self._get_params_in_dict(hdl_bank[PHASE1], package) for package in packages[:-1]] last_phase_key = PHASE1 if phase == PHASE1 else mainTask params_list += [self._get_params_in_dict(hdl_bank[last_phase_key], packages[-1])] if len(params_list) == 0: raise AttributeError elif len(params_list) == 1: return params_list[0] else: result = {} for params, package in zip(params_list, packages): result.update(add_prefix_in_dict_keys(params, package + SERIES_CONNECT_LEADER_TOKEN)) return result
[docs] def generic_recommend(self) -> Dict[str, List[Union[str, Dict[str, Any]]]]: ''' Recommend a generic DAG workflow-space. Returns ------- DAG_workflow: dict ''' DAG_workflow = OrderedDict() contain_highR_nan = False essential_feature_groups = self.data_manager.essential_feature_groups nan_column2essential_fg = self.data_manager.nan_column2essential_fg fg_set = set(essential_feature_groups) nan_fg_set = set(nan_column2essential_fg.values()) # --------Start imputing missing(nan) value-------------------- if "highR_nan" in self.data_manager.feature_groups: DAG_workflow["highR_nan->nan"] = self.included_highR_nan_imputers contain_highR_nan = True if contain_highR_nan or "nan" in self.data_manager.feature_groups: DAG_workflow["nan->imputed"] = self.included_imputers if len(nan_fg_set) > 1: sorted_nan_column2essential_fg = sort_dict(nan_column2essential_fg) sorted_nan_fg = sort_dict(list(nan_fg_set)) DAG_workflow[f"imputed->{','.join(sorted_nan_fg)}"] = {"_name": "operate.split", "column2fg": _encode( sorted_nan_column2essential_fg)} elif len(nan_fg_set) == 1: elem = list(nan_fg_set)[0] DAG_workflow[f"imputed->{elem}"] = "operate.keep_going" else: raise NotImplementedError # --------Start encoding categorical(cat) value -------------------- if "cat" in fg_set: DAG_workflow["cat->purified"] = self.included_cat_encoders if "highC_cat" in fg_set: DAG_workflow["highC_cat->purified"] = self.included_highC_cat_encoders # --------processing text features-------------------- if "text" in fg_set: for k, v in self.text2purified_workflow.items(): DAG_workflow[k] = v # --------processing numerical features-------------------- if "num" in fg_set: for k, v in self.num2purified_workflow.items(): DAG_workflow[k] = v # --------finally processing-------------------- for k, v in self.purified2final_workflow.items(): DAG_workflow[k] = v # --------Start estimating-------------------- mainTask = self.ml_task.mainTask if mainTask == "classification": DAG_workflow["final->target"] = self.included_classifiers elif mainTask == "regression": DAG_workflow["final->target"] = self.included_regressors else: raise NotImplementedError # todo: 如果特征多,做特征选择或者降维。如果特征少,做增维 # todo: 处理样本不平衡 return DAG_workflow
[docs] def draw_workflow_space( self, colorful=True, candidates_colors=("#663366", "#663300", "#666633", "#333366", "#660033"), feature_groups_colors=("#0099CC", "#0066CC", "#339933", "#FFCC33", "#33CC99", "#FF0033", "#663399", "#FF6600") ): ''' Notes ------ You must install graphviz in your compute. if you are using Ubuntu or another debian Linux, you should run:: $ sudo apt-get install graphviz You can also install graphviz by conda:: $ conda install -c conda-forge graphviz Returns ------- graph: :class:`graphviz.dot.Digraph` You can find usage of :class:`graphviz.dot.Digraph` in https://graphviz.readthedocs.io/en/stable/manual.html ''' cand2c = ColorSelector(list(candidates_colors), colorful) feat2c = ColorSelector(list(feature_groups_colors), colorful) def parsed_algorithms(algorithms): parsed_algorithms = [] for algorithm in algorithms: if isinstance(algorithm, dict): parsed_algorithms.append(algorithm["_name"]) else: parsed_algorithms.append(algorithm) return parsed_algorithms def get_node_label(node_name): if colorful: return f'''<<font color="{feat2c[node_name]}">{node_name}</font>>''' else: return node_name def get_multi_out_edge_label(label_name, tail): if colorful: return f'''<{label_name}: <font color="{feat2c[tail]}">{tail}</font>>''' else: return f'''{label_name}: {tail}''' def get_single_algo_edge_label(algorithm): if colorful: return f'<<font color="{cand2c[algorithm]}">{algorithm}</font>>' else: return algorithm def get_algo_selection_edge_label(algorithms): if colorful: return "<{" + ", ".join( [f'<font color="{cand2c[algorithm]}">{algorithm}</font>' for algorithm in algorithms]) + "}>" else: return "{" + ", ".join(algorithms) + "}" try: from graphviz import Digraph except Exception as e: self.logger.warning("Cannot import graphviz!") self.logger.error(str(e)) return None DAG_workflow = deepcopy(self.DAG_workflow) graph = Digraph("workflow_space") graph.node("data") for parsed_node in pd.unique(self.data_manager.feature_groups): graph.node(parsed_node, color=feat2c[parsed_node], label=get_node_label(parsed_node)) graph.edge("data", parsed_node, label=get_multi_out_edge_label("data_manager", parsed_node)) for indicate, algorithms in DAG_workflow.items(): if "->" not in indicate: continue _from, _to = indicate.split("->") graph.node(_from, color=feat2c[_from], label=get_node_label(_from)) algorithms = parsed_algorithms(algorithms) if len(_to.split(",")) > 1: algorithm = algorithms[0] tails = [] for item in _to.split(","): tails.append(item.split("=")[-1]) for tail in tails: graph.node(tail, color=feat2c[tail], label=get_node_label(tail)) graph.edge(_from, tail, get_multi_out_edge_label(algorithm, tail)) else: graph.node(_to, color=feat2c[_to], label=get_node_label(_to)) if len(algorithms) == 1: edge_label = get_single_algo_edge_label(algorithms[0]) else: edge_label = get_algo_selection_edge_label(algorithms) graph.edge(_from, _to, edge_label) graph.attr(label=r'WorkFlow Space') return graph
[docs] def purify_step_name(self, step: str): # autoflow/evaluation/train_evaluator.py:252 cnt = "" ix = 0 for i, c in enumerate(step): if c.isdigit(): cnt += c else: ix = i break cnt = int(cnt) if cnt else -1 step = step[ix:] ignored_suffixs = ["(choice)"] for ignored_suffix in ignored_suffixs: if step.endswith(ignored_suffix): step = step[:len(step) - len(ignored_suffix)] return step
[docs] def get_hdl_dataframe(self) -> pd.DataFrame: preprocessing = self.hdl.get(PHASE1) dicts = [] tuple_multi_index = [] def push(step, algorithm_selection, hyper_params): step = self.purify_step_name(step) if hyper_params: for hp_name, hp_dict in hyper_params.items(): tuple_multi_index.append((step, algorithm_selection, hp_name)) if isinstance(hp_dict, dict): dicts.append({"_type": hp_dict.get("_type"), "_value": hp_dict.get("_value"), "_default": hp_dict.get("_default")}) else: dicts.append({"_type": f"constant {hp_dict.__class__.__name__}", "_value": hp_dict, "_default": ""}) else: tuple_multi_index.append((step, algorithm_selection, "")) dicts.append({"_type": "", "_value": "", "_default": ""}) if preprocessing is not None: for step, algorithm_selections in preprocessing.items(): for algorithm_selection, hyper_params in algorithm_selections.items(): push(step, algorithm_selection, hyper_params) step = PHASE2 algorithm_selections = self.hdl[f"{step}(choice)"] for algorithm_selection, hyper_params in algorithm_selections.items(): push(step, algorithm_selection, hyper_params) df = pd.DataFrame(dicts) multi_index = pd.MultiIndex.from_tuples(tuple_multi_index, names=["step", "algorithm_selections", "hyper_param_name"]) df.index = multi_index return df
[docs] def interactive_display_workflow_space(self): try: from IPython.display import display except Exception as e: self.logger.warning("Cannot import IPython") self.logger.error(str(e)) return None display(self.draw_workflow_space()) display(self.get_hdl_dataframe())
[docs] def run(self, data_manager, model_registry=None): ''' Parameters ---------- data_manager: :class:`autoflow.manager.data_manager.DataManager` highC_cat_threshold: float ''' if model_registry is None: model_registry={} self.data_manager = data_manager self.ml_task = data_manager.ml_task self.highC_cat_threshold = data_manager.highC_cat_threshold self.highR_nan_threshold = data_manager.highR_nan_threshold self.consider_ordinal_as_cat = data_manager.consider_ordinal_as_cat if isinstance(self.DAG_workflow, str): if self.DAG_workflow == "generic_recommend": self.hdl_metadata.update({"source": "generic_recommend"}) self.logger.info("Using 'generic_recommend' method to initialize a generic DAG_workflow, \n" "to Adapt to various data such like NaN and categorical features.") self.DAG_workflow = self.generic_recommend() else: raise NotImplementedError elif isinstance(self.DAG_workflow, dict): self.hdl_metadata.update({"source": "user_defined"}) self.logger.info("DAG_workflow is specifically set by user.") else: raise NotImplementedError target_key = None self.purify_DAG_describe() # ---- 开始将 DAG_workflow 解析成 HDL DAG_workflow = deepcopy(self.DAG_workflow) for step in DAG_workflow.keys(): if step.split("->")[-1] == "target": target_key = step estimator_values = DAG_workflow.pop(target_key) if not isinstance(estimator_values, (list, tuple)): estimator_values = [estimator_values] preprocessing_dict = {} mainTask = self.ml_task.mainTask hdl_bank = deepcopy(self.hdl_bank) # 遍历DAG_describe,构造preprocessing n_steps = len(DAG_workflow) int_len = get_int_length(n_steps) for i, (step, values) in enumerate(DAG_workflow.items()): formed_key = f"{i:0{int_len}d}{step}(choice)" sub_dict = {} for value in values: packages, addition_dict, is_vanilla = self.parse_item(value) assert get_class_object_in_pipeline_components("preprocessing", packages, model_registry) is not None,\ f"In step '{step}', user defined packege : '{packages}' does not exist!" # todo: 适配用户自定义模型 params = {} if is_vanilla else self.get_params_in_dict(hdl_bank, packages, PHASE1, mainTask) sub_dict[packages] = params sub_dict[packages].update(addition_dict) preprocessing_dict[formed_key] = sub_dict # 构造estimator estimator_dict = {} for estimator_value in estimator_values: packages, addition_dict, is_vanilla = self.parse_item(estimator_value) assert get_class_object_in_pipeline_components(data_manager.ml_task.mainTask, packages, model_registry) is not None, \ f"In step '{target_key}', user defined packege : '{packages}' does not exist!" params = {} if is_vanilla else self.get_params_in_dict(hdl_bank, packages, PHASE2, mainTask) estimator_dict[packages] = params estimator_dict[packages].update(addition_dict) final_dict = { PHASE1: preprocessing_dict, f"{PHASE2}(choice)": estimator_dict } self.hdl = final_dict
[docs] def get_hdl(self) -> Dict[str, Any]: ''' Returns ------- hdl: dict ''' # 获取hdl return self.hdl