summaryrefslogtreecommitdiff
path: root/turbo_hipster/worker_manager.py
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2014-03-05 13:06:08 +1100
committerJoshua Hesketh <josh@nitrotech.org>2014-03-05 13:06:08 +1100
commit81b5fb85317e5b346876c5814d75d2a1fa6af7fe (patch)
tree9e2b396910a58c86a732585b9283d8e3ceb4ba37 /turbo_hipster/worker_manager.py
parent4900dcbafcceb14bee46d2b6ddbacc6e1cc18d48 (diff)
downloadturbo-hipster-81b5fb85317e5b346876c5814d75d2a1fa6af7fe.tar.gz
Improve gearman shutdown with extra thread lock checks
When quiting turbo-hipster make sure all threads stop properly. Previously when the gearman_client disconnects the waitForServer method would release and the client would then wait for a job it isn't going to receive because turbo-hipster is shutting down. Change-Id: If9c13e4858c2281c021ca4540ec9475c4d46c615
Diffstat (limited to 'turbo_hipster/worker_manager.py')
-rw-r--r--turbo_hipster/worker_manager.py35
1 files changed, 18 insertions, 17 deletions
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 746e4f7..d011a03 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -47,7 +47,6 @@ class ZuulManager(threading.Thread):
self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port']
)
- self.gearman_worker.waitForServer()
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
@@ -55,24 +54,26 @@ class ZuulManager(threading.Thread):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
- self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
- while True and not self.stopped():
+ while not self.stopped():
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
- logging.debug("Waiting for job")
- self.current_step = 0
- job = self.gearman_worker.getJob()
- self._handle_job(job)
+ if (not self.stopped() and self.gearman_worker.running and
+ self.gearman_worker.active_connections):
+ logging.debug("Waiting for job")
+ self.current_step = 0
+ job = self.gearman_worker.getJob()
+ self._handle_job(job)
except:
logging.exception('Exception retrieving log event.')
+ self.log.debug("Finished manager thread")
def _handle_job(self, job):
""" Handle the requested job """
@@ -104,7 +105,6 @@ class ZuulClient(threading.Thread):
self.functions = {}
self.job = None
- self.cancelled = False
self.setup_gearman()
@@ -119,7 +119,6 @@ class ZuulClient(threading.Thread):
def register_functions(self):
self.log.debug("Register functions with gearman")
for function_name, plugin in self.functions.items():
- self.gearman_worker.waitForServer()
self.gearman_worker.registerFunction(function_name)
self.log.debug(self.gearman_worker.functions)
@@ -131,26 +130,28 @@ class ZuulClient(threading.Thread):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
- self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
- while True and not self.stopped():
+ while not self.stopped():
try:
- self.cancelled = False
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
- self.log.debug("Waiting for job")
- self.job = self.gearman_worker.getJob()
- self._handle_job()
+ if (not self.stopped() and self.gearman_worker.running and
+ self.gearman_worker.active_connections):
+ self.log.debug("Waiting for job")
+ self.job = self.gearman_worker.getJob()
+ self._handle_job()
except:
self.log.exception('Exception waiting for job.')
+ self.log.debug("Finished client thread")
def _handle_job(self):
""" We have a job, give it to the right plugin """
- self.log.debug("We have a job, we'll launch the task now.")
- self.functions[self.job.name].start_job(self.job)
+ if self.job:
+ self.log.debug("We have a job, we'll launch the task now.")
+ self.functions[self.job.name].start_job(self.job)