summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-10 18:15:25 +0000
committerAlan Conway <aconway@apache.org>2008-09-10 18:15:25 +0000
commited35819acfafa18730802532ca581c51d1be3854 (patch)
tree47e22004858bc19eaade16d585f1e054f76ac8a7 /qpid/cpp/src/tests/cluster_test.cpp
parent26acdc039853786717e5392f567fc84dcf9db82c (diff)
downloadqpid-python-ed35819acfafa18730802532ca581c51d1be3854.tar.gz
Cluster support for copying shared broker state to new members.
cluster/DumpClient: Copies broker shared state to a new broker via AMQP. broker/*Registry, Queue, QueueBindings: Added iteration functions for DumpClient broker/SemanticState.cpp: Allow DumpClient to sidestep setting of delivery-properties.exchange. client/Connection.h: Added Connection::open(Url) overload. client/SessionImpl: Added send(AMQBody, FrameSet) overload for forwarding broker messages. tests/cluster_test.cpp: Added test for DumpClient copying shared state between brokers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@693918 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp86
1 files changed, 83 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 9abc1b189e..d082d74367 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -23,6 +23,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
@@ -51,6 +52,7 @@ using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::ptr_vector;
using qpid::cluster::Cluster;
@@ -133,6 +135,86 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
+QPID_AUTO_TEST_CASE(testDumpClient) {
+ BrokerFixture donor, receiver;
+ {
+ Client c(donor.getPort());
+ FieldTable args;
+ args.setString("x", "y");
+ c.session.queueDeclare("qa", arg::arguments=args);
+ c.session.queueDeclare("qb", arg::alternateExchange="amq.direct");
+
+ c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
+ c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
+ c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo"));
+
+ c.session.exchangeDeclare("ext", arg::type="topic");
+ c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
+ c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar"));
+
+ c.session.close();
+ c.connection.close();
+ }
+ qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort()));
+ dump.dump(*donor.broker);
+ {
+ Client r(receiver.getPort());
+ // Verify exchanges
+ ExchangeQueryResult ex=r.session.exchangeQuery("exd");
+ BOOST_CHECK_EQUAL(ex.getType(), "direct");
+ BOOST_CHECK_EQUAL(ex.getDurable(), false);
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+ BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y");
+
+ ex = r.session.exchangeQuery("ext");
+ BOOST_CHECK_EQUAL(ex.getType(), "topic");
+ BOOST_CHECK_EQUAL(ex.getNotFound(), false);
+
+ // Verify queues
+ QueueQueryResult qq = r.session.queueQuery("qa");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qa");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "");
+ BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 1);
+
+ qq = r.session.queueQuery("qb");
+ BOOST_CHECK_EQUAL(qq.getQueue(), "qb");
+ BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 2);
+
+ // Verify messages
+ Message m;
+ BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "one");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "two");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
+
+ // Verify bindings
+ r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo"));
+ BOOST_CHECK(r.subs.get(m, "qa"));
+ BOOST_CHECK_EQUAL(m.getData(), "xxx");
+
+ r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar"));
+ BOOST_CHECK(r.subs.get(m, "qb"));
+ BOOST_CHECK_EQUAL(m.getData(), "yyy");
+
+ r.session.close();
+ r.connection.close();
+ }
+}
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -146,8 +228,7 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) {
ClusterFixture cluster(1);
Client c(cluster[0]);
BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
- // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
+ BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -234,5 +315,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-
QPID_AUTO_TEST_SUITE_END()