Source code for autoflow.tuner

import inspect
import os
import re
from collections import OrderedDict
from copy import deepcopy
from itertools import product
from typing import Dict, Optional, Callable, Union, List, Any, Tuple

import numpy as np
from ConfigSpace import ConfigurationSpace, Configuration
from frozendict import frozendict

from autoflow.evaluation.train_evaluator import TrainEvaluator
from autoflow.hdl.hdl2shps import HDL2SHPS
from autoflow.data_manager import DataManager
from autoflow.resource_manager.base import ResourceManager
from autoflow.utils.concurrence import parse_n_jobs
from autoflow.utils.config_space import get_random_initial_configs, get_grid_initial_configs, replace_phps
from autoflow.utils.klass import StrSignatureMixin
from autoflow.utils.logging_ import get_logger
from autoflow.utils.ml_task import MLTask
from dsmac.facade.smac_hpo_facade import SMAC4HPO
from dsmac.scenario.scenario import Scenario


[docs]class Tuner(StrSignatureMixin): ''' ``Tuner`` if class who agent an abstract search process. ''' def __init__( self, evaluator: Union[Callable, str] = "TrainEvaluator", search_method: str = "smac", run_limit: int = 100, initial_runs: int = 20, search_method_params: dict = frozendict(), n_jobs: int = 1, exit_processes: Optional[int] = None, limit_resource: bool = True, per_run_time_limit: float = 60, per_run_memory_limit: float = 3072, time_left_for_this_task: float = None, n_jobs_in_algorithm=1, debug=False ): ''' Parameters ---------- evaluator: callable, str ``evaluator`` is a function or callable class (implement magic method ``__call__``) or string-indicator. ``evaluator`` can receive a shp(SMAC Hyper Param, :class:`ConfigSpace.ConfigurationSpace`), and return a dict ,which contains such keys: * ``loss``, you can think of it as negative reward. * ``status``, a string , ``SUCCESS`` means fine, ``FAILED`` means crashed. As default, "TrainEvaluator" is the string-indicator of :class:`autoflow.evaluation.train_evaluator.TrainEvaluator` . search_method: str Specific searching method, ``random``, ``smac``, ``grid`` are available. * ``random`` Random Search Algorithm, * ``grid`` Grid Search Algorithm, * ``smac`` Bayes Search by SMAC Algorithm. run_limit: int Limitation of running step. initial_runs: int If you choose ``smac`` algorithm, you should realize the SMAC algorithm has a initialize procedure, The algorithm needs enough initial runs to get enough experience. This param will be omitted if ``random`` or ``grid`` is selected. search_method_params: dict Configuration for specific search method. n_jobs: int ``n_jobs`` searching process will start. exit_processes: int limit_resource: bool If ``limit_resource = True``, a searching trial will be killed if it use more CPU times or memory. per_run_time_limit: float will active if ``limit_resource = True``. a searching trial will be killed if it use CPU times more than ``per_run_time_limit``. per_run_memory_limit: float will active if ``limit_resource = True``. a searching trial will be killed if it use memory more than ``per_run_memory_limit``. time_left_for_this_task: float will active if ``limit_resource = True``. a searching task will be killed if it's totally run time more than ``time_left_for_this_task``. debug: bool For debug mode. Exception will be re-raised if ``debug = True`` ''' self.n_jobs_in_algorithm = n_jobs_in_algorithm self.debug = debug self.per_run_memory_limit = per_run_memory_limit self.time_left_for_this_task = time_left_for_this_task self.per_run_time_limit = per_run_time_limit self.limit_resource = limit_resource self.logger = get_logger(self) if self.debug and self.limit_resource: self.logger.warning( "Tuner.debug and Tuner.limit_resource cannot be both True. set Tuner.limit_resource to False.") self.limit_resource = False search_method_params = dict(search_method_params) if isinstance(evaluator, str): if evaluator == "TrainEvaluator": evaluator = TrainEvaluator elif evaluator == "EnsembleEvaluator": evaluator = EnsembleEvaluator else: raise NotImplementedError assert callable(evaluator) self.evaluator_prototype = evaluator if inspect.isfunction(evaluator): self.evaluator = evaluator else: self.evaluator = evaluator() self.evaluator.debug = self.debug self.search_method_params = search_method_params assert search_method in ("smac", "grid", "random", "beam") if search_method in ("grid", "random"): initial_runs = 0 self.initial_runs = initial_runs self.run_limit = run_limit self.search_method = search_method self.random_state = 0 self.addition_info = {} self.resource_manager = None self.ml_task = None self.data_manager = None self.n_jobs = parse_n_jobs(n_jobs) if exit_processes is None: exit_processes = max(self.n_jobs // 3, 1) self.exit_processes = exit_processes
[docs] def set_random_state(self, random_state): self.random_state = random_state
# def set_inside_dict(self, addition_info): # self.addition_info = addition_info
[docs] def hdl2shps(self, hdl: Dict): hdl2shps = HDL2SHPS() hdl2shps.set_task(self.ml_task) return hdl2shps(hdl)
[docs] def set_hdl(self, hdl: Dict): self.hdl = hdl self.shps: ConfigurationSpace = self.hdl2shps(hdl) self.shps.seed(self.random_state) replace_phps(self.shps, "n_jobs", int(self.n_jobs_in_algorithm))
[docs] def set_resource_manager(self, resource_manager: ResourceManager): self.resource_manager = resource_manager
# self.evaluator.set_resource_manager(resource_manager)
[docs] def set_task(self, ml_task: MLTask): self.ml_task = ml_task
[docs] def set_data_manager(self, data_manager: DataManager): self.data_manager = data_manager self.ml_task = data_manager.ml_task
[docs] def design_initial_configs(self, n_jobs): if self.search_method == "smac": return get_random_initial_configs(self.shps, max(self.initial_runs, n_jobs), self.random_state) elif self.search_method == "grid": return get_grid_initial_configs(self.shps, self.run_limit, self.random_state) elif self.search_method == "random": return get_random_initial_configs(self.shps, self.run_limit, self.random_state) elif self.search_method == "beam": return get_random_initial_configs(self.shps, self.run_limit, self.random_state) else: raise NotImplementedError
[docs] def get_run_limit(self): if self.search_method == "smac": return self.run_limit else: return 0
[docs] def prepare_beam_search_configs(self, config_space: ConfigurationSpace, cs_keys: List[str], search_ranges: List[List[Any]]): configs = [] for values in product(*search_ranges): # todo: 超出范围的异常检测? for cs_key, value in zip(cs_keys, values): config_space.get_hyperparameter(cs_key).default_value = value configs.append(config_space.get_default_configuration()) return configs
[docs] def match_cs_key(self, step_name: str, config: Union[Configuration, ConfigurationSpace]): candidate_result = [] if isinstance(config, Configuration): config_ = config.get_dictionary() elif isinstance(config, ConfigurationSpace): config_ = [x.name for x in config.get_hyperparameters()] else: raise NotImplementedError for key in config_: if re.match(rf".*{step_name}.*", key): candidate_result.append(key) assert len(candidate_result) > 0 lens = [len(result) for result in candidate_result] result = candidate_result[int(np.argmin(lens))] self.logger.info(f"Original beam_search step name '{step_name}' is parsed as '{result}'") return result
[docs] def set_beam_search_result_to_cs_default(self, config_space: ConfigurationSpace, beam_result: dict): config_space = deepcopy(config_space) for cs_key, best_value in beam_result.items(): hp = config_space.get_hyperparameter(cs_key) # if isinstance(hp, CategoricalHyperparameter): hp.default_value = best_value return config_space
[docs] def run( self, initial_configs, evaluator_params=frozendict(), instance_id="", rh_db_type="sqlite", rh_db_params=frozendict(), rh_db_table_name="runhistory" ): # time.sleep(random.random()) if not initial_configs: self.logger.warning("Haven't initial_configs. Return.") return self.evaluator.init_data(**evaluator_params) senario_dict = { "run_obj": "quality", "runcount-limit": 1000, "cs": self.shps, # configuration space "deterministic": "true", "instances": [[instance_id]], "cutoff_time": self.per_run_time_limit, "memory_limit": self.per_run_memory_limit # todo : 如果是local,存在experiment,如果是其他文件系统,不输出smac # "output_dir": self.resource_manager.smac_output_dir, } self.scenario = Scenario( senario_dict, initial_runs=0, db_type=rh_db_type, db_params=rh_db_params, db_table_name=rh_db_table_name, anneal_func=self.search_method_params.get("anneal_func"), use_pynisher=self.limit_resource ) # todo 将 file_system 传入,或者给file_system添加 runtime 参数 if self.search_method == "beam": beam_steps: List[Dict[str, Any]] = self.search_method_params["beam_steps"] beam_result = OrderedDict() # for step_name, search_range in beam_steps.items(): for step in beam_steps: # 根据历史最好配置 shps_ = self.set_beam_search_result_to_cs_default(self.shps, beam_result) default_config = shps_.get_default_configuration() sampled_configs = [] cs_keys = [] search_ranges = [] for step_name, search_range in step.items(): cs_key = self.match_cs_key(step_name, default_config) cs_keys.append(cs_key) search_ranges.append(search_range) sampled_configs += self.prepare_beam_search_configs(shps_, cs_keys, search_ranges) smac = SMAC4HPO( scenario=self.scenario, rng=np.random.RandomState(self.random_state), tae_runner=self.evaluator, initial_configurations=sampled_configs ) smac.solver.initial_configurations = sampled_configs incumbent = smac.solver.start_(warm_start=False) for cs_key in cs_keys: best_value = incumbent.get(cs_key) beam_result[cs_key] = best_value elif self.search_method == "random": specific_allocate: Dict[Tuple[str, str], int] = self.search_method_params.get("specific_allocate") if specific_allocate is not None: raw_key2cs_key = {} processed_specific_allocate = {} for raw_key, value in specific_allocate.keys(): if raw_key not in raw_key2cs_key: cs_key = self.match_cs_key(raw_key, self.shps) raw_key2cs_key[raw_key] = cs_key # todo: 把这两个for循环整合一下 for (raw_key, value), times in specific_allocate.items(): cs_key = raw_key2cs_key[raw_key] processed_specific_allocate[(cs_key, value)] = times configs = [] for sample in self.shps.sample_configuration(10000): empty_bins = 0 for (cs_key, value), times in processed_specific_allocate.items(): if times > 0: if sample.get(cs_key) == value: configs.append(sample) processed_specific_allocate[cs_key, value] -= 1 break else: empty_bins += 1 if empty_bins >= len(processed_specific_allocate): break initial_configs = configs smac = SMAC4HPO( scenario=self.scenario, rng=np.random.RandomState(self.random_state), tae_runner=self.evaluator, initial_configurations=initial_configs ) smac.solver.initial_configurations = initial_configs smac.solver.start_() else: smac = SMAC4HPO( scenario=self.scenario, rng=np.random.RandomState(self.random_state), tae_runner=self.evaluator, initial_configurations=initial_configs ) self.logger.info(f"Starting {min(self.initial_runs, len(initial_configs))} times initial runs for SMAC.") if self.initial_runs: smac.solver.initial_configurations = initial_configs smac.solver.start_() else: smac.solver.start_(only_timing=True) run_limit = self.get_run_limit() for i in range(run_limit): smac.solver.run_() should_continue = self.evaluator.resource_manager.delete_models() if not should_continue: self.logger.info(f"PID = {os.getpid()} is exiting.") break