diff options
author | Jiangge Zhang <tonyseek@gmail.com> | 2017-10-31 12:55:10 +0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-03-23 02:01:50 -0700 |
commit | cbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0 (patch) | |
tree | 3f3b0e718b9624102e14a774607db1751d8a26b7 | |
parent | 9b0a793e1275b7b031392b66b805b702a1bc58be (diff) | |
download | kazoo-cbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0.tar.gz |
refactor: Unify queue factory in various handlers
Then every handlers have queue_impl and queue_empty as their attributes.
-rw-r--r-- | kazoo/handlers/eventlet.py | 6 | ||||
-rw-r--r-- | kazoo/handlers/gevent.py | 10 |
2 files changed, 9 insertions, 7 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py index 1126298..c87898e 100644 --- a/kazoo/handlers/eventlet.py +++ b/kazoo/handlers/eventlet.py @@ -76,11 +76,13 @@ class SequentialEventletHandler(object): """ name = "sequential_eventlet_handler" + queue_impl = green_queue.LightQueue + queue_empty = green_queue.Empty def __init__(self): """Create a :class:`SequentialEventletHandler` instance""" - self.callback_queue = green_queue.LightQueue() - self.completion_queue = green_queue.LightQueue() + self.callback_queue = self.queue_impl() + self.completion_queue = self.queue_impl() self._workers = [] self._started = False diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index 71a4237..551e6a0 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -7,8 +7,6 @@ import gevent from gevent import socket import gevent.event import gevent.queue -from gevent.queue import Empty -from gevent.queue import Queue import gevent.select import gevent.thread try: @@ -50,11 +48,13 @@ class SequentialGeventHandler(object): """ name = "sequential_gevent_handler" + queue_impl = gevent.queue.Queue + queue_empty = gevent.queue.Empty sleep_func = staticmethod(gevent.sleep) def __init__(self): """Create a :class:`SequentialGeventHandler` instance""" - self.callback_queue = Queue() + self.callback_queue = self.queue_impl() self._running = False self._async = None self._state_change = Semaphore() @@ -72,7 +72,7 @@ class SequentialGeventHandler(object): if func is _STOP: break func() - except Empty: + except self.queue_empty: continue except Exception as exc: log.warning("Exception in worker greenlet") @@ -110,7 +110,7 @@ class SequentialGeventHandler(object): worker.join() # Clear the queues - self.callback_queue = Queue() # pragma: nocover + self.callback_queue = self.queue_impl() # pragma: nocover python2atexit.unregister(self.stop) |