summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp86
1 files changed, 83 insertions, 3 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 9abc1b189e..d082d74367 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -23,6 +23,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
@@ -51,6 +52,7 @@ using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::ptr_vector;
using qpid::cluster::Cluster;
@@ -133,6 +135,86 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
+QPID_AUTO_TEST_CASE(testDumpClient) {
+ BrokerFixture donor, receiver;
+ {
+ Client c(donor.getPort());
+ FieldTable args;
+ args.setString("x", "y");
+ c.session.queueDeclare("qa", arg::arguments=args);
+ c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+ c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
+ c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
+ c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo"));
+
+ c.session.exchangeDeclare("ext", arg::type="topic");
+ c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
+ c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar"));
+
+ c.session.close();
+ c.connection.close();
+ }
+ qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
+ dump.dump(*donor.broker);
+ {
+ Client r(receiver.getPort());
+ // Verify exchanges
+ ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+ BOOST_CHECK_EQUAL(ex.getType(), "direct");
+ BOOST_CHECK_EQUAL(ex.getDurable(), false);
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+ BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+ ex = r.session.exchangeQuery("ext");
+ BOOST_CHECK_EQUAL(ex.getType(), "topic");
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+ // Verify queues
+ QueueQueryResult qq = r.session.queueQuery("qa");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+ BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 1);
+
+ qq = r.session.queueQuery("qb");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 2);
+
+ // Verify messages
+ Message m;
+ BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "two");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ // Verify bindings
+ r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo"));
+ BOOST_CHECK(r.subs.get(m, "qa"));
+ BOOST_CHECK_EQUAL(m.getData(), "xxx");
+
+ r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar"));
+ BOOST_CHECK(r.subs.get(m, "qb"));
+ BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+ r.session.close();
+ r.connection.close();
+ }
+}
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -146,8 +228,7 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) {
ClusterFixture cluster(1);
Client c(cluster[0]);
BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
- // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
+ BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -234,5 +315,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-
QPID_AUTO_TEST_SUITE_END()