diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
commit | 53a2ec220e568e12f4e4a640f87b02d22274a938 (patch) | |
tree | 40020f9d9dd2a95bf484b8471a844d251c1ac41a /python | |
parent | 5781e67cca8d07d481797946c217e738264f6d23 (diff) | |
download | qpid-python-53a2ec220e568e12f4e4a640f87b02d22274a938.tar.gz |
added sync flag to acknowledge and ack_capcity to Session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810495 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/messaging.py | 19 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 44 |
2 files changed, 52 insertions, 11 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 4fd2900663..8e14072c59 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -339,6 +339,8 @@ class Session(Lockable): self.incoming = [] self.unacked = [] self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED self.closing = False self.closed = False @@ -437,13 +439,15 @@ class Session(Lockable): return None @synchronized - def acknowledge(self, message=None): + def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged """ if message is None: messages = self.unacked[:] @@ -451,12 +455,18 @@ class Session(Lockable): messages = [message] for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self.wakeup() + self.ewait(lambda: len(self.acked) < self.ack_capacity) self.unacked.remove(m) self.acked.append(m) self.wakeup() - self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked]) - self.check_error() + if sync: + self.ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -539,6 +549,8 @@ class Session(Lockable): while self.thread.isAlive(): self.thread.join(3) self.thread = None + # XXX: should be able to express this condition through API calls + self.ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) def parse_addr(address): @@ -1116,6 +1128,7 @@ class Driver(Lockable): # XXX: really need to make this async so that we don't give up the lock _ssn.sync() + # XXX: we're ignoring acks that get lost when disconnected for m in messages: ssn.acked.remove(m) if ssn.transactional: diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 0058f6199e..3ec8bf0749 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -72,13 +72,15 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None, timeout=0): + def drain(self, rcv, limit=None, timeout=0, expected=None): contents = [] try: while limit is None or len(contents) < limit: contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) return contents def assertEmpty(self, rcv): @@ -225,27 +227,27 @@ class SessionTests(Base): # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown - def testAcknowledge(self): + def ackTest(self, acker, ack_capacity=None): # send a bunch of messages snd = self.ssn.sender("test-ack-queue") - tid = "a" - contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)] + contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking rcv = self.ssn.receiver(snd.target) - assert contents == self.drain(rcv) + self.drain(rcv, expected=contents) self.ssn.close() # drain the queue again, verify that they are all the messages # were requeued, and ack this time before closing self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity rcv = self.ssn.receiver("test-ack-queue") - drained = self.drain(rcv) - assert contents == drained, "expected %s, got %s" % (contents, drained) - self.ssn.acknowledge() + self.drain(rcv, expected=contents) + acker(self.ssn) self.ssn.close() # drain the queue a final time and verify that the messages were @@ -254,6 +256,32 @@ class SessionTests(Base): rcv = self.ssn.receiver("test-ack-queue") self.assertEmpty(rcv) + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver("test-ack-queue")) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + def send(self, ssn, queue, base, count=1): snd = ssn.sender(queue) contents = [] |