diff options
Diffstat (limited to 'M4-RCs/qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | M4-RCs/qpid/cpp/src/tests/cluster_test.cpp | 648 |
1 files changed, 0 insertions, 648 deletions
diff --git a/M4-RCs/qpid/cpp/src/tests/cluster_test.cpp b/M4-RCs/qpid/cpp/src/tests/cluster_test.cpp deleted file mode 100644 index f4a38ae861..0000000000 --- a/M4-RCs/qpid/cpp/src/tests/cluster_test.cpp +++ /dev/null @@ -1,648 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test_tools.h" -#include "unit_test.h" -#include "ForkedBroker.h" -#include "BrokerFixture.h" - -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/Session.h" -#include "qpid/client/FailoverListener.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/DumpClient.h" -#include "qpid/framing/AMQBody.h" -#include "qpid/framing/Uuid.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/enum.h" -#include "qpid/log/Logger.h" - -#include <boost/bind.hpp> -#include <boost/shared_ptr.hpp> - -#include <string> -#include <iostream> -#include <iterator> -#include <vector> -#include <set> -#include <algorithm> -#include <iterator> - -namespace std { // ostream operators in std:: namespace -template <class T> -ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } -} - - -QPID_AUTO_TEST_SUITE(cluster) - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; -using boost::shared_ptr; -using qpid::cluster::Cluster; - -/** Parse broker & cluster options */ -Broker::Options parseOpts(size_t argc, const char* argv[]) { - Broker::Options opts; - Plugin::addOptions(opts); // Pick up cluster options. - opts.parse(argc, argv, "", true); // Allow-unknown for --load-module - return opts; -} - -/** Cluster fixture is a vector of ports for the replicas. - * - * At most one replica (by default replica 0) is in the current - * process, all others are forked as children. - */ -class ClusterFixture : public vector<uint16_t> { - string name; - std::auto_ptr<BrokerFixture> localBroker; - int localIndex; - std::vector<shared_ptr<ForkedBroker> > forkedBrokers; - - public: - /** @param localIndex can be -1 meaning don't automatically start a local broker. - * A local broker can be started with addLocal(). - */ - ClusterFixture(size_t n, int localIndex=0); - void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } - void add(); // Add a broker. - void addLocal(); // Add a local broker. - void setup(); - - bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); } - - /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ - void kill(size_t n, int sig=SIGINT) { - if (n == size_t(localIndex)) - localBroker->broker->shutdown(); - else - forkedBrokers[n]->kill(sig); - } - - /** Kill a broker and suppress errors from connection. */ - void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT) { - ScopedSuppressLogging sl; - kill(n,sig); - try { c.close(); } catch(...) {} - } -}; - -ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) { - add(n); -} - -void ClusterFixture::add() { - if (size() != size_t(localIndex)) { // fork a broker process. - std::ostringstream os; os << "fork" << size(); - std::string prefix = os.str(); - const char* argv[] = { - "qpidd " __FILE__ , - "--no-module-dir", - "--load-module=../.libs/cluster.so", - "--cluster-name", name.c_str(), - "--auth=no", "--no-data-dir", - "--log-prefix", prefix.c_str(), - }; - size_t argc = sizeof(argv)/sizeof(argv[0]); - forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv))); - push_back(forkedBrokers.back()->getPort()); - } - else { // Run in this process - addLocal(); - } -} - -void ClusterFixture::addLocal() { - assert(int(size()) == localIndex || localIndex == -1); - localIndex = size(); - const char* argv[] = { - "qpidd " __FILE__ , - "--load-module=../.libs/cluster.so", - "--cluster-name", name.c_str(), - "--auth=no", "--no-data-dir" - }; - size_t argc = sizeof(argv)/sizeof(argv[0]); - ostringstream os; os << "local" << localIndex; - qpid::log::Logger::instance().setPrefix(os.str()); - localBroker.reset(new BrokerFixture(parseOpts(argc, argv))); - push_back(localBroker->getPort()); - forkedBrokers.push_back(shared_ptr<ForkedBroker>()); -} - -ostream& operator<<(ostream& o, const cpg_name* n) { - return o << qpid::cluster::Cpg::str(*n); -} - -ostream& operator<<(ostream& o, const cpg_address& a) { - return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")"; -} - -template <class T> -ostream& operator<<(ostream& o, const pair<T*, int>& array) { - o << "{ "; - ostream_iterator<cpg_address> i(o, " "); - copy(array.first, array.first+array.second, i); - o << "}"; - return o; -} - -template <class C> set<uint16_t> makeSet(const C& c) { - set<uint16_t> s; - std::copy(c.begin(), c.end(), std::inserter(s, s.begin())); - return s; -} - -template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) { - vector<Url> urls = source.getKnownBrokers(); - if (n >= 0 && unsigned(n) != urls.size()) { - BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); - // Retry up to 10 secs in .1 second intervals. - for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - ::usleep(1000*100); // 0.1 secs - urls = source.getKnownBrokers(); - } - } - BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls); - set<uint16_t> s; - for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) - s.insert((*i)[0].get<TcpAddress>()->port); - return s; -} - -class Sender { - public: - Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} - void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { - AMQFrame f(body); - f.setChannel(channel); - f.setFirstSegment(firstSeg); - f.setLastSegment(lastSeg); - f.setFirstFrame(firstFrame); - f.setLastFrame(lastFrame); - connection->handle(f); - } - - private: - boost::shared_ptr<ConnectionImpl> connection; - uint16_t channel; -}; - -int64_t getMsgSequence(const Message& m) { - return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); -} - -QPID_AUTO_TEST_CASE(testSequenceOptions) { - // Make sure the exchange qpid.msg_sequence property is properly replicated. - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - FieldTable args; - args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"?? - c0.session.queueDeclare(arg::queue="q"); - c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); - c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); - c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC))); - BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC))); - - cluster.add(); - Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); -} - -QPID_AUTO_TEST_CASE(testUnsupported) { - ScopedSuppressLogging sl; - ClusterFixture cluster(1); - Client c1(cluster[0], "c1"); - BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException); - Client c2(cluster[0], "c2"); - Message m; - m.getDeliveryProperties().setTtl(1); - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception); -} - -QPID_AUTO_TEST_CASE(testTxTransaction) { - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare(arg::queue="q"); - c0.session.messageTransfer(arg::content=Message("A", "q")); - c0.session.messageTransfer(arg::content=Message("B", "q")); - - // Start a transaction that will commit. - Session commitSession = c0.connection.newSession("commit"); - SubscriptionManager commitSubs(commitSession); - commitSession.txSelect(); - commitSession.messageTransfer(arg::content=Message("a", "q")); - commitSession.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A"); - - // Start a transaction that will roll back. - Session rollbackSession = c0.connection.newSession("rollback"); - SubscriptionManager rollbackSubs(rollbackSession); - rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=Message("1", "q")); - Message rollbackMessage = rollbackSubs.get("q", TIME_SEC); - BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); - - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); - // Add new member mid transaction. - cluster.add(); - Client c1(cluster[1], "c1"); - - // More transactional work - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=Message("2", "q")); - commitSession.messageTransfer(arg::content=Message("c", "q")); - rollbackSession.messageTransfer(arg::content=Message("3", "q")); - - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - - // Commit/roll back. - commitSession.txCommit(); - rollbackSession.txRollback(); - rollbackSession.messageRelease(rollbackMessage.getId()); - - - // Verify queue status: just the comitted messages and dequeues should remain. - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); -} - -QPID_AUTO_TEST_CASE(testUnacked) { - // Verify replication of unacknowledged messages. - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - - Message m; - - // Create unacked message: acquired but not accepted. - SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); - c0.session.queueDeclare("q1"); - c0.session.messageTransfer(arg::content=Message("11","q1")); - LocalQueue q1; - c0.subs.subscribe(q1, "q1", manualAccept); - BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted - BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue - - // Create unacked message: not acquired, accepted or completeed. - SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); - c0.session.queueDeclare("q2"); - c0.session.messageTransfer(arg::content=Message("21","q2")); - c0.session.messageTransfer(arg::content=Message("22","q2")); - LocalQueue q2; - c0.subs.subscribe(q2, "q2", manualAcquire); - m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue - BOOST_CHECK_EQUAL(m.getData(), "21"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed - c0.subs.getSubscription("q2").acquire(m); // Acquire manually - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed - BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. - - // Create empty credit record: acquire and accept but don't complete. - SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); - c0.session.queueDeclare("q3"); - c0.session.messageTransfer(arg::content=Message("31", "q3")); - c0.session.messageTransfer(arg::content=Message("32", "q3")); - LocalQueue q3; - c0.subs.subscribe(q3, "q3", manualComplete); - Message m31=q3.get(TIME_SEC); - BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. - BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); - - // Add new member while there are unacked messages. - cluster.add(); - Client c1(cluster[1], "c1"); - - // Check queue counts - BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u); - - // Complete the empty credit message, should unblock the message behind it. - BOOST_CHECK_THROW(q3.get(0), Exception); - c0.session.markCompleted(SequenceSet(m31.getId()), true); - BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); - - // Close the original session - unacked messages should be requeued. - c0.session.close(); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); - - BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); -} - -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { - // Verify that we dump transaction state correctly to new members. - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - - // Do work in a transaction. - c0.session.txSelect(); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("1","q")); - c0.session.messageTransfer(arg::content=Message("2","q")); - Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "1"); - - // New member, TX not comitted, c1 should see nothing. - cluster.add(); - Client c1(cluster[1], "c1"); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); - - // After commit c1 shoudl see results of tx. - c0.session.txCommit(); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "2"); - - // Another transaction with both members active. - c0.session.messageTransfer(arg::content=Message("3","q")); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); - c0.session.txCommit(); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "3"); -} - -QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { - // Verify that we dump a partially recieved message to a new member. - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); - - // Send first 2 frames of message. - MessageTransferBody transfer( - ProtocolVersion(), std::string(), // default exchange. - framing::message::ACCEPT_MODE_NONE, - framing::message::ACQUIRE_MODE_PRE_ACQUIRED); - sender.send(transfer, true, false, true, true); - AMQHeaderBody header; - header.get<DeliveryProperties>(true)->setRoutingKey("q"); - sender.send(header, false, false, true, true); - - // No reliable way to ensure the partial message has arrived - // before we start the new broker, so we sleep. - ::usleep(2500); - cluster.add(); - - // Send final 2 frames of message. - sender.send(AMQContentBody("ab"), false, true, true, false); - sender.send(AMQContentBody("cd"), false, true, false, true); - - // Verify message is enqued correctly on second member. - Message m; - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "abcd"); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); -} - -QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - set<uint16_t> kb0 = knownBrokerPorts(c0.connection); - BOOST_CHECK_EQUAL(kb0.size(), 1u); - BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); - - cluster.add(); - Client c1(cluster[1], "c1"); - set<uint16_t> kb1 = knownBrokerPorts(c1.connection); - kb0 = knownBrokerPorts(c0.connection, 2); - BOOST_CHECK_EQUAL(kb1.size(), 2u); - BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); - BOOST_CHECK_EQUAL(kb1,kb0); - - cluster.add(); - Client c2(cluster[2], "c2"); - set<uint16_t> kb2 = knownBrokerPorts(c2.connection); - kb1 = knownBrokerPorts(c1.connection, 3); - kb0 = knownBrokerPorts(c0.connection, 3); - BOOST_CHECK_EQUAL(kb2.size(), 3u); - BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); - BOOST_CHECK_EQUAL(kb2,kb0); - BOOST_CHECK_EQUAL(kb2,kb1); - - cluster.killWithSilencer(1,c1.connection,9); - kb0 = knownBrokerPorts(c0.connection, 2); - kb2 = knownBrokerPorts(c2.connection, 2); - BOOST_CHECK_EQUAL(kb0.size(), 2u); - BOOST_CHECK_EQUAL(kb0, kb2); -} - -QPID_AUTO_TEST_CASE(DumpConsumers) { - ClusterFixture cluster(1, 1); - - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("p"); - c0.session.queueDeclare("q"); - c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); - LocalQueue lp; - c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); - c0.session.sync(); - - // Start new members - cluster.add(); // Local - Client c1(cluster[1], "c1"); - cluster.add(); - Client c2(cluster[2], "c2"); - - // Transfer messages - c0.session.messageTransfer(arg::content=Message("aaa", "q")); - - c0.session.messageTransfer(arg::content=Message("bbb", "p")); - c0.session.messageTransfer(arg::content=Message("ccc", "p")); - - // Activate the subscription, ensure message removed on all queues. - c0.subs.setFlowControl("q", FlowControl::unlimited()); - Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); - - // Check second subscription's flow control: gets first message, not second. - BOOST_CHECK(lp.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - - BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "ccc"); - - // Kill the subscribing member, ensure further messages are not removed. - cluster.killWithSilencer(0,c0.connection,9); - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); - for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=Message("xxx", "q")); - BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC)); - BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); - } -} - -QPID_AUTO_TEST_CASE(testCatchupSharedState) { - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - - // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - c0.session.messageTransfer(arg::content=Message("bar","q")); - while (c0.session.queueQuery("q").getMessageCount() != 2) - ::usleep(1000); // Wait for message to show up on broker 0. - - // Add a new broker, it should catch up. - cluster.add(); - - // Do some work post-add - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("pfoo","p")); - - // Do some work post-join - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=Message("pbar","p")); - - // Verify new brokers have state. - Message m; - - Client c1(cluster[1], "c1"); - - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - - // Add another broker, don't wait for join - should be stalled till ready. - cluster.add(); - Client c2(cluster[2], "c2"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "pbar"); - BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); -} - -QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); - Client c0(cluster[0]); - BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); - c0.session.queueDeclare("q"); - c0.session.exchangeDeclare("ex", arg::type="direct"); - c0.session.close(); - c0.connection.close(); - // Verify all brokers get wiring update. - for (size_t i = 0; i < cluster.size(); ++i) { - BOOST_MESSAGE("i == "<< i); - Client c(cluster[i]); - BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); - BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); - } -} - -QPID_AUTO_TEST_CASE(testMessageEnqueue) { - // Enqueue on one broker, dequeue on another. - ClusterFixture cluster(2); - Client c0(cluster[0]); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); - c0.session.close(); - Client c1(cluster[1]); - Message msg; - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(string("foo"), msg.getData()); - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(string("bar"), msg.getData()); -} - -QPID_AUTO_TEST_CASE(testMessageDequeue) { - // Enqueue on one broker, dequeue on two others. - ClusterFixture cluster(3); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo", "q")); - c0.session.messageTransfer(arg::content=Message("bar", "q")); - - Message msg; - - // Dequeue on 2 others, ensure correct order. - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(msg, "q")); - BOOST_CHECK_EQUAL("foo", msg.getData()); - - Client c2(cluster[2], "c2"); - BOOST_CHECK(c1.subs.get(msg, "q")); - BOOST_CHECK_EQUAL("bar", msg.getData()); - - // Queue should be empty on all cluster members. - BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); -} - -QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { - ClusterFixture cluster(3); - Client c0(cluster[0]); - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. - - // First start a subscription. - c0.session.queueDeclare("q"); - c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); - - // Now send messages - Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=Message("foo", "q")); - c1.session.messageTransfer(arg::content=Message("bar", "q")); - - // Check they arrived - Message m; - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); - BOOST_CHECK_EQUAL("foo", m.getData()); - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); - BOOST_CHECK_EQUAL("bar", m.getData()); - - // Queue should be empty on all cluster members. - Client c2(cluster[2]); - BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); -} - -QPID_AUTO_TEST_SUITE_END() |