#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : qichun tang
# @Contact : qichun.tang@bupt.edu.cn
from collections import defaultdict
from copy import deepcopy
from math import inf
from time import time
from typing import Tuple, Union, List, Dict
import numpy as np
from ConfigSpace import Configuration
from sklearn.utils.validation import check_random_state
from ultraopt.structure import Job
from ultraopt.utils.config_space import add_configs_origin, get_dict_from_config
from ultraopt.utils.hash import get_hash_of_config
from ultraopt.utils.logging_ import get_logger
[文档]def runId_info():
return {"start_time": time(), "end_time": -1}
[文档]class BaseOptimizer():
def __init__(self):
self.logger = get_logger(self)
self.is_init = False
self.configId2config: Dict[str, dict] = {}
self.runId2info: Dict[Tuple[str, float], dict] = defaultdict(runId_info)
[文档] def initialize(self, config_space, budgets=(1,), random_state=42, initial_points=None, budget2obvs=None):
if self.is_init:
return
self.is_init = True
self.initial_points = initial_points
self.random_state = random_state
self.config_space = config_space
self.config_space.seed(random_state)
self.budgets = budgets
if budget2obvs is None:
budget2obvs = self.get_initial_budget2obvs(self.budgets)
self.budget2obvs = budget2obvs
# other variable
self.rng = check_random_state(self.random_state)
self.initial_points_index = 0
[文档] @classmethod
def get_initial_budget2obvs(cls, budgets):
return {budget: {"losses": [], "configs": [], "vectors": [], "locks": []} for budget in budgets}
[文档] def tell(self, config: Union[dict, Configuration], loss: float, budget: float = 1, update_model=True):
config = get_dict_from_config(config)
job = Job(get_hash_of_config(config))
job.kwargs = {
"budget": budget,
"config": config,
"config_info": {}
}
job.result = {
"loss": loss
}
self.new_result(job, update_model=update_model)
[文档] def new_result(self, job: Job, update_model=True):
##############################
### 1. update observations ###
##############################
if job.result is None:
# One could skip crashed results, but we decided to
# assign a +inf loss and count them as bad configurations
loss = np.inf
else:
# same for non numeric losses.
# Note that this means losses of minus infinity will count as bad!
loss = job.result["loss"] if np.isfinite(job.result["loss"]) else np.inf
budget = job.kwargs["budget"]
config_dict = job.kwargs["config"]
configId = get_hash_of_config(config_dict)
runId = (configId, budget)
if runId in self.runId2info:
self.runId2info[runId]["end_time"] = time()
self.runId2info[runId]["loss"] = loss
else:
self.logger.error(f"runId {runId} not in runId2info, it's impossible!!!")
# config_info = job.kwargs["config_info"]
config = Configuration(self.config_space, config_dict)
# add lock (It may be added twice, but it does not affect)
self.budget2obvs[budget]["locks"].append(config.get_array().copy())
self.budget2obvs[budget]["configs"].append(deepcopy(config))
self.budget2obvs[budget]["vectors"].append(config.get_array())
self.budget2obvs[budget]["losses"].append(loss)
losses = np.array(self.budget2obvs[budget]["losses"])
vectors = np.array(self.budget2obvs[budget]["vectors"])
###################################################################
### 2. Judge whether the EPM training conditions are satisfied ###
###################################################################
if not update_model:
return
self._new_result(budget, vectors, losses)
def _new_result(self, budget, vectors: np.ndarray, losses: np.ndarray):
raise NotImplementedError
[文档] def ask(self, budget=1, n_points=None, strategy="cl_min") -> Union[List[Tuple[dict, dict]], Tuple[dict, dict]]:
if n_points is None:
return self.get_config(budget)
supported_strategies = ["cl_min", "cl_mean", "cl_max"]
if not (isinstance(n_points, int) and n_points > 0):
raise ValueError(
"n_points should be int > 0, got " + str(n_points)
)
if strategy not in supported_strategies:
raise ValueError(
"Expected parallel_strategy to be one of " +
str(supported_strategies) + ", " + "got %s" % strategy
)
opt = deepcopy(self)
config_info_pairs = []
for i in range(n_points):
start_time = time()
config, config_info = opt.get_config(budget=budget)
config_info_pairs.append((config, config_info))
losses = opt.budget2obvs[budget]["losses"]
if strategy == "cl_min":
y_lie = np.min(losses) if losses else 0.0 # CL-min lie
elif strategy == "cl_mean":
y_lie = np.mean(losses) if losses else 0.0 # CL-mean lie
elif strategy == "cl_max":
y_lie = np.max(losses) if losses else 0.0 # CL-max lie
else:
raise NotImplementedError
opt.tell(config, y_lie)
self.register_config(config, budget, start_time=start_time)
return config_info_pairs
[文档] def get_config(self, budget) -> Tuple[dict, dict]:
# get max_budget
# calc by budget2epm
start_time = time()
max_budget = self.get_available_max_budget()
# initial points
if self.initial_points is not None and self.initial_points_index < len(self.initial_points):
while True:
if self.initial_points_index >= len(self.initial_points):
break
initial_point_dict = self.initial_points[self.initial_points_index]
initial_point = Configuration(self.config_space, initial_point_dict)
self.initial_points_index += 1
initial_point.origin = "User Defined"
if not self.is_config_exist(budget, initial_point):
self.logger.debug(f"Using initial points [{self.initial_points_index - 1}]")
return self.process_config_info_pair(initial_point, {}, budget)
config, config_info = self._get_config(budget, max_budget)
self.register_config(config, budget, start_time)
return config, config_info
[文档] def register_config(self, config, budget, start_time=None):
configId = get_hash_of_config(config)
runId = (configId, budget)
if runId in self.runId2info: # don't set second time
return
self.configId2config[configId] = config
info = self.runId2info[runId] # auto set start_time
if start_time:
info["start_time"] = start_time
def _get_config(self, budget, max_budget):
raise NotImplementedError
[文档] def is_config_exist(self, budget, config: Configuration):
vectors_list = []
budgets = [budget_ for budget_ in list(self.budget2obvs.keys()) if budget_ >= budget]
for budget_ in budgets:
vectors = np.array(self.budget2obvs[budget_]["locks"])
if vectors.size:
vectors_list.append(vectors)
if len(vectors_list) == 0:
return False
vectors = np.vstack(vectors_list)
if np.any(np.array(vectors.shape) == 0):
return False
vectors[np.isnan(vectors)] = -1
vector = config.get_array().copy()
vector[np.isnan(vector)] = -1
if np.any(np.all(vector == vectors, axis=1)):
return True
return False
[文档] def get_available_max_budget(self):
raise NotImplementedError
[文档] def process_config_info_pair(self, config: Configuration, info_dict: dict, budget):
self.budget2obvs[budget]["locks"].append(config.get_array().copy())
info_dict = deepcopy(info_dict)
if config.origin is None:
config.origin = "unknown"
info_dict.update({
"origin": config.origin
})
return config.get_dictionary(), info_dict
[文档] def process_all_configs_exist(self, info_dict, budget):
seed = self.rng.randint(1, 8888)
self.config_space.seed(seed)
config = self.config_space.sample_configuration()
add_configs_origin(config, "Initial Design")
info_dict.update({"sampling_different_samples_failed": True, "seed": seed})
return self.process_config_info_pair(config, info_dict, budget)
[文档] def pick_random_initial_config(self, budget, max_sample=1000, origin="Initial Design"):
i = 0
info_dict = {"model_based_pick": False}
while i < max_sample:
i += 1
config = self.config_space.sample_configuration()
add_configs_origin(config, origin)
if self.is_config_exist(budget, config):
self.logger.debug(f"The sample already exists and needs to be resampled. "
f"It's the {i}-th time sampling in random sampling. ")
else:
return self.process_config_info_pair(config, info_dict, budget)
return self.process_all_configs_exist(info_dict, budget)
[文档] def reset_time(self):
min_time = inf
for info in self.runId2info.values():
min_time = min(info["start_time"], min_time)
for info in self.runId2info.values():
info["start_time"] -= min_time
info["end_time"] -= min_time
[文档] def resume_time(self):
max_time = -inf
for info in self.runId2info.values():
max_time = max(info["end_time"], max_time)
delta_time = time() - max_time
for info in self.runId2info.values():
info["start_time"] += delta_time
info["end_time"] += delta_time