ultraopt.async_comm.dispatcher 源代码

import queue
import threading
import time

import Pyro4

from ultraopt.structure import Job
from ultraopt.utils.logging_ import get_logger


[文档]class Worker(object): def __init__(self, name, uri): self.name = name self.proxy = Pyro4.Proxy(uri) self.runs_job = None
[文档] def is_alive(self): try: self.proxy._pyroReconnect(1) except Pyro4.errors.ConnectionClosedError: return False except: raise return (True)
[文档] def shutdown(self): self.proxy.shutdown()
[文档] def is_busy(self): return (self.proxy.is_busy())
def __repr__(self): return (self.name)
[文档]class Dispatcher(object): """ The dispatcher is responsible for assigning tasks to free workers, report results back to the master and communicate to the nameserver. """ def __init__(self, new_result_callback, run_id='0', ping_interval=10, nameserver='localhost', nameserver_port=None, host=None, queue_callback=None): """ Parameters ---------- new_result_callback: function function that will be called with a `Job instance <ultraopt.async_comm.dispatcher.Job>`_ as argument. From the `Job` the result can be read and e.g. logged. run_id: str unique run_id associated with the HPB run ping_interval: int how often to ping for workers (in seconds) nameserver: str address of the Pyro4 nameserver nameserver_port: int port of Pyro4 nameserver host: str ip (or name that resolves to that) of the network interface to use queue_callback: function gets called with the number of workers in the pool on every update-cycle """ self.new_result_callback = new_result_callback self.queue_callback = queue_callback self.run_id = run_id self.nameserver = nameserver self.nameserver_port = nameserver_port self.host = host self.ping_interval = int(ping_interval) self.shutdown_all_threads = False self.logger = get_logger(self) self.worker_pool = {} self.waiting_jobs = queue.Queue() self.running_jobs = {} self.idle_workers = set() self.thread_lock = threading.Lock() self.runner_cond = threading.Condition(self.thread_lock) self.discover_cond = threading.Condition(self.thread_lock) self.pyro_id = "ultraopt.run_%s.dispatcher" % self.run_id
[文档] def run(self): with self.discover_cond: t1 = threading.Thread(target=self.discover_workers, name='discover_workers') t1.start() self.logger.debug('DISPATCHER: started the \'discover_worker\' thread') t2 = threading.Thread(target=self.job_runner, name='job_runner') t2.start() self.logger.debug('DISPATCHER: started the \'job_runner\' thread') self.pyro_daemon = Pyro4.core.Daemon(host=self.host) with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: uri = self.pyro_daemon.register(self, self.pyro_id) ns.register(self.pyro_id, uri) self.logger.debug("DISPATCHER: Pyro daemon running on %s" % (self.pyro_daemon.locationStr)) self.pyro_daemon.requestLoop() with self.discover_cond: self.shutdown_all_threads = True self.logger.debug('DISPATCHER: Dispatcher shutting down') self.runner_cond.notify_all() self.discover_cond.notify_all() with Pyro4.locateNS(self.nameserver, port=self.nameserver_port) as ns: ns.remove(self.pyro_id) t1.join() self.logger.debug('DISPATCHER: \'discover_worker\' thread exited') t2.join() self.logger.debug('DISPATCHER: \'job_runner\' thread exited') self.logger.debug('DISPATCHER: shut down complete')
[文档] def shutdown_all_workers(self, rediscover=False): with self.discover_cond: for worker in self.worker_pool.values(): worker.shutdown() if rediscover: time.sleep(1) self.discover_cond.notify()
[文档] def shutdown(self, shutdown_workers=False): if shutdown_workers: self.shutdown_all_workers() with self.runner_cond: self.pyro_daemon.shutdown()
[文档] @Pyro4.expose @Pyro4.oneway def trigger_discover_worker(self): # time.sleep(1) self.logger.debug("DISPATCHER: A new worker triggered discover_worker") with self.discover_cond: self.discover_cond.notify()
[文档] def discover_workers(self): self.discover_cond.acquire() sleep_interval = 1 while True: self.logger.debug('DISPATCHER: Starting worker discovery') update = False with Pyro4.locateNS(host=self.nameserver, port=self.nameserver_port) as ns: worker_names = ns.list(prefix="ultraopt.run_%s.worker." % self.run_id) self.logger.debug("DISPATCHER: Found %i potential workers, %i currently in the pool." % (len(worker_names), len(self.worker_pool))) for wn, uri in worker_names.items(): if not wn in self.worker_pool: w = Worker(wn, uri) if not w.is_alive(): self.logger.debug('DISPATCHER: skipping dead worker, %s' % wn) continue update = True self.logger.debug('DISPATCHER: discovered new worker, %s' % wn) self.worker_pool[wn] = w # check the current list of workers crashed_jobs = set() all_workers = list(self.worker_pool.keys()) for wn in all_workers: # remove dead entries from the nameserver if not self.worker_pool[wn].is_alive(): self.logger.debug('DISPATCHER: removing dead worker, %s' % wn) update = True # todo check if there were jobs running on that that need to be rescheduled current_job = self.worker_pool[wn].runs_job if not current_job is None: self.logger.debug('Job %s was not completed' % str(current_job)) crashed_jobs.add(current_job) del self.worker_pool[wn] self.idle_workers.discard(wn) continue if not self.worker_pool[wn].is_busy(): self.idle_workers.add(wn) # try to submit more jobs if something changed if update: if not self.queue_callback is None: self.discover_cond.release() self.queue_callback(len(self.worker_pool)) self.discover_cond.acquire() self.runner_cond.notify() for crashed_job in crashed_jobs: self.discover_cond.release() self.register_result(crashed_job, {'result': None, 'exception': 'Worker died unexpectedly.'}) self.discover_cond.acquire() self.logger.debug('DISPATCHER: Finished worker discovery') # if (len(self.worker_pool) == 0 ): # ping for new workers if no workers are currently available # self.logger.debug('No workers available! Keep pinging') # self.discover_cond.wait(sleep_interval) # sleep_interval *= 2 # else: self.discover_cond.wait(self.ping_interval) if self.shutdown_all_threads: self.logger.debug('DISPATCHER: discover_workers shutting down') self.runner_cond.notify() self.discover_cond.release() return
[文档] def number_of_workers(self): with self.discover_cond: return (len(self.worker_pool))
[文档] def job_runner(self): self.runner_cond.acquire() while True: while self.waiting_jobs.empty() or len(self.idle_workers) == 0: self.logger.debug('DISPATCHER: jobs to submit = %i, number of idle workers = %i -> waiting!' % (self.waiting_jobs.qsize(), len(self.idle_workers))) self.runner_cond.wait() self.logger.debug('DISPATCHER: Trying to submit another job.') if self.shutdown_all_threads: self.logger.debug('DISPATCHER: job_runner shutting down') self.discover_cond.notify() self.runner_cond.release() return job = self.waiting_jobs.get() wn = self.idle_workers.pop() worker = self.worker_pool[wn] self.logger.debug('DISPATCHER: starting job %s on %s' % (str(job.id), worker.name)) job.time_it('started') worker.runs_job = job.id worker.proxy.start_computation(self, job.id, **job.kwargs) job.worker_name = wn self.running_jobs[job.id] = job self.logger.debug('DISPATCHER: job %s dispatched on %s' % (str(job.id), worker.name))
[文档] def submit_job(self, id, **kwargs): self.logger.debug('DISPATCHER: trying to submit job %s' % str(id)) with self.runner_cond: job = Job(id, **kwargs) job.time_it('submitted') self.waiting_jobs.put(job) self.logger.debug('DISPATCHER: trying to notify the job_runner thread.') self.runner_cond.notify()
[文档] @Pyro4.expose @Pyro4.callback @Pyro4.oneway def register_result(self, id=None, result=None): self.logger.debug('DISPATCHER: job %s finished' % (str(id))) with self.runner_cond: self.logger.debug('DISPATCHER: register_result: lock acquired') # fill in missing information job = self.running_jobs[id] job.time_it('finished') job.result = result['result'] job.exception = result['exception'] job.config_info = None self.logger.debug('DISPATCHER: job %s on %s finished' % (str(job.id), job.worker_name)) self.logger.debug(str(job)) # delete job del self.running_jobs[id] # label worker as idle again try: self.worker_pool[job.worker_name].runs_job = None self.worker_pool[job.worker_name].proxy._pyroRelease() self.idle_workers.add(job.worker_name) # notify the job_runner to check for more jobs to run self.runner_cond.notify() except KeyError: # happens for crashed workers, but we can just continue pass except: raise # call users callback function to register the result # needs to be with the condition released, as the master can call # submit_job quickly enough to cause a dead-lock self.new_result_callback(job)