diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-05-27 15:12:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-05-27 15:12:32 +0000 |
commit | d8066a60abda29db997c738d8f8e9feac3c31a56 (patch) | |
tree | 7fb6d9244f4a8b1abfa097926a33f04823271c81 /qpid/cpp/src | |
parent | 1edaf052cf5a83a6605b6a75cb1866b96bcc6be1 (diff) | |
download | qpid-python-d8066a60abda29db997c738d8f8e9feac3c31a56.tar.gz |
Fixed erroneous use of arg::durable in messageTransfer()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@779205 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 93 |
1 files changed, 50 insertions, 43 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index be7d2f9158..64b3ea65f1 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -131,9 +131,16 @@ int64_t getMsgSequence(const Message& m) { return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } -Message ttlMessage(const string& data, const string& key, uint64_t ttl) { +Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) { Message m(data, key); m.getDeliveryProperties().setTtl(ttl); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + return m; +} + +Message makeMessage(const string& data, const string& key, bool durable = false) { + Message m(data, key); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); return m; } @@ -220,10 +227,10 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { Client c1(cluster[1], "c1"); c0.session.queueDeclare("p", arg::durable=durableFlag); c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag)); cluster.add(); Client c2(cluster[1], "c2"); @@ -248,14 +255,14 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex"); + c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex"); BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex"); BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } @@ -265,22 +272,22 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag)); // Start a transaction that will commit. Session commitSession = c0.connection.newSession("commit"); SubscriptionManager commitSubs(commitSession); commitSession.txSelect(); - commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag); - commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag); + commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag); + rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); @@ -291,9 +298,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // More transactional work BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag); - commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag); - rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag); + rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag)); + rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag)); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -323,7 +330,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create unacked message: acquired but not accepted. SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); c0.session.queueDeclare("q1", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag)); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted @@ -332,8 +339,8 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create unacked message: not acquired, accepted or completeed. SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); c0.session.queueDeclare("q2", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag)); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue @@ -347,8 +354,8 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Create empty credit record: acquire and accept but don't complete. SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); c0.session.queueDeclare("q3", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag)); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); Message m31=q3.get(TIMEOUT); @@ -391,8 +398,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // Do work in a transaction. c0.session.txSelect(); c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag)); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); @@ -409,7 +416,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. - c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag)); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); @@ -511,10 +518,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { Client c2(cluster[2], "c2"); // Transfer messages - c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag)); - c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag)); // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); @@ -539,7 +546,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { cluster.killWithSilencer(0,c0.connection,9); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag)); BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } @@ -553,8 +560,8 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { // Create some shared state. c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag)); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -564,11 +571,11 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { // Do some work post-add c0.session.queueDeclare("p", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag)); // Do some work post-join BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag)); // Verify new brokers have state. Message m; @@ -619,8 +626,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -637,8 +644,8 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { ClusterFixture cluster(3, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); Message msg; @@ -670,8 +677,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag); - c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); // Check they arrived Message m; @@ -698,7 +705,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) void execute(AsyncSession& session, bool) { - session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag); + session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag)); } }; @@ -783,12 +790,12 @@ QPID_AUTO_TEST_CASE(testPolicyUpdate) { QueueOptions options; options.setSizePolicy(REJECT, 0, 2); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - c1.session.messageTransfer(arg::content=Message("one", "q")); + c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); cluster.add(); Client c2(cluster[1], "c2"); - c2.session.messageTransfer(arg::content=Message("two", "q")); - - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=Message("three", "q")), framing::ResourceLimitExceededException); + c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); + + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); Message received; BOOST_CHECK(c1.subs.get(received, "q")); |