diff options
author | Joshua Hesketh <josh@nitrotech.org> | 2014-03-05 13:06:08 +1100 |
---|---|---|
committer | Joshua Hesketh <josh@nitrotech.org> | 2014-03-05 13:06:08 +1100 |
commit | 81b5fb85317e5b346876c5814d75d2a1fa6af7fe (patch) | |
tree | 9e2b396910a58c86a732585b9283d8e3ceb4ba37 /turbo_hipster/worker_manager.py | |
parent | 4900dcbafcceb14bee46d2b6ddbacc6e1cc18d48 (diff) | |
download | turbo-hipster-81b5fb85317e5b346876c5814d75d2a1fa6af7fe.tar.gz |
Improve gearman shutdown with extra thread lock checks
When quiting turbo-hipster make sure all threads stop properly.
Previously when the gearman_client disconnects the waitForServer
method would release and the client would then wait for a job
it isn't going to receive because turbo-hipster is shutting down.
Change-Id: If9c13e4858c2281c021ca4540ec9475c4d46c615
Diffstat (limited to 'turbo_hipster/worker_manager.py')
-rw-r--r-- | turbo_hipster/worker_manager.py | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py index 746e4f7..d011a03 100644 --- a/turbo_hipster/worker_manager.py +++ b/turbo_hipster/worker_manager.py @@ -47,7 +47,6 @@ class ZuulManager(threading.Thread): self.config['zuul_server']['gearman_host'], self.config['zuul_server']['gearman_port'] ) - self.gearman_worker.waitForServer() self.gearman_worker.registerFunction( 'stop:turbo-hipster-manager-%s' % hostname) @@ -55,24 +54,26 @@ class ZuulManager(threading.Thread): self._stop.set() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") - self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stopped(self): return self._stop.isSet() def run(self): - while True and not self.stopped(): + while not self.stopped(): try: # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") self.gearman_worker.waitForServer() - logging.debug("Waiting for job") - self.current_step = 0 - job = self.gearman_worker.getJob() - self._handle_job(job) + if (not self.stopped() and self.gearman_worker.running and + self.gearman_worker.active_connections): + logging.debug("Waiting for job") + self.current_step = 0 + job = self.gearman_worker.getJob() + self._handle_job(job) except: logging.exception('Exception retrieving log event.') + self.log.debug("Finished manager thread") def _handle_job(self, job): """ Handle the requested job """ @@ -104,7 +105,6 @@ class ZuulClient(threading.Thread): self.functions = {} self.job = None - self.cancelled = False self.setup_gearman() @@ -119,7 +119,6 @@ class ZuulClient(threading.Thread): def register_functions(self): self.log.debug("Register functions with gearman") for function_name, plugin in self.functions.items(): - self.gearman_worker.waitForServer() self.gearman_worker.registerFunction(function_name) self.log.debug(self.gearman_worker.functions) @@ -131,26 +130,28 @@ class ZuulClient(threading.Thread): self._stop.set() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") - self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stopped(self): return self._stop.isSet() def run(self): - while True and not self.stopped(): + while not self.stopped(): try: - self.cancelled = False # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") self.gearman_worker.waitForServer() - self.log.debug("Waiting for job") - self.job = self.gearman_worker.getJob() - self._handle_job() + if (not self.stopped() and self.gearman_worker.running and + self.gearman_worker.active_connections): + self.log.debug("Waiting for job") + self.job = self.gearman_worker.getJob() + self._handle_job() except: self.log.exception('Exception waiting for job.') + self.log.debug("Finished client thread") def _handle_job(self): """ We have a job, give it to the right plugin """ - self.log.debug("We have a job, we'll launch the task now.") - self.functions[self.job.name].start_job(self.job) + if self.job: + self.log.debug("We have a job, we'll launch the task now.") + self.functions[self.job.name].start_job(self.job) |