summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-05-01 19:17:59 +0000
committerKim van der Riet <kpvdr@apache.org>2009-05-01 19:17:59 +0000
commitc5791158d4aa1f53687142f2b9ad81c7b279d49d (patch)
tree9e9f4d6009c1f11a43b7e1e66d5b4709238d88f2 /qpid/cpp/src/tests/cluster_test.cpp
parentb4a8fbd276dff5d1f6681d96d696c90994b62276 (diff)
downloadqpid-python-c5791158d4aa1f53687142f2b9ad81c7b279d49d.tar.gz
Cluster test code now has a persistence switch controlled by the environment. When this switch set, all brokers start with the store module loaded, all queues are declared persistent and all messages are also made persistent. The absolute paths to module libs hardcoded into the test fixtures have been replaced by paths relative to environment variable QPID_LIB_DIR (which is set in Makefile.am). The cluster test, when run from qpid, will continue to run without persistence by default; the intention is to have the store test code run this test directly with the switch turned on.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@770796 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp214
1 files changed, 130 insertions, 84 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index d38d84025b..819bf4365e 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -69,6 +69,18 @@ using namespace boost::assign;
using broker::Broker;
using boost::shared_ptr;
+bool durableFlag = std::getenv("DURABLE_ENABLE") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+ ostringstream clusterLib;
+ clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+ if (durableFlag)
+ args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+ else
+ args += "--no-data-dir";
+}
+
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
@@ -166,29 +178,31 @@ QPID_AUTO_TEST_CASE(testAcl) {
policyFile.close();
char cwd[1024];
BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
- ClusterFixture cluster(2,-1, list_of<string>
- ("--no-data-dir")
- ("--auth=no")
- ("--acl-file="+string(cwd)+"/cluster_test.acl")
- ("--cluster-mechanism=PLAIN")
- ("--cluster-username=cluster")
- ("--cluster-password=cluster")
- ("--load-module=../.libs/acl.so"));
+ ostringstream aclLib;
+ aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ args += "--acl-file", string(cwd) + "/cluster_test.acl",
+ "--cluster-mechanism", "PLAIN",
+ "--cluster-username", "cluster",
+ "--cluster-password", "cluster",
+ "--load-module", aclLib.str();
+ ClusterFixture cluster(2, args, -1);
Client c0(aclSettings(cluster[0], "c0"), "c0");
Client c1(aclSettings(cluster[1], "c1"), "c1");
Client foo(aclSettings(cluster[1], "foo"), "foo");
- foo.session.queueDeclare("foo");
+ foo.session.queueDeclare("foo", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
cluster.add();
Client c2(aclSettings(cluster[2], "c2"), "c2");
- BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
}
@@ -198,15 +212,17 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
// Note: this doesn't actually test for cluster race conditions around TTL,
// it just verifies that basic TTL functionality works.
//
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
- c0.session.messageTransfer(arg::content=Message("b", "q"));
- c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
- c0.session.messageTransfer(arg::content=Message("y", "p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag);
cluster.add();
Client c2(cluster[1], "c2");
@@ -222,44 +238,48 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- FieldTable args;
- args.setInt("qpid.msg_sequence", 1);
- c0.session.queueDeclare(arg::queue="q");
- c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ FieldTable ftargs;
+ ftargs.setInt("qpid.msg_sequence", 1);
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
- c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
- c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag);
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare(arg::queue="q");
- c0.session.messageTransfer(arg::content=Message("A", "q"));
- c0.session.messageTransfer(arg::content=Message("B", "q"));
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
// Start a transaction that will commit.
Session commitSession = c0.connection.newSession("commit");
SubscriptionManager commitSubs(commitSession);
commitSession.txSelect();
- commitSession.messageTransfer(arg::content=Message("a", "q"));
- commitSession.messageTransfer(arg::content=Message("b", "q"));
+ commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
- rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag);
Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
@@ -270,9 +290,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// More transactional work
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- rollbackSession.messageTransfer(arg::content=Message("2", "q"));
- commitSession.messageTransfer(arg::content=Message("c", "q"));
- rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag);
+ rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
@@ -292,15 +312,17 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
Message m;
// Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
- c0.session.queueDeclare("q1");
- c0.session.messageTransfer(arg::content=Message("11","q1"));
+ c0.session.queueDeclare("q1", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag);
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
@@ -308,9 +330,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
- c0.session.queueDeclare("q2");
- c0.session.messageTransfer(arg::content=Message("21","q2"));
- c0.session.messageTransfer(arg::content=Message("22","q2"));
+ c0.session.queueDeclare("q2", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag);
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
@@ -323,9 +345,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create empty credit record: acquire and accept but don't complete.
SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
- c0.session.queueDeclare("q3");
- c0.session.messageTransfer(arg::content=Message("31", "q3"));
- c0.session.messageTransfer(arg::content=Message("32", "q3"));
+ c0.session.queueDeclare("q3", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag);
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
Message m31=q3.get(TIMEOUT);
@@ -360,14 +382,16 @@ QPID_AUTO_TEST_CASE(testUnacked) {
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// Verify that we update transaction state correctly to new members.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Do work in a transaction.
c0.session.txSelect();
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("1","q"));
- c0.session.messageTransfer(arg::content=Message("2","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag);
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
@@ -384,7 +408,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
- c0.session.messageTransfer(arg::content=Message("3","q"));
+ c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag);
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
@@ -394,9 +418,11 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Verify that we update a partially recieved message to a new member.
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
// Send first 2 frames of message.
@@ -407,6 +433,10 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
sender.send(transfer, true, false, true, true);
AMQHeaderBody header;
header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ if (durableFlag)
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ else
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
sender.send(header, false, false, true, true);
// No reliable way to ensure the partial message has arrived
@@ -427,7 +457,9 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
@@ -459,11 +491,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
}
QPID_AUTO_TEST_CASE(testUpdateConsumers) {
- ClusterFixture cluster(1, 1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("p");
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
LocalQueue lp;
c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
@@ -476,10 +510,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
Client c2(cluster[2], "c2");
// Transfer messages
- c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bbb", "p"));
- c0.session.messageTransfer(arg::content=Message("ccc", "p"));
+ c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag);
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
@@ -504,20 +538,22 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
cluster.killWithSilencer(0,c0.connection,9);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
- c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+ c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag);
BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
- ClusterFixture cluster(1);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
// Create some shared state.
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo","q"));
- c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag);
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -526,12 +562,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
cluster.add();
// Do some work post-add
- c0.session.queueDeclare("p");
- c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag);
// Do some work post-join
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
- c0.session.messageTransfer(arg::content=Message("pbar","p"));
+ c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag);
// Verify new brokers have state.
Message m;
@@ -556,11 +592,13 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.session.exchangeDeclare("ex", arg::type="direct");
c0.session.close();
c0.connection.close();
@@ -575,11 +613,13 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
Client c0(cluster[0]);
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -591,11 +631,13 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0], "c0");
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo", "q"));
- c0.session.messageTransfer(arg::content=Message("bar", "q"));
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
Message msg;
@@ -615,18 +657,20 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
}
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
- ClusterFixture cluster(3);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
Client c0(cluster[0]);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
// First start a subscription.
- c0.session.queueDeclare("q");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("foo", "q"));
- c1.session.messageTransfer(arg::content=Message("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
// Check they arrived
Message m;
@@ -653,7 +697,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.messageTransfer(arg::content=Message(content, queue));
+ session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag);
}
};
@@ -676,7 +720,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.queueDeclare(arg::queue=queue);
+ session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
SubscriptionManager subs(session);
subscription = subs.subscribe(*this, queue);
session.sync();
@@ -707,7 +751,9 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
}
};
- ClusterFixture cluster(2);
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
ConnectionSettings settings;
settings.port = cluster[1];
settings.heartbeat = 1;