diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
commit | 53a2ec220e568e12f4e4a640f87b02d22274a938 (patch) | |
tree | 40020f9d9dd2a95bf484b8471a844d251c1ac41a /python/qpid/messaging.py | |
parent | 5781e67cca8d07d481797946c217e738264f6d23 (diff) | |
download | qpid-python-53a2ec220e568e12f4e4a640f87b02d22274a938.tar.gz |
added sync flag to acknowledge and ack_capcity to Session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810495 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 4fd2900663..8e14072c59 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -339,6 +339,8 @@ class Session(Lockable): self.incoming = [] self.unacked = [] self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED self.closing = False self.closed = False @@ -437,13 +439,15 @@ class Session(Lockable): return None @synchronized - def acknowledge(self, message=None): + def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged """ if message is None: messages = self.unacked[:] @@ -451,12 +455,18 @@ class Session(Lockable): messages = [message] for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self.wakeup() + self.ewait(lambda: len(self.acked) < self.ack_capacity) self.unacked.remove(m) self.acked.append(m) self.wakeup() - self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked]) - self.check_error() + if sync: + self.ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -539,6 +549,8 @@ class Session(Lockable): while self.thread.isAlive(): self.thread.join(3) self.thread = None + # XXX: should be able to express this condition through API calls + self.ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) def parse_addr(address): @@ -1116,6 +1128,7 @@ class Driver(Lockable): # XXX: really need to make this async so that we don't give up the lock _ssn.sync() + # XXX: we're ignoring acks that get lost when disconnected for m in messages: ssn.acked.remove(m) if ssn.transactional: |