summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster2_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster2_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py67
1 files changed, 63 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index f17dfe2961..1cf749cdb4 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests")
class Cluster2Tests(BrokerTest):
"""Tests for new cluster code."""
- def verify_content(self, content, receiver):
- for c in content: self.assertEqual(c, receiver.fetch(1).content)
+ def queue_exists(self, queue, connection):
+ s = connection.session()
+ try:
+ s.sender(queue)
+ return True
+ except qpid.messaging.exceptions.NotFound:
+ return False
+
+ # FIXME aconway 2011-06-22: needed to compensate for
+ # async wiring in early cluster2 prototype
+ def wait_for_queue(self, queue, connections, timeout=10):
+ deadline = time.time() + timeout
+ for c in connections:
+ while not self.queue_exists(queue,c):
+ if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue)
+ time.sleep(0.01)
+
+ # FIXME aconway 2011-05-17: remove, use assert_browse.
+ def verify_content(self, expect, receiver):
+ actual = [receiver.fetch(1).content for x in expect]
+ self.assertEqual(expect, actual)
self.assertRaises(Empty, receiver.fetch, 0)
def test_message_enqueue(self):
@@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest):
s0 = sn0.sender("q;{create:always,delete:always}")
r0 = sn0.receiver("q")
sn1 = cluster[1].connect().session()
- r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+ r1 = sn1.receiver("q;{create:always}")
content = ["a","b","c"]
for m in content: s0.send(Message(m))
- # Verify enqueued on cluster[1]
+ # Verify enqueued on members 0 and 1
+ # FIXME aconway 2011-05-13:
+ self.verify_content(content, sn0.receiver("q;{mode:browse}"))
self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+
# Dequeue on cluster[0]
self.assertEqual(r0.fetch(1).content, "a")
sn0.acknowledge(sync=True)
@@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest):
self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
# FIXME aconway 2010-10-29: test unbind, may need to use old API.
+
+ def test_dequeue_mutex(self):
+ """Ensure that one and only one consumer receives each dequeued message."""
+ class Receiver(Thread):
+ def __init__(self, session):
+ self.session = session
+ self.receiver = session.receiver("q")
+ self.messages = []
+ Thread.__init__(self)
+
+ def run(self):
+ try:
+ while True:
+ self.messages.append(self.receiver.fetch(1))
+ self.session.acknowledge()
+ except Empty: pass
+
+ cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+ connections = [ b.connect() for b in cluster]
+ sessions = [ c.session() for c in connections ]
+ sender = sessions[0].sender("q;{create:always}")
+ self.wait_for_queue("q", connections)
+
+ receivers = [ Receiver(s) for s in sessions ]
+ for r in receivers: r.start()
+
+ n = 0
+ t = time.time() + 1 # Send for 1 second.
+ while time.time() < t:
+ sender.send(str(n))
+ n += 1
+ for r in receivers: r.join();
+ print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
+ for r in receivers: assert len(r.messages) # At least one message to each
+ messages = [int(m.content) for r in receivers for m in r.messages ]
+ messages.sort()
+ self.assertEqual(range(n), messages)