Source code for autoflow.estimator.base

import datetime
import math
import multiprocessing
import os
from copy import deepcopy
from importlib import import_module
from multiprocessing import Manager
from typing import Union, Optional, Dict, List, Any

import numpy as np
import pandas as pd
from frozendict import frozendict
from sklearn.base import BaseEstimator
from sklearn.model_selection import KFold

from autoflow import constants
from autoflow.ensemble.base import EnsembleEstimator
from autoflow.ensemble.trained_data_fetcher import TrainedDataFetcher
from autoflow.ensemble.trials_fetcher import TrialsFetcher
from autoflow.hdl.hdl_constructor import HDL_Constructor
from autoflow.manager.data_manager import DataManager
from autoflow.manager.resource_manager import ResourceManager
from autoflow.metrics import r2, accuracy
from autoflow.pipeline.dataframe import GenericDataFrame
from autoflow.tuner.tuner import Tuner
from autoflow.utils.concurrence import get_chunks
from autoflow.utils.config_space import replace_phps, estimate_config_space_numbers
from autoflow.utils.dict import update_mask_from_other_dict
from autoflow.utils.klass import instancing, sequencing
from autoflow.utils.logging_ import get_logger, setup_logger
from autoflow.utils.packages import get_class_name_of_module


[docs]class AutoFlowEstimator(BaseEstimator): checked_mainTask = None def __init__( self, tuner: Union[Tuner, List[Tuner], None, dict] = None, hdl_constructor: Union[HDL_Constructor, List[HDL_Constructor], None, dict] = None, resource_manager: Union[ResourceManager, str] = None, random_state=42, log_file: str = None, log_config: Optional[dict] = None, highR_nan_threshold=0.5, highC_cat_threshold=0.5, should_store_intermediate_result=False, should_finally_fit=False, should_calc_all_metrics=True, **kwargs ): ''' Parameters ---------- tuner: :class:`autoflow.tuner.tuner.Tuner` or None ``Tuner`` if class who agent an abstract search process. hdl_constructor: :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` or None ``HDL`` is abbreviation of Hyper-parameter Descriptions Language. It describes an abstract hyperparametric space that independent with concrete implementation. ``HDL_Constructor`` is a class who is responsible for translating dict-type ``DAG-workflow`` into ``H.D.L`` . resource_manager: :class:`autoflow.manager.resource_manager.ResourceManager` or None ``ResourceManager`` is a class manager computer resources such like ``file_system`` and ``data_base``. random_state: int random state log_file: path which file to store log, if is None, ``autoflow.log`` will be used. log_config: dict logging configuration highR_nan_threshold: float high ratio NaN threshold, you can find example and practice in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` highC_cat_threshold: float high ratio categorical feature's cardinality threshold, you can find example and practice in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` kwargs if parameters like ``tuner`` or ``hdl_constructor`` and ``resource_manager`` are passing None, you can passing kwargs to make passed parameter work. See the following example. Examples --------- In this example, you can see a trick to seed kwargs parameters with out initializing :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` or other class. In following example, user pass ``DAG_workflow`` and ``hdl_bank`` by key-work arguments method. And we can see hdl_constructor is instanced by kwargs implicitly. >>> from autoflow import AutoFlowClassifier >>> classifier = AutoFlowClassifier(DAG_workflow={"num->target":["lightgbm"]}, ... hdl_bank={"classification":{"lightgbm":{"boosting_type": {"_type": "choice", "_value":["gbdt","dart","goss"]}}}}) AutoFlowClassifier(hdl_constructor=HDL_Constructor( DAG_workflow={'num->target': ['lightgbm']} hdl_bank_path=None hdl_bank={'classification': {'lightgbm': {'boosting_type': {'_type': 'choice', '_value': ['gbdt', 'dart', 'goss']}}}} included_classifiers=('adaboost', 'catboost', 'decision_tree', 'extra_trees', 'gaussian_nb', 'knn', 'linearsvc', 'lib... ''' self.should_finally_fit = should_finally_fit self.should_store_intermediate_result = should_store_intermediate_result self.should_calc_all_metrics = should_calc_all_metrics self.log_config = log_config self.highR_nan_threshold = highR_nan_threshold self.highC_cat_threshold = highC_cat_threshold # ---logger------------------------------------ self.log_file = log_file setup_logger(self.log_file, self.log_config) self.logger = get_logger(self) # ---random_state----------------------------------- self.random_state = random_state # ---tuner----------------------------------- tuner = instancing(tuner, Tuner, kwargs) # ---tuners----------------------------------- self.tuners = sequencing(tuner, Tuner) self.tuner = self.tuners[0] # ---hdl_constructor-------------------------- hdl_constructor = instancing(hdl_constructor, HDL_Constructor, kwargs) # ---hdl_constructors------------------------- self.hdl_constructors = sequencing(hdl_constructor, HDL_Constructor) self.hdl_constructor = self.hdl_constructors[0] # ---resource_manager----------------------------------- self.resource_manager = instancing(resource_manager, ResourceManager, kwargs) # ---member_variable------------------------------------ self.estimator = None self.ensemble_estimator = None
[docs] def fit( self, X_train: Union[np.ndarray, pd.DataFrame, GenericDataFrame], y_train=None, X_test=None, y_test=None, column_descriptions: Optional[Dict] = None, metric=None, splitter=KFold(5, True, 42), specific_task_token="", additional_info: dict = frozendict(), dataset_metadata: dict = frozenset(), task_metadata: dict = frozendict(), fit_ensemble_params: Union[str, Dict[str, Any], None, bool] = "auto", ): ''' Parameters ---------- X_train: :class:`numpy.ndarray` or :class:`pandas.DataFrame` y_train: :class:`numpy.ndarray` or str X_test: :class:`numpy.ndarray` or :class:`pandas.DataFrame` y_test: :class:`numpy.ndarray` or str or None column_descriptions: dict Description about each columns' feature_group, you can find full definition in :class:`autoflow.manager.data_manager.DataManager` . dataset_metadata: dict Dataset's metadata metric: :class:`autoflow.metrics.Scorer` or None If ``metric`` is None: if it's classification task, :obj:`autoflow.metrics.accuracy` will be used by default. if it's regressor task, :obj:`autoflow.metrics.r2` will be used by default. should_calc_all_metrics: bool If ``True``, all the metrics supported in current task will be calculated, result will be store in databbase. splitter: object Default is ``KFold(5, True, 42)`` object. You can pass this param defined by yourself or other package, like :class:`sklearn.model_selection.StratifiedKFold`. specific_task_token: str should_store_intermediate_result: bool additional_info: dict fit_ensemble_params: str, dict, None, bool If this param is None, program will not do ensemble. If this param is "auto" or True, the top 10 models will be integrated by stacking ensemble. Returns ------- self ''' dataset_metadata = dict(dataset_metadata) additional_info = dict(additional_info) task_metadata = dict(task_metadata) # build data_manager self.data_manager = DataManager( X_train, y_train, X_test, y_test, dataset_metadata, column_descriptions, self.highR_nan_threshold ) self.ml_task = self.data_manager.ml_task if self.checked_mainTask is not None: if self.checked_mainTask != self.ml_task.mainTask: if self.checked_mainTask == "regression": self.ml_task = constants.regression_task self.data_manager.ml_task = self.ml_task else: self.logger.error( f"This task is supposed to be {self.checked_mainTask} task ,but the target data is {self.ml_task}.") raise ValueError # parse metric if metric is None: if self.ml_task.mainTask == "regression": metric = r2 elif self.ml_task.mainTask == "classification": metric = accuracy else: raise NotImplementedError() self.metric = metric # get task_id, and insert record into "tasks.tasks" database self.resource_manager.insert_to_tasks_table(self.data_manager, metric, splitter, specific_task_token, dataset_metadata, task_metadata) self.resource_manager.close_tasks_table() # store other params self.splitter = splitter assert len(self.hdl_constructors) == len(self.tuners) n_step = len(self.hdl_constructors) general_experiment_timestamp = datetime.datetime.now() for step, (hdl_constructor, tuner) in enumerate(zip(self.hdl_constructors, self.tuners)): current_experiment_timestamp = datetime.datetime.now() hdl_constructor.run(self.data_manager, self.random_state, self.highC_cat_threshold) raw_hdl = hdl_constructor.get_hdl() if step != 0: last_best_dhp = self.resource_manager.load_best_dhp() hdl = update_mask_from_other_dict(raw_hdl, last_best_dhp) self.logger.debug(f"Updated HDL(Hyperparams Descriptions Language) in step {step}:\n{hdl}") else: hdl = raw_hdl # get hdl_id, and insert record into "{task_id}.hdls" database self.resource_manager.insert_to_hdls_table(hdl, hdl_constructor.hdl_metadata) self.resource_manager.close_hdls_table() # now we get task_id and hdl_id, we can insert current runtime information into "experiments.experiments" database self.resource_manager.insert_to_experiments_table(general_experiment_timestamp, current_experiment_timestamp, self.hdl_constructors, hdl_constructor, raw_hdl, hdl, self.tuners, tuner, self.should_calc_all_metrics, self.data_manager, column_descriptions, dataset_metadata, metric, splitter, self.should_store_intermediate_result, fit_ensemble_params, additional_info, self.should_finally_fit) self.resource_manager.close_experiments_table() self.task_id = self.resource_manager.task_id self.hdl_id = self.resource_manager.hdl_id self.experiment_id = self.resource_manager.experiment_id self.logger.info(f"task_id:\t{self.task_id}") self.logger.info(f"hdl_id:\t{self.hdl_id}") self.logger.info(f"experiment_id:\t{self.experiment_id}") result = self.start_tuner(tuner, hdl) if result["is_manual"] == True: break if step == n_step - 1: self.start_final_step(fit_ensemble_params) return self
[docs] def get_sync_dict(self, n_jobs, tuner): if n_jobs > 1 and tuner.search_method != "grid": sync_dict = Manager().dict() sync_dict["exit_processes"] = tuner.exit_processes else: sync_dict = None return sync_dict
[docs] def start_tuner(self, tuner: Tuner, hdl: dict): self.logger.debug(f"Start fine tune task, \nwhich HDL(Hyperparams Descriptions Language) is:\n{hdl}") self.logger.debug(f"which Tuner is:\n{tuner}") tuner.set_data_manager(self.data_manager) tuner.set_random_state(self.random_state) tuner.set_hdl(hdl) # just for get shps of tuner if estimate_config_space_numbers(tuner.shps) == 1: self.logger.info("HDL(Hyperparams Descriptions Language) is a constant space, using manual modeling.") dhp, self.estimator = tuner.evaluator.shp2model(tuner.shps.sample_configuration()) self.estimator.fit(self.data_manager.X_train, self.data_manager.y_train) return {"is_manual": True} n_jobs = tuner.n_jobs run_limits = [math.ceil(tuner.run_limit / n_jobs)] * n_jobs is_master_list = [False] * n_jobs is_master_list[0] = True initial_configs_list = get_chunks( tuner.design_initial_configs(n_jobs), n_jobs) random_states = np.arange(n_jobs) + self.random_state sync_dict = self.get_sync_dict(n_jobs, tuner) self.resource_manager.clear_pid_list() self.resource_manager.close_all() resource_managers = [deepcopy(self.resource_manager) for i in range(n_jobs)] tuners = [deepcopy(tuner) for i in range(n_jobs)] processes = [] # todo: 重构 sync_dict for tuner, resource_manager, run_limit, initial_configs, is_master, random_state in \ zip(tuners, resource_managers, run_limits, initial_configs_list, is_master_list, random_states): args = (tuner, resource_manager, run_limit, initial_configs, is_master, random_state, sync_dict) if n_jobs == 1: self.run(*args) else: p = multiprocessing.Process( target=self.run, args=args ) processes.append(p) p.start() for p in processes: p.join() return {"is_manual": False}
[docs] def start_final_step(self, fit_ensemble_params): if isinstance(fit_ensemble_params, str): if fit_ensemble_params == "auto": self.logger.info(f"'fit_ensemble_params' is 'auto', use default params to fit_ensemble_params.") self.estimator = self.fit_ensemble() else: raise NotImplementedError elif isinstance(fit_ensemble_params, bool): if fit_ensemble_params: self.logger.info(f"'fit_ensemble_params' is True, use default params to fit_ensemble_params.") self.estimator = self.fit_ensemble() else: self.logger.info( f"'fit_ensemble_params' is False, don't fit_ensemble but use best trial as result.") self.estimator = self.resource_manager.load_best_estimator(self.ml_task) elif isinstance(fit_ensemble_params, dict): self.logger.info( f"'fit_ensemble_params' is specific: {fit_ensemble_params}.") self.estimator = self.fit_ensemble(**fit_ensemble_params) elif fit_ensemble_params is None: self.logger.info( f"'fit_ensemble_params' is None, don't fit_ensemble but use best trial as result.") self.estimator = self.resource_manager.load_best_estimator(self.ml_task) else: raise NotImplementedError
[docs] def run(self, tuner, resource_manager, run_limit, initial_configs, is_master, random_state, sync_dict=None): if sync_dict: sync_dict[os.getpid()] = 0 resource_manager.sync_dict = sync_dict resource_manager.set_is_master(is_master) resource_manager.push_pid_list() # random_state: 1. set_hdl中传给phps 2. 传给所有配置 tuner.random_state = random_state tuner.run_limit = run_limit tuner.set_resource_manager(resource_manager) # 替换搜索空间中的 random_state replace_phps(tuner.shps, "random_state", int(random_state)) tuner.shps.seed(random_state) # todo : 增加 n_jobs ? 调研默认值 tuner.run( initial_configs=initial_configs, evaluator_params=dict( random_state=random_state, data_manager=self.data_manager, metric=self.metric, should_calc_all_metric=self.should_calc_all_metrics, splitter=self.splitter, should_store_intermediate_result=self.should_store_intermediate_result, resource_manager=resource_manager, should_finally_fit=self.should_finally_fit ), instance_id=resource_manager.task_id, rh_db_type=resource_manager.db_type, rh_db_params=resource_manager.runhistory_db_params, rh_db_table_name=resource_manager.runhistory_table_name ) if sync_dict: sync_dict[os.getpid()] = 1
[docs] def fit_ensemble( self, task_id=None, hdl_id=None, trials_fetcher="GetBestK", trials_fetcher_params=frozendict(k=10), ensemble_type="stack", ensemble_params=frozendict(), return_Xy_test=False ): if task_id is None: assert hasattr(self.resource_manager, "task_id") and self.resource_manager.task_id is not None task_id = self.resource_manager.task_id # if hdl_id is None: # assert hasattr(self.resource_manager, "hdl_id") and self.resource_manager.hdl_id is not None # hdl_id = self.resource_manager.hdl_id trials_fetcher_name = trials_fetcher from autoflow.ensemble import trials_fetcher assert hasattr(trials_fetcher, trials_fetcher_name) trials_fetcher_cls = getattr(trials_fetcher, trials_fetcher_name) trials_fetcher: TrialsFetcher = trials_fetcher_cls(resource_manager=self.resource_manager, task_id=task_id, hdl_id=hdl_id, **trials_fetcher_params) trial_ids = trials_fetcher.fetch() estimator_list, y_true_indexes_list, y_preds_list = TrainedDataFetcher( task_id, hdl_id, trial_ids, self.resource_manager).fetch() ml_task, Xy_train, Xy_test = self.resource_manager.get_ensemble_needed_info(task_id, hdl_id) y_true = Xy_train[1] ensemble_estimator_package_name = f"autoflow.ensemble.{ensemble_type}.{ml_task.role}" ensemble_estimator_package = import_module(ensemble_estimator_package_name) ensemble_estimator_class_name = get_class_name_of_module(ensemble_estimator_package_name) ensemble_estimator_class = getattr(ensemble_estimator_package, ensemble_estimator_class_name) ensemble_estimator: EnsembleEstimator = ensemble_estimator_class(**ensemble_params) ensemble_estimator.fit_trained_data(estimator_list, y_true_indexes_list, y_preds_list, y_true) self.ensemble_estimator = ensemble_estimator if return_Xy_test: return self.ensemble_estimator, Xy_test else: return self.ensemble_estimator
[docs] def auto_fit_ensemble(self): # todo: 调研stacking等ensemble方法的表现评估 pass
def _predict( self, X_test, task_id=None, trial_id=None, experiment_id=None, column_descriptions: Optional[Dict] = None, highR_nan_threshold=0.5 ): is_set_X_test = False if hasattr(self, "data_manager") and self.data_manager is not None: self.logger.info( "'data_manager' is existing in AutoFlowEstimator, will not load it from database or create it.") else: if task_id is not None: _experiment_id = self.resource_manager.get_experiment_id_by_task_id(task_id) elif experiment_id is not None: _experiment_id = experiment_id elif hasattr(self, "experiment_id") and self.experiment_id is not None: _experiment_id = self.experiment_id else: _experiment_id = None if _experiment_id is None: self.logger.info( "'_experiment_id' is not exist, initializing data_manager by user given parameters.") self.data_manager = DataManager(X_test, column_descriptions=column_descriptions, highR_nan_threshold=highR_nan_threshold) is_set_X_test = True else: self.logger.info( "'_experiment_id' is exist, loading data_manager by query meta_record.experiments database.") self.data_manager: DataManager = self.resource_manager.load_data_manager_by_experiment_id( _experiment_id) if not is_set_X_test: self.data_manager.set_data(X_test=X_test) if self.estimator is None: self.logger.warning( f"'{self.__class__.__name__}' 's estimator is None, maybe you didn't use fit method to train the data.\n" f"We try to query trials database if you seed trial_id specifically.") raise NotImplementedError