diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-24 17:25:10 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-24 17:25:10 +0000 |
commit | c20ddb1ec5f95ee7f9d85db22ef469a0fa027621 (patch) | |
tree | ec9766250516e09b1d7e79973e5ba7cf848887de /python/qpid/messaging.py | |
parent | 9d23fd30e819f7176b9583fc2bd548f425e93831 (diff) | |
download | qpid-python-c20ddb1ec5f95ee7f9d85db22ef469a0fa027621.tar.gz |
added back exchange query on link establishment; added sender.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 84653a56ff..f9ca54fe9e 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -314,7 +314,7 @@ class Session: return self.connection._ewait(predicate, timeout, exc) @synchronized - def sender(self, target): + def sender(self, target, **options): """ Creates a L{Sender} that may be used to send L{Messages<Message>} to the specified target. @@ -324,7 +324,7 @@ class Session: @rtype: Sender @return: a new Sender for the specified target """ - sender = Sender(self, len(self.senders), target) + sender = Sender(self, len(self.senders), target, options) self.senders.append(sender) self._wakeup() # XXX: because of the lack of waiting here we can end up getting @@ -334,7 +334,7 @@ class Session: return sender @synchronized - def receiver(self, source, filter=None): + def receiver(self, source, **options): """ Creates a receiver that may be used to actively fetch or to listen for the arrival of L{Messages<Message>} from the specified source. @@ -344,7 +344,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, filter, + receiver = Receiver(self, len(self.receivers), source, options, self.started) self.receivers.append(receiver) self._wakeup() @@ -512,13 +512,15 @@ class Sender: Sends outgoing messages. """ - def __init__(self, session, index, target): + def __init__(self, session, index, target, options): self.session = session self.index = index self.target = target - self.capacity = UNLIMITED + self.options = options + self.capacity = options.get("capacity", UNLIMITED) self.queued = Serial(0) self.acked = Serial(0) + self.closing = False self.closed = False self._lock = self.session._lock @@ -580,15 +582,19 @@ class Sender: message._sender = self self.session.outgoing.append(message) self.queued += 1 - mno = self.queued self._wakeup() if sync: - self._ewait(lambda: self.acked >= mno) + self.sync() assert message not in self.session.outgoing @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized def close(self): """ Close the Sender. @@ -616,15 +622,15 @@ class Receiver: L{listen}. """ - def __init__(self, session, index, source, filter, started): + def __init__(self, session, index, source, options, started): self.session = session self.index = index self.destination = str(self.index) self.source = source - self.filter = filter + self.options = options self.started = started - self.capacity = UNLIMITED + self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) self.drain = False self.impending = Serial(0) |