diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-11-12 18:33:25 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-12 18:33:25 +0000 |
| commit | 48572ba0ee68195b39b932909fcb99a57c9cf826 (patch) | |
| tree | 8545375b67e8e43cd4db1b2badc7de04f7c7c3e9 /python | |
| parent | 2cb5195f485b3d2e312c8cc9cf8c9dab4cd980ed (diff) | |
| download | qpid-python-48572ba0ee68195b39b932909fcb99a57c9cf826.tar.gz | |
removed listeners in favor of next_receiver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835488 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/messaging.py | 55 | ||||
| -rw-r--r-- | python/qpid/tests/messaging.py | 45 |
2 files changed, 40 insertions, 60 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 2fe7d33ca9..ed0bd14f9c 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -295,10 +295,6 @@ class Session: self.closed = False self._lock = connection._lock - self.running = True - self.thread = Thread(target = self.run) - self.thread.setDaemon(True) - self.thread.start() def __repr__(self): return "<Session %s>" % self.name @@ -342,8 +338,8 @@ class Session: @synchronized def receiver(self, source, **options): """ - Creates a receiver that may be used to actively fetch or to listen - for the arrival of L{Messages<Message>} from the specified source. + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. @type source: str @param source: the source of L{Messages<Message>} @@ -392,6 +388,13 @@ class Session: return None @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all @@ -465,29 +468,8 @@ class Session: """ for rcv in self.receivers: rcv.stop() - # TODO: think about stopping individual receivers in listen mode - self._wait(lambda: self._peek(self._pred) is None) self.started = False - def _pred(self, m): - return m._receiver.listener is not None - - @synchronized - def run(self): - self.running = True - try: - while True: - msg = self._get(self._pred) - if msg is None: - break; - else: - msg._receiver.listener(msg) - if self._peek(self._pred) is None: - self.connection._waiter.notifyAll() - finally: - self.running = False - self.connection._waiter.notifyAll() - @synchronized def close(self): """ @@ -498,10 +480,7 @@ class Session: self.closing = True self._wakeup() - self._ewait(lambda: self.closed and not self.running) - while self.thread.isAlive(): - self.thread.join(3) - self.thread = None + self._ewait(lambda: self.closed) # XXX: should be able to express this condition through API calls self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) @@ -636,8 +615,7 @@ class Receiver: """ Receives incoming messages from a remote source. Messages may be - actively fetched with L{fetch} or a listener may be installed with - L{listen}. + fetched with L{fetch}. """ def __init__(self, session, index, source, options, started): @@ -659,7 +637,6 @@ class Receiver: self.linked = False self.closing = False self.closed = False - self.listener = None self._lock = self.session._lock def _wakeup(self): @@ -694,16 +671,6 @@ class Receiver: else: return self.capacity - @synchronized - def listen(self, listener=None): - """ - Sets the message listener for this receiver. - - @type listener: callable - @param listener: a callable object to be notified on message arrival - """ - self.listener = listener - def _pred(self, msg): return msg._receiver == self diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 13638638fb..db9e15a01a 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -228,6 +228,35 @@ class SessionTests(Base): assert msg.content == content self.ssn.acknowledge(msg) + def testNextReceiver(self): + ADDR = 'test-next-rcv-queue {create: always}' + rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + + # XXX: this won't work if it is before the receiver creation + self.ssn.start() + + snd = self.ssn.sender(ADDR) + + msgs = [] + for i in range(10): + content = self.content("testNextReceiver", i) + snd.send(content) + msgs.append(content) + + fetched = [] + try: + while True: + rcv = self.ssn.next_receiver(timeout=self.delay()) + assert rcv in (rcv1, rcv2, rcv3) + assert rcv.pending() > 0 + fetched.append(rcv.fetch().content) + except Empty: + pass + assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) + self.ssn.acknowledge() + def testStart(self): START_Q = 'test-start-queue {create: always}' rcv = self.ssn.receiver(START_Q) @@ -437,22 +466,6 @@ class ReceiverTests(Base): self.snd.send(content) return content - def testListen(self): - msgs = Queue() - def listener(m): - msgs.put(m) - self.ssn.acknowledge(m) - self.rcv.listen(listener) - content = self.send("testListen") - try: - msg = msgs.get(timeout=self.delay()) - assert False, "did not expect message: %s" % msg - except QueueEmpty: - pass - self.rcv.start() - msg = msgs.get(timeout=self.delay()) - assert msg.content == content - def testFetch(self): try: msg = self.rcv.fetch(0) |
