diff options
author | Joshua Hesketh <josh@nitrotech.org> | 2013-11-20 12:11:55 +1100 |
---|---|---|
committer | Joshua Hesketh <josh@nitrotech.org> | 2013-11-21 10:37:27 +1100 |
commit | 4343b956c5dc0ce5b68f9774cdda19fab6609a8c (patch) | |
tree | 52309f27767af0365fda5f5697b2c66510b8603c /turbo_hipster/worker_manager.py | |
parent | 527966b2e4270d64a637800e564e247bc78cde43 (diff) | |
download | turbo-hipster-4343b956c5dc0ce5b68f9774cdda19fab6609a8c.tar.gz |
Refactor th to have one gearman worker thread
Change-Id: I78f95a67b80ce0627b4a3bbb20578f3d16028714
Diffstat (limited to 'turbo_hipster/worker_manager.py')
-rw-r--r-- | turbo_hipster/worker_manager.py | 68 |
1 files changed, 66 insertions, 2 deletions
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py index 72034fb..4b2e02f 100644 --- a/turbo_hipster/worker_manager.py +++ b/turbo_hipster/worker_manager.py @@ -20,7 +20,7 @@ import os import threading -class GearmanManager(threading.Thread): +class ZuulManager(threading.Thread): """ This thread manages all of the launched gearman workers. As required by the zuul protocol it handles stopping builds when they @@ -31,7 +31,7 @@ class GearmanManager(threading.Thread): log = logging.getLogger("worker_manager.GearmanManager") def __init__(self, config, tasks): - super(GearmanManager, self).__init__() + super(ZuulManager, self).__init__() self._stop = threading.Event() self.config = config self.tasks = tasks @@ -81,3 +81,67 @@ class GearmanManager(threading.Thread): except Exception as e: self.log.exception('Exception handling log event.') job.sendWorkException(str(e).encode('utf-8')) + + +class ZuulClient(threading.Thread): + + """ ...""" + + log = logging.getLogger("worker_manager.ZuulClient") + + def __init__(self, global_config, worker_name): + super(ZuulClient, self).__init__() + self._stop = threading.Event() + self.global_config = global_config + + self.worker_name = worker_name + + # Set up the runner worker + self.gearman_worker = None + self.functions = {} + + self.job = None + self.cancelled = False + + self.setup_gearman() + + def setup_gearman(self): + self.log.debug("Set up gearman worker") + self.gearman_worker = gear.Worker(self.worker_name) + self.gearman_worker.addServer( + self.global_config['zuul_server']['gearman_host'], + self.global_config['zuul_server']['gearman_port'] + ) + self.register_functions() + + def register_functions(self): + for function_name, plugin in self.functions.items(): + self.gearman_worker.registerFunction(function_name) + + def add_function(self, function_name, plugin): + self.functions[function_name] = plugin + + def stop(self): + self._stop.set() + # Unblock gearman + self.log.debug("Telling gearman to stop waiting for jobs") + self.gearman_worker.stopWaitingForJobs() + self.gearman_worker.shutdown() + + def stopped(self): + return self._stop.isSet() + + def run(self): + while True and not self.stopped(): + try: + self.cancelled = False + # gearman_worker.getJob() blocks until a job is available + self.log.debug("Waiting for job") + self.job = self.gearman_worker.getJob() + self._handle_job() + except: + self.log.exception('Exception retrieving log event.') + + def _handle_job(self): + """ We have a job, give it to the right plugin """ + self.functions[self.job.name].start_job(self.job) |