diff options
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 73 |
1 files changed, 52 insertions, 21 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d082d74367..871aa0c657 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -21,13 +21,14 @@ #include "ForkedBroker.h" #include "BrokerFixture.h" -#include "qpid/cluster/Cpg.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" #include "qpid/framing/AMQBody.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Logger.h" #include <boost/bind.hpp> @@ -41,7 +42,7 @@ namespace qpid { namespace cluster { -boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster @@ -81,11 +82,11 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { add(n); // Wait for all n members to join the cluster int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster()->size() != n) { + while (retry && getGlobalCluster().size() != n) { ::sleep(1); --retry; } - BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); } void ClusterFixture::add() { @@ -135,7 +136,37 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -QPID_AUTO_TEST_CASE(testDumpClient) { +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} +#endif + +QPID_AUTO_TEST_CASE(testDumpClientSharedState) { BrokerFixture donor, receiver; { Client c(donor.getPort()); @@ -146,13 +177,13 @@ QPID_AUTO_TEST_CASE(testDumpClient) { c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); - c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo")); + c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo")); c.session.exchangeDeclare("ext", arg::type="topic"); c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar")); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar")); c.session.close(); c.connection.close(); @@ -202,11 +233,11 @@ QPID_AUTO_TEST_CASE(testDumpClient) { BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); // Verify bindings - r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo")); + r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo")); BOOST_CHECK(r.subs.get(m, "qa")); BOOST_CHECK_EQUAL(m.getData(), "xxx"); - r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar")); + r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar")); BOOST_CHECK(r.subs.get(m, "qb")); BOOST_CHECK_EQUAL(m.getData(), "yyy"); @@ -254,8 +285,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { ClusterFixture cluster(2); Client c0(cluster[0]); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -268,19 +299,19 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0]); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); Message msg; // Dequeue on 2 others, ensure correct order. - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2]); + Client c2(cluster[2], "c2"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); @@ -298,8 +329,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c1.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q")); + c1.session.messageTransfer(arg::content=Message("bar", "q")); // Check they arrived Message m; |