import datetime
import sys
from collections import defaultdict
from contextlib import redirect_stderr
from io import StringIO
from time import time
from typing import Dict, Optional, List
import numpy as np
from autoflow.constants import PHASE2, PHASE1, SERIES_CONNECT_LEADER_TOKEN, SERIES_CONNECT_SEPARATOR_TOKEN, \
SUBSAMPLES_BUDGET_MODE, ITERATIONS_BUDGET_MODE
from autoflow.data_container import DataFrameContainer, NdArrayContainer
from autoflow.data_manager import DataManager
from autoflow.ensemble.utils import vote_predicts, mean_predicts
from autoflow.evaluation.budget import implement_subsample_budget
from autoflow.hdl.shp2dhp import SHP2DHP
from autoflow.hpbandster.core.worker import Worker
from autoflow.metrics import Scorer, calculate_score, calculate_confusion_matrix
from autoflow.resource_manager.base import ResourceManager
from autoflow.utils.dict_ import group_dict_items_before_first_token
from autoflow.utils.hash import get_hash_of_config
from autoflow.utils.klass import StrSignatureMixin
from autoflow.utils.ml_task import MLTask
from autoflow.utils.packages import get_class_object_in_pipeline_components
from autoflow.utils.pipeline import concat_pipeline
from autoflow.utils.sys_ import get_trance_back_msg
from autoflow.workflow.components.base import AutoFlowIterComponent
from autoflow.workflow.ml_workflow import ML_Workflow
[docs]class TrainEvaluator(Worker, StrSignatureMixin):
def __init__(
self,
run_id,
data_manager: DataManager,
resource_manager: ResourceManager,
random_state: int,
metric: Scorer,
groups: List[int],
should_calc_all_metric: bool,
splitter,
should_store_intermediate_result: bool,
should_stack_X: bool,
should_finally_fit: bool,
model_registry: dict,
budget2kfold: Optional[Dict[float, int]] = None,
algo2budget_mode: Optional[Dict[str, str]] = None,
algo2iter: Optional[Dict[str, int]] = None,
nameserver=None,
nameserver_port=None,
host=None,
worker_id=None,
timeout=None,
):
super(TrainEvaluator, self).__init__(
run_id, nameserver, nameserver_port, host, worker_id, timeout
)
self.algo2iter = algo2iter
self.algo2budget_mode = algo2budget_mode
self.budget2kfold = budget2kfold
self.timeout = timeout
self.id = worker_id
self.host = host
self.nameserver_port = nameserver_port
self.nameserver = nameserver
self.model_registry = model_registry
self.should_finally_fit = should_finally_fit
self.resource_manager = resource_manager
self.should_stack_X = should_stack_X
self.should_store_intermediate_result = should_store_intermediate_result
self.splitter = splitter
self.should_calc_all_metric = should_calc_all_metric
self.groups = groups
self.metric = metric
self.data_manager = data_manager
self.random_state = random_state
self.run_id = run_id
# ---member variable----
self.debug = False
self.ml_task: MLTask = self.data_manager.ml_task
if self.ml_task.mainTask == "regression":
self.predict_function = self._predict_regression
else:
self.predict_function = self._predict_proba
self.X_train = self.data_manager.X_train
self.y_train = self.data_manager.y_train
self.X_test = self.data_manager.X_test
self.y_test = self.data_manager.y_test
self.rng = np.random.RandomState(self.random_state)
[docs] def loss(self, y_true, y_hat):
score, true_score = calculate_score(
y_true, y_hat, self.ml_task, self.metric,
should_calc_all_metric=self.should_calc_all_metric)
if isinstance(score, dict):
err = self.metric._optimum - score[self.metric.name]
all_score = true_score
elif isinstance(score, (int, float)):
err = self.metric._optimum - score
all_score = None
else:
raise TypeError
return err, all_score
def _predict_proba(self, X, model):
y_pred = model.predict_proba(X)
return y_pred
def _predict_regression(self, X, model):
y_pred = model.predict(X)
if len(y_pred.shape) == 1:
y_pred = y_pred.reshape((-1, 1))
return y_pred
[docs] def get_Xy(self):
# fixme: 会出现结果被改变的情况!
# 目前这个bug在autoflow.workflow.components.preprocessing.operate.keep_going.KeepGoing 出现过
# fixme: autoflow.manager.data_container.dataframe.DataFrameContainer#sub_sample 函数采用deepcopy,
# 应该能从源头上解决X_train数据集的问题,但是要注意X_test
return (self.X_train), (self.y_train), (self.X_test), (self.y_test)
# return deepcopy(self.X_train), deepcopy(self.y_train), deepcopy(self.X_test), deepcopy(self.y_test)
[docs] def evaluate(self, config_id, model: ML_Workflow, X, y, X_test, y_test, budget):
warning_info = StringIO()
additional_info = {}
final_model = model[-1]
final_model_name = final_model.__class__.__name__
support_early_stopping = getattr(final_model, "support_early_stopping", False)
budget_mode = self.algo2budget_mode[final_model_name]
is_iter_algo = self.algo2iter.get(final_model_name) is not None
max_iter = -1
# if final model is iterative algorithm, max_iter should be specified
if is_iter_algo:
if budget_mode == ITERATIONS_BUDGET_MODE:
fraction = min(1, budget)
else:
fraction = 1
max_iter = max(round(self.algo2iter[final_model_name] * fraction), 1)
with redirect_stderr(warning_info):
losses = []
models = []
y_true_indexes = []
y_preds = []
y_test_preds = []
all_scores = []
status = "SUCCESS"
failed_info = ""
intermediate_results = []
start_time = datetime.datetime.now()
confusion_matrices = []
best_iterations = []
for fold_ix, (train_index, valid_index) in enumerate(self.splitter.split(X.data, y.data, self.groups)):
cloned_model = model.copy()
X: DataFrameContainer
X_train = X.sub_sample(train_index)
X_valid = X.sub_sample(valid_index)
y_train = y.sub_sample(train_index)
y_valid = y.sub_sample(valid_index)
# subsamples budget_mode.
if fold_ix == 0 and budget_mode == SUBSAMPLES_BUDGET_MODE and budget < 1:
X_train, y_train, (X_valid, X_test) = implement_subsample_budget(
X_train, y_train, [X_valid, X_test],
budget, self.random_state
)
cache_key = self.get_cache_key(config_id, X_train, y_train)
cached_model = self.resource_manager.cache.get(cache_key)
if cached_model is not None:
cloned_model = cached_model
# 如果是iterations budget mode, 采用一个统一的接口调整 max_iter
# 未来争取做到能缓存ML_Workflow, 只训练最后的拟合器
try:
procedure_result = cloned_model.procedure(
self.ml_task, X_train, y_train, X_valid, y_valid,
X_test, y_test, max_iter
)
except Exception as e:
self.logger.error(str(e))
failed_info = get_trance_back_msg()
status = "FAILED" # todo: 实现 timeout, memory out
if self.debug:
self.logger.error("re-raise exception")
raise sys.exc_info()[1]
break
# save model as cache
if (budget_mode == ITERATIONS_BUDGET_MODE and budget <= 1 and
isinstance(final_model, AutoFlowIterComponent)) or \
(budget == 1):
self.resource_manager.cache.set(cache_key, cloned_model)
intermediate_results.append(cloned_model.intermediate_result)
models.append(cloned_model)
y_true_indexes.append(valid_index)
y_pred = procedure_result["pred_valid"]
y_test_pred = procedure_result["pred_test"]
if self.ml_task.mainTask == "classification":
confusion_matrices.append(calculate_confusion_matrix(y_valid.data, y_pred))
if support_early_stopping:
estimator = cloned_model.steps[-1][1]
# todo: 重构 LGBM等
best_iterations.append(getattr(
estimator, "best_iteration_", getattr(estimator.component, "best_iteration_", -1)))
y_preds.append(y_pred)
if y_test_pred is not None:
y_test_preds.append(y_test_pred)
loss, all_score = self.loss(y_valid.data, y_pred) # todo: 非1d-array情况下的用户自定义评估器
losses.append(float(loss))
all_scores.append(all_score)
# when budget <= 1 , hold out validation
if fold_ix == 0 and budget <= 1:
break
# when budget > 1 , budget will be interpreted as kfolds num by 'budget2kfold'
# for example, budget = 4 , budget2kfold = {4: 10}, we only do 10 times cross-validation,
# so we break when fold_ix == 10 - 1 == 9
if budget > 1 and fold_ix == self.budget2kfold[budget] - 1:
break
if self.ml_task.mainTask == "classification":
additional_info["confusion_matrices"] = confusion_matrices
if support_early_stopping:
additional_info["best_iterations"] = best_iterations
end_time = datetime.datetime.now()
# finally fit
if status == "SUCCESS" and self.should_finally_fit:
# make sure have resource_manager to do things like connect redis
model.resource_manager = self.resource_manager
finally_fit_model = model.fit(X, y, X_test=X_test, y_test=y_test)
if self.ml_task.mainTask == "classification":
y_test_pred_by_finally_fit_model = model.predict_proba(X_test)
else:
y_test_pred_by_finally_fit_model = model.predict(X_test)
model.resource_manager = None
else:
finally_fit_model = None
y_test_pred_by_finally_fit_model = None
if len(losses) > 0:
final_loss = float(np.array(losses).mean())
else:
final_loss = 65535
if len(all_scores) > 0 and all_scores[0]:
all_score = defaultdict(list)
for cur_all_score in all_scores:
if isinstance(cur_all_score, dict):
for key, value in cur_all_score.items():
all_score[key].append(value)
else:
self.logger.warning(f"TypeError: cur_all_score is not dict.\ncur_all_score = {cur_all_score}")
for key in all_score.keys():
all_score[key] = float(np.mean(all_score[key]))
else:
all_score = {}
all_scores = []
info = {
"loss": final_loss,
"losses": losses,
"all_score": all_score,
"all_scores": all_scores,
"models": models,
"finally_fit_model": finally_fit_model,
"y_true_indexes": y_true_indexes,
"y_preds": y_preds,
"intermediate_results": intermediate_results,
"status": status,
"failed_info": failed_info,
"start_time": start_time,
"end_time": end_time,
"additional_info": additional_info
}
# todo
if y_test is not None:
# 验证集训练模型的组合去预测测试集的数据
if self.should_finally_fit:
y_test_pred = y_test_pred_by_finally_fit_model
else:
if self.ml_task.mainTask == "classification":
y_test_pred = vote_predicts(y_test_preds)
else:
y_test_pred = mean_predicts(y_test_preds)
test_loss, test_all_score = self.loss(y_test.data, y_test_pred)
# todo: 非1d-array情况下的用户自定义评估器
info.update({
"test_loss": test_loss,
"test_all_score": test_all_score,
# "y_test_true": y_test,
"y_test_pred": y_test_pred
})
info["warning_info"] = warning_info.getvalue()
return info
[docs] def compute(self, config: dict, config_info: dict, budget: float, **kwargs):
# 1. 将php变成model
config_id = get_hash_of_config(config)
start = time()
dhp, model = self.shp2model(config)
# 2. 获取数据
X_train, y_train, X_test, y_test = self.get_Xy()
# todo: iter budget类型的支持
# 3. 进行评价
info = self.evaluate(config_id, model, X_train, y_train, X_test, y_test, budget)
# 4. 持久化
cost_time = time() - start
info["config_id"] = config_id
info["config"] = config
info["config_info"] = config_info
info["budget"] = budget
# info["instance_id"] = self.run_id
# info["run_id"] = self.run_id
# info["program_hyper_param"] = shp
info["dict_hyper_param"] = dhp
estimator = list(dhp.get(PHASE2, {"unk": ""}).keys())[0]
info["component"] = estimator
info["cost_time"] = cost_time
# info["additional_info"].update({
# "config_origin": getattr(shp, "origin", "unk")
# })
# fixme: 改到result_logger
trial_id = self.resource_manager.insert_trial_record(info)
return {
"loss": info["loss"],
"info": {
"config_id": config_id,
"trial_id": trial_id
},
}
[docs] def shp2model(self, shp):
shp2dhp = SHP2DHP()
dhp = shp2dhp(shp)
# todo : 引入一个参数,描述运行模式。一共有3种模式:普通,深度学习,大数据。对以下三个翻译的步骤进行重构
preprocessor = self.create_preprocessor(dhp)
estimator = self.create_estimator(dhp)
pipeline = concat_pipeline(preprocessor, estimator)
return dhp, pipeline
[docs] def parse_key(self, key: str):
cnt = ""
ix = 0
for i, c in enumerate(key):
if c.isdigit():
cnt += c
else:
ix = i
break
cnt = int(cnt)
key = key[ix:]
# todo: 支持多结点的输入输出,与dataframe.py耦合
if "->" in key:
_from, _to = key.split("->")
in_feature_groups = _from.split(",")[0]
out_feature_groups = _to.split(",")[0]
else:
in_feature_groups, out_feature_groups = None, None
if not in_feature_groups:
in_feature_groups = None
if not out_feature_groups:
out_feature_groups = None
return in_feature_groups, out_feature_groups
[docs] def create_preprocessor(self, dhp: Dict) -> Optional[ML_Workflow]:
preprocessing_dict: dict = dhp[PHASE1]
pipeline_list = []
for key, value in preprocessing_dict.items():
name = key # like: "cat->num"
in_feature_groups, out_feature_groups = self.parse_key(key)
sub_dict = preprocessing_dict[name]
if sub_dict is None:
continue
preprocessor = self.create_component(sub_dict, PHASE1, name, in_feature_groups, out_feature_groups,
)
pipeline_list.extend(preprocessor)
if pipeline_list:
return ML_Workflow(pipeline_list, self.should_store_intermediate_result, self.resource_manager)
else:
return None
[docs] def create_estimator(self, dhp: Dict) -> ML_Workflow:
# 根据超参构造一个估计器
return ML_Workflow(self.create_component(dhp[PHASE2], PHASE2, self.ml_task.role),
self.should_store_intermediate_result, self.resource_manager)
def _create_component(self, key1, key2, params):
cls = get_class_object_in_pipeline_components(key1, key2, self.model_registry)
component = cls(**params)
# component.set_inside_dict(self.addition_info)
return component
[docs] def create_component(self, sub_dhp: Dict, phase: str, step_name, in_feature_groups="all", out_feature_groups="all",
outsideEdge_info=None):
pipeline_list = []
assert phase in (PHASE1, PHASE2)
packages = list(sub_dhp.keys())[0]
params = sub_dhp[packages]
packages = packages.split(SERIES_CONNECT_SEPARATOR_TOKEN)
grouped_params = group_dict_items_before_first_token(params, SERIES_CONNECT_LEADER_TOKEN)
if len(packages) == 1:
if bool(grouped_params):
grouped_params[packages[0]] = grouped_params.pop("single")
else:
grouped_params[packages[0]] = {}
for package in packages[:-1]:
preprocessor = self._create_component(PHASE1, package, grouped_params[package])
preprocessor.in_feature_groups = in_feature_groups
preprocessor.out_feature_groups = in_feature_groups
pipeline_list.append([
package,
preprocessor
])
key1 = PHASE1 if phase == PHASE1 else self.ml_task.mainTask
hyperparams = grouped_params[packages[-1]]
if outsideEdge_info:
hyperparams.update(outsideEdge_info)
component = self._create_component(key1, packages[-1], hyperparams)
component.in_feature_groups = in_feature_groups
component.out_feature_groups = out_feature_groups
if not self.should_stack_X:
setattr(component, "need_y", True)
pipeline_list.append([
step_name,
component
])
return pipeline_list
[docs] def get_cache_key(self, config_id, X_train: DataFrameContainer, y_train: NdArrayContainer):
return "-".join([config_id, X_train.get_hash(), y_train.get_hash()])