summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/cluster_test.cpp75
-rw-r--r--cpp/src/tests/exception_test.cpp2
2 files changed, 68 insertions, 9 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index a20a3841a9..887a0716e7 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -220,31 +220,90 @@ class Sender {
uint16_t channel;
};
-// FIXME aconway 2008-10-20: dump Tx state.
+QPID_AUTO_TEST_CASE(testUnacked) {
+ // Verify replication of unacknowledged messages.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+
+ Message m;
+
+ SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+ c0.session.queueDeclare("q1");
+ c0.session.messageTransfer(arg::content=Message("11","q1"));
+ LocalQueue q1;
+ c0.subs.subscribe(q1, "q1", manualAccept);
+ BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+ SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+ c0.session.queueDeclare("q2");
+ c0.session.messageTransfer(arg::content=Message("21","q2"));
+ c0.session.messageTransfer(arg::content=Message("22","q2"));
+
+ LocalQueue q2;
+ c0.subs.subscribe(q2, "q2", manualAcquire);
+ m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+ c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+ BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+ // Add new member while there are unacked messages.
+ cluster.add();
+ cluster.waitFor(2);
+ Client c1(cluster[1], "c1");
+
+ // Check queue counts
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+
+ // Unacked messages should be requeued when session is closed.
+ c0.session.close();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+ BOOST_CHECK(c1.subs.get(m, "q1", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "11");
+ BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "22");
+}
+
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
// Verify that we dump transaction state correctly to new members.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
+
+ // Do work in a transaction.
c0.session.txSelect();
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("1","q"));
- c0.session.txCommit();
-
- c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(1));
+ c0.session.messageTransfer(arg::content=Message("2","q"));
Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "1");
- c0.session.messageTransfer(arg::content=Message("2","q"));
+ // New member, TX not comitted, c1 should see nothing.
cluster.add();
Client c1(cluster[1], "c1");
- // Not yet comitted, c1 should see nothing.
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+ // After commit c1 shoudl see results of tx.
c0.session.txCommit();
- // c1 shoudl see results of tx.
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "2");
+
+ // Another transaction with both members active.
+ c0.session.messageTransfer(arg::content=Message("3","q"));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "3");
}
QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index a73ea9e36b..f3f5435699 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -92,7 +92,7 @@ QPID_AUTO_TEST_CASE(DisconnectedPop) {
ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(fix.lq, "q");
- Catcher<ConnectionException> pop(bind(&LocalQueue::pop, boost::ref(fix.lq)));
+ Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
fix.connection.proxy.close();
BOOST_CHECK(pop.join());
}