summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-25 20:09:10 +0000
committerAlan Conway <aconway@apache.org>2008-11-25 20:09:10 +0000
commitff5041af8560ca018d09c19236a2c4c142cdac82 (patch)
treef9155ddd9c946781428225167505de8bcb7ae3aa /cpp/src/tests/cluster_test.cpp
parentd0a9f65cbb18ee2e6eaabd6f2ae065e8ec37d411 (diff)
downloadqpid-python-ff5041af8560ca018d09c19236a2c4c142cdac82.tar.gz
cluster_test.cpp: Extended testUnacked to cover acquired, accepted but not completed case.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@720585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp26
1 files changed, 23 insertions, 3 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 152be4f82d..e1a18d883e 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -301,6 +301,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
Message m;
+ // Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
c0.session.queueDeclare("q1");
c0.session.messageTransfer(arg::content=Message("11","q1"));
@@ -309,11 +310,11 @@ QPID_AUTO_TEST_CASE(testUnacked) {
BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+ // Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
c0.session.queueDeclare("q2");
c0.session.messageTransfer(arg::content=Message("21","q2"));
c0.session.messageTransfer(arg::content=Message("22","q2"));
-
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
@@ -324,6 +325,17 @@ QPID_AUTO_TEST_CASE(testUnacked) {
BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+ // Create empty credit record: acquire and accept but don't complete.
+ SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+ c0.session.queueDeclare("q3");
+ c0.session.messageTransfer(arg::content=Message("31", "q3"));
+ c0.session.messageTransfer(arg::content=Message("32", "q3"));
+ LocalQueue q3;
+ c0.subs.subscribe(q3, "q3", manualComplete);
+ Message m31=q3.get(TIME_SEC);
+ BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
// Add new member while there are unacked messages.
cluster.add();
Client c1(cluster[1], "c1");
@@ -331,8 +343,16 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Check queue counts
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
-
- // Unacked messages should be requeued when session is closed.
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Complete the empty credit message, should unblock the message behind it.
+ BOOST_CHECK_THROW(q3.get(0), Exception);
+ C0.session.markCompleted(SequenceSet(m31.getId()), true);
+ BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+ // Close the original session - unacked messages should be requeued.
c0.session.close();
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);