summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp1231
1 files changed, 1231 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
new file mode 100644
index 0000000000..f2ccd0ba84
--- /dev/null
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -0,0 +1,1231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ForkedBroker.h"
+#include "BrokerFixture.h"
+#include "ClusterFixture.h"
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/UpdateClient.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
+
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <iterator>
+#include <vector>
+#include <set>
+#include <algorithm>
+#include <iterator>
+
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(cluster_test)
+
+bool durableFlag = std::getenv("STORE_LIB") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+ ostringstream clusterLib;
+ clusterLib << getLibPath("CLUSTER_LIB");
+ args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+ if (durableFlag)
+ args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
+ else
+ args += "--no-data-dir";
+}
+
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ return args;
+}
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=2*sys::TIME_SEC;
+
+
+ostream& operator<<(ostream& o, const cpg_name* n) {
+ return o << Cpg::str(*n);
+}
+
+ostream& operator<<(ostream& o, const cpg_address& a) {
+ return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
+}
+
+template <class T>
+ostream& operator<<(ostream& o, const pair<T*, int>& array) {
+ o << "{ ";
+ ostream_iterator<cpg_address> i(o, " ");
+ copy(array.first, array.first+array.second, i);
+ o << "}";
+ return o;
+}
+
+template <class C> set<int> makeSet(const C& c) {
+ set<int> s;
+ copy(c.begin(), c.end(), inserter(s, s.begin()));
+ return s;
+}
+
+class Sender {
+ public:
+ Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+ void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+ AMQFrame f(body);
+ f.setChannel(channel);
+ f.setFirstSegment(firstSeg);
+ f.setLastSegment(lastSeg);
+ f.setFirstFrame(firstFrame);
+ f.setLastFrame(lastFrame);
+ connection->expand(f.encodedSize(), false);
+ connection->handle(f);
+ }
+
+ private:
+ boost::shared_ptr<ConnectionImpl> connection;
+ uint16_t channel;
+};
+
+int64_t getMsgSequence(const Message& m) {
+ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) {
+ Message m(data, key);
+ m.getDeliveryProperties().setTtl(ttl);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ return m;
+}
+
+Message makeMessage(const string& data, const string& key, bool durable = false) {
+ Message m(data, key);
+ if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ return m;
+}
+
+vector<string> browse(Client& c, const string& q, int n) {
+ SubscriptionSettings browseSettings(
+ FlowControl::messageCredit(n),
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(q);
+ vector<string> result;
+ for (int i = 0; i < n; ++i) {
+ Message m;
+ if (!lq.get(m, TIMEOUT))
+ break;
+ result.push_back(m.getData());
+ }
+ c.subs.getSubscription(q).cancel();
+ return result;
+}
+
+ConnectionSettings aclSettings(int port, const std::string& id) {
+ ConnectionSettings settings;
+ settings.port = port;
+ settings.mechanism = "PLAIN";
+ settings.username = id;
+ settings.password = id;
+ return settings;
+}
+
+// An illegal frame body
+struct PoisonPill : public AMQBody {
+ virtual uint8_t type() const { return 0xFF; }
+ virtual void encode(Buffer& ) const {}
+ virtual void decode(Buffer& , uint32_t=0) {}
+ virtual uint32_t encodedSize() const { return 0; }
+
+ virtual void print(std::ostream&) const {};
+ virtual void accept(AMQBodyConstVisitor&) const {};
+
+ virtual AMQMethodBody* getMethod() { return 0; }
+ virtual const AMQMethodBody* getMethod() const { return 0; }
+
+ /** Match if same type and same class/method ID for methods */
+ static bool match(const AMQBody& , const AMQBody& ) { return false; }
+ virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
+};
+
+QPID_AUTO_TEST_CASE(testBadClientData) {
+ // Ensure that bad data on a client connection closes the
+ // connection but does not stop the broker.
+ ClusterFixture::Args args;
+ prepareArgs(args, false);
+ args += "--log-enable=critical"; // Supress expected errors
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0]);
+ Client c1(cluster[1]);
+ boost::shared_ptr<client::ConnectionImpl> ci =
+ client::ConnectionAccess::getImpl(c0.connection);
+ AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill));
+ ci->expand(poison.encodedSize(), false);
+ ci->handle(poison);
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception);
+ }
+ Client c00(cluster[0]);
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), "");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), "");
+}
+
+QPID_AUTO_TEST_CASE(testAcl) {
+ ofstream policyFile("cluster_test.acl");
+ policyFile << "acl allow foo@QPID create queue name=foo" << endl
+ << "acl allow foo@QPID create queue name=foo2" << endl
+ << "acl deny foo@QPID create queue name=bar" << endl
+ << "acl allow all all" << endl;
+ policyFile.close();
+ char cwd[1024];
+ BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
+ ostringstream aclLib;
+ aclLib << getLibPath("ACL_LIB");
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ args += "--log-enable=critical"; // Supress expected errors
+ args += "--acl-file", string(cwd) + "/cluster_test.acl",
+ "--cluster-mechanism", "PLAIN",
+ "--cluster-username", "cluster",
+ "--cluster-password", "cluster",
+ "--load-module", aclLib.str();
+ ClusterFixture cluster(2, args, -1);
+
+ Client c0(aclSettings(cluster[0], "c0"), "c0");
+ Client c1(aclSettings(cluster[1], "c1"), "c1");
+ Client foo(aclSettings(cluster[1], "foo"), "foo");
+
+ foo.session.queueDeclare("foo", arg::durable=durableFlag);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
+
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+ }
+ BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
+ BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
+
+ cluster.add();
+ Client c2(aclSettings(cluster[2], "c2"), "c2");
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+ }
+ BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+
+ BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of<string>("x"));
+ BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of<string>("x"));
+ BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of<string>("x"));
+
+ sys::usleep(200*1000);
+ BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+ // Make sure the exchange qpid.msg_sequence property is properly replicated.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ FieldTable ftargs;
+ ftargs.setInt("qpid.msg_sequence", 1);
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
+ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+ c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex");
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+
+ cluster.add();
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex");
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag));
+
+ // Start a transaction that will commit.
+ Session commitSession = c0.connection.newSession("commit");
+ SubscriptionManager commitSubs(commitSession);
+ commitSession.txSelect();
+ commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
+
+ // Start a transaction that will roll back.
+ Session rollbackSession = c0.connection.newSession("rollback");
+ SubscriptionManager rollbackSubs(rollbackSession);
+ rollbackSession.txSelect();
+ rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+ Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
+ BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
+
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ // Add new member mid transaction.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // More transactional work
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag));
+ rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag));
+
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Commit/roll back.
+ commitSession.txCommit();
+ rollbackSession.txRollback();
+ rollbackSession.messageRelease(rollbackMessage.getId());
+
+ // Verify queue status: just the comitted messages and dequeues should remain.
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+ commitSession.close();
+ rollbackSession.close();
+}
+
+QPID_AUTO_TEST_CASE(testUnacked) {
+ // Verify replication of unacknowledged messages.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ Message m;
+
+ // Create unacked message: acquired but not accepted.
+ SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+ c0.session.queueDeclare("q1", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag));
+ LocalQueue q1;
+ c0.subs.subscribe(q1, "q1", manualAccept);
+ BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+ // Create unacked message: not acquired, accepted or completeed.
+ SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+ c0.session.queueDeclare("q2", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag));
+ LocalQueue q2;
+ c0.subs.subscribe(q2, "q2", manualAcquire);
+ m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+ c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+ BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+ // Create empty credit record: acquire and accept but don't complete.
+ SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+ c0.session.queueDeclare("q3", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag));
+ LocalQueue q3;
+ c0.subs.subscribe(q3, "q3", manualComplete);
+ Message m31=q3.get(TIMEOUT);
+ BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Add new member while there are unacked messages.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // Check queue counts
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+ // Complete the empty credit message, should unblock the message behind it.
+ BOOST_CHECK_THROW(q3.get(0), Exception);
+ c0.session.markCompleted(SequenceSet(m31.getId()), true);
+ BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+ // Close the original session - unacked messages should be requeued.
+ c0.session.close();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+ BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
+}
+
+// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
+void testUpdateTxState() {
+ // Verify that we update transaction state correctly to new members.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ // Do work in a transaction.
+ c0.session.txSelect();
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag));
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "1");
+
+ // New member, TX not comitted, c1 should see nothing.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+ // After commit c1 shoudl see results of tx.
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "2");
+
+ // Another transaction with both members active.
+ c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "3");
+}
+
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+ // Verify that we update a partially recieved message to a new member.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+ // Send first 2 frames of message.
+ MessageTransferBody transfer(
+ ProtocolVersion(), string(), // default exchange.
+ framing::message::ACCEPT_MODE_NONE,
+ framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sender.send(transfer, true, false, true, true);
+ AMQHeaderBody header;
+ header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ if (durableFlag)
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+ else
+ header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
+ sender.send(header, false, false, true, true);
+
+ // No reliable way to ensure the partial message has arrived
+ // before we start the new broker, so we sleep.
+ sys::usleep(2500);
+ cluster.add();
+
+ // Send final 2 frames of message.
+ sender.send(AMQContentBody("ab"), false, true, true, false);
+ sender.send(AMQContentBody("cd"), false, true, false, true);
+
+ // Verify message is enqued correctly on second member.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "abcd");
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+ set<int> kb0 = knownBrokerPorts(c0.connection, 1);
+ BOOST_CHECK_EQUAL(kb0.size(), 1u);
+ BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+ cluster.add();
+ Client c1(cluster[1], "c1");
+ set<int> kb1 = knownBrokerPorts(c1.connection, 2);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ BOOST_CHECK_EQUAL(kb1.size(), 2u);
+ BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb1,kb0);
+
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ set<int> kb2 = knownBrokerPorts(c2.connection, 3);
+ kb1 = knownBrokerPorts(c1.connection, 3);
+ kb0 = knownBrokerPorts(c0.connection, 3);
+ BOOST_CHECK_EQUAL(kb2.size(), 3u);
+ BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+ BOOST_CHECK_EQUAL(kb2,kb0);
+ BOOST_CHECK_EQUAL(kb2,kb1);
+
+ cluster.killWithSilencer(1,c1.connection,9);
+ kb0 = knownBrokerPorts(c0.connection, 2);
+ kb2 = knownBrokerPorts(c2.connection, 2);
+ BOOST_CHECK_EQUAL(kb0.size(), 2u);
+ BOOST_CHECK_EQUAL(kb0, kb2);
+}
+
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+ LocalQueue lp;
+ c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+ c0.session.sync();
+
+ // Start new members
+ cluster.add(); // Local
+ Client c1(cluster[1], "c1");
+ cluster.add();
+ Client c2(cluster[2], "c2");
+
+ // Transfer messages
+ c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag));
+
+ c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag));
+
+ // Activate the subscription, ensure message removed on all queues.
+ c0.subs.setFlowControl("q", FlowControl::unlimited());
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Check second subscription's flow control: gets first message, not second.
+ BOOST_CHECK(lp.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+ BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
+ // Kill the subscribing member, ensure further messages are not removed.
+ cluster.killWithSilencer(0,c0.connection,9);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
+ for (int i = 0; i < 10; ++i) {
+ c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
+ }
+}
+
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ // Create messages with different delivery properties
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
+ c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag),
+ arg::destination="amq.fanout");
+
+ while (c0.session.queueQuery("q").getMessageCount() != 2)
+ sys::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Add a new broker, it will catch up.
+ cluster.add();
+
+ // Do some work post-add
+ c0.session.queueDeclare("p", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag));
+
+ // Do some work post-join
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
+ c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag));
+
+ // Verify new brokers have state.
+ Message m;
+
+ Client c1(cluster[1], "c1");
+
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ // Add another broker, don't wait for join - should be stalled till ready.
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+ BOOST_CHECK_EQUAL(m.getData(), "pbar");
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0]);
+ BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeDeclare("ex", arg::type="direct");
+ c0.session.close();
+ c0.connection.close();
+ // Verify all brokers get wiring update.
+ for (size_t i = 0; i < cluster.size(); ++i) {
+ BOOST_MESSAGE("i == "<< i);
+ Client c(cluster[i]);
+ BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+ c0.session.close();
+ Client c1(cluster[1]);
+ Message msg;
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+ BOOST_CHECK_EQUAL(string("bar"), msg.getData());
+}
+
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+ // Enqueue on one broker, dequeue on two others.
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+ Message msg;
+
+ // Dequeue on 2 others, ensure correct order.
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("foo", msg.getData());
+
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("bar", msg.getData());
+
+ // Queue should be empty on all cluster members.
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(3, args, -1);
+ Client c0(cluster[0]);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+ // First start a subscription.
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
+ // Now send messages
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+ c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+ // Check they arrived
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL("foo", m.getData());
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+ BOOST_CHECK_EQUAL("bar", m.getData());
+
+ // Queue should be empty on all cluster members.
+ Client c2(cluster[2]);
+ BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+ BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
+{
+ /*
+ Start with a single broker.
+ Set up two queues: one durable, and one not.
+ Add a new broker to the cluster.
+ Make sure it has one durable and one non-durable queue.
+ */
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("durable_queue", arg::durable=true);
+ c0.session.queueDeclare("non_durable_queue", arg::durable=false);
+ cluster.add();
+ Client c1(cluster[1]);
+ QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" );
+ QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+ BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+ BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
+
+ BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
+ BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
+}
+
+
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+
+ struct Sender : FailoverManager::Command
+ {
+ std::string queue;
+ std::string content;
+
+ Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag));
+ }
+ };
+
+ struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+ {
+ FailoverManager& mgr;
+ std::string queue;
+ std::string expectedContent;
+ qpid::client::Subscription subscription;
+ qpid::sys::Monitor lock;
+ bool ready, failed;
+
+ Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {}
+
+ void received(Message& message)
+ {
+ BOOST_CHECK_EQUAL(expectedContent, message.getData());
+ subscription.cancel();
+ }
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, queue);
+ session.sync();
+ setReady();
+ subs.run();
+ //cleanup:
+ session.queueDelete(arg::queue=queue);
+ }
+
+ void run()
+ {
+ try {
+ mgr.execute(*this);
+ }
+ catch (const std::exception& e) {
+ BOOST_MESSAGE("Exception in mgr.execute: " << e.what());
+ failed = true;
+ }
+ }
+
+ void waitForReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ while (!ready) {
+ lock.wait();
+ }
+ }
+
+ void setReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ ready = true;
+ lock.notify();
+ }
+ };
+
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(2, args, -1);
+ ConnectionSettings settings;
+ settings.port = cluster[1];
+ settings.heartbeat = 1;
+ FailoverManager fmgr(settings);
+ Sender sender("my-queue", "my-data");
+ Receiver receiver(fmgr, "my-queue", "my-data");
+ qpid::sys::Thread runner(receiver);
+ receiver.waitForReady();
+ {
+ ScopedSuppressLogging allQuiet; // suppress connection closed messages
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ }
+ fmgr.execute(sender);
+ runner.join();
+ BOOST_CHECK(!receiver.failed);
+ fmgr.close();
+}
+
+QPID_AUTO_TEST_CASE(testPolicyUpdate) {
+ //tests that the policys internal state is accurate on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(REJECT, 0, 2);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+ Message received;
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+ BOOST_CHECK(!c1.subs.get(received, "q"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
+ //tests that exclusive queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ QueueQueryResult result = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+ BOOST_CHECK(result.getExclusive());
+ BOOST_CHECK(result.getAutoDelete());
+ BOOST_CHECK(!result.getDurable());
+ BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+ c1.session.close();
+ c1.connection.close();
+ c2.session = c2.connection.newSession();
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ }
+}
+
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+ LocalQueue q;
+ SubscriptionSettings settings(FlowControl::messageCredit(count));
+ settings.autoAck = 0;
+ Subscription sub = client.subs.subscribe(q, queue, settings);
+ client.session.messageFlush(sub.getName());
+ return sub;
+}
+
+/**
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages)
+{
+ for (size_t i = 0; i < cluster.size(); i++) {
+ Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str());
+ BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+ client.close();
+ }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m",
+ const std::string& lvqKey="")
+{
+ for (int i = 0; i < count; i++) {
+ Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag);
+ if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey);
+ client.session.messageTransfer(arg::content=message);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+ //tests that ring queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //send one more message
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+ //tests that ring queues are accurately replicated on newly joined
+ //nodes; just like testRingQueueUpdate, but new node joins after
+ //the sixth message has been sent.
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 5, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1"));
+ }
+}
+
+
+QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ //if the lvq state has been affected by browsers
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 1, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1"));
+ send(c1, "q", 4, 2, "a", "a");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3"));
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+ //tests that releasing a messages that was unacked when one node
+ //joined works correctly
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+ }
+}
+
+
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+ SubscriptionSettings browseSettings(
+ FlowControl(1, n, false), // 1 message, n bytes credit, no window
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ Subscription s = c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(arg::destination=q, arg::sync=true);
+ c.session.sync();
+ c.subs.getSubscription(q).cancel();
+ return lq.get(m, 0); // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+ ClusterFixture cluster(1, prepareArgs(), -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+ cluster.add();
+
+ int size=36; // Size of message on broker: headers+body
+ Client c1(cluster[1], "c1");
+ Message m;
+
+ // Ensure we get the message with exact credit.
+ BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+ BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+ // and not with one byte less.
+ BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+ BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+// Test that consumer positions are updated correctly.
+// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927
+//
+QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+ // Set the acquire mode to 'not-acquired' the consumer moves along the queue
+ // but does not acquire (remove) messages.
+ settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
+ Subscription s = c0.subs.subscribe(c0.lq, "q", settings);
+ c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+ BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData());
+
+ // Add another member, send/receive another message and acquire
+ // the messages. With the bug, this creates an inconsistency
+ // because the browse position was not updated to the new member.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData());
+ s.acquire(s.getUnacquired());
+ s.accept(s.getUnaccepted());
+
+ // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1]
+ // Subscribing on cluster[1] provokes an error that shuts down cluster[0]
+ Client c1(cluster[1], "c1");
+ Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1
+ Message m;
+ BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ FieldTable arguments;
+ arguments.setInt("x-qpid-priorities", 10);
+ arguments.setInt("x-qpid-fairshare", 5);
+ c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+ //send messages of different priorities
+ for (int i = 0; i < 20; i++) {
+ Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+ msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+ c0.session.messageTransfer(arg::content=msg);
+ }
+
+ //pull off a couple of the messages (first four should be the top priority messages
+ for (int i = 0; i < 4; i++) {
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+ }
+
+ // Add another member
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ //pull off some more messages
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+ //check queue has same content on both nodes
+ BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests