summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py19
1 files changed, 16 insertions, 3 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 4fd2900663..8e14072c59 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -339,6 +339,8 @@ class Session(Lockable):
self.incoming = []
self.unacked = []
self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
self.closing = False
self.closed = False
@@ -437,13 +439,15 @@ class Session(Lockable):
return None
@synchronized
- def acknowledge(self, message=None):
+ def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@type message: Message
@param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
"""
if message is None:
messages = self.unacked[:]
@@ -451,12 +455,18 @@ class Session(Lockable):
messages = [message]
for m in messages:
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self.wakeup()
+ self.ewait(lambda: len(self.acked) < self.ack_capacity)
self.unacked.remove(m)
self.acked.append(m)
self.wakeup()
- self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
- self.check_error()
+ if sync:
+ self.ewait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -539,6 +549,8 @@ class Session(Lockable):
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
+ # XXX: should be able to express this condition through API calls
+ self.ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
def parse_addr(address):
@@ -1116,6 +1128,7 @@ class Driver(Lockable):
# XXX: really need to make this async so that we don't give up the lock
_ssn.sync()
+ # XXX: we're ignoring acks that get lost when disconnected
for m in messages:
ssn.acked.remove(m)
if ssn.transactional: