summaryrefslogtreecommitdiff
path: root/turbo_hipster/worker_manager.py
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2013-07-26 13:57:28 +1000
committerJoshua Hesketh <josh@nitrotech.org>2013-07-26 13:57:28 +1000
commit1ab465f923ff805183be98049682db31ee6673aa (patch)
tree2d9d245c58657f0de136ff9af06c3c5e1c29e9ab /turbo_hipster/worker_manager.py
parent1377f6f6c60a0e92bc9361d65414cdc22a4cb39a (diff)
downloadturbo-hipster-1ab465f923ff805183be98049682db31ee6673aa.tar.gz
restructure project slightly
Diffstat (limited to 'turbo_hipster/worker_manager.py')
-rw-r--r--turbo_hipster/worker_manager.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
new file mode 100644
index 0000000..f48da1b
--- /dev/null
+++ b/turbo_hipster/worker_manager.py
@@ -0,0 +1,69 @@
+# Copyright ...
+
+import gear
+import json
+import logging
+import os
+import threading
+
+
+class GearmanManager(threading.Thread):
+
+ """ This thread manages all of the launched gearman workers.
+ As required by the zuul protocol it handles stopping builds when they
+ are cancelled through stop:turbo-hipster-manager-%hostname.
+ To do this it implements its own gearman worker waiting for events on
+ that manager. """
+
+ log = logging.getLogger("worker_manager.GearmanManager")
+
+ def __init__(self, config, tasks):
+ super(GearmanManager, self).__init__()
+ self._stop = threading.Event()
+ self.config = config
+ self.tasks = tasks
+
+ self.gearman_worker = None
+ self.setup_gearman()
+
+ def setup_gearman(self):
+ hostname = os.uname()[1]
+ self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
+ % hostname)
+ self.gearman_worker.addServer(
+ self.config['zuul_server']['gearman_host'],
+ self.config['zuul_server']['gearman_port']
+ )
+ self.gearman_worker.registerFunction(
+ 'stop:turbo-hipster-manager-%s' % hostname)
+
+ def stop(self):
+ self._stop.set()
+ # Unblock gearman
+ self.log.debug("Telling gearman to stop waiting for jobs")
+ self.gearman_worker.stopWaitingForJobs()
+ self.gearman_worker.shutdown()
+
+ def stopped(self):
+ return self._stop.isSet()
+
+ def run(self):
+ while True and not self.stopped():
+ try:
+ # gearman_worker.getJob() blocks until a job is available
+ logging.debug("Waiting for job")
+ self.current_step = 0
+ job = self.gearman_worker.getJob()
+ self._handle_job(job)
+ except:
+ logging.exception('Exception retrieving log event.')
+
+ def _handle_job(self, job):
+ """ Handle the requested job """
+ try:
+ job_arguments = json.loads(job.arguments.decode('utf-8'))
+ self.tasks[job_arguments['name']].stop_worker(
+ job_arguments['number'])
+ except Exception as e:
+ self.log.exception('Exception handling log event.')
+ job.sendWorkException(str(e).encode('utf-8'))