diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-02-25 21:29:55 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-02-25 21:29:55 +0000 |
commit | 4c7414d38b032fcb84dab463a408b050dd0c7ddf (patch) | |
tree | 5cf31931ba4bb6a93b15c67a21019355ccb38008 | |
parent | a3ecd35573fff8b40f88c284d109c9f7d3462802 (diff) | |
download | qpid-python-4c7414d38b032fcb84dab463a408b050dd0c7ddf.tar.gz |
put queue listeners in their own thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@631002 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/queue.py | 47 | ||||
-rw-r--r-- | qpid/python/tests/queue.py | 41 |
2 files changed, 49 insertions, 39 deletions
diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py index af0565b6cc..00946a9156 100644 --- a/qpid/python/qpid/queue.py +++ b/qpid/python/qpid/queue.py @@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use. """ from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread class Closed(Exception): pass class Queue(BaseQueue): END = object() + STOP = object() def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) - self._real_put = self.put - self.listener = self._real_put + self.listener = None + self.thread = None def close(self): self.put(Queue.END) def get(self, block = True, timeout = None): - self.put = self._real_put - try: - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result - finally: - self.put = self.listener - pass + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result def listen(self, listener): self.listener = listener - self.put = self.listener + if listener == None: + if self.thread != None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + else: + if self.thread == None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed: + break diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py index d2e495d207..e12354eb43 100644 --- a/qpid/python/tests/queue.py +++ b/qpid/python/tests/queue.py @@ -30,37 +30,32 @@ class QueueTest (TestCase): # all the queue functionality. def test_listen(self): - LISTEN = object() - GET = object() - EMPTY = object() + values = [] + heard = threading.Event() + def listener(x): + values.append(x) + heard.set() q = Queue(0) - values = [] - q.listen(lambda x: values.append((LISTEN, x))) + q.listen(listener) + heard.clear() q.put(1) - assert values[-1] == (LISTEN, 1) + heard.wait() + assert values[-1] == 1 + heard.clear() q.put(2) - assert values[-1] == (LISTEN, 2) - - class Getter(threading.Thread): + heard.wait() + assert values[-1] == 2 - def run(self): - try: - values.append((GET, q.get(timeout=10))) - except Empty: - values.append(EMPTY) - - g = Getter() - g.start() - # let the other thread reach the get - time.sleep(2) + q.listen(None) q.put(3) - g.join() - - assert values[-1] == (GET, 3) + assert q.get(3) == 3 + q.listen(listener) + heard.clear() q.put(4) - assert values[-1] == (LISTEN, 4) + heard.wait() + assert values[-1] == 4 def test_close(self): q = Queue(0) |