diff options
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 4fd2900663..8e14072c59 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -339,6 +339,8 @@ class Session(Lockable): self.incoming = [] self.unacked = [] self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED self.closing = False self.closed = False @@ -437,13 +439,15 @@ class Session(Lockable): return None @synchronized - def acknowledge(self, message=None): + def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged """ if message is None: messages = self.unacked[:] @@ -451,12 +455,18 @@ class Session(Lockable): messages = [message] for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self.wakeup() + self.ewait(lambda: len(self.acked) < self.ack_capacity) self.unacked.remove(m) self.acked.append(m) self.wakeup() - self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked]) - self.check_error() + if sync: + self.ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -539,6 +549,8 @@ class Session(Lockable): while self.thread.isAlive(): self.thread.join(3) self.thread = None + # XXX: should be able to express this condition through API calls + self.ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) def parse_addr(address): @@ -1116,6 +1128,7 @@ class Driver(Lockable): # XXX: really need to make this async so that we don't give up the lock _ssn.sync() + # XXX: we're ignoring acks that get lost when disconnected for m in messages: ssn.acked.remove(m) if ssn.transactional: |