summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-02-25 21:29:55 +0000
committerRafael H. Schloming <rhs@apache.org>2008-02-25 21:29:55 +0000
commit4c7414d38b032fcb84dab463a408b050dd0c7ddf (patch)
tree5cf31931ba4bb6a93b15c67a21019355ccb38008
parenta3ecd35573fff8b40f88c284d109c9f7d3462802 (diff)
downloadqpid-python-4c7414d38b032fcb84dab463a408b050dd0c7ddf.tar.gz
put queue listeners in their own thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@631002 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/queue.py47
-rw-r--r--qpid/python/tests/queue.py41
2 files changed, 49 insertions, 39 deletions
diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py
index af0565b6cc..00946a9156 100644
--- a/qpid/python/qpid/queue.py
+++ b/qpid/python/qpid/queue.py
@@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use.
"""
from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
class Closed(Exception): pass
class Queue(BaseQueue):
END = object()
+ STOP = object()
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
- self._real_put = self.put
- self.listener = self._real_put
+ self.listener = None
+ self.thread = None
def close(self):
self.put(Queue.END)
def get(self, block = True, timeout = None):
- self.put = self._real_put
- try:
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
- finally:
- self.put = self.listener
- pass
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
def listen(self, listener):
self.listener = listener
- self.put = self.listener
+ if listener == None:
+ if self.thread != None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+ else:
+ if self.thread == None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed:
+ break
diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py
index d2e495d207..e12354eb43 100644
--- a/qpid/python/tests/queue.py
+++ b/qpid/python/tests/queue.py
@@ -30,37 +30,32 @@ class QueueTest (TestCase):
# all the queue functionality.
def test_listen(self):
- LISTEN = object()
- GET = object()
- EMPTY = object()
+ values = []
+ heard = threading.Event()
+ def listener(x):
+ values.append(x)
+ heard.set()
q = Queue(0)
- values = []
- q.listen(lambda x: values.append((LISTEN, x)))
+ q.listen(listener)
+ heard.clear()
q.put(1)
- assert values[-1] == (LISTEN, 1)
+ heard.wait()
+ assert values[-1] == 1
+ heard.clear()
q.put(2)
- assert values[-1] == (LISTEN, 2)
-
- class Getter(threading.Thread):
+ heard.wait()
+ assert values[-1] == 2
- def run(self):
- try:
- values.append((GET, q.get(timeout=10)))
- except Empty:
- values.append(EMPTY)
-
- g = Getter()
- g.start()
- # let the other thread reach the get
- time.sleep(2)
+ q.listen(None)
q.put(3)
- g.join()
-
- assert values[-1] == (GET, 3)
+ assert q.get(3) == 3
+ q.listen(listener)
+ heard.clear()
q.put(4)
- assert values[-1] == (LISTEN, 4)
+ heard.wait()
+ assert values[-1] == 4
def test_close(self):
q = Queue(0)