summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp190
1 files changed, 100 insertions, 90 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 8fba108717..b87bbf4aa5 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -74,7 +74,7 @@ bool durableFlag = std::getenv("STORE_LIB") != 0;
void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
ostringstream clusterLib;
- clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
if (durableFlag)
args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
@@ -190,7 +190,7 @@ struct PoisonPill : public AMQBody {
static bool match(const AMQBody& , const AMQBody& ) { return false; }
virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
};
-
+
QPID_AUTO_TEST_CASE(testBadClientData) {
// Ensure that bad data on a client connection closes the
// connection but does not stop the broker.
@@ -227,7 +227,7 @@ QPID_AUTO_TEST_CASE(testAcl) {
char cwd[1024];
BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
ostringstream aclLib;
- aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+ aclLib << getLibPath("ACL_LIB", "../.libs/acl.so");
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
args += "--acl-file", string(cwd) + "/cluster_test.acl",
@@ -808,7 +808,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void run()
{
- try {
+ try {
mgr.execute(*this);
}
catch (const std::exception& e) {
@@ -854,7 +854,6 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
}
QPID_AUTO_TEST_CASE(testPolicyUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that the policys internal state is accurate on newly
//joined nodes
ClusterFixture::Args args;
@@ -862,26 +861,28 @@ QPID_AUTO_TEST_CASE(testPolicyUpdate) {
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(REJECT, 0, 2);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
- cluster.add();
- Client c2(cluster[1], "c2");
- c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
-
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
-
- Message received;
- BOOST_CHECK(c1.subs.get(received, "q"));
- BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
- BOOST_CHECK(c1.subs.get(received, "q"));
- BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
- BOOST_CHECK(!c1.subs.get(received, "q"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(REJECT, 0, 2);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+ Message received;
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+ BOOST_CHECK(!c1.subs.get(received, "q"));
+ }
}
QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that exclusive queues are accurately replicated on newly
//joined nodes
ClusterFixture::Args args;
@@ -889,19 +890,22 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
- cluster.add();
- Client c2(cluster[1], "c2");
- QueueQueryResult result = c2.session.queueQuery("q");
- BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
- BOOST_CHECK(result.getExclusive());
- BOOST_CHECK(result.getAutoDelete());
- BOOST_CHECK(!result.getDurable());
- BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
- BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
- c1.connection.close();
- c2.session = c2.connection.newSession();
- BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ QueueQueryResult result = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+ BOOST_CHECK(result.getExclusive());
+ BOOST_CHECK(result.getAutoDelete());
+ BOOST_CHECK(!result.getDurable());
+ BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+ c1.connection.close();
+ c2.session = c2.connection.newSession();
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ }
}
/**
@@ -940,7 +944,6 @@ void send(Client& client, const std::string& queue, int count, int start=1, cons
}
QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly
//joined nodes
ClusterFixture::Args args;
@@ -948,24 +951,26 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(RING, 0, 5);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- send(c1, "q", 5);
- lockMessages(c1, "q", 1);
- //add new node
- cluster.add();
- BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
- //send one more message
- send(c1, "q", 1, 6);
- //release locked message
- c1.close();
- //check state of queue on both nodes
- checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //send one more message
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
}
QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
- ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly joined
//nodes; just like testRingQueueUpdate, but new node joins after
//the sixth message has been sent.
@@ -974,24 +979,26 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(RING, 0, 5);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- send(c1, "q", 5);
- lockMessages(c1, "q", 1);
- //send sixth message
- send(c1, "q", 1, 6);
- //add new node
- cluster.add();
- BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
- //release locked message
- c1.close();
- //check state of queue on both nodes
- checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
}
QPID_AUTO_TEST_CASE(testRelease) {
- ScopedSuppressLogging allQuiet;
//tests that releasing a messages that was unacked when one node
//joined works correctly
ClusterFixture::Args args;
@@ -999,31 +1006,34 @@ QPID_AUTO_TEST_CASE(testRelease) {
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- c1.session.queueDeclare("q", arg::durable=durableFlag);
- for (int i = 0; i < 5; i++) {
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
}
- //receive but don't ack a message
- LocalQueue lq;
- SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
- lqSettings.autoAck = 0;
- Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
- c1.session.messageFlush("q");
- Message received;
- BOOST_CHECK(lq.get(received));
- BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
-
- //add new node
- cluster.add();
-
- lqSub.release(lqSub.getUnaccepted());
-
- //check state of queue on both nodes
- vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
- Client c3(cluster[0], "c3");
- BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
- Client c2(cluster[1], "c2");
- BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
}
QPID_AUTO_TEST_SUITE_END()