diff options
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 86 |
1 files changed, 83 insertions, 3 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 9abc1b189e..d082d74367 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/DumpClient.h" #include "qpid/framing/AMQBody.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" @@ -51,6 +52,7 @@ using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; +using qpid::sys::TIME_SEC; using qpid::broker::Broker; using boost::ptr_vector; using qpid::cluster::Cluster; @@ -133,6 +135,86 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } +QPID_AUTO_TEST_CASE(testDumpClient) { + BrokerFixture donor, receiver; + { + Client c(donor.getPort()); + FieldTable args; + args.setString("x", "y"); + c.session.queueDeclare("qa", arg::arguments=args); + c.session.queueDeclare("qb", arg::alternateExchange="amq.direct"); + + c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); + c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); + c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo")); + + c.session.exchangeDeclare("ext", arg::type="topic"); + c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); + c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); + c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar")); + + c.session.close(); + c.connection.close(); + } + qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort())); + dump.dump(*donor.broker); + { + Client r(receiver.getPort()); + // Verify exchanges + ExchangeQueryResult ex=r.session.exchangeQuery("exd"); + BOOST_CHECK_EQUAL(ex.getType(), "direct"); + BOOST_CHECK_EQUAL(ex.getDurable(), false); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y"); + + ex = r.session.exchangeQuery("ext"); + BOOST_CHECK_EQUAL(ex.getType(), "topic"); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + + // Verify queues + QueueQueryResult qq = r.session.queueQuery("qa"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qa"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), ""); + BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), 1); + + qq = r.session.queueQuery("qb"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qb"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), 2); + + // Verify messages + Message m; + BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "two"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + // Verify bindings + r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo")); + BOOST_CHECK(r.subs.get(m, "qa")); + BOOST_CHECK_EQUAL(m.getData(), "xxx"); + + r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar")); + BOOST_CHECK(r.subs.get(m, "qb")); + BOOST_CHECK_EQUAL(m.getData(), "yyy"); + + r.session.close(); + r.connection.close(); + } +} + QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; @@ -146,8 +228,7 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) { ClusterFixture cluster(1); Client c(cluster[0]); BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); - // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate. + BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound()); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -234,5 +315,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } - QPID_AUTO_TEST_SUITE_END() |