diff options
Diffstat (limited to 'qpid/python/qpid/tests/messaging/endpoints.py')
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 226 |
1 files changed, 157 insertions, 69 deletions
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 6bc52d962d..5d4fc1646b 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -227,21 +227,60 @@ class SessionTests(Base): def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node: { + x-declare: { + alternate-exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] for i in range(count): - c = self.content(base, i) + c = self.message(base, i) snd.send(c) - contents.append(c) + messages.append(c) snd.close() - return contents + return messages def txTest(self, commit): TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) + messages = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) @@ -255,10 +294,10 @@ class SessionTests(Base): if commit: txssn.commit() self.assertEmpty(rcv) - assert contents == self.drain(copy_rcv) + self.drain(copy_rcv, expected=messages) else: txssn.rollback() - assert contents == self.drain(rcv) + self.drain(rcv, expected=messages, redelivered=True) self.assertEmpty(copy_rcv) self.ssn.acknowledge() @@ -271,13 +310,13 @@ class SessionTests(Base): def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() - assert contents == self.drain(rcv) + self.drain(rcv, expected=messages) self.ssn.acknowledge() else: txssn.rollback() @@ -297,18 +336,17 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - assert contents == self.drain(txrcv) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) if commit: txssn.acknowledge() else: txssn.rollback() - drained = self.drain(txrcv) - assert contents == drained, "expected %s, got %s" % (contents, drained) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.rollback() - assert contents == self.drain(txrcv) + self.drain(txrcv, expected=messages, redelivered=True) txssn.commit() # commit without ack self.assertEmpty(txrcv) @@ -316,7 +354,7 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) - assert contents == self.drain(txrcv) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_QD) @@ -477,7 +515,7 @@ class ReceiverTests(Base): snd = self.ssn.sender("""test-double-close; { create: always, delete: sender, - node-properties: { + node: { type: topic } } @@ -533,9 +571,9 @@ class AddressTests(Base): assert "error in options: %s" % error == str(e), e def testIllegalKey(self): - self.badOption("{create: always, node-properties: " + self.badOption("{create: always, node: " "{this-property-does-not-exist: 3}}", - "node-properties: this-property-does-not-exist: " + "node: this-property-does-not-exist: " "illegal key") def testWrongValue(self): @@ -543,23 +581,17 @@ class AddressTests(Base): "('always', 'sender', 'receiver', 'never')") def testWrongType1(self): - self.badOption("{node-properties: asdf}", - "node-properties: asdf is not a map") + self.badOption("{node: asdf}", + "node: asdf is not a map") def testWrongType2(self): - self.badOption("{node-properties: {durable: []}}", - "node-properties: durable: [] is not a bool") - - def testNonQueueBindings(self): - self.badOption("{node-properties: {type: topic, x-properties: " - "{bindings: []}}}", - "node-properties: x-properties: bindings: " - "bindings are only permitted on nodes of type queue") + self.badOption("{node: {durable: []}}", + "node: durable: [] is not a bool") def testCreateQueue(self): snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node-properties: {type: queue, durable: False, " - "x-properties: {auto_delete: true}}}") + "node: {type: queue, durable: False, " + "x-declare: {auto_delete: true}}}") content = self.content("testCreateQueue") snd.send(content) rcv = self.ssn.receiver("test-create-queue") @@ -569,10 +601,10 @@ class AddressTests(Base): addr = """test-create-exchange; { create: always, delete: always, - node-properties: { + node: { type: topic, durable: False, - x-properties: {auto_delete: true, %s} + x-declare: {auto_delete: true, %s} } }""" % props snd = self.ssn.sender(addr) @@ -639,15 +671,15 @@ class AddressTests(Base): # XXX: need to figure out close after error self.conn._remove_session(self.ssn) - def testBindings(self): + def testNodeBindingsQueue(self): snd = self.ssn.sender(""" -test-bindings-queue; { +test-node-bindings-queue; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"] - } + node: { + x-bindings: [{exchange: "amq.topic", key: "a.#"}, + {exchange: "amq.direct", key: "b"}, + {exchange: "amq.topic", key: "c.*"}] } } """) @@ -658,49 +690,80 @@ test-bindings-queue; { snd_a.send("two") snd_b.send("three") snd_c.send("four") - rcv = self.ssn.receiver("test-bindings-queue") + rcv = self.ssn.receiver("test-node-bindings-queue") self.drain(rcv, expected=["one", "two", "three", "four"]) - def testBindingsAdditive(self): - m1 = self.content("testBindingsAdditive", 1) - m2 = self.content("testBindingsAdditive", 2) - m3 = self.content("testBindingsAdditive", 3) - m4 = self.content("testBindingsAdditive", 4) - + def testNodeBindingsTopic(self): + rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") + rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") + rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") + rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") snd = self.ssn.sender(""" -test-bindings-additive-queue; { +test-node-bindings-topic; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a"] - } + node: { + type: topic, + x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, + {queue: test-node-bindings-topic-queue-a, key: "a.#"}, + {queue: test-node-bindings-topic-queue-b, key: "b"}, + {queue: test-node-bindings-topic-queue-c, key: "c.*"}] } } """) + m1 = Message("one") + m2 = Message(subject="a.foo", content="two") + m3 = Message(subject="b", content="three") + m4 = Message(subject="c.bar", content="four") + snd.send(m1) + snd.send(m2) + snd.send(m3) + snd.send(m4) + self.drain(rcv, expected=[m1, m2, m3, m4]) + self.drain(rcv_a, expected=[m2]) + self.drain(rcv_b, expected=[m3]) + self.drain(rcv_c, expected=[m4]) + + def testLinkBindings(self): + m_a = self.message("testLinkBindings", 1, subject="a") + m_b = self.message("testLinkBindings", 2, subject="b") + + self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") + snd = self.ssn.sender("amq.topic") + + snd.send(m_a) + snd.send(m_b) + snd.close() - snd_a = self.ssn.sender("amq.topic/a") - snd_b = self.ssn.sender("amq.topic/b") + rcv = self.ssn.receiver("test-link-bindings-queue") + self.assertEmpty(rcv) + + snd = self.ssn.sender(""" +amq.topic; { + link: { + x-bindings: [{queue: test-link-bindings-queue, key: a}] + } +} +""") - snd_a.send(m1) - snd_b.send(m2) + snd.send(m_a) + snd.send(m_b) - rcv = self.ssn.receiver("test-bindings-additive-queue") - self.drain(rcv, expected=[m1]) + self.drain(rcv, expected=[m_a]) + rcv.close() - new_snd = self.ssn.sender(""" -test-bindings-additive-queue; { - node-properties: { - x-properties: { - bindings: ["amq.topic/b"] - } + rcv = self.ssn.receiver(""" +test-link-bindings-queue; { + link: { + x-bindings: [{exchange: "amq.topic", key: b}] } } """) - new_snd.send(m3) - snd_b.send(m4) - self.drain(rcv, expected=[m3, m4]) + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a, m_b]) def testSubjectOverride(self): snd = self.ssn.sender("amq.topic/a") @@ -726,6 +789,32 @@ test-bindings-additive-queue; { assert e2.subject == "b", "subject: %s" % e2.subject self.assertEmpty(rcv) + def doReliabilityTest(self, reliability, messages, expected): + snd = self.ssn.sender("amq.topic") + rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) + for m in messages: + snd.send(m) + self.conn.disconnect() + self.conn.connect() + self.drain(rcv, expected=expected) + + def testReliabilityUnreliable(self): + msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] + self.doReliabilityTest("unreliable", msgs, []) + + def testReliabilityAtLeastOnce(self): + msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] + self.doReliabilityTest("at-least-once", msgs, msgs) + + def testLinkName(self): + msgs = [self.message("testLinkName", i) for i in range(3)] + snd = self.ssn.sender("amq.topic") + trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") + qrcv = self.ssn.receiver("test-link-name") + for m in msgs: + snd.send(m) + self.drain(qrcv, expected=msgs) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" @@ -838,8 +927,7 @@ class SenderTests(Base): msgs = [self.content("asyncTest", i) for i in range(15)] for m in msgs: self.snd.send(m, sync=False) - drained = self.drain(self.rcv, timeout=self.delay()) - assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.drain(self.rcv, timeout=self.delay(), expected=msgs) self.ssn.acknowledge() def testSendAsyncCapacity0(self): |