diff options
Diffstat (limited to 'python/qpid/queue.py')
-rw-r--r-- | python/qpid/queue.py | 47 |
1 files changed, 31 insertions, 16 deletions
diff --git a/python/qpid/queue.py b/python/qpid/queue.py index af0565b6cc..00946a9156 100644 --- a/python/qpid/queue.py +++ b/python/qpid/queue.py @@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use. """ from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread class Closed(Exception): pass class Queue(BaseQueue): END = object() + STOP = object() def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) - self._real_put = self.put - self.listener = self._real_put + self.listener = None + self.thread = None def close(self): self.put(Queue.END) def get(self, block = True, timeout = None): - self.put = self._real_put - try: - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result - finally: - self.put = self.listener - pass + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result def listen(self, listener): self.listener = listener - self.put = self.listener + if listener == None: + if self.thread != None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + else: + if self.thread == None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed: + break |