ultraopt.optimizer.bo.sampling_sort_opt 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : qichun tang
# @Date    : 2020-12-14
# @Contact    : qichun.tang@bupt.edu.cn

import itertools
from copy import deepcopy
from functools import partial
from typing import Tuple, List

import numpy as np
from ConfigSpace import Configuration
from ConfigSpace.util import get_one_exchange_neighbourhood
from skopt.learning.forest import ExtraTreesRegressor

from ultraopt.optimizer.base_opt import BaseOptimizer
from ultraopt.optimizer.bo.config_evaluator import ConfigEvaluator
from ultraopt.utils.config_space import add_configs_origin
from ultraopt.utils.config_transformer import ConfigTransformer
from ultraopt.utils.loss_transformer import LossTransformer, LogScaledLossTransformer, ScaledLossTransformer

get_one_exchange_neighbourhood = partial(get_one_exchange_neighbourhood, stdev=0.05, num_neighbors=8)


[文档]class SamplingSortOptimizer(BaseOptimizer): def __init__( self, # model related epm=None, config_transformer=None, # several hyper-parameters use_local_search=False, loss_transformer="log_scaled", min_points_in_model=15, n_samples=5000, acq_func="LogEI", xi=0 ): super(SamplingSortOptimizer, self).__init__() # ----------member variables----------------- self.xi = xi self.acq_func = acq_func self.use_local_search = use_local_search self.n_samples = n_samples self.min_points_in_model = min_points_in_model # ----------components----------------- # experiment performance model self.epm = epm if epm is not None else ExtraTreesRegressor() # config transformer self.config_transformer = config_transformer if config_transformer is not None else ConfigTransformer() # loss transformer if loss_transformer is None: self.loss_transformer = LossTransformer() elif loss_transformer == "log_scaled": self.loss_transformer = LogScaledLossTransformer() elif loss_transformer == "scaled": self.loss_transformer = ScaledLossTransformer() else: raise NotImplementedError
[文档] def initialize(self, config_space, budgets=(1,), random_state=42, initial_points=None, budget2obvs=None): super(SamplingSortOptimizer, self).initialize(config_space, budgets, random_state, initial_points, budget2obvs) self.budget2epm = {budget: None for budget in budgets} self.config_transformer.fit(config_space) self.budget2confevt = {} for budget in budgets: config_evaluator = ConfigEvaluator(self.budget2epm, budget, self.acq_func, {"xi": self.xi}) self.budget2confevt[budget] = config_evaluator self.update_weight_cnt = 0
def _new_result(self, budget, vectors: np.ndarray, losses: np.ndarray): if len(losses) < self.min_points_in_model: return X_obvs = self.config_transformer.transform(vectors) y_obvs = self.loss_transformer.fit_transform(losses) if self.budget2epm[budget] is None: epm = deepcopy(self.epm) else: epm = self.budget2epm[budget] self.budget2epm[budget] = epm.fit(X_obvs, y_obvs) def _get_config(self, budget, max_budget): # choose model from max-budget epm = self.budget2epm[max_budget] # random sampling if epm is None: return self.pick_random_initial_config(budget) # model based pick info_dict = {"model_based_pick": True} # using config_evaluator evaluate random samples configs = self.config_space.sample_configuration(self.n_samples) losses, configs_sorted = self.evaluate(configs, max_budget, return_loss_config=True) add_configs_origin(configs_sorted, "Random Search (Sorted)") if self.use_local_search: start_points = self.get_local_search_initial_points(max_budget, 10, configs_sorted) local_losses, local_configs = self.local_search(start_points, max_budget) add_configs_origin(local_configs, "Local Search") concat_losses = np.hstack([losses.flatten(), local_losses.flatten()]) concat_configs = configs + local_configs random_var = self.rng.rand(len(concat_losses)) indexes = np.lexsort((random_var.flatten(), concat_losses)) concat_configs_sorted = [concat_configs[i] for i in indexes] concat_losses = concat_losses[indexes] else: concat_losses, concat_configs_sorted = losses, configs_sorted # 选取获益最大,且没有出现过的一个配置 for i, config in enumerate(concat_configs_sorted): if self.is_config_exist(budget, config): self.logger.debug(f"The sample already exists and needs to be resampled. " f"It's the {i}-th time sampling in bayesian sampling. ") else: return self.process_config_info_pair(config, info_dict, budget) return self.process_all_configs_exist(info_dict, budget)
[文档] def get_available_max_budget(self): budgets = [budget for budget in self.budget2epm.keys() if budget > 0] sorted_budgets = sorted(budgets) for budget in sorted(budgets, reverse=True): if budget <= 0: continue if self.budget2epm[budget] is not None: return budget return sorted_budgets[0]
[文档] def get_local_search_initial_points(self, budget, num_points, additional_start_points): # 对之前的样本做评价 # 1. 按acq排序,前num_points的历史样本 config_evaluator = self.budget2confevt[budget] configs_previous_runs = self.budget2obvs[budget]["configs"] X_trans = self.transform(configs_previous_runs) y_opt = np.min(self.budget2obvs[budget]["losses"]) rewards = config_evaluator(X_trans, y_opt) # 只取前num_points的样本 random_var = self.rng.rand(len(rewards)) indexes = np.lexsort((random_var.flatten(), -rewards.flatten())) configs_previous_runs_sorted_by_acq = [configs_previous_runs[ix] for ix in indexes[:num_points]] # 2. 按loss排序,前num_points的历史样本 losses = np.array(self.budget2obvs[budget]["losses"]) random_var = self.rng.rand(len(losses)) indexes = np.lexsort((random_var.flatten(), losses.flatten())) configs_previous_runs_sorted_by_loss = [configs_previous_runs[ix] for ix in indexes[:num_points]] additional_start_points = additional_start_points[:num_points] init_points = [] init_points_as_set = set() for cand in itertools.chain( configs_previous_runs_sorted_by_acq, configs_previous_runs_sorted_by_loss, additional_start_points, ): if cand not in init_points_as_set: init_points.append(cand) init_points_as_set.add(cand) return init_points
[文档] def get_y_opt(self, budget): y_opt = np.min(self.budget2obvs[budget]["losses"]) return y_opt
[文档] def transform(self, configs: List[Configuration]): X = np.array([config.get_array() for config in configs], dtype="float32") X_trans = self.config_transformer.transform(X) return X_trans
# return [(a, i) for a, i in zip(acq_val_incumbents, incumbents)]
[文档] def evaluate(self, configs: List[Configuration], budget, y_opt=None, return_loss_config_pairs=False, return_loss=False, return_loss_config=False): config_evaluator = self.budget2confevt[budget] if isinstance(configs, Configuration): configs = [configs] X_trans = self.transform(configs) if y_opt is None: y_opt = self.get_y_opt(budget) rewards = config_evaluator(X_trans, y_opt) random_var = self.rng.rand(len(rewards)) indexes = np.lexsort((random_var.flatten(), -rewards.flatten())) rewards_sorted = rewards[indexes] configs_sorted = [configs[ix] for ix in indexes] if return_loss_config_pairs: return list(zip(-rewards_sorted, configs_sorted)) if return_loss: return -rewards if return_loss_config: return -rewards_sorted, configs_sorted return configs_sorted
# todo: develop and test weight update