diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/cluster.mk | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableCondition.cpp (renamed from cpp/src/qpid/cluster/PollableCondition.cpp) | 12 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableCondition.h (renamed from cpp/src/qpid/cluster/PollableCondition.h) | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h (renamed from cpp/src/qpid/cluster/PollableQueue.h) | 12 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 73 |
16 files changed, 151 insertions, 103 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7c02516575..98dec2b12d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -264,6 +264,9 @@ libqpidcommon_la_SOURCES = \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ + qpid/sys/PollableCondition.h \ + qpid/sys/PollableCondition.cpp \ + qpid/sys/PollableQueue.h \ qpid/sys/Runnable.cpp \ qpid/sys/SystemInfo.cpp \ qpid/sys/Shlib.cpp \ diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index bb9546f387..f02e5e1644 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -18,9 +18,6 @@ cluster_la_SOURCES = \ qpid/cluster/Connection.h \ qpid/cluster/Connection.cpp \ qpid/cluster/NoOpConnectionOutputHandler.h \ - qpid/cluster/PollableCondition.h \ - qpid/cluster/PollableCondition.cpp \ - qpid/cluster/PollableQueue.h \ qpid/cluster/WriteEstimate.h \ qpid/cluster/WriteEstimate.cpp \ qpid/cluster/OutputInterceptor.h \ diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 913188845f..027f8a212d 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -70,8 +70,6 @@ SessionState::SessionState( } SessionState::~SessionState() { - // Remove ID from active session list. - broker.getSessionManager().forget(getId()); if (mgmtObject != 0) mgmtObject->resourceDestroy (); } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b736d116e1..7b1cacb640 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -649,7 +649,7 @@ void SessionImpl::checkOpen() const //call with lock held. { check(); if (state != ATTACHED) { - throw NotAttachedException("Session isn't attached"); + throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached")); } } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce156e85e4..07ed4596e0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(&b), + broker(b), poller(b.getPoller()), cpg(*this), name(name_), @@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ), deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { - broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); + QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + QPID_LOG(debug, "~Cluster()"); +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +// FIXME aconway 2008-09-10: leave is currently not called, +// It should be called if we are shut down by a cluster admin command. +// Any other type of exit is caught in disconnect(). +// void Cluster::leave() { - Mutex::ScopedLock l(lock); - if (!broker) return; // Already left. - // Leave is called by from Broker destructor after the poller has - // been shut down. No dispatches can occur. - - QPID_LOG(notice, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. - while(broker) { - Mutex::ScopedUnlock u(lock); - cpg.dispatchAll(); - } - cpg.shutdown(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -177,6 +172,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); + QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { @@ -238,7 +234,7 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " << AddrList(joined, nJoined) << AddrList(left, nLeft)); if (nJoined) // Notfiy new members of my URL. @@ -246,13 +242,14 @@ void Cluster::configChange( AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); - + if (find(left, left+nLeft, self) != left+nLeft) { + // We have left the group, this is the final config change. + QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); + broker.shutdown(); + } Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - - if (find(left, left+nLeft, self) != left+nLeft) - broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) { h.rewatch(); } -void Cluster::disconnect(sys::DispatchHandle& h) { - h.stopWatch(); - QPID_LOG(critical, "Disconnected from cluster, shutting down"); - broker->shutdown(); +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + broker.shutdown(); } void Cluster::joining(const MemberId& m, const string& url) { - QPID_LOG(notice, "Cluster member " << m << " has URL " << url); + QPID_LOG(info, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } void Cluster::ready(const MemberId& ) { // FIXME aconway 2008-09-08: TODO } - -}} // namespace qpid::cluster +// Called from Broker::~Broker when broker is shut down. At this +// point we know the poller has stopped so no poller callbacks will be +// invoked. We must ensure that CPG has also shut down so no CPG +// callbacks will be invoked. +// +void Cluster::shutdown() { + QPID_LOG(notice, "Cluster member " << self << " shutting down."); + try { cpg.shutdown(); } + catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } + delete this; +} +broker::Broker& Cluster::getBroker(){ return broker; } +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a25b62ea12..3a254684ad 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Event.h" -#include "qpid/cluster/PollableQueue.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" #include "qpid/broker/Broker.h" @@ -43,7 +43,7 @@ class Connection; * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : public RefCounted, private Cpg::Handler +class Cluster : private Cpg::Handler { public: @@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler void joining(const MemberId&, const std::string& url); void ready(const MemberId&); - broker::Broker& getBroker() { assert(broker); return *broker; } - MemberId getSelf() const { return self; } + void shutdown(); + + broker::Broker& getBroker(); + private: typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; - - /** Message sent over the cluster. */ - typedef std::pair<framing::AMQFrame, ConnectionId> Message; - typedef PollableQueue<Event> EventQueue; + typedef sys::PollableQueue<Event> EventQueue; boost::function<void()> shutdownNext; @@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); mutable sys::Monitor lock; // Protect access to members. - broker::Broker* broker; + broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; @@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue<Event> deliverQueue; + EventQueue deliverQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 31447f2fd0..f4128634a6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin { ClusterValues values; ClusterOptions options; - boost::intrusive_ptr<Cluster> cluster; + Cluster* cluster; boost::scoped_ptr<ConnectionCodec::Factory> factory; - ClusterPlugin() : options(values) {} + ClusterPlugin() : options(values), cluster(0) {} Options* getOptions() { return &options; } @@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin { if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); - broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); } void earlyInitialize(Plugin::Target&) {} - - void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. // For test purposes. -boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } +Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 506e982ffd..68d1b16dfa 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) { deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. } + +void Connection::sessionState(const SequenceNumber& /*replayStart*/, + const SequenceSet& /*sentIncomplete*/, + const SequenceNumber& /*expected*/, + const SequenceNumber& /*received*/, + const SequenceSet& /*unknownCompleted*/, + const SequenceSet& /*receivedIncomplete*/) +{ + // FIXME aconway 2008-09-10: TODO +} + +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) +{ + // FIXME aconway 2008-09-10: TODO +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index b3e151ce51..a30350585f 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,9 +40,7 @@ namespace framing { class AMQFrame; } namespace cluster { -/** - * Plug-in associated with broker::Connections, both local and shadow. - */ +/** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : public RefCounted, public sys::ConnectionInputHandler, @@ -90,16 +88,13 @@ class Connection : sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); // State dump methods. - virtual void sessionState(const framing::SequenceNumber& /*replayId*/, - const framing::SequenceNumber& /*sendId*/, - const framing::SequenceSet& /*sentIncomplete*/, - const framing::SequenceNumber& /*expectedId*/, - const framing::SequenceNumber& /*receivedId*/, - const framing::SequenceSet& /*unknownCompleted*/, - const framing::SequenceSet& /*receivedIncomplete*/) {} + virtual void sessionState(const SequenceNumber& replayStart, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - virtual void shadowReady(uint64_t /*clusterId*/, - const std::string& /*userId*/) {} + virtual void shadowReady(uint64_t memberId, uint64_t connectionId); private: void sendDoOutput(); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index f093a0cc1c..6179eab724 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -30,16 +30,16 @@ namespace cluster { sys::ConnectionCodec* ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == framing::ProtocolVersion(0, 10)) + if (v == framing::ProtocolVersion(0, 10)) return new ConnectionCodec(out, id, cluster); return 0; } +// FIXME aconway 2008-08-27: outbound connections need to be made +// with proper qpid::client code for failover, get rid of this +// broker-side hack. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - // FIXME aconway 2008-08-27: outbound connections need to be made - // with proper qpid::client code for failover, get rid of this - // broker-side hack. return next->create(out, id); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 59ce20d821..22d752d174 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec { struct Factory : public sys::ConnectionCodec::Factory { boost::shared_ptr<sys::ConnectionCodec::Factory> next; Cluster& cluster; - Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {} + Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) + : next(f), cluster(c) {} sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/sys/PollableCondition.cpp index eecf95ff8d..5a3bd583cf 100644 --- a/cpp/src/qpid/cluster/PollableCondition.cpp +++ b/cpp/src/qpid/sys/PollableCondition.cpp @@ -27,14 +27,14 @@ // #include "qpid/sys/posix/PrivatePosix.h" -#include "qpid/cluster/PollableCondition.h" +#include "qpid/sys/PollableCondition.h" #include "qpid/Exception.h" #include <unistd.h> #include <fcntl.h> namespace qpid { -namespace cluster { +namespace sys { PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { int fds[2]; @@ -67,13 +67,13 @@ void PollableCondition::set() { #if 0 // FIXME aconway 2008-08-12: More efficient Linux implementation using -// eventfd system call. Do a configure.ac test to enable this when -// eventfd is available. +// eventfd system call. Move to separate file & do configure.ac test +// to enable this when ::eventfd() is available. #include <sys/eventfd.h> namespace qpid { -namespace cluster { +namespace sys { PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { impl->fd = ::eventfd(0, 0); @@ -95,6 +95,6 @@ void PollableCondition::set() { #endif -}} // namespace qpid::cluster +}} // namespace qpid::sys #endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h index 6bfca6cabe..6f0e12a474 100644 --- a/cpp/src/qpid/cluster/PollableCondition.h +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -29,7 +29,7 @@ // namespace qpid { -namespace cluster { +namespace sys { /** * A pollable condition to integrate in-process conditions with IO @@ -55,6 +55,6 @@ class PollableCondition : public sys::IOHandle { private: int writeFd; }; -}} // namespace qpid::cluster +}} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 1c7720f5c6..2e5d3a0d3d 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_POLLABLEQUEUE_H -#define QPID_CLUSTER_POLLABLEQUEUE_H +#ifndef QPID_SYS_POLLABLEQUEUE_H +#define QPID_SYS_POLLABLEQUEUE_H /* * @@ -22,7 +22,7 @@ * */ -#include "qpid/cluster/PollableCondition.h" +#include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Mutex.h" #include <boost/function.hpp> @@ -34,7 +34,7 @@ namespace qpid { namespace sys { class Poller; } -namespace cluster { +namespace sys { // FIXME aconway 2008-08-11: this could be of more general interest, // move to common lib. @@ -108,6 +108,6 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.clear(); } -}} // namespace qpid::cluster +}} // namespace qpid::sys -#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ +#endif /*!QPID_SYS_POLLABLEQUEUE_H*/ diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 09cca066ef..4e10f82809 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -92,7 +92,8 @@ struct ClientT { SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {} + ClientT(uint16_t port, const std::string& name=std::string()) + : connection(port), session(connection.newSession(name)), subs(session) {} ~ClientT() { connection.close(); } }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d082d74367..871aa0c657 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -21,13 +21,14 @@ #include "ForkedBroker.h" #include "BrokerFixture.h" -#include "qpid/cluster/Cpg.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" #include "qpid/framing/AMQBody.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Logger.h" #include <boost/bind.hpp> @@ -41,7 +42,7 @@ namespace qpid { namespace cluster { -boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster @@ -81,11 +82,11 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { add(n); // Wait for all n members to join the cluster int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster()->size() != n) { + while (retry && getGlobalCluster().size() != n) { ::sleep(1); --retry; } - BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); } void ClusterFixture::add() { @@ -135,7 +136,37 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -QPID_AUTO_TEST_CASE(testDumpClient) { +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} +#endif + +QPID_AUTO_TEST_CASE(testDumpClientSharedState) { BrokerFixture donor, receiver; { Client c(donor.getPort()); @@ -146,13 +177,13 @@ QPID_AUTO_TEST_CASE(testDumpClient) { c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); - c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo")); + c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo")); c.session.exchangeDeclare("ext", arg::type="topic"); c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar")); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar")); c.session.close(); c.connection.close(); @@ -202,11 +233,11 @@ QPID_AUTO_TEST_CASE(testDumpClient) { BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); // Verify bindings - r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo")); + r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo")); BOOST_CHECK(r.subs.get(m, "qa")); BOOST_CHECK_EQUAL(m.getData(), "xxx"); - r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar")); + r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar")); BOOST_CHECK(r.subs.get(m, "qb")); BOOST_CHECK_EQUAL(m.getData(), "yyy"); @@ -254,8 +285,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { ClusterFixture cluster(2); Client c0(cluster[0]); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -268,19 +299,19 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0]); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); Message msg; // Dequeue on 2 others, ensure correct order. - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2]); + Client c2(cluster[2], "c2"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); @@ -298,8 +329,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c1.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q")); + c1.session.messageTransfer(arg::content=Message("bar", "q")); // Check they arrived Message m; |