summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-24 17:25:10 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-24 17:25:10 +0000
commitc20ddb1ec5f95ee7f9d85db22ef469a0fa027621 (patch)
treeec9766250516e09b1d7e79973e5ba7cf848887de /python/qpid/messaging.py
parent9d23fd30e819f7176b9583fc2bd548f425e93831 (diff)
downloadqpid-python-c20ddb1ec5f95ee7f9d85db22ef469a0fa027621.tar.gz
added back exchange query on link establishment; added sender.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py28
1 files changed, 17 insertions, 11 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 84653a56ff..f9ca54fe9e 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -314,7 +314,7 @@ class Session:
return self.connection._ewait(predicate, timeout, exc)
@synchronized
- def sender(self, target):
+ def sender(self, target, **options):
"""
Creates a L{Sender} that may be used to send L{Messages<Message>}
to the specified target.
@@ -324,7 +324,7 @@ class Session:
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target)
+ sender = Sender(self, len(self.senders), target, options)
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -334,7 +334,7 @@ class Session:
return sender
@synchronized
- def receiver(self, source, filter=None):
+ def receiver(self, source, **options):
"""
Creates a receiver that may be used to actively fetch or to listen
for the arrival of L{Messages<Message>} from the specified source.
@@ -344,7 +344,7 @@ class Session:
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, filter,
+ receiver = Receiver(self, len(self.receivers), source, options,
self.started)
self.receivers.append(receiver)
self._wakeup()
@@ -512,13 +512,15 @@ class Sender:
Sends outgoing messages.
"""
- def __init__(self, session, index, target):
+ def __init__(self, session, index, target, options):
self.session = session
self.index = index
self.target = target
- self.capacity = UNLIMITED
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
self.queued = Serial(0)
self.acked = Serial(0)
+ self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -580,15 +582,19 @@ class Sender:
message._sender = self
self.session.outgoing.append(message)
self.queued += 1
- mno = self.queued
self._wakeup()
if sync:
- self._ewait(lambda: self.acked >= mno)
+ self.sync()
assert message not in self.session.outgoing
@synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
def close(self):
"""
Close the Sender.
@@ -616,15 +622,15 @@ class Receiver:
L{listen}.
"""
- def __init__(self, session, index, source, filter, started):
+ def __init__(self, session, index, source, options, started):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
- self.filter = filter
+ self.options = options
self.started = started
- self.capacity = UNLIMITED
+ self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
self.drain = False
self.impending = Serial(0)