diff options
author | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
commit | e65b0086a2924ff04640b1350393a816249d01b3 (patch) | |
tree | b372c5386cc44e3ad16c4ae585088ed038a629e4 | |
parent | e596837411d54a16dd3cb1e5de717664496c2bd0 (diff) | |
download | qpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz |
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd.
- Plugin::addFinalizer - more flexible way to shutdown plugins.
- Reworked cluster extension points using boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 538 insertions, 441 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 74aa504e90..f513ab8ee3 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -353,7 +353,6 @@ nobase_include_HEADERS = \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ - qpid/HandlerChain.h \ qpid/Plugin.h \ qpid/ptr_map.h \ qpid/RangeSet.h \ diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 87a6d4cd54..718dffff38 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -12,6 +12,8 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/ClusterPlugin.cpp \ + qpid/cluster/ConnectionInterceptor.h \ + qpid/cluster/ConnectionInterceptor.cpp \ qpid/cluster/ClassifierHandler.h \ qpid/cluster/ClassifierHandler.cpp \ qpid/cluster/ShadowConnectionOutputHandler.h diff --git a/cpp/src/qpid/HandlerChain.h b/cpp/src/qpid/HandlerChain.h deleted file mode 100644 index e3746ec14b..0000000000 --- a/cpp/src/qpid/HandlerChain.h +++ /dev/null @@ -1,97 +0,0 @@ -#ifndef QPID_HANDLERCHAIN_H -#define QPID_HANDLERCHAIN_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/Plugin.h> -#include <boost/ptr_container/ptr_vector.hpp> -#include <memory> - -namespace qpid { - -/** - * Chain-of-responsibility design pattern. - * - * Construct a chain of objects deriving from Base. Each implements - * Base::f by doing its own logic and then calling Base::f on the next - * handler (or not if it chooses not to.) - * - * HandlerChain acts as a smart pointer to the first object in the chain. - */ -template <class Base> -class HandlerChain { - public: - /** Base class for chainable handlers */ - class Handler : public Base { - public: - Handler() : next() {} - virtual ~Handler() {} - virtual void setNext(Base* next_) { next = next_; } - - protected: - Base* next; - }; - - typedef std::auto_ptr<Handler> HandlerAutoPtr; - - /**@param target is the object at the end of the chain. */ - HandlerChain(Base& target) : first(&target) {} - - /** HandlerChain owns the ChainableHandler. */ - void push(HandlerAutoPtr h) { - handlers.push_back(h.release()); - h->setNext(first); - first = h.get(); - } - - // Smart pointer functions - Base* operator*() { return first; } - const Base* operator*() const { return first; } - Base* operator->() { return first; } - const Base* operator->() const { return first; } - operator bool() const { return first; } - - private: - boost::ptr_vector<Base> handlers; - Base* first; -}; - -/** - * A PluginHandlerChain calls Plugin::initAll(*this) on construction, - * allowing plugins to add handlers. - * - * @param Tag can be any class, use to distinguish different plugin - * chains with the same Base type. - */ -template <class Base, class Tag=void> -struct PluginHandlerChain : public HandlerChain<Base>, - public Plugin::Target -{ - PluginHandlerChain(Base& target) : HandlerChain<Base>(target) { - Plugin::initAll(*this); - } -}; - - -} // namespace qpid - -#endif /*!QPID_HANDLERCHAIN_H*/ diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index 1deaca199f..e521b1220a 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -176,7 +176,7 @@ Options::Options(const string& name) : -void Options::parse(int argc, char** argv, const std::string& configFile, bool allowUnknown) +void Options::parse(int argc, char const* const* argv, const std::string& configFile, bool allowUnknown) { string defaultConfigFile = configFile; // May be changed by env/cmdline string parsing; diff --git a/cpp/src/qpid/Options.h b/cpp/src/qpid/Options.h index 35ce8f9d40..cb86d27241 100644 --- a/cpp/src/qpid/Options.h +++ b/cpp/src/qpid/Options.h @@ -209,7 +209,7 @@ struct Options : public po::options_description { * Note the filename argument can reference an options variable that * is updated by argc/argv or environment variable parsing. */ - void parse(int argc, char** argv, + void parse(int argc, char const* const* argv, const std::string& configfile=std::string(), bool allowUnknown = false); diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index b8206499ae..e4b76db28a 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -35,15 +35,20 @@ Plugin::Plugins& thePlugins() { return plugins; } -void call(boost::function<void()> f) { f(); } +void invoke(boost::function<void()> f) { f(); } } // namespace -Plugin::Target::~Target() { - std::for_each(cleanup.begin(), cleanup.end(), &call); +Plugin::Target::~Target() { finalize(); } + +void Plugin::Target::finalize() { + for_each(finalizers.begin(), finalizers.end(), invoke); + finalizers.clear(); } -void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); } +void Plugin::Target::addFinalizer(const boost::function<void()>& f) { + finalizers.push_back(f); +} Plugin::Plugin() { // Register myself. @@ -69,7 +74,7 @@ void Plugin::addOptions(Options& opts) { } } -void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); } -void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); } +void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); } +void Plugin::initializeAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); } } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index a53d4e5d18..3c7c8031bb 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -21,11 +21,9 @@ * */ -#include "qpid/shared_ptr.h" #include <boost/noncopyable.hpp> -#include <vector> #include <boost/function.hpp> - +#include <vector> /**@file Generic plug-in framework. */ @@ -35,30 +33,36 @@ class Options; /** * Plug-in base class. */ -class Plugin : boost::noncopyable -{ +class Plugin : private boost::noncopyable { public: + typedef std::vector<Plugin*> Plugins; + /** - * Base interface for targets that receive plug-ins. - * Plug-ins can register clean-up functions to execute when - * the target is destroyed. + * Base interface for targets that can receive plug-ins. + * Also allows plug-ins to attach a a function to be called + * when the target is 'finalized'. */ - struct Target { + class Target : private boost::noncopyable + { public: + /** Calls finalize() if not already called. */ virtual ~Target(); - void addCleanup(const boost::function<void()>& cleanupFunction); + + /** Run all the finalizers */ + void finalize(); + + /** Add a function to run when finalize() is called */ + void addFinalizer(const boost::function<void()>&); private: - std::vector<boost::function<void()> > cleanup; + std::vector<boost::function<void()> > finalizers; }; - typedef std::vector<Plugin*> Plugins; - /** - * Construct registers the plug-in to appear in getPlugins(). + * Constructor registers the plug-in to appear in getPlugins(). * * A concrete Plugin is instantiated as a global or static - * member variable in a library so it is registered during static + * member variable in a library so it is registered during * initialization when the library is loaded. */ Plugin(); @@ -103,7 +107,7 @@ class Plugin : boost::noncopyable static void earlyInitAll(Target&); /** Call initialize() on all registered plugins */ - static void initAll(Target&); + static void initializeAll(Target&); /** For each registered plugin, add plugin.getOptions() to opts. */ static void addOptions(Options& opts); diff --git a/cpp/src/qpid/RefCounted.h b/cpp/src/qpid/RefCounted.h index d67f6c31db..10b5e4afcc 100644 --- a/cpp/src/qpid/RefCounted.h +++ b/cpp/src/qpid/RefCounted.h @@ -46,23 +46,6 @@ protected: virtual ~RefCounted() {}; }; -/** - * Reference-counted member of a reference-counted parent class. - * Delegates reference counts to the parent so that the parent is - * deleted only when there are no references to the parent or any of - * its children. - * TODO: Delete this class if it's unused as I don't think this class makes much sense: - */ -struct RefCountedChild { - RefCounted& parent; - -protected: - RefCountedChild(RefCounted& parent_) : parent(parent_) {} - -public: - void addRef() const { parent.addRef(); } - void release() const { parent.release(); } -}; } // namespace qpid @@ -70,8 +53,6 @@ public: namespace boost { inline void intrusive_ptr_add_ref(const qpid::RefCounted* p) { p->addRef(); } inline void intrusive_ptr_release(const qpid::RefCounted* p) { p->release(); } -inline void intrusive_ptr_add_ref(const qpid::RefCountedChild* p) { p->addRef(); } -inline void intrusive_ptr_release(const qpid::RefCountedChild* p) { p->release(); } } diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index ccd31c78a7..a3692911b2 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -29,7 +29,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), - connection(this, broker, id, _isClient), + connection(new broker::Connection(this, broker, id, _isClient)), identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { @@ -46,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) { framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection.received(frame); + connection->received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection.doOutput(); + if (!frameQueueClosed) connection->doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -91,7 +91,7 @@ void Connection::close() { } void Connection::closed() { - connection.closed(); + connection->closed(); } void Connection::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index a3a756cefb..b707031789 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_CONNECTION_H -#define QPID_BROKER_CONNECTION_H +#ifndef QPID_AMQP_0_10_CONNECTION_H +#define QPID_AMQP_0_10_CONNECTION_H /* * @@ -24,8 +24,8 @@ #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/Mutex.h" -#include "Connection.h" #include "qpid/broker/Connection.h" +#include <boost/intrusive_ptr.hpp> #include <queue> #include <memory> @@ -40,7 +40,7 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - broker::Connection connection; + boost::intrusive_ptr<broker::Connection> connection; std::string identifier; bool initialized; bool isClient; @@ -60,4 +60,4 @@ class Connection : public sys::ConnectionCodec, }} // namespace qpid::amqp_0_10 -#endif /*!QPID_BROKER_CONNECTION_H*/ +#endif /*!QPID_AMQP_0_10_CONNECTION_H*/ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bffca94f95..b8204c9cf5 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -285,6 +285,7 @@ void Broker::shutdown() { // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. poller->shutdown(); + finalize(); // Finalize any plugins. } Broker::~Broker() { diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e85d3c89c..e77911bd10 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,14 +49,14 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), + receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), + closedFn(boost::bind(&Connection::closedImpl, this)), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), mgmtObject(0), - links(broker_.getLinks()), - lastInHandler(*this), - inChain(lastInHandler) + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject(); @@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } + + Plugin::initializeAll(*this); // Let plug-ins update extension points. } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback) out->activateOutput(); } - Connection::~Connection() { if (mgmtObject != 0) @@ -88,9 +89,9 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } - -void Connection::receivedLast(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } + +void Connection::receivedImpl(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -170,10 +171,13 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ // Physically closed, suspend open sessions. +void Connection::closed() { closedFn(); } + +void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); + // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - QPID_LOG(error, " Unhandled exception while closing session: " << - e.what()); + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); assert(0); } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index c911e88200..0d646bab83 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_CONNECTION_H +#define QPID_BROKER_CONNECTION_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,8 +21,6 @@ * under the License. * */ -#ifndef _Connection_ -#define _Connection_ #include <memory> #include <sstream> @@ -43,7 +44,8 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" -#include "qpid/HandlerChain.h" +#include "qpid/Plugin.h" +#include "qpid/RefCounted.h" #include <boost/ptr_container/ptr_map.hpp> @@ -53,11 +55,11 @@ namespace broker { class LinkRegistry; class Connection : public sys::ConnectionInputHandler, - public ConnectionState + public ConnectionState, + public Plugin::Target, + public RefCounted { public: - typedef boost::shared_ptr<Connection> shared_ptr; - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); @@ -74,8 +76,8 @@ class Connection : public sys::ConnectionInputHandler, void received(framing::AMQFrame& frame); void idleOut(); void idleIn(); - void closed(); bool doOutput(); + void closed(); void closeChannel(framing::ChannelId channel); @@ -92,12 +94,16 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); + // Extension points: allow plugins to insert additional functionality. + boost::function<void(framing::AMQFrame&)> receivedFn; + boost::function<void()> closedFn; + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - // End of the received handler chain. - void receivedLast(framing::AMQFrame& frame); + void receivedImpl(framing::AMQFrame& frame); + void closedImpl(); ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; @@ -108,10 +114,8 @@ class Connection : public sys::ConnectionInputHandler, boost::function0<void> ioCallback; management::Connection* mgmtObject; LinkRegistry& links; - framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler; - PluginHandlerChain<framing::FrameHandler, Connection> inChain; }; }} -#endif +#endif /*!QPID_BROKER_CONNECTION_H*/ diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 698f8123e8..c9cf6ece8d 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -70,6 +70,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable sys::ConnectionOutputHandler& getOutput() const { return *out; } framing::ProtocolVersion getVersion() const { return version; } + void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; } + protected: framing::ProtocolVersion version; sys::ConnectionOutputHandler* out; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 3cc509c904..aa6f6b7520 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -54,11 +54,7 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), - mgmtObject(0), - inLastHandler(*this), - outLastHandler(*this), - inChain(inLastHandler), - outChain(outLastHandler) + mgmtObject(0) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -75,9 +71,6 @@ SessionState::SessionState( SessionState::~SessionState() { // Remove ID from active session list. - // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, - // they don't belong in the manager. For now rely on uniqueness of UUIDs. - // broker.getSessionManager().forget(getId()); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -126,7 +119,6 @@ void SessionState::activateOutput() { Mutex::ScopedLock l(lock); if (isAttached()) getConnection().outputTasks.activateOutput(); - // FIXME aconway 2008-05-22: should we hold the lock over activateOutput?? } ManagementObject* SessionState::GetManagementObject (void) const @@ -224,10 +216,7 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); } -void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); } - -void SessionState::handleInLast(AMQFrame& frame) { +void SessionState::handleIn(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether @@ -258,7 +247,7 @@ void SessionState::handleInLast(AMQFrame& frame) { } } -void SessionState::handleOutLast(AMQFrame& frame) { +void SessionState::handleOut(AMQFrame& frame) { assert(handler); handler->out(frame); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index f6bf98d431..96f2e8f512 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,7 +23,6 @@ */ #include "qpid/SessionState.h" -#include "qpid/HandlerChain.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" @@ -102,10 +101,6 @@ class SessionState : public qpid::SessionState, void readyToSend(); - // Tag types to identify PluginHandlerChains. - struct InTag {}; - struct OutTag {}; - private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); @@ -131,11 +126,6 @@ class SessionState : public qpid::SessionState, IncompleteMessageList incomplete; IncompleteMessageList::CompletionListener enqueuedOp; management::Session* mgmtObject; - framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; - framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; - - qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain; - qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain; friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 4ea77e7fbf..3b7f32e822 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,15 +17,19 @@ */ #include "Cluster.h" +#include "ConnectionInterceptor.h" + #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" +#include "qpid/shared_ptr.h" #include <boost/bind.hpp> -#include <boost/scoped_array.hpp> +#include <boost/cast.hpp> #include <algorithm> #include <iterator> #include <map> @@ -37,24 +41,6 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -// Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler { - Cluster::ConnectionChain& connection; - Cluster& cluster; - - ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} - - void handle(AMQFrame& f) { - // FIXME aconway 2008-01-29: Refcount Connections to ensure - // Connection not destroyed till message is self delivered. - cluster.send(f, &connection, next); // Indirectly send to next via cluster. - } -}; - -void Cluster::initialize(Cluster::ConnectionChain& cc) { - cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); -} - ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "-" << cluster.self; } @@ -69,14 +55,14 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -// FIXME aconway 2008-07-02: create a Connection for the cluster. Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(b), cpg(*this), + broker(&b), name(name_), url(url_), self(cpg.self()) { + broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); @@ -90,15 +76,27 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : } Cluster::~Cluster() { - QPID_LOG(trace, *this << " Leaving cluster."); - try { - cpg.leave(name); - cpg.shutdown(); - dispatcher.join(); - } - catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << *this << ": " - << e.what()); + cpg.shutdown(); + dispatcher.join(); +} + +// local connection initializes plugins +void Cluster::initialize(broker::Connection& c) { + bool isLocal = &c.getOutput() != &shadowOut; + if (isLocal) + new ConnectionInterceptor(c, *this); +} + +void Cluster::leave() { + if (!broker.get()) return; // Already left + QPID_LOG(info, QPID_MSG("Leaving cluster " << *this)); + // Must not be called in the dispatch thread. + assert(Thread::current().id() != dispatcher.id()); + cpg.leave(name); + // Wait till final config-change is delivered and broker is released. + { + Mutex::ScopedLock l(lock); + while(broker.get()) lock.wait(); } } @@ -112,22 +110,20 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) { +void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // TODO aconway 2008-07-03: More efficient buffer management. + // FIXME aconway 2008-07-03: More efficient buffer management. // Cache coded form of decoded frames for re-encoding? Buffer buf(buffer); - assert(frame.size() + 128 < sizeof(buffer)); + assert(frame.size() + 64 < sizeof(buffer)); frame.encode(buf); encodePtr(buf, connection); - encodePtr(buf, next); iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { - AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - send(frame, 0, 0); + send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0); } size_t Cluster::size() const { @@ -143,19 +139,17 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -boost::shared_ptr<broker::Connection> -Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) { - // FIXME aconway 2008-07-02: locking - called by deliver in - // cluster thread so no locks but may need to revisit as model - // changes. - ShadowConnectionId id(member, connectionPtr); - boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id]; - if (!ptr) { +ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { + ShadowConnectionId id(member, remotePtr); + ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); + if (i == shadowConnectionMap.end()) { // A new shadow connection. std::ostringstream os; - os << name << ":" << member << ":" << std::hex << connectionPtr; - ptr.reset(new broker::Connection(&shadowOut, broker, os.str())); + os << name << ":" << member << ":" << remotePtr; + broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str()); + ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id)); + i = shadowConnectionMap.insert(value).first; } - return ptr; + return i->second; } void Cluster::deliver( @@ -171,78 +165,75 @@ void Cluster::deliver( Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; frame.decode(buf); - void* connectionId; - decodePtr(buf, connectionId); + ConnectionInterceptor* connection; + decodePtr(buf, connection); + QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame); - - if (connectionId == 0) // A cluster control frame. - handleClusterFrame(from, frame); - else if (from == self) { // My own frame, carries a next pointer. - FrameHandler* next; - decodePtr(buf, next); - next->handle(frame); - } - else { // Foreign frame, forward to shadow connection. - // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr. - boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId); - shadow->received(frame); + if (!broker.get()) { + QPID_LOG(warning, "Ignoring late DLVR, already left the cluster."); + return; } + + if (connection && from != self) // Look up shadow for remote connections + connection = getShadowConnection(from, connection); + + if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) + handleMethod(from, connection, *frame.getMethod()); + else + connection->deliver(frame); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(error, "Error handling frame from cluster " << e.what()); + QPID_LOG(critical, "Error in cluster delivery: " << e.what()); + assert(0); + throw; } } -bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, - Duration timeout) const -{ - AbsTime deadline(now(), timeout); - Mutex::ScopedLock l(lock); - while (!predicate(*this) && lock.wait(deadline)) - ; - return (predicate(*this)); -} - -// Handle cluster control frame . -void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { - // TODO aconway 2007-06-20: use visitor pattern here. - ClusterNotifyBody* notifyIn= - dynamic_cast<ClusterNotifyBody*>(frame.getBody()); - assert(notifyIn); - MemberList list; - { - Mutex::ScopedLock l(lock); - members[from].url=notifyIn->getUrl(); - lock.notifyAll(); - QPID_LOG(debug, "Cluster join: " << members); +// Handle cluster methods +// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. +void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { + assert(method.amqpClassId() == CLUSTER_CLASS_ID); + switch (method.amqpMethodId()) { + case CLUSTER_NOTIFY_METHOD_ID: { + ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method); + Mutex::ScopedLock l(lock); + members[from].url=notify.getUrl(); + lock.notifyAll(); + break; + } + case CLUSTER_CONNECTION_CLOSE_METHOD_ID: { + if (!connection->isLocal()) + shadowConnectionMap.erase(connection->getShadowId()); + connection->deliverClosed(); + break; + } + default: + assert(0); } } void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address */*current*/, int /*nCurrent*/, + cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int nJoined) { - bool newMembers=false; - MemberList updated; - { - Mutex::ScopedLock l(lock); - if (nLeft) { - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - QPID_LOG(debug, "Cluster leave: " << members); - lock.notifyAll(); - } - newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); - // We don't record members joining here, we record them when - // we get their ClusterNotify message. + Mutex::ScopedLock l(lock); + for (int i = 0; i < nLeft; ++i) + members.erase(left[i]); + for(int j = 0; j < nCurrent; ++j) + members[current[j]].id = current[j]; + QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):" + << members); + assert(members.size() == size_t(nCurrent)); + if (members.find(self) == members.end()) { + QPID_LOG(debug, "Left cluster " << *this); + broker = 0; // Release broker reference. } - if (newMembers) // Notify new members of my presence. - notify(); + + lock.notifyAll(); // Threads waiting for membership changes. } void Cluster::run() { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 84b5ed072c..7147b1ac05 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -22,14 +22,14 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" -#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/log/Logger.h" #include "qpid/Url.h" - +#include "qpid/RefCounted.h" #include <boost/optional.hpp> #include <boost/function.hpp> @@ -41,19 +41,21 @@ namespace qpid { namespace cluster { +class ConnectionInterceptor; + /** * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : private sys::Runnable, private Cpg::Handler +class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted { public: - typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; /** Details of a cluster member */ struct Member { - Member(const Url& url_=Url()) : url(url_) {} - Url url; ///< Broker address. + Cpg::Id id; + Url url; }; typedef std::vector<Member> MemberList; @@ -65,11 +67,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler */ Cluster(const std::string& name, const Url& url, broker::Broker&); - // Add cluster handlers to broker chains. - void initialize(ConnectionChain&); - virtual ~Cluster(); + /** Initialize interceptors for a new connection */ + void initialize(broker::Connection&); + /** Get the current cluster membership. */ MemberList getMembers() const; @@ -78,22 +80,22 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Wait for predicate(*this) to be true, up to timeout. - *@return True if predicate became true, false if timed out. - *Note the predicate may not be true after wait returns, - *all the caller can say is it was true at some earlier point. - */ - bool wait(boost::function<bool(const Cluster&)> predicate, - sys::Duration timeout=sys::TIME_INFINITE) const; - /** Send frame to the cluster */ - void send(framing::AMQFrame&, void* connection, framing::FrameHandler*); + void send(const framing::AMQFrame&, ConnectionInterceptor*); + + /** Leave the cluster */ + void leave(); + + // Cluster frame handing functions + void notify(const std::string& url); + void connectionClose(); private: typedef Cpg::Id Id; typedef std::map<Id, Member> MemberMap; - typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; - typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap; + typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; + + boost::function<void()> shutdownNext; void notify(); ///< Notify cluster of my details. @@ -114,19 +116,18 @@ class Cluster : private sys::Runnable, private Cpg::Handler ); void run(); - - void handleClusterFrame(Id from, framing::AMQFrame&); - boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*); + void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); + + ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*); - mutable sys::Monitor lock; - broker::Broker& broker; + mutable sys::Monitor lock; // Protect access to members. Cpg cpg; + boost::intrusive_ptr<broker::Broker> broker; Cpg::Name name; Url url; MemberMap members; sys::Thread dispatcher; - boost::function<void()> callback; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index c4b67de141..a2c66e3790 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -15,8 +15,8 @@ * limitations under the License. * */ -#include <boost/program_options/value_semantic.hpp> +#include "ConnectionInterceptor.h" #include "qpid/broker/Broker.h" @@ -25,61 +25,81 @@ #include "qpid/Options.h" #include "qpid/shared_ptr.h" -#include <boost/optional.hpp> #include <boost/utility/in_place_factory.hpp> - namespace qpid { namespace cluster { using namespace std; -struct ClusterOptions : public Options { +struct ClusterValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +/** Note separating options from values to work around boost version differences. + * Old boost takes a reference to options objects, but new boost makes a copy. + * New boost allows a shared_ptr but that's not compatible with old boost. + */ +struct ClusterOptions : public Options { + ClusterValues& values; + + ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(values.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") ; } - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } }; struct ClusterPlugin : public Plugin { - typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + ClusterValues values; ClusterOptions options; - boost::optional<Cluster> cluster; + boost::intrusive_ptr<Cluster> cluster; + + ClusterPlugin() : options(values) {} + + Options* getOptions() { return &options; } - template <class Chain> void init(Plugin::Target& t) { - Chain* c = dynamic_cast<Chain*>(&t); - if (c) cluster->initialize(*c); + void init(broker::Broker& b) { + if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); + cluster = new Cluster(values.name, values.getUrl(b.getPort()), b); + b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); + } + + template <class T> void init(T& t) { + if (cluster) cluster->initialize(t); + } + + template <class T> bool init(Plugin::Target& target) { + T* t = dynamic_cast<T*>(&target); + if (t) init(*t); + return t; } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && !options.name.empty()) { - if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - return; - } - if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. - init<ConnectionChain>(target); + if (init<broker::Broker>(target)) return; + if (!cluster) return; // Remaining plugins only valid if cluster initialized. + if (init<broker::Connection>(target)) return; } + + void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. + +// For test purposes. +boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp new file mode 100644 index 0000000000..5283ba9b1a --- /dev/null +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionInterceptor.h" +#include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/AMQFrame.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; } + +ConnectionInterceptor::ConnectionInterceptor( + broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) +{ + connection->addFinalizer(boost::bind(operator delete, this)); + // Attach my functions to Connection extension points. + shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); + shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); +} + +ConnectionInterceptor::~ConnectionInterceptor() { + assert(isClosed); + assert(connection == 0); +} + +void ConnectionInterceptor::received(framing::AMQFrame& f) { + if (isClosed) return; + cluster.send(f, this); +} + +void ConnectionInterceptor::deliver(framing::AMQFrame& f) { + receivedNext(f); +} + +void ConnectionInterceptor::closed() { + if (isClosed) return; + try { + // Called when the local network connection is closed. We still + // need to process any outstanding cluster frames for this + // connection to ensure our sessions are up-to-date. We defer + // closing the Connection object till deliverClosed(), but replace + // its output handler with a null handler since the network output + // handler will be deleted. + // + connection->setOutputHandler(&discardHandler); + cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this); + isClosed = true; + } + catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); + } +} + +void ConnectionInterceptor::deliverClosed() { + closedNext(); + // Drop reference so connection will be deleted, which in turn + // will delete this via finalizer added in ctor. + connection = 0; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h new file mode 100644 index 0000000000..d499acb832 --- /dev/null +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -0,0 +1,77 @@ +#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H +#define QPID_CLUSTER_CONNECTIONPLUGIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "qpid/broker/Connection.h" +#include "qpid/sys/ConnectionOutputHandler.h" + +namespace qpid { +namespace framing { class AMQFrame; } +namespace cluster { + +/** + * Plug-in associated with broker::Connections, both local and shadow. + */ +class ConnectionInterceptor { + public: + ConnectionInterceptor(broker::Connection&, Cluster&, + Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0)); + ~ConnectionInterceptor(); + + // Called on self-delivery + void deliver(framing::AMQFrame& f); + + // Called on self-delivery of my own cluster.connection-close + void deliverClosed(); + + Cluster::ShadowConnectionId getShadowId() const { return shadowId; } + + bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); } + + private: + struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { + void close() {} + void send(framing::AMQFrame&) {} + void doOutput() {} + void activateOutput() {} + }; + + // Functions to add to Connection extension points. + void received(framing::AMQFrame&); + void closed(); + + boost::function<void(framing::AMQFrame&)> receivedNext; + boost::function<void()> closedNext; + + boost::intrusive_ptr<broker::Connection> connection; + Cluster& cluster; + NullConnectionHandler discardHandler; + bool isClosed; + Cluster::ShadowConnectionId shadowId; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/ + diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 3118e11e57..674781ac06 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -62,7 +62,7 @@ void Cpg::globalConfigChange( cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } -Cpg::Cpg(Handler& h) : handler(h) { +Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); check(cpg_context_set(handle, this), "Cannot set CPG context"); @@ -78,10 +78,10 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - if (handle) { - cpg_context_set(handle, 0); + if (!isShutdown) { + QPID_LOG(debug,"Shutting down CPG"); + isShutdown=true; check(cpg_finalize(handle), "Error in shutdown of CPG"); - handle = 0; } } diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index d3142efcb6..c89bf3e121 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -165,6 +165,7 @@ class Cpg : public Dispatchable { cpg_handle_t handle; Handler& handler; + bool isShutdown; }; std::ostream& operator <<(std::ostream& out, const cpg_name& name); diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index a2a8ee7bfa..e07a803e17 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -66,7 +66,7 @@ struct Handler { MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} void handle(T t) { (target->*F)(t); } - /** Allow calling with -> syntax, like a qpid::HandlerChain */ + /** Allow calling with -> syntax */ MemFunRef* operator->() { return this; } private: diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index e01034c355..6c20330c28 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -1,5 +1,5 @@ #ifndef TESTS_FORKEDBROKER_H -#define TESTS_FORKEDBROKER_H + /* * @@ -23,16 +23,11 @@ */ #include "qpid/Exception.h" -#include "qpid/sys/Fork.h" -#include "qpid/log/Logger.h" +#include "qpid/log/Statement.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SignalHandler.h" - #include <boost/lexical_cast.hpp> - #include <string> - -#include <signal.h> +#include <stdio.h> #include <sys/wait.h> /** @@ -48,63 +43,66 @@ * process.) * */ -class ForkedBroker : public qpid::sys::ForkWithMessage { - pid_t pid; - uint16_t port; - qpid::broker::Broker::Options opts; - std::string prefix; - +class ForkedBroker { public: - struct ChildExit {}; // Thrown in child processes. + ForkedBroker(std::vector<const char*> argv) { init(argv); } - ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(), - const std::string& prefix_=std::string()) - : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } + ForkedBroker(int argc, const char* const argv[]) { + std::vector<const char*> args(argv, argv+argc); + init(args); + } ~ForkedBroker() { - try { stop(); } - catch(const std::exception& e) { - QPID_LOG(error, e.what()); + try { stop(); } catch(const std::exception& e) { + QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what())); } } void stop() { - if (pid > 0) { // I am the parent, clean up children. - if (::kill(pid, SIGINT) < 0) - throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno))); - int status = 0; - if (::waitpid(pid, &status, 0) < 0) - throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno))); - if (WEXITSTATUS(status) != 0) - throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status))); - } + using qpid::ErrnoException; + if (pid == 0) return; + if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed"); + int status; + if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed"); + if (WEXITSTATUS(status) != 0) + throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status))); + pid = 0; } - void parent(pid_t pid_) { - pid = pid_; - qpid::log::Logger::instance().setPrefix("parent"); - std::string portStr = wait(5); - port = boost::lexical_cast<uint16_t>(portStr); - } + uint16_t getPort() { return port; } - void child() { - prefix += boost::lexical_cast<std::string>(long(getpid())); - qpid::log::Logger::instance().setPrefix(prefix); - opts.port = 0; - boost::intrusive_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts)); - qpid::broker::SignalHandler::setBroker(broker); - QPID_LOG(info, "ForkedBroker started on " << broker->getPort()); - ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent. - broker->run(); - QPID_LOG(notice, "ForkedBroker exiting."); + private: - // Force exit in the child process, otherwise we will try to - // carry with parent tests. - broker = 0; // Run broker dtor before we exit. - exit(0); + void init(const std::vector<const char*>& args) { + using qpid::ErrnoException; + pid = 0; + port = 0; + int pipeFds[2]; + if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe"); + pid = ::fork(); + if (pid < 0) throw ErrnoException("Fork failed"); + if (pid) { // parent + ::close(pipeFds[1]); + FILE* f = ::fdopen(pipeFds[0], "r"); + if (!f) throw ErrnoException("fopen failed"); + if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed"); + } + else { // child + ::close(pipeFds[0]); + int fd = ::dup2(pipeFds[1], 1); + if (fd < 0) throw ErrnoException("dup2 failed"); + const char* prog = "../qpidd"; + std::vector<const char*> args2(args); + args2.push_back("--port=0"); + args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems? + args2.push_back(0); + execv(prog, const_cast<char* const*>(&args2[0])); + throw ErrnoException("execv failed"); + } } - uint16_t getPort() { return port; } + pid_t pid; + int port; }; #endif /*!TESTS_FORKEDBROKER_H*/ diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index da5b4d6e90..9190eee4e5 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -10,9 +10,8 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la # -# FIXME aconway 2008-07-04: disabled till process leak is plugged. -# ais_check checks conditions for cluster tests and run them if ok. -#TESTS+=ais_check +# ais_check checks pre-requisites for cluster tests and runs them if ok. +TESTS+=ais_check EXTRA_DIST+=ais_check check_PROGRAMS+=cluster_test diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d361919f0b..cafac489d2 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -16,13 +16,13 @@ * */ - #include "test_tools.h" #include "unit_test.h" #include "ForkedBroker.h" #include "BrokerFixture.h" #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/Cluster.h" #include "qpid/framing/AMQBody.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" @@ -37,10 +37,13 @@ #include <vector> #include <algorithm> -#include <signal.h> +namespace qpid { +namespace cluster { +boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +}} // namespace qpid::cluster -QPID_AUTO_TEST_SUITE(CpgTestSuite) +QPID_AUTO_TEST_SUITE(CpgTestSuite) using namespace std; using namespace qpid; @@ -49,27 +52,60 @@ using namespace qpid::framing; using namespace qpid::client; using qpid::broker::Broker; using boost::ptr_vector; +using qpid::cluster::Cluster; +using qpid::cluster::getGlobalCluster; -struct ClusterFixture : public ptr_vector<ForkedBroker> { +/** Cluster fixture is a vector of ports for the replicas. + * Replica 0 is in the current process, all others are forked as children. + */ +struct ClusterFixture : public vector<uint16_t> { string name; + Broker::Options opts; + std::auto_ptr<BrokerFixture> broker0; + boost::ptr_vector<ForkedBroker> forkedBrokers; - ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); } + ClusterFixture(size_t n); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); + void setup(); }; +ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { + add(n); + // Wait for all n members to join the cluster + int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. + while (retry && getGlobalCluster()->size() != n) { + ::sleep(1); + --retry; + } + BOOST_CHECK_EQUAL(n, getGlobalCluster()->size()); +} + void ClusterFixture::add() { - broker::Broker::Options opts; - Plugin::addOptions(opts); // For cluster options. + std::ostringstream os; + os << "broker" << size(); + std::string prefix = os.str(); + const char* argv[] = { - "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir" + "qpidd " __FILE__ , + "--load-module=../.libs/libqpidcluster.so", + "--cluster-name", name.c_str(), + "--auth=no", "--no-data-dir", + "--log-prefix", prefix.c_str(), }; - opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv)); - ostringstream prefix; - prefix << "b" << size() << "-"; - QPID_LOG(info, "ClusterFixture adding broker " << prefix.str()); - push_back(new ForkedBroker(opts, prefix.str())); - QPID_LOG(info, "ClusterFixture added broker " << prefix.str()); + size_t argc = sizeof(argv)/sizeof(argv[0]); + + if (size()) { // Not the first broker, fork. + forkedBrokers.push_back(new ForkedBroker(argc, argv)); + push_back(forkedBrokers.back().getPort()); + } + else { // First broker, run in this process. + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + broker0.reset(new BrokerFixture(opts)); + push_back(broker0->getPort()); + } } // For debugging: op << for CPG types. @@ -149,26 +185,25 @@ QPID_AUTO_TEST_CASE(CpgBasic) { QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. - Broker::Options opts; - opts.auth="no"; - opts.noDataDir=true; - ForkedBroker broker(opts); + const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; + ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv); Client c(broker.getPort()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers - Client c0(cluster[0].getPort()); + ClusterFixture cluster(3); + Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); c0.session.queueDeclare("q"); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); + c0.connection.close(); // Verify all brokers get wiring update. for (size_t i = 0; i < cluster.size(); ++i) { BOOST_MESSAGE("i == "<< i); - Client c(cluster[i].getPort()); + Client c(cluster[i]); BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); } @@ -177,12 +212,12 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); c0.session.close(); - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); @@ -190,10 +225,14 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } +#if 0 + +// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test. + QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); @@ -201,11 +240,11 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { Message msg; - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2].getPort()); + Client c2(cluster[2]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); QueueQueryResult r = c2.session.queueQuery("q"); @@ -214,4 +253,6 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { // TODO aconway 2008-06-25: failover. +#endif + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test index 8fbaaaee07..4d0da15d4c 100755 --- a/cpp/src/tests/run_test +++ b/cpp/src/tests/run_test @@ -38,9 +38,11 @@ VALGRIND_OPTS=" --demangle=yes --suppressions=$srcdir/.valgrind.supp --num-callers=25 ---trace-children=yes --log-file=$VG_LOG -- " +# FIXME aconway 2008-07-16: removed --trace-children=yes, problems with cluster tests forking +# qpidd libtool script. Investigate & restore --trace-children if possible. + ERROR=0 if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then # This is a libtool "executable". Valgrind it if VALGRIND specified. diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index ce9303fef8..aac095a9a9 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -22,12 +22,14 @@ <amqp major="0" minor="10" port="5672"> - <class name = "cluster" code = "0x80" label="clustering extensions"> + <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <control name = "notify" code="0x1" label="notify cluster of a new member"> - <doc>Notify the cluster of a member URL</doc> - <!-- No chassis element, this is handled by separte cluster code for now.--> + + <control name = "notify" code="0x1"> <field name="url" type="str16" /> </control> + + <control name="connection-close" code="0x2"> + </control> </class> </amqp> |