diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-05-01 19:17:59 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-05-01 19:17:59 +0000 |
commit | f3cec6986a366d133a08647279a38393a9769718 (patch) | |
tree | a3388944e36fb8cde72f3f3b38254e88709493bd | |
parent | 38dc8e231d6136dd6ae0cfa28f4f9dcb90677c77 (diff) | |
download | qpid-python-f3cec6986a366d133a08647279a38393a9769718.tar.gz |
Cluster test code now has a persistence switch controlled by the environment. When this switch set, all brokers start with the store module loaded, all queues are declared persistent and all messages are also made persistent. The absolute paths to module libs hardcoded into the test fixtures have been replaced by paths relative to environment variable QPID_LIB_DIR (which is set in Makefile.am). The cluster test, when run from qpid, will continue to run without persistence by default; the intention is to have the store test code run this test directly with the switch turned on.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@770796 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/tests/ClusterFailover.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFixture.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFixture.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/tests/PartialFailure.cpp | 30 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 214 | ||||
-rw-r--r-- | cpp/src/tests/test_tools.h | 9 |
7 files changed, 175 insertions, 112 deletions
diff --git a/cpp/src/tests/ClusterFailover.cpp b/cpp/src/tests/ClusterFailover.cpp index db2392b296..7d49ed5cda 100644 --- a/cpp/src/tests/ClusterFailover.cpp +++ b/cpp/src/tests/ClusterFailover.cpp @@ -50,7 +50,10 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; // Test re-connecting with same session name after a failure. QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { - ClusterFixture cluster(2, -1); + ostringstream clusterLib; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str()); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "foo"); cluster.kill(0, 9); Client c1(cluster[1], "foo"); // Using same name, should be cleaned up. diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp index 70d60b10b4..e12106c464 100644 --- a/cpp/src/tests/ClusterFixture.cpp +++ b/cpp/src/tests/ClusterFixture.cpp @@ -61,25 +61,20 @@ using boost::assign::list_of; #include "ClusterFixture.h" -ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_) - : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_) +ClusterFixture::ClusterFixture(size_t n, const Args& args_, int localIndex_) + : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_) { add(n); } -ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_) - : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_) +ClusterFixture::ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs_, int localIndex_) + : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_) { add(n); } -const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = - list_of<string>("--auth=no")("--no-data-dir"); - ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { Args args = list_of<string>("qpidd ") - ("--no-module-dir") - ("--load-module")(clusterLib) ("--cluster-name")(name) ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h index 353ec0c88d..08b314499e 100644 --- a/cpp/src/tests/ClusterFixture.h +++ b/cpp/src/tests/ClusterFixture.h @@ -60,8 +60,6 @@ using qpid::broker::Broker; using boost::shared_ptr; using qpid::cluster::Cluster; -#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so" - /** Cluster fixture is a vector of ports for the replicas. * * At most one replica (by default replica 0) is in the current @@ -70,15 +68,14 @@ using qpid::cluster::Cluster; class ClusterFixture : public vector<uint16_t> { public: typedef std::vector<std::string> Args; - static const Args DEFAULT_ARGS; /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ - ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB); + ClusterFixture(size_t n, const Args& args, int localIndex=0); /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ - ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB); + ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. @@ -102,7 +99,6 @@ class ClusterFixture : public vector<uint16_t> { std::vector<shared_ptr<ForkedBroker> > forkedBrokers; Args userArgs; boost::function<void (Args&, size_t)> updateArgs; - string clusterLib; }; /** diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 161428fcad..98d101049b 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -211,7 +211,13 @@ check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS) -TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test +TESTS_ENVIRONMENT = \ + VALGRIND=$(VALGRIND) \ + srcdir=$(srcdir) \ + QPID_DATA_DIR= \ + QPID_LIB_DIR=../.libs \ + BOOST_TEST_SHOW_PROGRESS=yes \ + $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test diff --git a/cpp/src/tests/PartialFailure.cpp b/cpp/src/tests/PartialFailure.cpp index 5137672e7d..91fa63e6e9 100644 --- a/cpp/src/tests/PartialFailure.cpp +++ b/cpp/src/tests/PartialFailure.cpp @@ -33,7 +33,7 @@ QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) - using namespace std; +using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; @@ -49,11 +49,19 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } void updateArgs(ClusterFixture::Args& args, size_t index) { - ostringstream os; - os << "--test-store-name=s" << index; - args.push_back(os.str()); - args.push_back("--load-module=.libs/test_store.so"); - args.push_back("--auth=no"); + ostringstream clusterLib, testStoreLib, storeName; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so"; + storeName << "s" << index; + args.push_back("--auth"); + args.push_back("no"); + args.push_back("--no-module-dir"); + args.push_back("--load-module"); + args.push_back(clusterLib.str()); + args.push_back("--load-module"); + args.push_back(testStoreLib.str()); + args.push_back("--test-store-name"); + args.push_back(storeName.str()); args.push_back("TMP_DATA_DIR"); // These tests generate errors deliberately, disable error logging unless a log env var is set. @@ -82,7 +90,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) { // Connection thread. ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, -1, updateArgs); + ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -113,7 +121,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) { QPID_AUTO_TEST_CASE(testErrorAfterJoin) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(1, -1, updateArgs); + ClusterFixture cluster(1, updateArgs, -1); Client c0(cluster[0]); c0.session.queueDeclare("q", durable=true); c0.session.messageTransfer(content=pMessage("a", "q")); @@ -138,7 +146,7 @@ QPID_AUTO_TEST_CASE(testErrorAfterJoin) { QPID_AUTO_TEST_CASE(testSinglePartialFailure) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, -1, updateArgs); + ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -166,7 +174,7 @@ QPID_AUTO_TEST_CASE(testSinglePartialFailure) { QPID_AUTO_TEST_CASE(testMultiPartialFailure) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(4, -1, updateArgs); + ClusterFixture cluster(4, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); @@ -195,7 +203,7 @@ QPID_AUTO_TEST_CASE(testMultiPartialFailure) { QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { ScopedSuppressLogging allQuiet; - ClusterFixture cluster(2, -1, updateArgs); + ClusterFixture cluster(2, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d38d84025b..819bf4365e 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -69,6 +69,18 @@ using namespace boost::assign; using broker::Broker; using boost::shared_ptr; +bool durableFlag = std::getenv("DURABLE_ENABLE") != 0; + +void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { + ostringstream clusterLib; + clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); + if (durableFlag) + args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR"; + else + args += "--no-data-dir"; +} + // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; @@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) { policyFile.close(); char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); - ClusterFixture cluster(2,-1, list_of<string> - ("--no-data-dir") - ("--auth=no") - ("--acl-file="+string(cwd)+"/cluster_test.acl") - ("--cluster-mechanism=PLAIN") - ("--cluster-username=cluster") - ("--cluster-password=cluster") - ("--load-module=../.libs/acl.so")); + ostringstream aclLib; + aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so"; + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + args += "--acl-file", string(cwd) + "/cluster_test.acl", + "--cluster-mechanism", "PLAIN", + "--cluster-username", "cluster", + "--cluster-password", "cluster", + "--load-module", aclLib.str(); + ClusterFixture cluster(2, args, -1); Client c0(aclSettings(cluster[0], "c0"), "c0"); Client c1(aclSettings(cluster[1], "c1"), "c1"); Client foo(aclSettings(cluster[1], "foo"), "foo"); - foo.session.queueDeclare("foo"); + foo.session.queueDeclare("foo", arg::durable=durableFlag); BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); cluster.add(); Client c2(aclSettings(cluster[2], "c2"), "c2"); - BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException); BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); } @@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, // it just verifies that basic TTL functionality works. // - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); - c0.session.messageTransfer(arg::content=Message("b", "q")); - c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); - c0.session.messageTransfer(arg::content=Message("y", "p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag); cluster.add(); Client c2(cluster[1], "c2"); @@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - FieldTable args; - args.setInt("qpid.msg_sequence", 1); - c0.session.queueDeclare(arg::queue="q"); - c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); + FieldTable ftargs; + ftargs.setInt("qpid.msg_sequence", 1); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); - c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); + c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); + c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag); BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare(arg::queue="q"); - c0.session.messageTransfer(arg::content=Message("A", "q")); - c0.session.messageTransfer(arg::content=Message("B", "q")); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag); // Start a transaction that will commit. Session commitSession = c0.connection.newSession("commit"); SubscriptionManager commitSubs(commitSession); commitSession.txSelect(); - commitSession.messageTransfer(arg::content=Message("a", "q")); - commitSession.messageTransfer(arg::content=Message("b", "q")); + commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=Message("1", "q")); + rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag); Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); @@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // More transactional work BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=Message("2", "q")); - commitSession.messageTransfer(arg::content=Message("c", "q")); - rollbackSession.messageTransfer(arg::content=Message("3", "q")); + rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag); + rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { QPID_AUTO_TEST_CASE(testUnacked) { // Verify replication of unacknowledged messages. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); Message m; // Create unacked message: acquired but not accepted. SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); - c0.session.queueDeclare("q1"); - c0.session.messageTransfer(arg::content=Message("11","q1")); + c0.session.queueDeclare("q1", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted @@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create unacked message: not acquired, accepted or completeed. SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); - c0.session.queueDeclare("q2"); - c0.session.messageTransfer(arg::content=Message("21","q2")); - c0.session.messageTransfer(arg::content=Message("22","q2")); + c0.session.queueDeclare("q2", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue @@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create empty credit record: acquire and accept but don't complete. SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); - c0.session.queueDeclare("q3"); - c0.session.messageTransfer(arg::content=Message("31", "q3")); - c0.session.messageTransfer(arg::content=Message("32", "q3")); + c0.session.queueDeclare("q3", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); Message m31=q3.get(TIMEOUT); @@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) { QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // Verify that we update transaction state correctly to new members. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Do work in a transaction. c0.session.txSelect(); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("1","q")); - c0.session.messageTransfer(arg::content=Message("2","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); @@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. - c0.session.messageTransfer(arg::content=Message("3","q")); + c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); @@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify that we update a partially recieved message to a new member. - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); // Send first 2 frames of message. @@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { sender.send(transfer, true, false, true, true); AMQHeaderBody header; header.get<DeliveryProperties>(true)->setRoutingKey("q"); + if (durableFlag) + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); + else + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); sender.send(header, false, false, true, true); // No reliable way to ensure the partial message has arrived @@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); @@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { } QPID_AUTO_TEST_CASE(testUpdateConsumers) { - ClusterFixture cluster(1, 1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); LocalQueue lp; c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); @@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { Client c2(cluster[2], "c2"); // Transfer messages - c0.session.messageTransfer(arg::content=Message("aaa", "q")); + c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bbb", "p")); - c0.session.messageTransfer(arg::content=Message("ccc", "p")); + c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag); // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); @@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { cluster.killWithSilencer(0,c0.connection,9); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=Message("xxx", "q")); + c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag); BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } QPID_AUTO_TEST_CASE(testCatchupSharedState) { - ClusterFixture cluster(1); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.add(); // Do some work post-add - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("pfoo","p")); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag); // Do some work post-join BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=Message("pbar","p")); + c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag); // Verify new brokers have state. Message m; @@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); c0.connection.close(); @@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); Message msg; @@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { } QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { - ClusterFixture cluster(3); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. // First start a subscription. - c0.session.queueDeclare("q"); + c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("foo", "q")); - c1.session.messageTransfer(arg::content=Message("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); // Check they arrived Message m; @@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.messageTransfer(arg::content=Message(content, queue)); + session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag); } }; @@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.queueDeclare(arg::queue=queue); + session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); SubscriptionManager subs(session); subscription = subs.subscribe(*this, queue); session.sync(); @@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) } }; - ClusterFixture cluster(2); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); ConnectionSettings settings; settings.port = cluster[1]; settings.heartbeat = 1; diff --git a/cpp/src/tests/test_tools.h b/cpp/src/tests/test_tools.h index 37a6594f8a..54837d3e5b 100644 --- a/cpp/src/tests/test_tools.h +++ b/cpp/src/tests/test_tools.h @@ -89,6 +89,15 @@ struct ScopedSuppressLogging { qpid::log::Options opts; }; +inline std::string getLibPath(const char* envName, const char* defaultPath = 0) { + const char* p = std::getenv(envName); + if (p != 0) + return p; + if (defaultPath == 0) + BOOST_FAIL("Environment variable " << envName << " not set."); + return defaultPath; +} + #endif /*!TEST_TOOLS_H*/ |