summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster2_tests.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-11-04 20:27:13 +0000
committerAlan Conway <aconway@apache.org>2011-11-04 20:27:13 +0000
commit29e3b04915ef30f7e0f769cc1ee3994d99711fef (patch)
tree02e49caec0e4e7699413d36eab177a3d5bbb732d /qpid/cpp/src/tests/cluster2_tests.py
parent561fe4dd6234c085dc55bbd430dcab7427d2db29 (diff)
downloadqpid-python-29e3b04915ef30f7e0f769cc1ee3994d99711fef.tar.gz
QPID-2920: Batch acquire/dequeue messages in cluster.qpid-2920-active
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1197749 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster2_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py31
1 files changed, 12 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index d5c9ffb61d..c5f6157f34 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -50,12 +50,6 @@ class Cluster2Tests(BrokerTest):
if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue)
time.sleep(0.01)
- # FIXME aconway 2011-05-17: remove, use assert_browse.
- def verify_content(self, expect, receiver):
- actual = [receiver.fetch(1).content for x in expect]
- self.assertEqual(expect, actual)
- self.assertRaises(Empty, receiver.fetch, 0)
-
def test_message_enqueue(self):
"""Test basic replication of enqueued messages.
Verify that fanout messages are replicated correctly.
@@ -64,13 +58,12 @@ class Cluster2Tests(BrokerTest):
cluster = self.cluster(2, cluster2=True)
sn0 = cluster[0].connect().session()
- r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
s0 = sn0.sender("amq.fanout");
-
sn1 = cluster[1].connect().session()
- r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+ # Bind queues to amq.fanout
+ sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
# Send messages on member 0
@@ -78,10 +71,10 @@ class Cluster2Tests(BrokerTest):
for m in content: s0.send(Message(m))
# Browse on both members.
- self.verify_content(content, r0p)
- self.verify_content(content, r0q)
- self.verify_content(content, r1p)
- self.verify_content(content, r1q)
+ self.assert_browse(sn0, "p", content)
+ self.assert_browse(sn0, "q", content)
+ self.assert_browse(sn1, "p", content)
+ self.assert_browse(sn1, "q", content)
sn1.connection.close()
sn0.connection.close()
@@ -98,16 +91,16 @@ class Cluster2Tests(BrokerTest):
content = ["a","b","c"]
for m in content: s0.send(Message(m))
# Verify enqueued on members 0 and 1
- self.verify_content(content, sn0.receiver("q;{mode:browse}"))
- self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+ self.assert_browse(sn0, "q", content)
+ self.assert_browse(sn1, "q", content)
# Dequeue on cluster[0]
self.assertEqual(r0.fetch(1).content, "a")
sn0.acknowledge(sync=True)
# Verify dequeued on cluster[0] and cluster[1]
- self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
- self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+ self.assert_browse(sn0, "q", ["b", "c"])
+ self.assert_browse(sn1, "q", ["b", "c"])
def test_wiring(self):
"""Test replication of wiring"""