summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-06-05 17:39:07 +0000
committerGordon Sim <gsim@apache.org>2009-06-05 17:39:07 +0000
commit6efddffd3c04fc16d2e63ccfa9db6c6ad0798825 (patch)
tree7b9e3b3b7e02b058af5919649cc0831bee99605a /cpp/src/tests
parentdb28bbe4b54cef5457e5349f85a154393db70a8d (diff)
downloadqpid-python-6efddffd3c04fc16d2e63ccfa9db6c6ad0798825.tar.gz
Further fix to new cluster member state transfer to fix a case where unacked messages on ring policy queue cause inconsistencies in queue state between nodes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerFixture.h1
-rw-r--r--cpp/src/tests/cluster_test.cpp87
2 files changed, 68 insertions, 20 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index c691e351e7..397045d00b 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -123,6 +123,7 @@ struct ClientT {
: connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
~ClientT() { connection.close(); }
+ void close() { session.close(); connection.close(); }
};
typedef ClientT<> Client;
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 0c249bdf15..a903d192c2 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -856,6 +856,41 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
}
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+ LocalQueue q;
+ SubscriptionSettings settings(FlowControl::messageCredit(count));
+ settings.autoAck = 0;
+ Subscription sub = client.subs.subscribe(q, queue, settings);
+ client.session.messageFlush(sub.getName());
+ return sub;
+}
+
+/**
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages)
+{
+ for (size_t i = 0; i < cluster.size(); i++) {
+ Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str());
+ BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+ client.close();
+ }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
+{
+ for (int i = 0; i < count; i++) {
+ client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag));
+ }
+}
+
QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly
@@ -868,31 +903,43 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
QueueOptions options;
options.setSizePolicy(RING, 0, 5);
c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- for (int i = 0; i < 5; i++) {
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
- }
- //receive but don't ack a message
- LocalQueue lq;
- SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
- lqSettings.autoAck = 0;
- Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
- c1.session.messageFlush("q");
-
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
//add new node
cluster.add();
-
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
//send one more message
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag));
-
- c1.session.close();
- c1.connection.close();
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+}
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+ ScopedSuppressLogging allQuiet;
+ //tests that ring queues are accurately replicated on newly joined
+ //nodes; just like testRingQueueUpdate, but new node joins after
+ //the sixth message has been sent.
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
//check state of queue on both nodes
- vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
- Client c3(cluster[0], "c3");
- BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
- Client c2(cluster[1], "c2");
- BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
}
QPID_AUTO_TEST_CASE(testRelease) {