summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2017-10-31 12:55:10 +0800
committerJeff Widman <jeff@jeffwidman.com>2018-03-23 02:01:50 -0700
commitcbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0 (patch)
tree3f3b0e718b9624102e14a774607db1751d8a26b7
parent9b0a793e1275b7b031392b66b805b702a1bc58be (diff)
downloadkazoo-cbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0.tar.gz
refactor: Unify queue factory in various handlers
Then every handlers have queue_impl and queue_empty as their attributes.
-rw-r--r--kazoo/handlers/eventlet.py6
-rw-r--r--kazoo/handlers/gevent.py10
2 files changed, 9 insertions, 7 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py
index 1126298..c87898e 100644
--- a/kazoo/handlers/eventlet.py
+++ b/kazoo/handlers/eventlet.py
@@ -76,11 +76,13 @@ class SequentialEventletHandler(object):
"""
name = "sequential_eventlet_handler"
+ queue_impl = green_queue.LightQueue
+ queue_empty = green_queue.Empty
def __init__(self):
"""Create a :class:`SequentialEventletHandler` instance"""
- self.callback_queue = green_queue.LightQueue()
- self.completion_queue = green_queue.LightQueue()
+ self.callback_queue = self.queue_impl()
+ self.completion_queue = self.queue_impl()
self._workers = []
self._started = False
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py
index 71a4237..551e6a0 100644
--- a/kazoo/handlers/gevent.py
+++ b/kazoo/handlers/gevent.py
@@ -7,8 +7,6 @@ import gevent
from gevent import socket
import gevent.event
import gevent.queue
-from gevent.queue import Empty
-from gevent.queue import Queue
import gevent.select
import gevent.thread
try:
@@ -50,11 +48,13 @@ class SequentialGeventHandler(object):
"""
name = "sequential_gevent_handler"
+ queue_impl = gevent.queue.Queue
+ queue_empty = gevent.queue.Empty
sleep_func = staticmethod(gevent.sleep)
def __init__(self):
"""Create a :class:`SequentialGeventHandler` instance"""
- self.callback_queue = Queue()
+ self.callback_queue = self.queue_impl()
self._running = False
self._async = None
self._state_change = Semaphore()
@@ -72,7 +72,7 @@ class SequentialGeventHandler(object):
if func is _STOP:
break
func()
- except Empty:
+ except self.queue_empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
@@ -110,7 +110,7 @@ class SequentialGeventHandler(object):
worker.join()
# Clear the queues
- self.callback_queue = Queue() # pragma: nocover
+ self.callback_queue = self.queue_impl() # pragma: nocover
python2atexit.unregister(self.stop)