diff options
author | laura-surcel <laurasurcel16@gmail.com> | 2019-01-15 21:16:05 +0200 |
---|---|---|
committer | Stephen SORRIAUX <stephen.sorriaux@gmail.com> | 2019-01-15 20:16:05 +0100 |
commit | d9e0e7208e56c31f0abec60a3701f8d6ec1e7d32 (patch) | |
tree | 6cca7bf62079b8e9db5439c2ba8a3f4349748174 | |
parent | 1452a48f3070fe9034314476a6fdb94ca206dede (diff) | |
download | kazoo-d9e0e7208e56c31f0abec60a3701f8d6ec1e7d32.tar.gz |
fix(handlers): make AsyncResult call all registered callbacks instantly if the handler has stopped running (#549)
This avoids zombie thread to appear when creating and closing the client right after. A new unit case is added.
-rw-r--r-- | kazoo/handlers/gevent.py | 4 | ||||
-rw-r--r-- | kazoo/handlers/threading.py | 4 | ||||
-rw-r--r-- | kazoo/handlers/utils.py | 33 | ||||
-rw-r--r-- | kazoo/tests/test_client.py | 24 |
4 files changed, 49 insertions, 16 deletions
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index 78d234d..96ee765 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -60,6 +60,10 @@ class SequentialGeventHandler(object): self._state_change = Semaphore() self._workers = [] + @property + def running(self): + return self._running + class timeout_exception(gevent.Timeout): def __init__(self, msg): gevent.Timeout.__init__(self, exception=msg) diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index afd05c5..1ab3349 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -113,6 +113,10 @@ class SequentialThreadingHandler(object): self._state_change = threading.Lock() self._workers = [] + @property + def running(self): + return self._running + def _create_thread_worker(self, queue): def _thread_worker(): # pragma: nocover while True: diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py index 2517390..bd1b92e 100644 --- a/kazoo/handlers/utils.py +++ b/kazoo/handlers/utils.py @@ -46,20 +46,14 @@ class AsyncResult(object): with self._condition: self.value = value self._exception = None - for callback in self._callbacks: - self._handler.completion_queue.put( - functools.partial(callback, self) - ) + self._do_callbacks() self._condition.notify_all() def set_exception(self, exception): """Store the exception. Wake up the waiters.""" with self._condition: self._exception = exception - for callback in self._callbacks: - self._handler.completion_queue.put( - functools.partial(callback, self) - ) + self._do_callbacks() self._condition.notify_all() def get(self, block=True, timeout=None): @@ -102,16 +96,13 @@ class AsyncResult(object): """Register a callback to call when a value or an exception is set""" with self._condition: - # Are we already set? Dispatch it now - if self.ready(): - self._handler.completion_queue.put( - functools.partial(callback, self) - ) - return - if callback not in self._callbacks: self._callbacks.append(callback) + # Are we already set? Dispatch it now + if self.ready(): + self._do_callbacks() + def unlink(self, callback): """Remove the callback set by :meth:`rawlink`""" with self._condition: @@ -122,6 +113,18 @@ class AsyncResult(object): if callback in self._callbacks: self._callbacks.remove(callback) + def _do_callbacks(self): + """Execute the callbacks that were registered by :meth:`rawlink`. + If the handler is in running state this method only schedules + the calls to be performed by the handler. If it's stopped, + the callbacks are called right away.""" + + for callback in self._callbacks: + if self._handler.running: + self._handler.completion_queue.put( + functools.partial(callback, self)) + else: + functools.partial(callback, self)() def _set_fd_cloexec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index e22261d..e988fdb 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -1154,7 +1154,7 @@ class TestClientTransactions(KazooTestCase): eq_(self.client.get('/smith')[0], b'32') -class TestCallbacks(unittest.TestCase): +class TestSessionCallbacks(unittest.TestCase): def test_session_callback_states(self): from kazoo.protocol.states import KazooState, KeeperState from kazoo.client import KazooClient @@ -1185,6 +1185,28 @@ class TestCallbacks(unittest.TestCase): eq_(client.state, KazooState.SUSPENDED) +class TestCallbacks(KazooTestCase): + def test_async_result_callbacks_are_always_called(self): + # create a callback object + callback_mock = mock.Mock() + + # simulate waiting for a response + async_result = self.client.handler.async_result() + async_result.rawlink(callback_mock) + + # begin the procedure to stop the client + self.client.stop() + + # the response has just been received; + # this should be on another thread, + # simultaneously with the stop procedure + async_result.set_exception( + Exception("Anything that throws an exception")) + + # with the fix the callback should be called + self.assertGreater(callback_mock.call_count, 0) + + class TestNonChrootClient(KazooTestCase): def test_create(self): |