/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "test_tools.h" #include "unit_test.h" #include "ForkedBroker.h" #include "BrokerFixture.h" #include "ClusterFixture.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/Session.h" #include "qpid/client/FailoverListener.h" #include "qpid/client/FailoverManager.h" #include "qpid/client/QueueOptions.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Logger.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; using namespace boost::assign; using broker::Broker; using boost::shared_ptr; namespace qpid { namespace tests { QPID_AUTO_TEST_SUITE(cluster_test) bool durableFlag = std::getenv("STORE_LIB") != 0; void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { ostringstream clusterLib; clusterLib << getLibPath("CLUSTER_LIB"); args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); if (durableFlag) args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; else args += "--no-data-dir"; } ClusterFixture::Args prepareArgs(const bool durableFlag = false) { ClusterFixture::Args args; prepareArgs(args, durableFlag); return args; } // Timeout for tests that wait for messages const sys::Duration TIMEOUT=2*sys::TIME_SEC; ostream& operator<<(ostream& o, const cpg_name* n) { return o << Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { return o << "(" << a.nodeid <<","< ostream& operator<<(ostream& o, const pair& array) { o << "{ "; ostream_iterator i(o, " "); copy(array.first, array.first+array.second, i); o << "}"; return o; } template set makeSet(const C& c) { set s; copy(c.begin(), c.end(), inserter(s, s.begin())); return s; } class Sender { public: Sender(boost::shared_ptr ci, uint16_t ch) : connection(ci), channel(ch) {} void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { AMQFrame f(body); f.setChannel(channel); f.setFirstSegment(firstSeg); f.setLastSegment(lastSeg); f.setFirstFrame(firstFrame); f.setLastFrame(lastFrame); connection->expand(f.encodedSize(), false); connection->handle(f); } private: boost::shared_ptr connection; uint16_t channel; }; int64_t getMsgSequence(const Message& m) { return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) { Message m(data, key); m.getDeliveryProperties().setTtl(ttl); if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); return m; } Message makeMessage(const string& data, const string& key, bool durable = false) { Message m(data, key); if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); return m; } vector browse(Client& c, const string& q, int n) { SubscriptionSettings browseSettings( FlowControl::messageCredit(n), ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED, 0 // No auto-ack. ); LocalQueue lq; c.subs.subscribe(lq, q, browseSettings); c.session.messageFlush(q); vector result; for (int i = 0; i < n; ++i) { Message m; if (!lq.get(m, TIMEOUT)) break; result.push_back(m.getData()); } c.subs.getSubscription(q).cancel(); return result; } ConnectionSettings aclSettings(int port, const std::string& id) { ConnectionSettings settings; settings.port = port; settings.mechanism = "PLAIN"; settings.username = id; settings.password = id; return settings; } // An illegal frame body struct PoisonPill : public AMQBody { virtual uint8_t type() const { return 0xFF; } virtual void encode(Buffer& ) const {} virtual void decode(Buffer& , uint32_t=0) {} virtual uint32_t encodedSize() const { return 0; } virtual void print(std::ostream&) const {}; virtual void accept(AMQBodyConstVisitor&) const {}; virtual AMQMethodBody* getMethod() { return 0; } virtual const AMQMethodBody* getMethod() const { return 0; } /** Match if same type and same class/method ID for methods */ static bool match(const AMQBody& , const AMQBody& ) { return false; } virtual boost::intrusive_ptr clone() const { return new PoisonPill; } }; QPID_AUTO_TEST_CASE(testBadClientData) { // Ensure that bad data on a client connection closes the // connection but does not stop the broker. ClusterFixture::Args args; prepareArgs(args, false); args += "--log-enable=critical"; // Supress expected errors ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); Client c1(cluster[1]); boost::shared_ptr ci = client::ConnectionAccess::getImpl(c0.connection); AMQFrame poison(boost::intrusive_ptr(new PoisonPill)); ci->expand(poison.encodedSize(), false); ci->handle(poison); { ScopedSuppressLogging sl; BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception); } Client c00(cluster[0]); BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), ""); BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), ""); } QPID_AUTO_TEST_CASE(testAcl) { ofstream policyFile("cluster_test.acl"); policyFile << "acl allow foo@QPID create queue name=foo" << endl << "acl allow foo@QPID create queue name=foo2" << endl << "acl deny foo@QPID create queue name=bar" << endl << "acl allow all all" << endl; policyFile.close(); char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); ostringstream aclLib; aclLib << getLibPath("ACL_LIB"); ClusterFixture::Args args; prepareArgs(args, durableFlag); args += "--log-enable=critical"; // Supress expected errors args += "--acl-file", string(cwd) + "/cluster_test.acl", "--cluster-mechanism", "PLAIN", "--cluster-username", "cluster", "--cluster-password", "cluster", "--load-module", aclLib.str(); ClusterFixture cluster(2, args, -1); Client c0(aclSettings(cluster[0], "c0"), "c0"); Client c1(aclSettings(cluster[1], "c1"), "c1"); Client foo(aclSettings(cluster[1], "foo"), "foo"); foo.session.queueDeclare("foo", arg::durable=durableFlag); BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); { ScopedSuppressLogging sl; BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); } BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); cluster.add(); Client c2(aclSettings(cluster[2], "c2"), "c2"); { ScopedSuppressLogging sl; BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); } BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); } QPID_AUTO_TEST_CASE(testMessageTimeToLive) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); c0.session.queueDeclare("p", arg::durable=durableFlag); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag)); cluster.add(); Client c2(cluster[1], "c2"); BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of("x")); BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of("x")); BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of("x")); sys::usleep(200*1000); BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of("b")); BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of("b")); BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of("b")); } QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); FieldTable ftargs; ftargs.setInt("qpid.msg_sequence", 1); c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex"); c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex"); BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex"); BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag)); // Start a transaction that will commit. Session commitSession = c0.connection.newSession("commit"); SubscriptionManager commitSubs(commitSession); commitSession.txSelect(); commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag)); commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); // Add new member mid transaction. cluster.add(); Client c1(cluster[1], "c1"); // More transactional work BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag)); rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag)); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Commit/roll back. commitSession.txCommit(); rollbackSession.txRollback(); rollbackSession.messageRelease(rollbackMessage.getId()); // Verify queue status: just the comitted messages and dequeues should remain. BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); commitSession.close(); rollbackSession.close(); } QPID_AUTO_TEST_CASE(testUnacked) { // Verify replication of unacknowledged messages. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); Message m; // Create unacked message: acquired but not accepted. SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); c0.session.queueDeclare("q1", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag)); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue // Create unacked message: not acquired, accepted or completeed. SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); c0.session.queueDeclare("q2", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag)); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(m.getData(), "21"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed c0.subs.getSubscription("q2").acquire(m); // Acquire manually BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. // Create empty credit record: acquire and accept but don't complete. SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); c0.session.queueDeclare("q3", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag)); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); Message m31=q3.get(TIMEOUT); BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); // Add new member while there are unacked messages. cluster.add(); Client c1(cluster[1], "c1"); // Check queue counts BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u); // Complete the empty credit message, should unblock the message behind it. BOOST_CHECK_THROW(q3.get(0), Exception); c0.session.markCompleted(SequenceSet(m31.getId()), true); BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); // Close the original session - unacked messages should be requeued. c0.session.close(); BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); } // FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. void testUpdateTxState() { // Verify that we update transaction state correctly to new members. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Do work in a transaction. c0.session.txSelect(); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag)); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); // New member, TX not comitted, c1 should see nothing. cluster.add(); Client c1(cluster[1], "c1"); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); // After commit c1 shoudl see results of tx. c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag)); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "3"); } QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify that we update a partially recieved message to a new member. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q", arg::durable=durableFlag); Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); // Send first 2 frames of message. MessageTransferBody transfer( ProtocolVersion(), string(), // default exchange. framing::message::ACCEPT_MODE_NONE, framing::message::ACQUIRE_MODE_PRE_ACQUIRED); sender.send(transfer, true, false, true, true); AMQHeaderBody header; header.get(true)->setRoutingKey("q"); if (durableFlag) header.get(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); else header.get(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); sender.send(header, false, false, true, true); // No reliable way to ensure the partial message has arrived // before we start the new broker, so we sleep. sys::usleep(2500); cluster.add(); // Send final 2 frames of message. sender.send(AMQContentBody("ab"), false, true, true, false); sender.send(AMQContentBody("cd"), false, true, false, true); // Verify message is enqued correctly on second member. Message m; Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size()); } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); set kb0 = knownBrokerPorts(c0.connection, 1); BOOST_CHECK_EQUAL(kb0.size(), 1u); BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); cluster.add(); Client c1(cluster[1], "c1"); set kb1 = knownBrokerPorts(c1.connection, 2); kb0 = knownBrokerPorts(c0.connection, 2); BOOST_CHECK_EQUAL(kb1.size(), 2u); BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); BOOST_CHECK_EQUAL(kb1,kb0); cluster.add(); Client c2(cluster[2], "c2"); set kb2 = knownBrokerPorts(c2.connection, 3); kb1 = knownBrokerPorts(c1.connection, 3); kb0 = knownBrokerPorts(c0.connection, 3); BOOST_CHECK_EQUAL(kb2.size(), 3u); BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); BOOST_CHECK_EQUAL(kb2,kb0); BOOST_CHECK_EQUAL(kb2,kb1); cluster.killWithSilencer(1,c1.connection,9); kb0 = knownBrokerPorts(c0.connection, 2); kb2 = knownBrokerPorts(c2.connection, 2); BOOST_CHECK_EQUAL(kb0.size(), 2u); BOOST_CHECK_EQUAL(kb0, kb2); } QPID_AUTO_TEST_CASE(testUpdateConsumers) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("p", arg::durable=durableFlag); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); LocalQueue lp; c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); c0.session.sync(); // Start new members cluster.add(); // Local Client c1(cluster[1], "c1"); cluster.add(); Client c2(cluster[2], "c2"); // Transfer messages c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag)); // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); Message m; BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); // Check second subscription's flow control: gets first message, not second. BOOST_CHECK(lp.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "ccc"); // Kill the subscribing member, ensure further messages are not removed. cluster.killWithSilencer(0,c0.connection,9); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag)); BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } // Test that message data and delivery properties are updated properly. QPID_AUTO_TEST_CASE(testUpdateMessages) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); // Create messages with different delivery properties c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), arg::destination="amq.fanout"); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. // Add a new broker, it will catch up. cluster.add(); // Do some work post-add c0.session.queueDeclare("p", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag)); // Do some work post-join BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag)); // Verify new brokers have state. Message m; Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); BOOST_CHECK(m.getDeliveryProperties().hasExchange()); BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK(m.getDeliveryProperties().hasExchange()); BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. cluster.add(); Client c2(cluster[2], "c2"); BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); c0.connection.close(); // Verify all brokers get wiring update. for (size_t i = 0; i < cluster.size(); ++i) { BOOST_MESSAGE("i == "<< i); Client c(cluster[i]); BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); } } QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(2, args, -1); Client c0(cluster[0]); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); c0.session.close(); Client c1(cluster[1]); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(3, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q", arg::durable=durableFlag); c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); Message msg; // Dequeue on 2 others, ensure correct order. Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); Client c2(cluster[2], "c2"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); // Queue should be empty on all cluster members. BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(3, args, -1); Client c0(cluster[0]); BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. // First start a subscription. c0.session.queueDeclare("q", arg::durable=durableFlag); c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); // Check they arrived Message m; BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("foo", m.getData()); BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("bar", m.getData()); // Queue should be empty on all cluster members. Client c2(cluster[2]); BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) { /* Start with a single broker. Set up two queues: one durable, and one not. Add a new broker to the cluster. Make sure it has one durable and one non-durable queue. */ ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0]); c0.session.queueDeclare("durable_queue", arg::durable=true); c0.session.queueDeclare("non_durable_queue", arg::durable=false); cluster.add(); Client c1(cluster[1]); QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); } QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) { struct Sender : FailoverManager::Command { std::string queue; std::string content; Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} void execute(AsyncSession& session, bool) { session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag)); } }; struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable { FailoverManager& mgr; std::string queue; std::string expectedContent; qpid::client::Subscription subscription; qpid::sys::Monitor lock; bool ready, failed; Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {} void received(Message& message) { BOOST_CHECK_EQUAL(expectedContent, message.getData()); subscription.cancel(); } void execute(AsyncSession& session, bool) { session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); SubscriptionManager subs(session); subscription = subs.subscribe(*this, queue); session.sync(); setReady(); subs.run(); //cleanup: session.queueDelete(arg::queue=queue); } void run() { try { mgr.execute(*this); } catch (const std::exception& e) { BOOST_MESSAGE("Exception in mgr.execute: " << e.what()); failed = true; } } void waitForReady() { qpid::sys::Monitor::ScopedLock l(lock); while (!ready) { lock.wait(); } } void setReady() { qpid::sys::Monitor::ScopedLock l(lock); ready = true; lock.notify(); } }; ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(2, args, -1); ConnectionSettings settings; settings.port = cluster[1]; settings.heartbeat = 1; FailoverManager fmgr(settings); Sender sender("my-queue", "my-data"); Receiver receiver(fmgr, "my-queue", "my-data"); qpid::sys::Thread runner(receiver); receiver.waitForReady(); { ScopedSuppressLogging allQuiet; // suppress connection closed messages cluster.kill(1); //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: ::usleep(2*1000*1000); } fmgr.execute(sender); runner.join(); BOOST_CHECK(!receiver.failed); fmgr.close(); } QPID_AUTO_TEST_CASE(testPolicyUpdate) { //tests that the policys internal state is accurate on newly //joined nodes ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; QueueOptions options; options.setSizePolicy(REJECT, 0, 2); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); cluster.add(); Client c2(cluster[1], "c2"); c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); Message received; BOOST_CHECK(c1.subs.get(received, "q")); BOOST_CHECK_EQUAL(received.getData(), std::string("one")); BOOST_CHECK(c1.subs.get(received, "q")); BOOST_CHECK_EQUAL(received.getData(), std::string("two")); BOOST_CHECK(!c1.subs.get(received, "q")); } } QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { //tests that exclusive queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); cluster.add(); Client c2(cluster[1], "c2"); QueueQueryResult result = c2.session.queueQuery("q"); BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); BOOST_CHECK(result.getExclusive()); BOOST_CHECK(result.getAutoDelete()); BOOST_CHECK(!result.getDurable()); BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); c1.session.close(); c1.connection.close(); c2.session = c2.connection.newSession(); BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } } /** * Subscribes to specified queue and acquires up to the specified * number of message but does not accept or release them. These * message are therefore 'locked' by the clients session. */ Subscription lockMessages(Client& client, const std::string& queue, int count) { LocalQueue q; SubscriptionSettings settings(FlowControl::messageCredit(count)); settings.autoAck = 0; Subscription sub = client.subs.subscribe(q, queue, settings); client.session.messageFlush(sub.getName()); return sub; } /** * check that the specified queue contains the expected set of * messages (matched on content) for all nodes in the cluster */ void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector& messages) { for (size_t i = 0; i < cluster.size(); i++) { Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); client.close(); } } void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", const std::string& lvqKey="") { for (int i = 0; i < count; i++) { Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); client.session.messageTransfer(arg::content=message); } } QPID_AUTO_TEST_CASE(testRingQueueUpdate) { //tests that ring queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; QueueOptions options; options.setSizePolicy(RING, 0, 5); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); send(c1, "q", 5); lockMessages(c1, "q", 1); //add new node cluster.add(); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //send one more message send(c1, "q", 1, 6); //release locked message c1.close(); //check state of queue on both nodes checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); } } QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { //tests that ring queues are accurately replicated on newly joined //nodes; just like testRingQueueUpdate, but new node joins after //the sixth message has been sent. ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; QueueOptions options; options.setSizePolicy(RING, 0, 5); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); send(c1, "q", 5); lockMessages(c1, "q", 1); //send sixth message send(c1, "q", 1, 6); //add new node cluster.add(); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //release locked message c1.close(); //check state of queue on both nodes checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); } } QPID_AUTO_TEST_CASE(testLvqUpdate) { //tests that lvqs are accurately replicated on newly joined nodes ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; QueueOptions options; options.setOrdering(LVQ); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); send(c1, "q", 5, 1, "a", "a"); send(c1, "q", 2, 1, "b", "b"); send(c1, "q", 1, 1, "c", "c"); send(c1, "q", 1, 3, "b", "b"); //add new node cluster.add(); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //check state of queue on both nodes checkQueue(cluster, "q", list_of("a_5")("b_3")("c_1")); } } QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { //tests that lvqs are accurately replicated on newly joined nodes //if the lvq state has been affected by browsers ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; QueueOptions options; options.setOrdering(LVQ); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); send(c1, "q", 1, 1, "a", "a"); send(c1, "q", 2, 1, "b", "b"); send(c1, "q", 1, 1, "c", "c"); checkQueue(cluster, "q", list_of("a_1")("b_2")("c_1")); send(c1, "q", 4, 2, "a", "a"); send(c1, "q", 1, 3, "b", "b"); //add new node cluster.add(); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //check state of queue on both nodes checkQueue(cluster, "q", list_of("a_1")("b_2")("c_1")("a_5")("b_3")); } } QPID_AUTO_TEST_CASE(testRelease) { //tests that releasing a messages that was unacked when one node //joined works correctly ClusterFixture::Args args; args += "--log-enable", "critical"; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); { ScopedSuppressLogging allQuiet; c1.session.queueDeclare("q", arg::durable=durableFlag); for (int i = 0; i < 5; i++) { c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); } //receive but don't ack a message LocalQueue lq; SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); lqSettings.autoAck = 0; Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); c1.session.messageFlush("q"); Message received; BOOST_CHECK(lq.get(received)); BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); //add new node cluster.add(); lqSub.release(lqSub.getUnaccepted()); //check state of queue on both nodes vector expected = list_of("m_1")("m_2")("m_3")("m_4")("m_5"); Client c3(cluster[0], "c3"); BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); Client c2(cluster[1], "c2"); BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); } } // Browse for 1 message with byte credit, return true if a message was // received false if not. bool browseByteCredit(Client& c, const string& q, int n, Message& m) { SubscriptionSettings browseSettings( FlowControl(1, n, false), // 1 message, n bytes credit, no window ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED, 0 // No auto-ack. ); LocalQueue lq; Subscription s = c.subs.subscribe(lq, q, browseSettings); c.session.messageFlush(arg::destination=q, arg::sync=true); c.session.sync(); c.subs.getSubscription(q).cancel(); return lq.get(m, 0); // No timeout, flush should push message thru. } // Ensure cluster update preserves exact message size, use byte credt as test. QPID_AUTO_TEST_CASE(testExactByteCredit) { ClusterFixture cluster(1, prepareArgs(), -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); cluster.add(); int size=36; // Size of message on broker: headers+body Client c1(cluster[1], "c1"); Message m; // Ensure we get the message with exact credit. BOOST_CHECK(browseByteCredit(c0, "q", size, m)); BOOST_CHECK(browseByteCredit(c1, "q", size, m)); // and not with one byte less. BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); } // Test that consumer positions are updated correctly. // Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927 // QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q", arg::durable=durableFlag); SubscriptionSettings settings; settings.autoAck = 0; // Set the acquire mode to 'not-acquired' the consumer moves along the queue // but does not acquire (remove) messages. settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; Subscription s = c0.subs.subscribe(c0.lq, "q", settings); c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData()); // Add another member, send/receive another message and acquire // the messages. With the bug, this creates an inconsistency // because the browse position was not updated to the new member. cluster.add(); c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData()); s.acquire(s.getUnacquired()); s.accept(s.getUnaccepted()); // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1] // Subscribing on cluster[1] provokes an error that shuts down cluster[0] Client c1(cluster[1], "c1"); Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1 Message m; BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10)); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); FieldTable arguments; arguments.setInt("x-qpid-priorities", 10); arguments.setInt("x-qpid-fairshare", 5); c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); //send messages of different priorities for (int i = 0; i < 20; i++) { Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); c0.session.messageTransfer(arg::content=msg); } //pull off a couple of the messages (first four should be the top priority messages for (int i = 0; i < 4; i++) { BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); } // Add another member cluster.add(); Client c1(cluster[1], "c1"); //pull off some more messages BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); //check queue has same content on both nodes BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests