summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/endpoints.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-16 22:15:14 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-16 22:15:14 +0000
commite0218aa1ff9071fd5b5c44e05f2a0a6902d989ad (patch)
tree30f36ecf97e946ac81ccedf54d72badfa26c6bf8 /python/qpid/messaging/endpoints.py
parenta6801aa6ac2c6d97b6747ef7bd7d2264be9c58ab (diff)
downloadqpid-python-e0218aa1ff9071fd5b5c44e05f2a0a6902d989ad.tar.gz
don't always set the sync bit on send
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@955414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r--python/qpid/messaging/endpoints.py25
1 files changed, 21 insertions, 4 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 707aee3ed6..58a654ef2a 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -677,12 +677,20 @@ class Session:
assert self.aborted
@synchronized
+ def sync(self):
+ """
+ Sync the session.
+ """
+ for snd in self.senders:
+ snd.sync()
+ self._ewait(lambda: not self.outgoing and not self.acked)
+
+ @synchronized
def close(self):
"""
Close the session.
"""
- # XXX: should be able to express this condition through API calls
- self._ewait(lambda: not self.outgoing and not self.acked)
+ self.sync()
for link in self.receivers + self.senders:
link.close()
@@ -704,8 +712,10 @@ class Sender:
self.target = target
self.options = options
self.capacity = options.get("capacity", UNLIMITED)
+ self.threshold = 0.5
self.durable = options.get("durable")
self.queued = Serial(0)
+ self.synced = Serial(0)
self.acked = Serial(0)
self.error = None
self.linked = False
@@ -792,18 +802,25 @@ class Sender:
# XXX: what if we send the same message to multiple senders?
message._sender = self
+ if self.capacity is not UNLIMITED:
+ message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
+ else:
+ message._sync = sync
self.session.outgoing.append(message)
self.queued += 1
- self._wakeup()
-
if sync:
self.sync()
assert message not in self.session.outgoing
+ else:
+ self._wakeup()
@synchronized
def sync(self):
mno = self.queued
+ if self.synced < mno:
+ self.synced = mno
+ self._wakeup()
self._ewait(lambda: self.acked >= mno)
@synchronized