summaryrefslogtreecommitdiff
path: root/turbo_hipster/worker_manager.py
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2013-11-20 12:11:55 +1100
committerJoshua Hesketh <josh@nitrotech.org>2013-11-21 10:37:27 +1100
commit4343b956c5dc0ce5b68f9774cdda19fab6609a8c (patch)
tree52309f27767af0365fda5f5697b2c66510b8603c /turbo_hipster/worker_manager.py
parent527966b2e4270d64a637800e564e247bc78cde43 (diff)
downloadturbo-hipster-4343b956c5dc0ce5b68f9774cdda19fab6609a8c.tar.gz
Refactor th to have one gearman worker thread
Change-Id: I78f95a67b80ce0627b4a3bbb20578f3d16028714
Diffstat (limited to 'turbo_hipster/worker_manager.py')
-rw-r--r--turbo_hipster/worker_manager.py68
1 files changed, 66 insertions, 2 deletions
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 72034fb..4b2e02f 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -20,7 +20,7 @@ import os
import threading
-class GearmanManager(threading.Thread):
+class ZuulManager(threading.Thread):
""" This thread manages all of the launched gearman workers.
As required by the zuul protocol it handles stopping builds when they
@@ -31,7 +31,7 @@ class GearmanManager(threading.Thread):
log = logging.getLogger("worker_manager.GearmanManager")
def __init__(self, config, tasks):
- super(GearmanManager, self).__init__()
+ super(ZuulManager, self).__init__()
self._stop = threading.Event()
self.config = config
self.tasks = tasks
@@ -81,3 +81,67 @@ class GearmanManager(threading.Thread):
except Exception as e:
self.log.exception('Exception handling log event.')
job.sendWorkException(str(e).encode('utf-8'))
+
+
+class ZuulClient(threading.Thread):
+
+ """ ..."""
+
+ log = logging.getLogger("worker_manager.ZuulClient")
+
+ def __init__(self, global_config, worker_name):
+ super(ZuulClient, self).__init__()
+ self._stop = threading.Event()
+ self.global_config = global_config
+
+ self.worker_name = worker_name
+
+ # Set up the runner worker
+ self.gearman_worker = None
+ self.functions = {}
+
+ self.job = None
+ self.cancelled = False
+
+ self.setup_gearman()
+
+ def setup_gearman(self):
+ self.log.debug("Set up gearman worker")
+ self.gearman_worker = gear.Worker(self.worker_name)
+ self.gearman_worker.addServer(
+ self.global_config['zuul_server']['gearman_host'],
+ self.global_config['zuul_server']['gearman_port']
+ )
+ self.register_functions()
+
+ def register_functions(self):
+ for function_name, plugin in self.functions.items():
+ self.gearman_worker.registerFunction(function_name)
+
+ def add_function(self, function_name, plugin):
+ self.functions[function_name] = plugin
+
+ def stop(self):
+ self._stop.set()
+ # Unblock gearman
+ self.log.debug("Telling gearman to stop waiting for jobs")
+ self.gearman_worker.stopWaitingForJobs()
+ self.gearman_worker.shutdown()
+
+ def stopped(self):
+ return self._stop.isSet()
+
+ def run(self):
+ while True and not self.stopped():
+ try:
+ self.cancelled = False
+ # gearman_worker.getJob() blocks until a job is available
+ self.log.debug("Waiting for job")
+ self.job = self.gearman_worker.getJob()
+ self._handle_job()
+ except:
+ self.log.exception('Exception retrieving log event.')
+
+ def _handle_job(self):
+ """ We have a job, give it to the right plugin """
+ self.functions[self.job.name].start_job(self.job)