diff options
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 190 |
1 files changed, 100 insertions, 90 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 8fba108717..b87bbf4aa5 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -74,7 +74,7 @@ bool durableFlag = std::getenv("STORE_LIB") != 0; void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { ostringstream clusterLib; - clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so"); args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); if (durableFlag) args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; @@ -190,7 +190,7 @@ struct PoisonPill : public AMQBody { static bool match(const AMQBody& , const AMQBody& ) { return false; } virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; } }; - + QPID_AUTO_TEST_CASE(testBadClientData) { // Ensure that bad data on a client connection closes the // connection but does not stop the broker. @@ -227,7 +227,7 @@ QPID_AUTO_TEST_CASE(testAcl) { char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); ostringstream aclLib; - aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so"; + aclLib << getLibPath("ACL_LIB", "../.libs/acl.so"); ClusterFixture::Args args; prepareArgs(args, durableFlag); args += "--acl-file", string(cwd) + "/cluster_test.acl", @@ -808,7 +808,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void run() { - try { + try { mgr.execute(*this); } catch (const std::exception& e) { @@ -854,7 +854,6 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) } QPID_AUTO_TEST_CASE(testPolicyUpdate) { - ScopedSuppressLogging allQuiet; //tests that the policys internal state is accurate on newly //joined nodes ClusterFixture::Args args; @@ -862,26 +861,28 @@ QPID_AUTO_TEST_CASE(testPolicyUpdate) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(REJECT, 0, 2); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); - cluster.add(); - Client c2(cluster[1], "c2"); - c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); - - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); - - Message received; - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("one")); - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("two")); - BOOST_CHECK(!c1.subs.get(received, "q")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(REJECT, 0, 2); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); + + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); + + Message received; + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("one")); + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("two")); + BOOST_CHECK(!c1.subs.get(received, "q")); + } } QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { - ScopedSuppressLogging allQuiet; //tests that exclusive queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; @@ -889,19 +890,22 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); - cluster.add(); - Client c2(cluster[1], "c2"); - QueueQueryResult result = c2.session.queueQuery("q"); - BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); - BOOST_CHECK(result.getExclusive()); - BOOST_CHECK(result.getAutoDelete()); - BOOST_CHECK(!result.getDurable()); - BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); - c1.connection.close(); - c2.session = c2.connection.newSession(); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); + cluster.add(); + Client c2(cluster[1], "c2"); + QueueQueryResult result = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); + BOOST_CHECK(result.getExclusive()); + BOOST_CHECK(result.getAutoDelete()); + BOOST_CHECK(!result.getDurable()); + BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); + c1.connection.close(); + c2.session = c2.connection.newSession(); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + } } /** @@ -940,7 +944,6 @@ void send(Client& client, const std::string& queue, int count, int start=1, cons } QPID_AUTO_TEST_CASE(testRingQueueUpdate) { - ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; @@ -948,24 +951,26 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //send one more message - send(c1, "q", 1, 6); - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //send one more message + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } } QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { - ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly joined //nodes; just like testRingQueueUpdate, but new node joins after //the sixth message has been sent. @@ -974,24 +979,26 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //send sixth message - send(c1, "q", 1, 6); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } } QPID_AUTO_TEST_CASE(testRelease) { - ScopedSuppressLogging allQuiet; //tests that releasing a messages that was unacked when one node //joined works correctly ClusterFixture::Args args; @@ -999,31 +1006,34 @@ QPID_AUTO_TEST_CASE(testRelease) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - c1.session.queueDeclare("q", arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - Message received; - BOOST_CHECK(lq.get(received)); - BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); - - //add new node - cluster.add(); - - lqSub.release(lqSub.getUnaccepted()); - - //check state of queue on both nodes - vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); } QPID_AUTO_TEST_SUITE_END() |