summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-05-27 15:12:32 +0000
committerKim van der Riet <kpvdr@apache.org>2009-05-27 15:12:32 +0000
commitd8066a60abda29db997c738d8f8e9feac3c31a56 (patch)
tree7fb6d9244f4a8b1abfa097926a33f04823271c81 /qpid/cpp/src
parent1edaf052cf5a83a6605b6a75cb1866b96bcc6be1 (diff)
downloadqpid-python-d8066a60abda29db997c738d8f8e9feac3c31a56.tar.gz
Fixed erroneous use of arg::durable in messageTransfer()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@779205 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp93
1 files changed, 50 insertions, 43 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index be7d2f9158..64b3ea65f1 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -131,9 +131,16 @@ int64_t getMsgSequence(const Message& m) {
return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
}
-Message ttlMessage(const string& data, const string& key, uint64_t ttl) {
+Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) {
Message m(data, key);
m.getDeliveryProperties().setTtl(ttl);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ return m;
+}
+
+Message makeMessage(const string& data, const string& key, bool durable = false) {
+ Message m(data, key);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
return m;
}
@@ -220,10 +227,10 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
Client c1(cluster[1], "c1");
c0.session.queueDeclare("p", arg::durable=durableFlag);
c0.session.queueDeclare("q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag));
cluster.add();
Client c2(cluster[1], "c2");
@@ -248,14 +255,14 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) {
c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
- c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex");
BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex");
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
@@ -265,22 +272,22 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag));
// Start a transaction that will commit.
Session commitSession = c0.connection.newSession("commit");
SubscriptionManager commitSubs(commitSession);
commitSession.txSelect();
- commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag);
- commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
+ commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
- rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag);
+ rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
@@ -291,9 +298,9 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// More transactional work
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag);
- commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag);
- rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag);
+ rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag));
+ rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag));
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
@@ -323,7 +330,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
c0.session.queueDeclare("q1", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag));
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
@@ -332,8 +339,8 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
c0.session.queueDeclare("q2", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag));
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
@@ -347,8 +354,8 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Create empty credit record: acquire and accept but don't complete.
SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
c0.session.queueDeclare("q3", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag));
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
Message m31=q3.get(TIMEOUT);
@@ -391,8 +398,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// Do work in a transaction.
c0.session.txSelect();
c0.session.queueDeclare("q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag));
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
@@ -409,7 +416,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
- c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag));
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
@@ -511,10 +518,10 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
Client c2(cluster[2], "c2");
// Transfer messages
- c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag));
- c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag));
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
@@ -539,7 +546,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
cluster.killWithSilencer(0,c0.connection,9);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
- c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag));
BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
@@ -553,8 +560,8 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
// Create some shared state.
c0.session.queueDeclare("q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag));
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -564,11 +571,11 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
// Do some work post-add
c0.session.queueDeclare("p", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag));
// Do some work post-join
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
- c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag));
// Verify new brokers have state.
Message m;
@@ -619,8 +626,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
ClusterFixture cluster(2, args, -1);
Client c0(cluster[0]);
c0.session.queueDeclare("q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -637,8 +644,8 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
ClusterFixture cluster(3, args, -1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("q", arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
- c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
Message msg;
@@ -670,8 +677,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
- c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
// Check they arrived
Message m;
@@ -698,7 +705,7 @@ QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
void execute(AsyncSession& session, bool)
{
- session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag);
+ session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag));
}
};
@@ -783,12 +790,12 @@ QPID_AUTO_TEST_CASE(testPolicyUpdate) {
QueueOptions options;
options.setSizePolicy(REJECT, 0, 2);
c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- c1.session.messageTransfer(arg::content=Message("one", "q"));
+ c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
cluster.add();
Client c2(cluster[1], "c2");
- c2.session.messageTransfer(arg::content=Message("two", "q"));
-
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=Message("three", "q")), framing::ResourceLimitExceededException);
+ c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
Message received;
BOOST_CHECK(c1.subs.get(received, "q"));