diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster2_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster2_tests.py | 67 |
1 files changed, 63 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index f17dfe2961..1cf749cdb4 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests") class Cluster2Tests(BrokerTest): """Tests for new cluster code.""" - def verify_content(self, content, receiver): - for c in content: self.assertEqual(c, receiver.fetch(1).content) + def queue_exists(self, queue, connection): + s = connection.session() + try: + s.sender(queue) + return True + except qpid.messaging.exceptions.NotFound: + return False + + # FIXME aconway 2011-06-22: needed to compensate for + # async wiring in early cluster2 prototype + def wait_for_queue(self, queue, connections, timeout=10): + deadline = time.time() + timeout + for c in connections: + while not self.queue_exists(queue,c): + if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue) + time.sleep(0.01) + + # FIXME aconway 2011-05-17: remove, use assert_browse. + def verify_content(self, expect, receiver): + actual = [receiver.fetch(1).content for x in expect] + self.assertEqual(expect, actual) self.assertRaises(Empty, receiver.fetch, 0) def test_message_enqueue(self): @@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest): s0 = sn0.sender("q;{create:always,delete:always}") r0 = sn0.receiver("q") sn1 = cluster[1].connect().session() - r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring. + r1 = sn1.receiver("q;{create:always}") content = ["a","b","c"] for m in content: s0.send(Message(m)) - # Verify enqueued on cluster[1] + # Verify enqueued on members 0 and 1 + # FIXME aconway 2011-05-13: + self.verify_content(content, sn0.receiver("q;{mode:browse}")) self.verify_content(content, sn1.receiver("q;{mode:browse}")) + # Dequeue on cluster[0] self.assertEqual(r0.fetch(1).content, "a") sn0.acknowledge(sync=True) @@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest): self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex") # FIXME aconway 2010-10-29: test unbind, may need to use old API. + + def test_dequeue_mutex(self): + """Ensure that one and only one consumer receives each dequeued message.""" + class Receiver(Thread): + def __init__(self, session): + self.session = session + self.receiver = session.receiver("q") + self.messages = [] + Thread.__init__(self) + + def run(self): + try: + while True: + self.messages.append(self.receiver.fetch(1)) + self.session.acknowledge() + except Empty: pass + + cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t + connections = [ b.connect() for b in cluster] + sessions = [ c.session() for c in connections ] + sender = sessions[0].sender("q;{create:always}") + self.wait_for_queue("q", connections) + + receivers = [ Receiver(s) for s in sessions ] + for r in receivers: r.start() + + n = 0 + t = time.time() + 1 # Send for 1 second. + while time.time() < t: + sender.send(str(n)) + n += 1 + for r in receivers: r.join(); + print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17: + for r in receivers: assert len(r.messages) # At least one message to each + messages = [int(m.content) for r in receivers for m in r.messages ] + messages.sort() + self.assertEqual(range(n), messages) |