diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-06-16 22:15:14 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-16 22:15:14 +0000 |
commit | e0218aa1ff9071fd5b5c44e05f2a0a6902d989ad (patch) | |
tree | 30f36ecf97e946ac81ccedf54d72badfa26c6bf8 /python/qpid/messaging/endpoints.py | |
parent | a6801aa6ac2c6d97b6747ef7bd7d2264be9c58ab (diff) | |
download | qpid-python-e0218aa1ff9071fd5b5c44e05f2a0a6902d989ad.tar.gz |
don't always set the sync bit on send
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@955414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r-- | python/qpid/messaging/endpoints.py | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 707aee3ed6..58a654ef2a 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -677,12 +677,20 @@ class Session: assert self.aborted @synchronized + def sync(self): + """ + Sync the session. + """ + for snd in self.senders: + snd.sync() + self._ewait(lambda: not self.outgoing and not self.acked) + + @synchronized def close(self): """ Close the session. """ - # XXX: should be able to express this condition through API calls - self._ewait(lambda: not self.outgoing and not self.acked) + self.sync() for link in self.receivers + self.senders: link.close() @@ -704,8 +712,10 @@ class Sender: self.target = target self.options = options self.capacity = options.get("capacity", UNLIMITED) + self.threshold = 0.5 self.durable = options.get("durable") self.queued = Serial(0) + self.synced = Serial(0) self.acked = Serial(0) self.error = None self.linked = False @@ -792,18 +802,25 @@ class Sender: # XXX: what if we send the same message to multiple senders? message._sender = self + if self.capacity is not UNLIMITED: + message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) + else: + message._sync = sync self.session.outgoing.append(message) self.queued += 1 - self._wakeup() - if sync: self.sync() assert message not in self.session.outgoing + else: + self._wakeup() @synchronized def sync(self): mno = self.queued + if self.synced < mno: + self.synced = mno + self._wakeup() self._ewait(lambda: self.acked >= mno) @synchronized |