diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterHandler.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 32 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 167 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 4 |
18 files changed, 270 insertions, 147 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index dd62c8e6e8..ac4ec81cb9 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/framing/enum.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -195,14 +196,14 @@ bool Connection::doOutput() { ioCallback = 0; if (mgmtClosing) - close(403, "Closed by Management Request", 0, 0); + close(execution::ERROR_CODE_UNAUTHORIZED_ACCESS, "Closed by Management Request", 0, 0); else //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.getMessage(), 0, 0); }catch(std::exception& e){ - close(541/*internal error*/, e.what(), 0, 0); + close(execution::ERROR_CODE_INTERNAL_ERROR, e.what(), 0, 0); } return false; } diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e8ff2d8660..cf5b09b255 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -166,6 +166,9 @@ class Connection void resume(Session& session); bool isOpen() const; + + + friend class ConnectionAccess; ///<@internal }; }} // namespace qpid::client diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 53f0ccc08c..9549527416 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -89,7 +89,7 @@ Cluster::~Cluster() {} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); - connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c)); + handler->insert(c); } void Cluster::erase(ConnectionId id) { @@ -186,8 +186,10 @@ void Cluster::connectionEvent(const Event& e) { e.getConnection()->deliverBuffer(buf); else { // control AMQFrame frame; - while (frame.decode(buf)) + while (frame.decode(buf)) { + QPID_LOG(trace, "DLVR [" << self << "]: " << frame); e.getConnection()->received(frame); + } } } @@ -274,6 +276,7 @@ broker::Broker& Cluster::getBroker(){ return broker; } void Cluster::stall() { Mutex::ScopedLock l(lock); + QPID_LOG(debug, self << " stalling."); // Stop processing connection events. We still process config changes // and cluster controls in deliver() connectionEventQueue.stop(); @@ -357,6 +360,4 @@ void Cluster::updateMemberStats(void) } - - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7c4e121a9b..aa077ef63c 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -91,6 +91,8 @@ class Cluster : private Cpg::Handler, public management::Manageable void shutdown(); broker::Broker& getBroker(); + + void setDumpComplete(); private: typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h index 5da5cf5b75..95106de016 100644 --- a/cpp/src/qpid/cluster/ClusterHandler.h +++ b/cpp/src/qpid/cluster/ClusterHandler.h @@ -24,6 +24,7 @@ #include "Cpg.h" #include "types.h" +#include <boost/intrusive_ptr.hpp> namespace qpid { @@ -31,6 +32,7 @@ namespace framing { class AMQFrame; } namespace cluster { +class Connection; class Cluster; class Event; @@ -44,6 +46,8 @@ class ClusterHandler ClusterHandler(Cluster& c); virtual ~ClusterHandler(); + bool invoke(const MemberId&, framing::AMQFrame& f); + virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0; virtual void dumpRequest(const MemberId&, const std::string& url) = 0; virtual void ready(const MemberId&, const std::string& url) = 0; @@ -54,7 +58,7 @@ class ClusterHandler cpg_address *left, int nLeft, cpg_address *joined, int nJoined) = 0; - bool invoke(const MemberId&, framing::AMQFrame& f); + virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0; protected: Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 51da5bef25..b225ba3568 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -34,29 +34,31 @@ using namespace framing; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp() {} Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId) + const std::string& wrappedId, MemberId myId, bool isCatchUp) : cluster(c), self(myId, this), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), + catchUp(isCatchUp), exCatchUp() {} Connection::~Connection() {} -bool Connection::doOutput() { return output.doOutput(); } +bool Connection::doOutput() { + return output.doOutput(); +} // Delivery of doOutput allows us to run the real connection doOutput() // which stocks up the write buffers with data. // void Connection::deliverDoOutput(uint32_t requested) { + assert(!catchUp); output.deliverDoOutput(requested); } -// Handle frames delivered from cluster. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "DLVR [" << self << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -64,16 +66,28 @@ void Connection::received(framing::AMQFrame& f) { void Connection::closed() { try { - // Called when the local network connection is closed. We still - // need to process any outstanding cluster frames for this - // connection to ensure our sessions are up-to-date. We defer - // closing the Connection object till deliverClosed(), but replace - // its output handler with a null handler since the network output - // handler will be deleted. - // - connection.setOutputHandler(&discardHandler); - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); - ++mcastSeq; + // Local network connection has closed. We need to keep the + // connection around but replace the output handler with a + // no-op handler as the network output handler will be + // deleted. + + // FIXME aconway 2008-09-18: output handler reset in right place? + // connection.setOutputHandler(&discardHandler); + output.setOutputHandler(discardHandler); + if (catchUp) { + // This was a catch-up connection, may be promoted to a + // shadow connection. + catchUp = false; + exCatchUp = true; + cluster.insert(boost::intrusive_ptr<Connection>(this)); + } + else { + // This was a local replicated connection. Multicast a deliver closed + // and process any outstanding frames from the cluster until + // self-delivery of deliver-closed. + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); + ++mcastSeq; + } } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); @@ -81,17 +95,20 @@ void Connection::closed() { } void Connection::deliverClose () { + assert(!catchUp); connection.closed(); cluster.erase(self); } size_t Connection::decode(const char* buffer, size_t size) { + assert(!catchUp); ++mcastSeq; cluster.mcastBuffer(buffer, size, self); return size; } void Connection::deliverBuffer(Buffer& buf) { + assert(!catchUp); ++deliverSeq; while (decoder.decode(buf)) received(decoder.frame); @@ -108,10 +125,15 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/, // FIXME aconway 2008-09-10: TODO } -void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) -{ +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) { // FIXME aconway 2008-09-10: TODO } +void Connection::dumpComplete() { + // FIXME aconway 2008-09-18: use or remove. +} + +bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index d17dc704ed..c664427ea1 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -51,21 +51,21 @@ class Connection : { public: /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId); + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp); /** Shadow connection */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); ~Connection(); ConnectionId getId() const { return self; } broker::Connection& getBrokerConnection() { return connection; } - bool isLocal() const { return self.second == this; } + bool isLocal() const; - Cluster& getCluster() { return cluster; } + /** True if the connection is in "catch-up" mode: building initial state */ + bool isCatchUp() const { return catchUp; } + bool isExCatchUp() const { return exCatchUp; } - // self-delivery of multicast data. - void deliverClose(); - void deliverDoOutput(uint32_t requested); - void deliverBuffer(framing::Buffer&); + + Cluster& getCluster() { return cluster; } // ConnectionOutputHandler methods void close() {} @@ -84,19 +84,27 @@ class Connection : // ConnectionCodec methods size_t decode(const char* buffer, size_t size); - // ConnectionInputHandlerFactory - sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); + // Called by cluster to deliver a buffer from CPG. + void deliverBuffer(framing::Buffer&); + + // ==== Used in catch-up mode to build initial state. + // // State dump methods. - virtual void sessionState(const SequenceNumber& replayStart, + void sessionState(const SequenceNumber& replayStart, const SequenceSet& sentIncomplete, const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - virtual void shadowReady(uint64_t memberId, uint64_t connectionId); + void shadowReady(uint64_t memberId, uint64_t connectionId); + + void dumpComplete(); private: + + void deliverClose(); + void deliverDoOutput(uint32_t requested); void sendDoOutput(); Cluster& cluster; @@ -108,6 +116,8 @@ class Connection : broker::Connection connection; framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; + bool catchUp; + bool exCatchUp; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 3dfd8ecc38..d95a321adf 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -32,7 +32,9 @@ namespace cluster { sys::ConnectionCodec* ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { if (v == framing::ProtocolVersion(0, 10)) - return new ConnectionCodec(out, id, cluster); + return new ConnectionCodec(out, id, cluster, false); + else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10)) + return new ConnectionCodec(out, id, cluster, true); // Catch-up connection return 0; } @@ -42,9 +44,9 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) return next->create(out, id); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster) +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp) : codec(out, id, false), - interceptor(new Connection(cluster, codec, id, cluster.getSelf())) + interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); @@ -55,7 +57,10 @@ ConnectionCodec::~ConnectionCodec() {} // ConnectionCodec functions delegate to the codecOutput size_t ConnectionCodec::decode(const char* buffer, size_t size) { - return interceptor->decode(buffer, size); + if (interceptor->isCatchUp()) + return codec.decode(buffer, size); + else + return interceptor->decode(buffer, size); } size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 22d752d174..a82569decd 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec { sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c); + ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp); ~ConnectionCodec(); // ConnectionCodec functions. diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index f76a55c0d3..43c30d3b07 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -27,12 +27,21 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/ClusterConnectionDumpCompleteBody.h" #include "qpid/framing/enum.h" +#include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> namespace qpid { + +namespace client { +struct ConnectionAccess { + static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } +}; +} // namespace client + namespace cluster { using broker::Broker; @@ -40,16 +49,18 @@ using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; +using namespace framing; using namespace framing::message; - using namespace client; + DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) : donor(b), done(ok), failed(fail) { - // FIXME aconway 2008-09-16: Identify as DumpClient connection. + // Special version identifies this as a catch-up connectionn. + client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10)); connection.open(url); session = connection.newSession(); } @@ -65,9 +76,10 @@ void DumpClient::dump() { // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); + SessionBase_0_10Access sb(session); + // FIXME aconway 2008-09-18: inidicate successful end-of-dump. session.sync(); session.close(); - // FIXME aconway 2008-09-17: send dump complete indication. connection.close(); } diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 3358e3404b..c188fe438e 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -30,7 +30,7 @@ namespace cluster { using namespace sys; using namespace framing; -JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {} +JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {} void JoiningHandler::configChange( cpg_address *current, int nCurrent, @@ -74,21 +74,17 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { else { // Start a new dump cluster.map.dumper = cluster.map.first(); if (dumpee == cluster.self) { // My turn - - state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump - - QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper); switch (state) { case START: case STALLED: assert(0); break; case DUMP_REQUESTED: + QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper); state = STALLED; cluster.stall(); break; - // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state. case DUMP_COMPLETE: cluster.ready(); break; @@ -102,5 +98,34 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) { checkDumpRequest(); } +void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { + if (c->isCatchUp()) { + ++catchUpConnections; + QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections."); + } + else if (c->isExCatchUp()) { + if (c->getId().getConnectionPtr() != c.get()) // become shadow connection + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining"); + if (--catchUpConnections == 0) + dumpComplete(); + } + else // Local connection, will be stalled till dump complete. + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} + +void JoiningHandler::dumpComplete() { + // FIXME aconway 2008-09-18: need to detect incomplete dump. + // + if (state == STALLED) { + QPID_LOG(debug, "Dump complete, unstalling."); + cluster.ready(); + } + else { + QPID_LOG(debug, "Dump complete, waiting for stall point."); + assert(state == DUMP_REQUESTED); + state = DUMP_COMPLETE; + } +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h index 07a48b8281..c2cdb2c504 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.h +++ b/cpp/src/qpid/cluster/JoiningHandler.h @@ -46,9 +46,14 @@ class JoiningHandler : public ClusterHandler void dumpRequest(const MemberId&, const std::string& url); void ready(const MemberId&, const std::string& url); + void insert(const boost::intrusive_ptr<Connection>& c); + private: - enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state; void checkDumpRequest(); + void dumpComplete(); + + enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state; + size_t catchUpConnections; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp index e82eaec458..1997ced9b0 100644 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/cpp/src/qpid/cluster/MemberHandler.cpp @@ -23,6 +23,7 @@ #include "DumpClient.h" #include "qpid/log/Statement.h" #include "qpid/framing/ClusterUpdateBody.h" +#include "qpid/framing/enum.h" namespace qpid { namespace cluster { @@ -32,6 +33,10 @@ using namespace framing; MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {} +MemberHandler::~MemberHandler() { + if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. +} + void MemberHandler::configChange( cpg_address */*current*/, int /*nCurrent*/, cpg_address */*left*/, int /*nLeft*/, @@ -58,11 +63,10 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled. cluster.stall(); - cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump - (void)urlStr; -// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, -// boost::bind(&MemberHandler::dumpDone, this), -// boost::bind(&MemberHandler::dumpError, this, _1))); + if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. + dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, + boost::bind(&MemberHandler::dumpSent, this), + boost::bind(&MemberHandler::dumpError, this, _1))); } void MemberHandler::ready(const MemberId& id, const std::string& url) { @@ -70,14 +74,22 @@ void MemberHandler::ready(const MemberId& id, const std::string& url) { } -void MemberHandler::dumpDone() { - dumpThread.join(); // Clean up. +void MemberHandler::dumpSent() { + QPID_LOG(debug, "Finished sending state dump."); + Mutex::ScopedLock l(cluster.lock); cluster.ready(); } void MemberHandler::dumpError(const std::exception& e) { - QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what()); - dumpDone(); + QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what()); + dumpSent(); +} + +void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) { + if (c->isCatchUp()) // Not allowed in member mode + c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode."); + else + cluster.connections[c->getId()] = c; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h index 630500a740..6657ea4f53 100644 --- a/cpp/src/qpid/cluster/MemberHandler.h +++ b/cpp/src/qpid/cluster/MemberHandler.h @@ -35,6 +35,7 @@ class MemberHandler : public ClusterHandler { public: MemberHandler(Cluster& c); + ~MemberHandler(); void configChange( struct cpg_address */*members*/, int /*nMembers*/, @@ -48,9 +49,11 @@ class MemberHandler : public ClusterHandler void dumpRequest(const MemberId&, const std::string& url); void ready(const MemberId&, const std::string& url); - void dumpDone(); + void dumpSent(); void dumpError(const std::exception&); + void insert(const boost::intrusive_ptr<Connection>& c); + public: sys::Thread dumpThread; }; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 4ff0a88b11..8718154d3e 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -33,12 +33,12 @@ namespace cluster { using namespace framing; OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), next(h), sent(), moreOutput(), doingOutput() + : parent(p), next(&h), sent(), moreOutput(), doingOutput() {} void OutputInterceptor::send(framing::AMQFrame& f) { Locker l(lock); - next.send(f); + next->send(f); sent += f.size(); } @@ -60,7 +60,7 @@ bool OutputInterceptor::doOutput() { // void OutputInterceptor::deliverDoOutput(size_t requested) { Locker l(lock); - size_t buf = next.getBuffered(); + size_t buf = next->getBuffered(); if (parent.isLocal()) writeEstimate.delivered(sent, buf); // Update the estimate. @@ -101,4 +101,9 @@ void OutputInterceptor::sendDoOutput() { QPID_LOG(trace, &parent << "Send doOutput request for " << request); } +void OutputInterceptor::setOutputHandler(sys::ConnectionOutputHandler& h) { + Locker l(lock); + next = &h; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index 548ec32b5b..ad9d9952bf 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -43,14 +43,16 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { // sys::ConnectionOutputHandler functions void send(framing::AMQFrame& f); void activateOutput(); - void close() { Locker l(lock); next.close(); } - size_t getBuffered() const { Locker l(lock); return next.getBuffered(); } + void close() { Locker l(lock); next->close(); } + size_t getBuffered() const { Locker l(lock); return next->getBuffered(); } // Delivery point for doOutput requests. void deliverDoOutput(size_t requested); // Intercept doOutput requests on Connection. bool doOutput(); + void setOutputHandler(sys::ConnectionOutputHandler& h); + cluster::Connection& parent; private: @@ -60,7 +62,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { void sendDoOutput(); mutable sys::Mutex lock; - sys::ConnectionOutputHandler& next; + sys::ConnectionOutputHandler* next; size_t sent; WriteEstimate writeEstimate; bool moreOutput; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 8dec23a09b..1b44902054 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -74,10 +74,12 @@ struct ClusterFixture : public vector<uint16_t> { string name; std::auto_ptr<BrokerFixture> broker0; boost::ptr_vector<ForkedBroker> forkedBrokers; + bool init0; - ClusterFixture(size_t n); + ClusterFixture(size_t n, bool init0=true); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); + void add0(bool force); void setup(); void kill(size_t n) { if (n) forkedBrokers[n-1].kill(); @@ -85,8 +87,9 @@ struct ClusterFixture : public vector<uint16_t> { } }; -ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { +ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); + if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. // Wait for all n members to join the cluster int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. while (retry && getGlobalCluster().size() != n) { @@ -101,24 +104,42 @@ void ClusterFixture::add() { os << "fork" << size(); std::string prefix = os.str(); + if (size()) { // Not the first broker, fork. + + const char* argv[] = { + "qpidd " __FILE__ , + "--load-module=../.libs/cluster.so", + "--cluster-name", name.c_str(), + "--auth=no", "--no-data-dir", + "--log-prefix", prefix.c_str(), + }; + size_t argc = sizeof(argv)/sizeof(argv[0]); + + + forkedBrokers.push_back(new ForkedBroker(argc, argv)); + push_back(forkedBrokers.back().getPort()); + } + else { + add0(init0); // First broker, run in this process. + } +} + +void ClusterFixture::add0(bool init) { + if (!init) { + push_back(0); + return; + } const char* argv[] = { "qpidd " __FILE__ , "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), - "--auth=no", "--no-data-dir", - "--log-prefix", prefix.c_str(), + "--auth=no", "--no-data-dir" }; size_t argc = sizeof(argv)/sizeof(argv[0]); - if (size()) { // Not the first broker, fork. - forkedBrokers.push_back(new ForkedBroker(argc, argv)); - push_back(forkedBrokers.back().getPort()); - } - else { // First broker, run in this process. - qpid::log::Logger::instance().setPrefix("main"); - broker0.reset(new BrokerFixture(parseOpts(argc, argv))); - push_back(broker0->getPort()); - } + qpid::log::Logger::instance().setPrefix("main"); + broker0.reset(new BrokerFixture(parseOpts(argc, argv))); + push_back(broker0->getPort()); } // For debugging: op << for CPG types. @@ -140,60 +161,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) { - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - while (c0.session.queueQuery("q").getMessageCount() != 1) - ::usleep(1000); // Wait for message to show up on broker 0. - - // Now join new broker, should catch up. - cluster.add(); - c0.session.messageTransfer(arg::content=Message("bar","q")); - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("poo","p")); - - // Verify new broker has all state. - Message m; - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "poo"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), (unsigned)0); -} - -QPID_AUTO_TEST_CASE(testStall) { - ClusterFixture cluster(2); - Client c0(cluster[0], "c0"); - Client c1(cluster[1], "c1"); - - // Declare on all to avoid race condition. - c0.session.queueDeclare("q"); - c1.session.queueDeclare("q"); - - // Stall 0, verify it does not process deliverys while stalled. - getGlobalCluster().stall(); - c1.session.messageTransfer(arg::content=Message("foo","q")); - while (c1.session.queueQuery("q").getMessageCount() != 1) - ::usleep(1000); // Wait for message to show up on broker 1. - sleep(2); // FIXME aconway 2008-09-11: remove. - // But it should not be on broker 0. - boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); - BOOST_REQUIRE(q0); - BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); - // Now unstall and we should get the message. - getGlobalCluster().ready(); - Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); -} - #if 0 // FIXME aconway 2008-09-10: finish & enable QPID_AUTO_TEST_CASE(testDumpConsumers) { ClusterFixture cluster(1); @@ -226,20 +193,36 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) { #endif -QPID_AUTO_TEST_CASE(testForkedBroker) { - // Verify the ForkedBroker works as expected. - const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; - ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv); - Client c(broker.getPort()); - BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); -} -QPID_AUTO_TEST_CASE(testSingletonCluster) { - // Test against a singleton cluster, verify basic operation. +QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); - Client c(cluster[0]); - BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound()); + + Client c0(cluster[0], "c0"); + // Create some shared state. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("foo","q")); + c0.session.messageTransfer(arg::content=Message("bar","q")); + while (c0.session.queueQuery("q").getMessageCount() != 2) + ::usleep(1000); // Wait for message to show up on broker 0. + + // FIXME aconway 2008-09-18: close session until we catchup session state also. + c0.session.close(); + c0.connection.close(); + + // Now join new broker, should catch up. + cluster.add(); + + // FIXME aconway 2008-09-18: when we do session state try adding + // further stuff from broker 0, and leaving a subscription active. + + // Verify new broker has all state. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -326,4 +309,30 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } +QPID_AUTO_TEST_CASE(testStall) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + // Declare on all to avoid race condition. + c0.session.queueDeclare("q"); + c1.session.queueDeclare("q"); + + // Stall 0, verify it does not process deliverys while stalled. + getGlobalCluster().stall(); + c1.session.messageTransfer(arg::content=Message("foo","q")); + while (c1.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 1. + sleep(2); // FIXME aconway 2008-09-11: remove. + // But it should not be on broker 0. + boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); + BOOST_REQUIRE(q0); + BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); + // Now unstall and we should get the message. + getGlobalCluster().ready(); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); +} + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index ba4e50d21e..2acf0aea82 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -60,7 +60,8 @@ o<?xml version="1.0"?> - attach sessions, create consumers, set flow with normal AMQP cokmmands. - reset session state by sending session-state for each session. - frames following session-state are replay frames. - - send shadow-ready to mark end of dump. + - send shadow-ready to mark end of shadow dump. + - send dump-complete when entire dump is complete. --> <control name="session-state" code="0x4" label="Set session state during a brain dump."> <!-- Target session deduced from channel number. --> @@ -79,5 +80,6 @@ o<?xml version="1.0"?> <field name="connection-id" type="uint64"/> </control> + <control name="dump-complete" code="0x6" label="End of brain dump."/> </class> </amqp> |