summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-12 18:33:25 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-12 18:33:25 +0000
commit48572ba0ee68195b39b932909fcb99a57c9cf826 (patch)
tree8545375b67e8e43cd4db1b2badc7de04f7c7c3e9 /python
parent2cb5195f485b3d2e312c8cc9cf8c9dab4cd980ed (diff)
downloadqpid-python-48572ba0ee68195b39b932909fcb99a57c9cf826.tar.gz
removed listeners in favor of next_receiver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835488 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging.py55
-rw-r--r--python/qpid/tests/messaging.py45
2 files changed, 40 insertions, 60 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 2fe7d33ca9..ed0bd14f9c 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -295,10 +295,6 @@ class Session:
self.closed = False
self._lock = connection._lock
- self.running = True
- self.thread = Thread(target = self.run)
- self.thread.setDaemon(True)
- self.thread.start()
def __repr__(self):
return "<Session %s>" % self.name
@@ -342,8 +338,8 @@ class Session:
@synchronized
def receiver(self, source, **options):
"""
- Creates a receiver that may be used to actively fetch or to listen
- for the arrival of L{Messages<Message>} from the specified source.
+ Creates a receiver that may be used to fetch L{Messages<Message>}
+ from the specified source.
@type source: str
@param source: the source of L{Messages<Message>}
@@ -392,6 +388,13 @@ class Session:
return None
@synchronized
+ def next_receiver(self, timeout=None):
+ if self._ewait(lambda: self.incoming, timeout):
+ return self.incoming[0]._receiver
+ else:
+ raise Empty
+
+ @synchronized
def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
@@ -465,29 +468,8 @@ class Session:
"""
for rcv in self.receivers:
rcv.stop()
- # TODO: think about stopping individual receivers in listen mode
- self._wait(lambda: self._peek(self._pred) is None)
self.started = False
- def _pred(self, m):
- return m._receiver.listener is not None
-
- @synchronized
- def run(self):
- self.running = True
- try:
- while True:
- msg = self._get(self._pred)
- if msg is None:
- break;
- else:
- msg._receiver.listener(msg)
- if self._peek(self._pred) is None:
- self.connection._waiter.notifyAll()
- finally:
- self.running = False
- self.connection._waiter.notifyAll()
-
@synchronized
def close(self):
"""
@@ -498,10 +480,7 @@ class Session:
self.closing = True
self._wakeup()
- self._ewait(lambda: self.closed and not self.running)
- while self.thread.isAlive():
- self.thread.join(3)
- self.thread = None
+ self._ewait(lambda: self.closed)
# XXX: should be able to express this condition through API calls
self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
@@ -636,8 +615,7 @@ class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
- actively fetched with L{fetch} or a listener may be installed with
- L{listen}.
+ fetched with L{fetch}.
"""
def __init__(self, session, index, source, options, started):
@@ -659,7 +637,6 @@ class Receiver:
self.linked = False
self.closing = False
self.closed = False
- self.listener = None
self._lock = self.session._lock
def _wakeup(self):
@@ -694,16 +671,6 @@ class Receiver:
else:
return self.capacity
- @synchronized
- def listen(self, listener=None):
- """
- Sets the message listener for this receiver.
-
- @type listener: callable
- @param listener: a callable object to be notified on message arrival
- """
- self.listener = listener
-
def _pred(self, msg):
return msg._receiver == self
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 13638638fb..db9e15a01a 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -228,6 +228,35 @@ class SessionTests(Base):
assert msg.content == content
self.ssn.acknowledge(msg)
+ def testNextReceiver(self):
+ ADDR = 'test-next-rcv-queue {create: always}'
+ rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+
+ # XXX: this won't work if it is before the receiver creation
+ self.ssn.start()
+
+ snd = self.ssn.sender(ADDR)
+
+ msgs = []
+ for i in range(10):
+ content = self.content("testNextReceiver", i)
+ snd.send(content)
+ msgs.append(content)
+
+ fetched = []
+ try:
+ while True:
+ rcv = self.ssn.next_receiver(timeout=self.delay())
+ assert rcv in (rcv1, rcv2, rcv3)
+ assert rcv.pending() > 0
+ fetched.append(rcv.fetch().content)
+ except Empty:
+ pass
+ assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
+ self.ssn.acknowledge()
+
def testStart(self):
START_Q = 'test-start-queue {create: always}'
rcv = self.ssn.receiver(START_Q)
@@ -437,22 +466,6 @@ class ReceiverTests(Base):
self.snd.send(content)
return content
- def testListen(self):
- msgs = Queue()
- def listener(m):
- msgs.put(m)
- self.ssn.acknowledge(m)
- self.rcv.listen(listener)
- content = self.send("testListen")
- try:
- msg = msgs.get(timeout=self.delay())
- assert False, "did not expect message: %s" % msg
- except QueueEmpty:
- pass
- self.rcv.start()
- msg = msgs.get(timeout=self.delay())
- assert msg.content == content
-
def testFetch(self):
try:
msg = self.rcv.fetch(0)