summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/tests/messaging/endpoints.py')
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py226
1 files changed, 157 insertions, 69 deletions
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 6bc52d962d..5d4fc1646b 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -227,21 +227,60 @@ class SessionTests(Base):
def testAcknowledgeAsyncAckCapUNLIMITED(self):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-declare: {
+ alternate-exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
for i in range(count):
- c = self.content(base, i)
+ c = self.message(base, i)
snd.send(c)
- contents.append(c)
+ messages.append(c)
snd.close()
- return contents
+ return messages
def txTest(self, commit):
TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
if commit:
txssn.commit()
self.assertEmpty(rcv)
- assert contents == self.drain(copy_rcv)
+ self.drain(copy_rcv, expected=messages)
else:
txssn.rollback()
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=messages, redelivered=True)
self.assertEmpty(copy_rcv)
self.ssn.acknowledge()
@@ -271,13 +310,13 @@ class SessionTests(Base):
def txTestSend(self, commit):
TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
txssn.commit()
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=messages)
self.ssn.acknowledge()
else:
txssn.rollback()
@@ -297,18 +336,17 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- assert contents == self.drain(txrcv)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
if commit:
txssn.acknowledge()
else:
txssn.rollback()
- drained = self.drain(txrcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.rollback()
- assert contents == self.drain(txrcv)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.commit() # commit without ack
self.assertEmpty(txrcv)
@@ -316,7 +354,7 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
- assert contents == self.drain(txrcv)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.commit()
rcv = self.ssn.receiver(TX_ACK_QD)
@@ -477,7 +515,7 @@ class ReceiverTests(Base):
snd = self.ssn.sender("""test-double-close; {
create: always,
delete: sender,
- node-properties: {
+ node: {
type: topic
}
}
@@ -533,9 +571,9 @@ class AddressTests(Base):
assert "error in options: %s" % error == str(e), e
def testIllegalKey(self):
- self.badOption("{create: always, node-properties: "
+ self.badOption("{create: always, node: "
"{this-property-does-not-exist: 3}}",
- "node-properties: this-property-does-not-exist: "
+ "node: this-property-does-not-exist: "
"illegal key")
def testWrongValue(self):
@@ -543,23 +581,17 @@ class AddressTests(Base):
"('always', 'sender', 'receiver', 'never')")
def testWrongType1(self):
- self.badOption("{node-properties: asdf}",
- "node-properties: asdf is not a map")
+ self.badOption("{node: asdf}",
+ "node: asdf is not a map")
def testWrongType2(self):
- self.badOption("{node-properties: {durable: []}}",
- "node-properties: durable: [] is not a bool")
-
- def testNonQueueBindings(self):
- self.badOption("{node-properties: {type: topic, x-properties: "
- "{bindings: []}}}",
- "node-properties: x-properties: bindings: "
- "bindings are only permitted on nodes of type queue")
+ self.badOption("{node: {durable: []}}",
+ "node: durable: [] is not a bool")
def testCreateQueue(self):
snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node-properties: {type: queue, durable: False, "
- "x-properties: {auto_delete: true}}}")
+ "node: {type: queue, durable: False, "
+ "x-declare: {auto_delete: true}}}")
content = self.content("testCreateQueue")
snd.send(content)
rcv = self.ssn.receiver("test-create-queue")
@@ -569,10 +601,10 @@ class AddressTests(Base):
addr = """test-create-exchange; {
create: always,
delete: always,
- node-properties: {
+ node: {
type: topic,
durable: False,
- x-properties: {auto_delete: true, %s}
+ x-declare: {auto_delete: true, %s}
}
}""" % props
snd = self.ssn.sender(addr)
@@ -639,15 +671,15 @@ class AddressTests(Base):
# XXX: need to figure out close after error
self.conn._remove_session(self.ssn)
- def testBindings(self):
+ def testNodeBindingsQueue(self):
snd = self.ssn.sender("""
-test-bindings-queue; {
+test-node-bindings-queue; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
- }
+ node: {
+ x-bindings: [{exchange: "amq.topic", key: "a.#"},
+ {exchange: "amq.direct", key: "b"},
+ {exchange: "amq.topic", key: "c.*"}]
}
}
""")
@@ -658,49 +690,80 @@ test-bindings-queue; {
snd_a.send("two")
snd_b.send("three")
snd_c.send("four")
- rcv = self.ssn.receiver("test-bindings-queue")
+ rcv = self.ssn.receiver("test-node-bindings-queue")
self.drain(rcv, expected=["one", "two", "three", "four"])
- def testBindingsAdditive(self):
- m1 = self.content("testBindingsAdditive", 1)
- m2 = self.content("testBindingsAdditive", 2)
- m3 = self.content("testBindingsAdditive", 3)
- m4 = self.content("testBindingsAdditive", 4)
-
+ def testNodeBindingsTopic(self):
+ rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+ rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+ rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+ rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
snd = self.ssn.sender("""
-test-bindings-additive-queue; {
+test-node-bindings-topic; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a"]
- }
+ node: {
+ type: topic,
+ x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+ {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+ {queue: test-node-bindings-topic-queue-b, key: "b"},
+ {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
}
}
""")
+ m1 = Message("one")
+ m2 = Message(subject="a.foo", content="two")
+ m3 = Message(subject="b", content="three")
+ m4 = Message(subject="c.bar", content="four")
+ snd.send(m1)
+ snd.send(m2)
+ snd.send(m3)
+ snd.send(m4)
+ self.drain(rcv, expected=[m1, m2, m3, m4])
+ self.drain(rcv_a, expected=[m2])
+ self.drain(rcv_b, expected=[m3])
+ self.drain(rcv_c, expected=[m4])
+
+ def testLinkBindings(self):
+ m_a = self.message("testLinkBindings", 1, subject="a")
+ m_b = self.message("testLinkBindings", 2, subject="b")
+
+ self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+ snd = self.ssn.sender("amq.topic")
+
+ snd.send(m_a)
+ snd.send(m_b)
+ snd.close()
- snd_a = self.ssn.sender("amq.topic/a")
- snd_b = self.ssn.sender("amq.topic/b")
+ rcv = self.ssn.receiver("test-link-bindings-queue")
+ self.assertEmpty(rcv)
+
+ snd = self.ssn.sender("""
+amq.topic; {
+ link: {
+ x-bindings: [{queue: test-link-bindings-queue, key: a}]
+ }
+}
+""")
- snd_a.send(m1)
- snd_b.send(m2)
+ snd.send(m_a)
+ snd.send(m_b)
- rcv = self.ssn.receiver("test-bindings-additive-queue")
- self.drain(rcv, expected=[m1])
+ self.drain(rcv, expected=[m_a])
+ rcv.close()
- new_snd = self.ssn.sender("""
-test-bindings-additive-queue; {
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/b"]
- }
+ rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+ link: {
+ x-bindings: [{exchange: "amq.topic", key: b}]
}
}
""")
- new_snd.send(m3)
- snd_b.send(m4)
- self.drain(rcv, expected=[m3, m4])
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a, m_b])
def testSubjectOverride(self):
snd = self.ssn.sender("amq.topic/a")
@@ -726,6 +789,32 @@ test-bindings-additive-queue; {
assert e2.subject == "b", "subject: %s" % e2.subject
self.assertEmpty(rcv)
+ def doReliabilityTest(self, reliability, messages, expected):
+ snd = self.ssn.sender("amq.topic")
+ rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+ for m in messages:
+ snd.send(m)
+ self.conn.disconnect()
+ self.conn.connect()
+ self.drain(rcv, expected=expected)
+
+ def testReliabilityUnreliable(self):
+ msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+ self.doReliabilityTest("unreliable", msgs, [])
+
+ def testReliabilityAtLeastOnce(self):
+ msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+ self.doReliabilityTest("at-least-once", msgs, msgs)
+
+ def testLinkName(self):
+ msgs = [self.message("testLinkName", i) for i in range(3)]
+ snd = self.ssn.sender("amq.topic")
+ trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+ qrcv = self.ssn.receiver("test-link-name")
+ for m in msgs:
+ snd.send(m)
+ self.drain(qrcv, expected=msgs)
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -838,8 +927,7 @@ class SenderTests(Base):
msgs = [self.content("asyncTest", i) for i in range(15)]
for m in msgs:
self.snd.send(m, sync=False)
- drained = self.drain(self.rcv, timeout=self.delay())
- assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.drain(self.rcv, timeout=self.delay(), expected=msgs)
self.ssn.acknowledge()
def testSendAsyncCapacity0(self):