ultraopt.async_comm.master 源代码

import copy
import os
import threading
import time
from collections import defaultdict
from typing import Dict

import numpy as np

from ultraopt.facade.utils import get_wanted
from ultraopt.multi_fidelity.iter import WarmStartIteration
from ultraopt.multi_fidelity.iter_gen.base_gen import BaseIterGenerator
from ultraopt.optimizer.base_opt import BaseOptimizer
from ultraopt.utils.logging_ import get_logger
from ultraopt.utils.progress import no_progress_callback
from .dispatcher import Dispatcher
from ..result import Result
from ..utils.misc import print_incumbent_trajectory, dump_checkpoint

[文档]class Master(object): def __init__(self, run_id, optimizer: BaseOptimizer, iter_generator: BaseIterGenerator, progress_callback=no_progress_callback, checkpoint_file=None, checkpoint_freq=10, working_directory='.', ping_interval=60, time_left_for_this_task=np.inf, nameserver='', nameserver_port=None, host=None, shutdown_workers=True, job_queue_sizes=(-1, 0), dynamic_queue_size=True, result_logger=None, previous_result=None, incumbents: Dict[float, dict] = None, incumbent_performances: Dict[float, float] = None ): """The Master class is responsible for the book keeping and to decide what to run next. Optimizers are instantiations of Master, that handle the important steps of deciding what configurations to run on what budget when. Parameters ---------- run_id : string A unique identifier of that Hyperband run. Use, for example, the cluster's JobID when running multiple concurrent runs to separate them optimizer: ultraopt.optimizer.base_opt.BaseOptimizer object An object that can generate new configurations and registers results of executed runs working_directory: string The top level working directory accessible to all compute nodes(shared filesystem). eta : float In each iteration, a complete run of sequential halving is executed. In it, after evaluating each configuration on the same subset size, only a fraction of 1/eta of them 'advances' to the next round. Must be greater or equal to 2. min_budget : float The smallest budget to consider. Needs to be positive! max_budget : float the largest budget to consider. Needs to be larger than min_budget! The budgets will be geometrically distributed :math:`\sim \eta^k` for :math:`k\in [0, 1, ... , num\_subsets - 1]`. ping_interval: int number of seconds between pings to discover new nodes. Default is 60 seconds. nameserver: str address of the Pyro4 nameserver nameserver_port: int port of Pyro4 nameserver host: str ip (or name that resolves to that) of the network interface to use shutdown_workers: bool flag to control whether the workers are shutdown after the computation is done job_queue_size: tuple of ints min and max size of the job queue. During the run, when the number of jobs in the queue reaches the min value, it will be filled up to the max size. Default: (0,1) dynamic_queue_size: bool Whether or not to change the queue size based on the number of workers available. If true (default), the job_queue_sizes are relative to the current number of workers. logger: logging.logger like object the logger to output some (more or less meaningful) information result_logger: a result logger that writes live results to disk previous_result: previous run to warmstart the run """ self.checkpoint_freq = checkpoint_freq self.checkpoint_file = checkpoint_file self.progress_callback = progress_callback iter_generator.initialize(optimizer) self.iter_generator = iter_generator self.time_left_for_this_task = time_left_for_this_task self.working_directory = working_directory os.makedirs(self.working_directory, exist_ok=True) self.logger = get_logger(self) self.result_logger = result_logger self.optimizer = optimizer self.time_ref = None self.iterations = [] self.jobs = [] self.num_running_jobs = 0 self.job_queue_sizes = job_queue_sizes self.user_job_queue_sizes = job_queue_sizes self.dynamic_queue_size = dynamic_queue_size if job_queue_sizes[0] >= job_queue_sizes[1]: raise ValueError("The queue size range needs to be (min, max) with min<max!") if previous_result is None: self.warmstart_iteration = [] else: self.warmstart_iteration = [WarmStartIteration(previous_result, self.optimizer)] # condition to synchronize the job_callback and the queue self.thread_cond = threading.Condition() self.config = { 'time_ref': self.time_ref } self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, ping_interval=ping_interval, nameserver=nameserver, nameserver_port=nameserver_port, host=host ) self.incumbents = defaultdict(dict) self.incumbent_performances = defaultdict(lambda: np.inf) if incumbents is not None: self.incumbents.update(incumbents) if incumbent_performances is not None: self.incumbent_performances.update(incumbent_performances) self.dispatcher_thread = threading.Thread(target=self.dispatcher.run) self.dispatcher_thread.start()
[文档] def shutdown(self, shutdown_workers=False): self.logger.info('HBMASTER: shutdown initiated, shutdown_workers = %s' % (str(shutdown_workers))) self.dispatcher.shutdown(shutdown_workers) self.dispatcher_thread.join()
[文档] def wait_for_workers(self, min_n_workers=1): """ helper function to hold execution until some workers are active Parameters ---------- min_n_workers: int minimum number of workers present before the run starts """ self.logger.debug('wait_for_workers trying to get the condition') with self.thread_cond: while (self.dispatcher.number_of_workers() < min_n_workers): self.logger.debug('HBMASTER: only %i worker(s) available, waiting for at least %i.' % ( self.dispatcher.number_of_workers(), min_n_workers)) self.thread_cond.wait(1) self.dispatcher.trigger_discover_worker() self.logger.debug('Enough workers to start this run!')
[文档] def get_next_iteration(self, iteration, iteration_kwargs): """ instantiates the next iteration Overwrite this to change the multi_fidelity for different optimizers Parameters ---------- iteration: int the index of the iteration to be instantiated iteration_kwargs: dict additional kwargs for the iteration class Returns ------- HB_iteration: a valid HB iteration object """ return self.iter_generator.get_next_iteration(iteration, **iteration_kwargs)
[文档] def run(self, n_iterations=1, min_n_workers=1, iteration_kwargs={}, ): """ run n_iterations of RankReductionIteration Parameters ---------- n_iterations: int number of multi_fidelity to be performed in this run min_n_workers: int minimum number of workers before starting the run """ self.all_n_iterations = self.iter_generator.num_all_configs(n_iterations) self.progress_bar = self.progress_callback(0, self.all_n_iterations) self.iter_cnt = 0 self.wait_for_workers(min_n_workers) iteration_kwargs.update({'result_logger': self.result_logger}) if self.time_ref is None: self.time_ref = time.time() self.config['time_ref'] = self.time_ref self.logger.debug('HBMASTER: starting run at %s' % (str(self.time_ref))) self.thread_cond.acquire() start_time = time.time() with self.progress_bar as self.progress_ctx: while True: self._queue_wait() cost_time = time.time() - start_time if cost_time > self.time_left_for_this_task: self.logger.warning(f"cost_time = {cost_time:.2f}, " f"exceed time_left_for_this_task = {self.time_left_for_this_task}") break next_run = None # find a new run to schedule for i in self.active_iterations(): # 对self.iterations的过滤 next_run = self.iterations[i].get_next_run() if not next_run is None: break # 取出一个配置成功了,返回。 if not next_run is None: self.logger.debug('HBMASTER: schedule new run for iteration %i' % i) self._submit_job(*next_run) continue else: if n_iterations > 0: # we might be able to start the next iteration # multi_fidelity 对象其实是 type: List[RankReductionIteration] self.iterations.append(self.get_next_iteration(len(self.iterations), iteration_kwargs)) n_iterations -= 1 continue cost_time = time.time() - start_time if cost_time > self.time_left_for_this_task: self.logger.warning(f"cost_time = {cost_time:.2f}, " f"exceed time_left_for_this_task = {self.time_left_for_this_task}") break # at this point there is no immediate run that can be scheduled, # so wait for some job to finish if there are active multi_fidelity if self.active_iterations(): self.thread_cond.wait() else: break self.thread_cond.release() for i in self.warmstart_iteration: i.fix_timestamps(self.time_ref) ws_data = [i.data for i in self.warmstart_iteration] return Result([copy.deepcopy(i.data) for i in self.iterations] + ws_data, self.config)
[文档] def adjust_queue_size(self, number_of_workers=None): self.logger.debug('HBMASTER: number of workers changed to %s' % str(number_of_workers)) with self.thread_cond: self.logger.debug('adjust_queue_size: lock accquired') if self.dynamic_queue_size: nw = self.dispatcher.number_of_workers() if number_of_workers is None else number_of_workers self.job_queue_sizes = (self.user_job_queue_sizes[0] + nw, self.user_job_queue_sizes[1] + nw) self.logger.debug('HBMASTER: adjusted queue size to %s' % str(self.job_queue_sizes)) self.thread_cond.notify_all()
[文档] def job_callback(self, job): """ method to be called when a job has finished this will do some book keeping and call the user defined new_result_callback if one was specified """ self.logger.debug('job_callback for %s started' % str(job.id)) with self.thread_cond: self.logger.debug('job_callback for %s got condition' % str(job.id)) self.num_running_jobs -= 1 if job.result is not None: budget = job.kwargs["budget"] challenger = job.kwargs["config"] challenger_performance = job.result["loss"] incumbent_performance = self.incumbent_performances[budget] incumbent = self.incumbents[budget] if challenger_performance < incumbent_performance: if np.isfinite(self.incumbent_performances[budget]): print_incumbent_trajectory( challenger_performance, incumbent_performance, challenger, incumbent, budget ) self.incumbent_performances[budget] = challenger_performance self.incumbents[budget] = challenger if not self.result_logger is None: self.result_logger(job) self.iterations[job.id[0]].register_result(job) self.optimizer.new_result(job) # 更新进度条等操作 max_budget, best_loss, _ = get_wanted(self.optimizer) self.progress_ctx.postfix = f"max budget: {max_budget}, best loss: {best_loss:.3f}" self.progress_ctx.update(1) self.iter_cnt += 1 if self.checkpoint_file is not None: if (self.iter_cnt - 1) % self.checkpoint_freq == 0 or self.iter_cnt == self.all_n_iterations: dump_checkpoint(self.optimizer, self.checkpoint_file) if self.num_running_jobs <= self.job_queue_sizes[0]: self.logger.debug("HBMASTER: Trying to run another job!") self.thread_cond.notify() self.logger.debug('job_callback for %s finished' % str(job.id))
def _queue_wait(self): """ helper function to wait for the queue to not overflow/underload it """ if self.num_running_jobs >= self.job_queue_sizes[1]: while (self.num_running_jobs > self.job_queue_sizes[0]): self.logger.debug('HBMASTER: running jobs: %i, queue sizes: %s -> wait' % ( self.num_running_jobs, str(self.job_queue_sizes))) self.thread_cond.wait() def _submit_job(self, config_id, config, config_info, budget): """ hidden function to submit a new job to the dispatcher This function handles the actual submission in a (hopefully) thread save way """ self.logger.debug('HBMASTER: trying submitting job %s to dispatcher' % str(config_id)) with self.thread_cond: self.logger.debug('HBMASTER: submitting job %s to dispatcher' % str(config_id)) self.dispatcher.submit_job(config_id, config=config, config_info=config_info, budget=budget, working_directory=self.working_directory) self.num_running_jobs += 1 # shouldn't the next line be executed while holding the condition? self.logger.debug("HBMASTER: job %s submitted to dispatcher" % str(config_id))
[文档] def active_iterations(self): """ function to find active (not marked as finished) multi_fidelity Returns ------- list: all active iteration objects (empty if there are none) """ l = list(filter(lambda idx: not self.iterations[idx].is_finished, range(len(self.iterations)))) return (l)
def __del__(self): # todo: kill server pass