diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 1231 |
1 files changed, 1231 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp new file mode 100644 index 0000000000..f2ccd0ba84 --- /dev/null +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -0,0 +1,1231 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ForkedBroker.h" +#include "BrokerFixture.h" +#include "ClusterFixture.h" + +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/Session.h" +#include "qpid/client/FailoverListener.h" +#include "qpid/client/FailoverManager.h" +#include "qpid/client/QueueOptions.h" +#include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/cluster/UpdateClient.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" + +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/assign.hpp> + +#include <string> +#include <iostream> +#include <fstream> +#include <iterator> +#include <vector> +#include <set> +#include <algorithm> +#include <iterator> + + +using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(cluster_test) + +bool durableFlag = std::getenv("STORE_LIB") != 0; + +void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { + ostringstream clusterLib; + clusterLib << getLibPath("CLUSTER_LIB"); + args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); + if (durableFlag) + args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; + else + args += "--no-data-dir"; +} + +ClusterFixture::Args prepareArgs(const bool durableFlag = false) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + return args; +} + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=2*sys::TIME_SEC; + + +ostream& operator<<(ostream& o, const cpg_name* n) { + return o << Cpg::str(*n); +} + +ostream& operator<<(ostream& o, const cpg_address& a) { + return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")"; +} + +template <class T> +ostream& operator<<(ostream& o, const pair<T*, int>& array) { + o << "{ "; + ostream_iterator<cpg_address> i(o, " "); + copy(array.first, array.first+array.second, i); + o << "}"; + return o; +} + +template <class C> set<int> makeSet(const C& c) { + set<int> s; + copy(c.begin(), c.end(), inserter(s, s.begin())); + return s; +} + +class Sender { + public: + Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} + void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { + AMQFrame f(body); + f.setChannel(channel); + f.setFirstSegment(firstSeg); + f.setLastSegment(lastSeg); + f.setFirstFrame(firstFrame); + f.setLastFrame(lastFrame); + connection->expand(f.encodedSize(), false); + connection->handle(f); + } + + private: + boost::shared_ptr<ConnectionImpl> connection; + uint16_t channel; +}; + +int64_t getMsgSequence(const Message& m) { + return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); +} + +Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) { + Message m(data, key); + m.getDeliveryProperties().setTtl(ttl); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + return m; +} + +Message makeMessage(const string& data, const string& key, bool durable = false) { + Message m(data, key); + if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + return m; +} + +vector<string> browse(Client& c, const string& q, int n) { + SubscriptionSettings browseSettings( + FlowControl::messageCredit(n), + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(q); + vector<string> result; + for (int i = 0; i < n; ++i) { + Message m; + if (!lq.get(m, TIMEOUT)) + break; + result.push_back(m.getData()); + } + c.subs.getSubscription(q).cancel(); + return result; +} + +ConnectionSettings aclSettings(int port, const std::string& id) { + ConnectionSettings settings; + settings.port = port; + settings.mechanism = "PLAIN"; + settings.username = id; + settings.password = id; + return settings; +} + +// An illegal frame body +struct PoisonPill : public AMQBody { + virtual uint8_t type() const { return 0xFF; } + virtual void encode(Buffer& ) const {} + virtual void decode(Buffer& , uint32_t=0) {} + virtual uint32_t encodedSize() const { return 0; } + + virtual void print(std::ostream&) const {}; + virtual void accept(AMQBodyConstVisitor&) const {}; + + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ) { return false; } + virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; } +}; + +QPID_AUTO_TEST_CASE(testBadClientData) { + // Ensure that bad data on a client connection closes the + // connection but does not stop the broker. + ClusterFixture::Args args; + prepareArgs(args, false); + args += "--log-enable=critical"; // Supress expected errors + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0]); + Client c1(cluster[1]); + boost::shared_ptr<client::ConnectionImpl> ci = + client::ConnectionAccess::getImpl(c0.connection); + AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill)); + ci->expand(poison.encodedSize(), false); + ci->handle(poison); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception); + } + Client c00(cluster[0]); + BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), ""); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), ""); +} + +QPID_AUTO_TEST_CASE(testAcl) { + ofstream policyFile("cluster_test.acl"); + policyFile << "acl allow foo@QPID create queue name=foo" << endl + << "acl allow foo@QPID create queue name=foo2" << endl + << "acl deny foo@QPID create queue name=bar" << endl + << "acl allow all all" << endl; + policyFile.close(); + char cwd[1024]; + BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); + ostringstream aclLib; + aclLib << getLibPath("ACL_LIB"); + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + args += "--log-enable=critical"; // Supress expected errors + args += "--acl-file", string(cwd) + "/cluster_test.acl", + "--cluster-mechanism", "PLAIN", + "--cluster-username", "cluster", + "--cluster-password", "cluster", + "--load-module", aclLib.str(); + ClusterFixture cluster(2, args, -1); + + Client c0(aclSettings(cluster[0], "c0"), "c0"); + Client c1(aclSettings(cluster[1], "c1"), "c1"); + Client foo(aclSettings(cluster[1], "foo"), "foo"); + + foo.session.queueDeclare("foo", arg::durable=durableFlag); + BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); + + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); + } + BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); + BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); + + cluster.add(); + Client c2(aclSettings(cluster[2], "c2"), "c2"); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); + } + BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); +} + +QPID_AUTO_TEST_CASE(testMessageTimeToLive) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + + BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of<string>("x")); + BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of<string>("x")); + BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of<string>("x")); + + sys::usleep(200*1000); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b")); +} + +QPID_AUTO_TEST_CASE(testSequenceOptions) { + // Make sure the exchange qpid.msg_sequence property is properly replicated. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + FieldTable ftargs; + ftargs.setInt("qpid.msg_sequence", 1); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); + c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); + c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex"); + c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex"); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); + + cluster.add(); + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex"); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); +} + +QPID_AUTO_TEST_CASE(testTxTransaction) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag)); + + // Start a transaction that will commit. + Session commitSession = c0.connection.newSession("commit"); + SubscriptionManager commitSubs(commitSession); + commitSession.txSelect(); + commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); + + // Start a transaction that will roll back. + Session rollbackSession = c0.connection.newSession("rollback"); + SubscriptionManager rollbackSubs(rollbackSession); + rollbackSession.txSelect(); + rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); + Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); + BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); + + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + // Add new member mid transaction. + cluster.add(); + Client c1(cluster[1], "c1"); + + // More transactional work + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); + commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag)); + rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag)); + + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + // Commit/roll back. + commitSession.txCommit(); + rollbackSession.txRollback(); + rollbackSession.messageRelease(rollbackMessage.getId()); + + // Verify queue status: just the comitted messages and dequeues should remain. + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); + + commitSession.close(); + rollbackSession.close(); +} + +QPID_AUTO_TEST_CASE(testUnacked) { + // Verify replication of unacknowledged messages. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + Message m; + + // Create unacked message: acquired but not accepted. + SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); + c0.session.queueDeclare("q1", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag)); + LocalQueue q1; + c0.subs.subscribe(q1, "q1", manualAccept); + BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue + + // Create unacked message: not acquired, accepted or completeed. + SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); + c0.session.queueDeclare("q2", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag)); + LocalQueue q2; + c0.subs.subscribe(q2, "q2", manualAcquire); + m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(m.getData(), "21"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed + c0.subs.getSubscription("q2").acquire(m); // Acquire manually + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed + BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. + + // Create empty credit record: acquire and accept but don't complete. + SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); + c0.session.queueDeclare("q3", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag)); + LocalQueue q3; + c0.subs.subscribe(q3, "q3", manualComplete); + Message m31=q3.get(TIMEOUT); + BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. + BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); + + // Add new member while there are unacked messages. + cluster.add(); + Client c1(cluster[1], "c1"); + + // Check queue counts + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u); + + // Complete the empty credit message, should unblock the message behind it. + BOOST_CHECK_THROW(q3.get(0), Exception); + c0.session.markCompleted(SequenceSet(m31.getId()), true); + BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); + + // Close the original session - unacked messages should be requeued. + c0.session.close(); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); + + BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); +} + +// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. +void testUpdateTxState() { + // Verify that we update transaction state correctly to new members. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + // Do work in a transaction. + c0.session.txSelect(); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag)); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "1"); + + // New member, TX not comitted, c1 should see nothing. + cluster.add(); + Client c1(cluster[1], "c1"); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + + // After commit c1 shoudl see results of tx. + c0.session.txCommit(); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "2"); + + // Another transaction with both members active. + c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag)); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + c0.session.txCommit(); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "3"); +} + +QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { + // Verify that we update a partially recieved message to a new member. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q", arg::durable=durableFlag); + Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); + + // Send first 2 frames of message. + MessageTransferBody transfer( + ProtocolVersion(), string(), // default exchange. + framing::message::ACCEPT_MODE_NONE, + framing::message::ACQUIRE_MODE_PRE_ACQUIRED); + sender.send(transfer, true, false, true, true); + AMQHeaderBody header; + header.get<DeliveryProperties>(true)->setRoutingKey("q"); + if (durableFlag) + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); + else + header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); + sender.send(header, false, false, true, true); + + // No reliable way to ensure the partial message has arrived + // before we start the new broker, so we sleep. + sys::usleep(2500); + cluster.add(); + + // Send final 2 frames of message. + sender.send(AMQContentBody("ab"), false, true, true, false); + sender.send(AMQContentBody("cd"), false, true, false, true); + + // Verify message is enqued correctly on second member. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "abcd"); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size()); +} + +QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + set<int> kb0 = knownBrokerPorts(c0.connection, 1); + BOOST_CHECK_EQUAL(kb0.size(), 1u); + BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); + + cluster.add(); + Client c1(cluster[1], "c1"); + set<int> kb1 = knownBrokerPorts(c1.connection, 2); + kb0 = knownBrokerPorts(c0.connection, 2); + BOOST_CHECK_EQUAL(kb1.size(), 2u); + BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb1,kb0); + + cluster.add(); + Client c2(cluster[2], "c2"); + set<int> kb2 = knownBrokerPorts(c2.connection, 3); + kb1 = knownBrokerPorts(c1.connection, 3); + kb0 = knownBrokerPorts(c0.connection, 3); + BOOST_CHECK_EQUAL(kb2.size(), 3u); + BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb2,kb0); + BOOST_CHECK_EQUAL(kb2,kb1); + + cluster.killWithSilencer(1,c1.connection,9); + kb0 = knownBrokerPorts(c0.connection, 2); + kb2 = knownBrokerPorts(c2.connection, 2); + BOOST_CHECK_EQUAL(kb0.size(), 2u); + BOOST_CHECK_EQUAL(kb0, kb2); +} + +QPID_AUTO_TEST_CASE(testUpdateConsumers) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); + LocalQueue lp; + c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); + c0.session.sync(); + + // Start new members + cluster.add(); // Local + Client c1(cluster[1], "c1"); + cluster.add(); + Client c2(cluster[2], "c2"); + + // Transfer messages + c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag)); + + c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag)); + + // Activate the subscription, ensure message removed on all queues. + c0.subs.setFlowControl("q", FlowControl::unlimited()); + Message m; + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); + + // Check second subscription's flow control: gets first message, not second. + BOOST_CHECK(lp.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); + + BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "ccc"); + + // Kill the subscribing member, ensure further messages are not removed. + cluster.killWithSilencer(0,c0.connection,9); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); + for (int i = 0; i < 10; ++i) { + c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag)); + BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); + BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); + } +} + +// Test that message data and delivery properties are updated properly. +QPID_AUTO_TEST_CASE(testUpdateMessages) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + // Create messages with different delivery properties + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); + c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), + arg::destination="amq.fanout"); + + while (c0.session.queueQuery("q").getMessageCount() != 2) + sys::usleep(1000); // Wait for message to show up on broker 0. + + // Add a new broker, it will catch up. + cluster.add(); + + // Do some work post-add + c0.session.queueDeclare("p", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag)); + + // Do some work post-join + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); + c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag)); + + // Verify new brokers have state. + Message m; + + Client c1(cluster[1], "c1"); + + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + // Add another broker, don't wait for join - should be stalled till ready. + cluster.add(); + Client c2(cluster[2], "c2"); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "pfoo"); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); + BOOST_CHECK_EQUAL(m.getData(), "pbar"); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); +} + +QPID_AUTO_TEST_CASE(testWiringReplication) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0]); + BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); + BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeDeclare("ex", arg::type="direct"); + c0.session.close(); + c0.connection.close(); + // Verify all brokers get wiring update. + for (size_t i = 0; i < cluster.size(); ++i) { + BOOST_MESSAGE("i == "<< i); + Client c(cluster[i]); + BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); + BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); + } +} + +QPID_AUTO_TEST_CASE(testMessageEnqueue) { + // Enqueue on one broker, dequeue on another. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + Client c0(cluster[0]); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + c0.session.close(); + Client c1(cluster[1]); + Message msg; + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(string("foo"), msg.getData()); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); + BOOST_CHECK_EQUAL(string("bar"), msg.getData()); +} + +QPID_AUTO_TEST_CASE(testMessageDequeue) { + // Enqueue on one broker, dequeue on two others. + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + + Message msg; + + // Dequeue on 2 others, ensure correct order. + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("foo", msg.getData()); + + Client c2(cluster[2], "c2"); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("bar", msg.getData()); + + // Queue should be empty on all cluster members. + BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); +} + +QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(3, args, -1); + Client c0(cluster[0]); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. + + // First start a subscription. + c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); + + // Now send messages + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); + c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); + + // Check they arrived + Message m; + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL("foo", m.getData()); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); + BOOST_CHECK_EQUAL("bar", m.getData()); + + // Queue should be empty on all cluster members. + Client c2(cluster[2]); + BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); + BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); +} + +QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) +{ + /* + Start with a single broker. + Set up two queues: one durable, and one not. + Add a new broker to the cluster. + Make sure it has one durable and one non-durable queue. + */ + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0]); + c0.session.queueDeclare("durable_queue", arg::durable=true); + c0.session.queueDeclare("non_durable_queue", arg::durable=false); + cluster.add(); + Client c1(cluster[1]); + QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); + QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); + BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); + BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); + + BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); + BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); +} + + +QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) +{ + + struct Sender : FailoverManager::Command + { + std::string queue; + std::string content; + + Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} + + void execute(AsyncSession& session, bool) + { + session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag)); + } + }; + + struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable + { + FailoverManager& mgr; + std::string queue; + std::string expectedContent; + qpid::client::Subscription subscription; + qpid::sys::Monitor lock; + bool ready, failed; + + Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {} + + void received(Message& message) + { + BOOST_CHECK_EQUAL(expectedContent, message.getData()); + subscription.cancel(); + } + + void execute(AsyncSession& session, bool) + { + session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, queue); + session.sync(); + setReady(); + subs.run(); + //cleanup: + session.queueDelete(arg::queue=queue); + } + + void run() + { + try { + mgr.execute(*this); + } + catch (const std::exception& e) { + BOOST_MESSAGE("Exception in mgr.execute: " << e.what()); + failed = true; + } + } + + void waitForReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + while (!ready) { + lock.wait(); + } + } + + void setReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + ready = true; + lock.notify(); + } + }; + + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(2, args, -1); + ConnectionSettings settings; + settings.port = cluster[1]; + settings.heartbeat = 1; + FailoverManager fmgr(settings); + Sender sender("my-queue", "my-data"); + Receiver receiver(fmgr, "my-queue", "my-data"); + qpid::sys::Thread runner(receiver); + receiver.waitForReady(); + { + ScopedSuppressLogging allQuiet; // suppress connection closed messages + cluster.kill(1); + //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: + ::usleep(2*1000*1000); + } + fmgr.execute(sender); + runner.join(); + BOOST_CHECK(!receiver.failed); + fmgr.close(); +} + +QPID_AUTO_TEST_CASE(testPolicyUpdate) { + //tests that the policys internal state is accurate on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(REJECT, 0, 2); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); + + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); + + Message received; + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("one")); + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("two")); + BOOST_CHECK(!c1.subs.get(received, "q")); + } +} + +QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { + //tests that exclusive queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); + cluster.add(); + Client c2(cluster[1], "c2"); + QueueQueryResult result = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); + BOOST_CHECK(result.getExclusive()); + BOOST_CHECK(result.getAutoDelete()); + BOOST_CHECK(!result.getDurable()); + BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); + c1.session.close(); + c1.connection.close(); + c2.session = c2.connection.newSession(); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + } +} + +/** + * Subscribes to specified queue and acquires up to the specified + * number of message but does not accept or release them. These + * message are therefore 'locked' by the clients session. + */ +Subscription lockMessages(Client& client, const std::string& queue, int count) +{ + LocalQueue q; + SubscriptionSettings settings(FlowControl::messageCredit(count)); + settings.autoAck = 0; + Subscription sub = client.subs.subscribe(q, queue, settings); + client.session.messageFlush(sub.getName()); + return sub; +} + +/** + * check that the specified queue contains the expected set of + * messages (matched on content) for all nodes in the cluster + */ +void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages) +{ + for (size_t i = 0; i < cluster.size(); i++) { + Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); + BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); + client.close(); + } +} + +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", + const std::string& lvqKey="") +{ + for (int i = 0; i < count; i++) { + Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); + if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); + client.session.messageTransfer(arg::content=message); + } +} + +QPID_AUTO_TEST_CASE(testRingQueueUpdate) { + //tests that ring queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //send one more message + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } +} + +QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { + //tests that ring queues are accurately replicated on newly joined + //nodes; just like testRingQueueUpdate, but new node joins after + //the sixth message has been sent. + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } +} + +QPID_AUTO_TEST_CASE(testLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 5, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1")); + } +} + + +QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + //if the lvq state has been affected by browsers + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 1, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")); + send(c1, "q", 4, 2, "a", "a"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3")); + } +} + +QPID_AUTO_TEST_CASE(testRelease) { + //tests that releasing a messages that was unacked when one node + //joined works correctly + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); + } +} + + +// Browse for 1 message with byte credit, return true if a message was +// received false if not. +bool browseByteCredit(Client& c, const string& q, int n, Message& m) { + SubscriptionSettings browseSettings( + FlowControl(1, n, false), // 1 message, n bytes credit, no window + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + Subscription s = c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(arg::destination=q, arg::sync=true); + c.session.sync(); + c.subs.getSubscription(q).cancel(); + return lq.get(m, 0); // No timeout, flush should push message thru. +} + +// Ensure cluster update preserves exact message size, use byte credt as test. +QPID_AUTO_TEST_CASE(testExactByteCredit) { + ClusterFixture cluster(1, prepareArgs(), -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); + cluster.add(); + + int size=36; // Size of message on broker: headers+body + Client c1(cluster[1], "c1"); + Message m; + + // Ensure we get the message with exact credit. + BOOST_CHECK(browseByteCredit(c0, "q", size, m)); + BOOST_CHECK(browseByteCredit(c1, "q", size, m)); + // and not with one byte less. + BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); + BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); +} + +// Test that consumer positions are updated correctly. +// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927 +// +QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + c0.session.queueDeclare("q", arg::durable=durableFlag); + SubscriptionSettings settings; + settings.autoAck = 0; + // Set the acquire mode to 'not-acquired' the consumer moves along the queue + // but does not acquire (remove) messages. + settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; + Subscription s = c0.subs.subscribe(c0.lq, "q", settings); + c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); + BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData()); + + // Add another member, send/receive another message and acquire + // the messages. With the bug, this creates an inconsistency + // because the browse position was not updated to the new member. + cluster.add(); + c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); + BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData()); + s.acquire(s.getUnacquired()); + s.accept(s.getUnaccepted()); + + // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1] + // Subscribing on cluster[1] provokes an error that shuts down cluster[0] + Client c1(cluster[1], "c1"); + Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1 + Message m; + BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10)); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); +} + +QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + FieldTable arguments; + arguments.setInt("x-qpid-priorities", 10); + arguments.setInt("x-qpid-fairshare", 5); + c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); + + //send messages of different priorities + for (int i = 0; i < 20; i++) { + Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); + msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); + c0.session.messageTransfer(arg::content=msg); + } + + //pull off a couple of the messages (first four should be the top priority messages + for (int i = 0; i < 4; i++) { + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); + } + + // Add another member + cluster.add(); + Client c1(cluster[1], "c1"); + + //pull off some more messages + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); + + //check queue has same content on both nodes + BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); +} + +QPID_AUTO_TEST_SUITE_END() +}} // namespace qpid::tests |