ultraopt.utils.config_space 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : qichun tang
# @Date    : 2020-12-14
# @Contact    : qichun.tang@bupt.edu.cn
from collections import Counter
from copy import deepcopy
from typing import List, Union

import numpy as np
from ConfigSpace import CategoricalHyperparameter, ConfigurationSpace, UniformFloatHyperparameter
from ConfigSpace import Configuration
from sklearn.utils.validation import check_random_state


[文档]def CS2HyperoptSpace(cs: ConfigurationSpace): '''一个将configspace转hyperopt空间的函数''' from hyperopt import hp result = {} for hyperparameter in cs.get_hyperparameters(): name = hyperparameter.name if isinstance(hyperparameter, CategoricalHyperparameter): result[name] = hp.choice(name, hyperparameter.choices) elif isinstance(hyperparameter, UniformFloatHyperparameter): lower = hyperparameter.lower upper = hyperparameter.upper result[name] = hp.uniform(name, lower, upper) else: raise ValueError # todo: 考虑更多情况 return result
[文档]def is_top_level_activated(config_space, config, hp_name, hp_value=None): parent_conditions = config_space.get_parent_conditions_of(hp_name) if len(parent_conditions): parent_condition = parent_conditions[0] parent_value = parent_condition.value parent_name = parent_condition.parent.name return is_top_level_activated(config_space, config, parent_name, parent_value) # 没有条件依赖,就是parent if hp_value is None: return True return config[hp_name] == hp_value
[文档]def deactivate(config_space, vector): result = deepcopy(vector) config = Configuration(config_space, vector=vector) for i, hp in enumerate(config_space.get_hyperparameters()): name = hp.name if not is_top_level_activated(config_space, config, name, None): result[i] = np.nan result_config = Configuration(configuration_space=config_space, vector=result) return result_config
[文档]def add_configs_origin(configs: List[Configuration], origin): if isinstance(configs, Configuration): configs = [configs] for config in configs: config.origin = origin
[文档]def initial_design(cs, n_configs): # todo: 将用户指定的 initial points 也纳入考虑中 # todo: 更智能的方式 # fixme: 完成HDL模块后, 添加单元测试。 目前的单元测试在autoflow代码中 cs = deepcopy(cs) n_choices_list = [] for hp in cs.get_hyperparameters(): if isinstance(hp, CategoricalHyperparameter): n_choices_list.append(len(hp.choices)) else: n_choices_list.append(0) n_choices_vec = np.array(n_choices_list) high_r_ix = np.arange(len(n_choices_list))[n_choices_vec >= 3] samples: list = sample_configurations(cs, n_configs) # rng = check_random_state(rng) while True: vectors = np.array([sample.get_array() for sample in samples]) vectors[np.isnan(vectors)] = -1 ok = True for ix in high_r_ix: col_vec = vectors[:, ix] col_vec = col_vec[col_vec != -1] counter = Counter(col_vec) k, cnt = counter.most_common()[-1] k = int(k) if len(counter) < n_choices_vec[ix]: ok = False # hp = cs.get_hyperparameter(cs.get_hyperparameter_by_idx(ix)) # hp.default_value = hp.choices[k] break if ok: break samples.append(cs.get_default_configuration()) vec = np.array([sample.get_array() for sample in samples]) for i in range(4): print(len(Counter(vec[:, i]))) return samples
[文档]def sample_vectors(cs, n_samples): return np.array([sample.get_array() for sample in sample_configurations(cs, n_samples)])
[文档]def sample_configuration_except_default(cs: ConfigurationSpace, idx2val: dict, is_child_list=None, sampled_vectors=None, rng=None): if is_child_list is None: is_child_list = [True] * len(idx2val) rng = check_random_state(rng) if sampled_vectors is None: sampled_vectors = sample_vectors(cs, 5000) refined_vectors = sampled_vectors.copy() while True: ok = True for i, (idx, val) in enumerate(idx2val.items()): if is_child_list[i]: refined_vectors[:, idx] = val else: refined_vectors = refined_vectors[refined_vectors[:, idx] == val, :] if refined_vectors.shape[0] == 0: ok = False sampled_vectors = np.vstack([sampled_vectors, sample_vectors(cs, 5000)]) break if ok: break L = refined_vectors.shape[0] which = rng.randint(0, L) vector = refined_vectors[which, :] return Configuration(cs, vector=vector), sampled_vectors
[文档]def sample_configurations(config_space, n_configs=1): if n_configs == 1: return [config_space.sample_configuration(1)] elif n_configs > 1: return config_space.sample_configuration(n_configs) else: raise ValueError(f"n_configs should >=1")
[文档]def get_array_from_configs(configs: List[Configuration]): return np.array([config.get_array() for config in configs])
[文档]def get_dict_from_config(config: Union[dict, Configuration]): if isinstance(config, dict): return config return config.get_dictionary()
[文档]def initial_design_2(cs, n_configs, rng): cs = deepcopy(cs) rng = check_random_state(rng) hp2n_choices = {} idx_list = [] is_child_list = [] for idx, hp in enumerate(cs.get_hyperparameters()): if isinstance(hp, CategoricalHyperparameter) \ and len(cs.get_parents_of(hp.name)) == 0 \ and len(hp.choices) >= 3: hp2n_choices[hp.name] = len(hp.choices) idx_list.append(idx) is_child_list.append(len(cs.get_child_conditions_of(hp)) == 0) # todo: 考虑没有高基离散变量的情况 if hp2n_choices: n_configs = max(n_configs, max(list(hp2n_choices.values()))) matrix = np.zeros([n_configs, len(hp2n_choices)], dtype="int32") for i, (hp, n_choices) in enumerate(hp2n_choices.items()): col_vec = [] while len(col_vec) < n_configs: col_vec.extend(np.arange(n_choices).tolist()) matrix[:, i] = rng.choice(col_vec[:n_configs], n_configs, replace=False) samples = [] sampled_vectors_ = sample_vectors(cs, 5000) for i in range(matrix.shape[0]): # todo: 开发一个固定几个变量,其他随机的函数 vec = matrix[i, :] idx2val = dict(zip(idx_list, vec.tolist())) sample, sampled_vectors_ = sample_configuration_except_default( cs, idx2val, is_child_list, sampled_vectors_, rng) samples.append(sample) # todo: 把这个注释整理为一个单元测试 # vec = np.array([sample.get_array() for sample in samples]) # for i in range(4): # print(len(Counter(vec[:, i]))) return samples
[文档]def initial_design_cat(cs, n_configs): n_choices_list = [] for hp in cs.get_hyperparameters(): if isinstance(hp, CategoricalHyperparameter): n_choices_list.append(len(hp.choices)) else: n_choices_list.append(0) samples = sample_configurations(cs, n_configs) # rng = check_random_state(rng) vectors = np.array([sample.get_array() for sample in samples]) for i, n_choices in enumerate(n_choices_list): if n_choices > 0: counter = Counter(vectors[:, i]) most_common = counter.most_common() if len(most_common) < n_choices: instances = [item[0] for item in most_common] sub = np.setdiff1d(np.arange(n_choices), instances) idx = 0 for j in range(vectors.shape[0]): if idx >= len(sub): break obj = vectors[j, i] if counter[obj] > 1: vectors[j, i] = sub[idx] idx += 1 counter[obj] -= 1 results = [] for i in range(vectors.shape[0]): results.append(Configuration(cs, vector=vectors[i, :])) vec = np.array([sample.get_array() for sample in samples]) for i in range(4): print(len(Counter(vec[:, i]))) return results