diff options
Diffstat (limited to 'python/qpid/tests/messaging.py')
-rw-r--r-- | python/qpid/tests/messaging.py | 136 |
1 files changed, 92 insertions, 44 deletions
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 8cadb2a8fe..474c9ab630 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -25,7 +25,8 @@ from qpid import compat from qpid.tests import Test from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4 + InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \ + UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -66,7 +67,7 @@ class Base(Test): return "%s[%s, %s]" % (base, count, self.test_id) def ping(self, ssn): - PING_Q = 'ping-queue; {create: always}' + PING_Q = 'ping-queue; {create: always, delete: always}' # send a message sender = ssn.sender(PING_Q, durable=self.durable()) content = self.content("ping") @@ -173,7 +174,8 @@ class ConnectionTests(Base): self.conn.close() assert not self.conn.connected() -ACK_Q = 'test-ack-queue; {create: always}' +ACK_QC = 'test-ack-queue; {create: always}' +ACK_QD = 'test-ack-queue; {delete: always}' class SessionTests(Base): @@ -185,7 +187,7 @@ class SessionTests(Base): return self.conn.session() def testSender(self): - snd = self.ssn.sender('test-snd-queue; {create: always}', + snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', durable=self.durable()) snd2 = self.ssn.sender(snd.target, durable=self.durable()) assert snd is not snd2 @@ -199,7 +201,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always}') + rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() @@ -212,7 +214,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testNextReceiver(self): - ADDR = 'test-next-rcv-queue; {create: always}' + ADDR = 'test-next-rcv-queue; {create: always, delete: always}' rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) @@ -241,14 +243,14 @@ class SessionTests(Base): # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): # send a bunch of messages - snd = self.ssn.sender(ACK_Q, durable=self.durable()) + snd = self.ssn.sender(ACK_QC, durable=self.durable()) contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QC) self.drain(rcv, expected=contents) self.ssn.close() @@ -257,7 +259,7 @@ class SessionTests(Base): self.ssn = self.conn.session() if ack_capacity is not None: self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QC) self.drain(rcv, expected=contents) acker(self.ssn) self.ssn.close() @@ -265,7 +267,7 @@ class SessionTests(Base): # drain the queue a final time and verify that the messages were # dequeued self.ssn = self.conn.session() - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QD) self.assertEmpty(rcv) def testAcknowledge(self): @@ -283,7 +285,7 @@ class SessionTests(Base): pass finally: self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver(ACK_Q)) + self.drain(self.ssn.receiver(ACK_QD)) self.ssn.acknowledge() def testAcknowledgeAsyncAckCap1(self): @@ -306,8 +308,8 @@ class SessionTests(Base): return contents def txTest(self, commit): - TX_Q = 'test-tx-queue; {create: always}' - TX_Q_COPY = 'test-tx-queue-copy; {create: always}' + TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' + TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) contents = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) @@ -337,7 +339,7 @@ class SessionTests(Base): self.txTest(False) def txTestSend(self, commit): - TX_SEND_Q = 'test-tx-send-queue; {create: always}' + TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) @@ -360,11 +362,12 @@ class SessionTests(Base): self.txTestSend(False) def txTestAck(self, commit): - TX_ACK_Q = 'test-tx-ack-queue; {create: always}' + TX_ACK_QC = 'test-tx-ack-queue; {create: always}' + TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_Q) + txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3) + contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) assert contents == self.drain(txrcv) if commit: @@ -382,11 +385,11 @@ class SessionTests(Base): txssn.close() txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_Q) + txrcv = txssn.receiver(TX_ACK_QC) assert contents == self.drain(txrcv) txssn.acknowledge() txssn.commit() - rcv = self.ssn.receiver(TX_ACK_Q) + rcv = self.ssn.receiver(TX_ACK_QD) self.assertEmpty(rcv) txssn.close() self.assertEmpty(rcv) @@ -405,7 +408,7 @@ class SessionTests(Base): except Disconnected: pass -RECEIVER_Q = 'test-receiver-queue; {create: always}' +RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' class ReceiverTests(Base): @@ -482,28 +485,6 @@ class ReceiverTests(Base): self.ssn.acknowledge() - def testPending(self): - self.rcv.capacity = UNLIMITED - assert self.rcv.pending() == 0 - - for i in range(3): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 3 - - for i in range(3, 10): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 10 - - self.drain(self.rcv, limit=3) - assert self.rcv.pending() == 7 - - self.drain(self.rcv) - assert self.rcv.pending() == 0 - - self.ssn.acknowledge() - def testCapacity(self): self.rcv.capacity = 5 self.assertPending(self.rcv, 0) @@ -537,8 +518,75 @@ class ReceiverTests(Base): self.ssn.acknowledge() + def testPending(self): + self.rcv.capacity = UNLIMITED + assert self.rcv.pending() == 0 + + for i in range(3): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 3 + + for i in range(3, 10): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 10 + + self.drain(self.rcv, limit=3) + assert self.rcv.pending() == 7 + + self.drain(self.rcv) + assert self.rcv.pending() == 0 + + self.ssn.acknowledge() + # XXX: need testClose +class AddressTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def testDeleteBySender(self): + snd = self.ssn.sender("test-delete; {create: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete; {delete: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete") + try: + snd.send("ping") + except SendError, e: + assert "no such queue" in str(e) + + def testDeleteByReceiver(self): + rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") + try: + rcv.fetch(0) + except Empty: + pass + rcv.close() + + try: + self.ssn.receiver("test-delete") + except SendError, e: + assert "no such queue" in str(e) + + def testDeleteSpecial(self): + snd = self.ssn.sender("amq.topic; {delete: always}") + snd.send("asdf") + try: + snd.close() + except SessionError, e: + assert "Cannot delete default exchange" in str(e) + # XXX: need to figure out close after error + self.conn._remove_session(self.ssn) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" @@ -606,7 +654,7 @@ class AddressErrorTests(Base): self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, lambda e: "unrecognized characters" in str(e)) -SENDER_Q = 'test-sender-q; {create: always}' +SENDER_Q = 'test-sender-q; {create: always, delete: always}' class SenderTests(Base): @@ -715,7 +763,7 @@ class MessageTests(Base): m.content = u"<html/>" assert m.content_type == "text/html; charset=utf8" -ECHO_Q = 'test-message-echo-queue; {create: always}' +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' class MessageEchoTests(Base): |