summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-06-02 19:25:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-06-02 19:25:57 +0000
commitbc25543ddcb44c4389025d8470751c0c790a9bbe (patch)
tree1700c5884eea8bdd3edd782cca35504874160e28 /python/qpid/messaging.py
parentd89ee55566c10f576d08e2e9cf7d87ce10ba4b61 (diff)
downloadqpid-python-bc25543ddcb44c4389025d8470751c0c790a9bbe.tar.gz
modified start and stop to function independently of fetch vs listen, added Receiver.pending() and added tests for Receiver.start(), Receiver.stop(), and Receiver.pending()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 7063dfe684..931784024e 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -365,6 +365,14 @@ class Session(Lockable):
receiver._link()
return receiver
+ @synchronized
+ def _count(self, predicate):
+ result = 0
+ for msg in self.incoming:
+ if predicate(msg):
+ result += 1
+ return result
+
def _peek(self, predicate):
for msg in self.incoming:
if predicate(msg):
@@ -647,6 +655,10 @@ class Receiver(Lockable):
self._ssn = None
@synchronized
+ def pending(self):
+ return self.session._count(self._pred)
+
+ @synchronized
def listen(self, listener=None):
"""
Sets the message listener for this receiver.
@@ -655,13 +667,6 @@ class Receiver(Lockable):
@param listener: a callable object to be notified on message arrival
"""
self.listener = listener
- if self.listener is None:
- self._ssn.message_stop(self.destination)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL,
- sync=True)
- self._ssn.sync()
- else:
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
def _pred(self, msg):
return msg._receiver == self
@@ -692,8 +697,7 @@ class Receiver(Lockable):
def _start(self):
self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL)
- if self.listener is not None:
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
@synchronized
def start(self):