diff options
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/HandlerChain.h | 97 | ||||
-rw-r--r-- | cpp/src/qpid/Plugin.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/Plugin.h | 28 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionManager.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionManager.h | 70 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 24 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 26 |
20 files changed, 208 insertions, 224 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 5b4a16429a..bfebd4ae88 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -248,8 +248,6 @@ libqpidbroker_la_SOURCES = \ qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ - qpid/broker/ConnectionManager.h \ - qpid/broker/ConnectionManager.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ @@ -354,6 +352,7 @@ nobase_include_HEADERS = \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ + qpid/HandlerChain.h \ qpid/Plugin.h \ qpid/ptr_map.h \ qpid/RangeSet.h \ diff --git a/cpp/src/qpid/HandlerChain.h b/cpp/src/qpid/HandlerChain.h new file mode 100644 index 0000000000..adeaa96536 --- /dev/null +++ b/cpp/src/qpid/HandlerChain.h @@ -0,0 +1,97 @@ +#ifndef QPID_HANDLERCHAIN_H +#define QPID_HANDLERCHAIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Plugin.h> +#include <boost/ptr_container/ptr_vector.hpp> +#include <memory> + +namespace qpid { + +/** + * Chain-of-responsibility design pattern. + * + * Construct a chain of objects deriving from Base. Each implements + * Base::f by doing its own logic and then calling Base::f on the next + * handler (or not if it chooses not to.) + * + * HandlerChain acts as a smart pointer to the first object in the chain. + */ +template <class Base> +class HandlerChain { + public: + /** Base class for chainable handlers */ + class Handler : public Base { + public: + Handler() : next() {} + virtual ~Handler() {} + virtual void setNext(Base* next_) { next = next_; } + + protected: + Base* next; + }; + + typedef std::auto_ptr<Handler> HandlerAutoPtr; + + /**@param target is the object at the end of the chain. */ + HandlerChain(Base& target) : first(&target) {} + + /** HandlerChain owns the ChainableHandler. */ + void push(HandlerAutoPtr h) { + handlers.push_back(h); + h->setNext(first); + first = h.get(); + } + + // Smart pointer functions + Base* operator*() { return first; } + const Base* operator*() const { return first; } + Base* operator->() { return first; } + const Base* operator->() const { return first; } + operator bool() const { return first; } + + private: + boost::ptr_vector<Base> handlers; + Base* first; +}; + +/** + * A PluginHandlerChain calls Plugin::initAll(*this) on construction, + * allowing plugins to add handlers. + * + * @param Tag can be any class, use to distinguish different plugin + * chains with the same Base type. + */ +template <class Base, class Tag=void> +struct PluginHandlerChain : public HandlerChain<Base>, + public Plugin::Target +{ + PluginHandlerChain(Base& target) : HandlerChain<Base>(target) { + Plugin::initAll(*this); + } +}; + + +} // namespace qpid + +#endif /*!QPID_HANDLERCHAIN_H*/ diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index 733d134334..b8206499ae 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -20,10 +20,13 @@ #include "Plugin.h" #include "qpid/Options.h" +#include <boost/bind.hpp> +#include <algorithm> namespace qpid { namespace { + Plugin::Plugins& thePlugins() { // This is a single threaded singleton implementation so // it is important to be sure that the first use of this @@ -31,8 +34,17 @@ Plugin::Plugins& thePlugins() { static Plugin::Plugins plugins; return plugins; } + +void call(boost::function<void()> f) { f(); } + +} // namespace + +Plugin::Target::~Target() { + std::for_each(cleanup.begin(), cleanup.end(), &call); } +void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); } + Plugin::Plugin() { // Register myself. thePlugins().push_back(this); @@ -44,6 +56,12 @@ Options* Plugin::getOptions() { return 0; } const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); } +namespace { +template <class F> void each_plugin(const F& f) { + std::for_each(Plugin::getPlugins().begin(), Plugin::getPlugins().end(), f); +} +} + void Plugin::addOptions(Options& opts) { for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) { if ((*i)->getOptions()) @@ -51,4 +69,7 @@ void Plugin::addOptions(Options& opts) { } } +void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); } +void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); } + } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index 3ead770129..a53d4e5d18 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -40,11 +40,17 @@ class Plugin : boost::noncopyable public: /** * Base interface for targets that receive plug-ins. - * - * The Broker is a plug-in target, there might be others - * in future. + * Plug-ins can register clean-up functions to execute when + * the target is destroyed. */ - struct Target { virtual ~Target() {} }; + struct Target { + public: + virtual ~Target(); + void addCleanup(const boost::function<void()>& cleanupFunction); + + private: + std::vector<boost::function<void()> > cleanup; + }; typedef std::vector<Plugin*> Plugins; @@ -69,7 +75,9 @@ class Plugin : boost::noncopyable virtual Options* getOptions(); /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target, called before + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called before the target itself is initialized. @@ -77,7 +85,9 @@ class Plugin : boost::noncopyable virtual void earlyInitialize(Target&) = 0; /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target. Called after + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called after the target is fully initialized. @@ -89,6 +99,12 @@ class Plugin : boost::noncopyable */ static const Plugins& getPlugins(); + /** Call earlyInitialize() on all registered plugins */ + static void earlyInitAll(Target&); + + /** Call initialize() on all registered plugins */ + static void initAll(Target&); + /** For each registered plugin, add plugin.getOptions() to opts. */ static void addOptions(Options& opts); }; diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index 407fe5ebd8..ccd31c78a7 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -29,7 +29,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), - connection(broker.getConnectionManager().create(this, broker, id, _isClient)), + connection(this, broker, id, _isClient), identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { @@ -46,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) { framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection->received(frame); + connection.received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); + if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -91,7 +91,7 @@ void Connection::close() { } void Connection::closed() { - connection->closed(); + connection.closed(); } void Connection::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index c08545df0f..a3a756cefb 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -33,7 +33,6 @@ namespace qpid { namespace broker { class Broker; } namespace amqp_0_10 { -// FIXME aconway 2008-03-18: Update to 0-10. class Connection : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { @@ -41,7 +40,7 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18: + broker::Connection connection; std::string identifier; bool initialized; bool isClient; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9a9f502bf0..be59cef24c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -23,7 +23,6 @@ */ #include "ConnectionFactory.h" -#include "ConnectionManager.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" @@ -121,7 +120,6 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } - ConnectionManager& getConnectionManager() { return connectionManager; } management::ManagementObject* GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -159,7 +157,6 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; - ConnectionManager connectionManager; management::ManagementAgent* managementAgent; management::Broker* mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index bb99c61cdd..61384638b3 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -88,7 +88,7 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain(frame); } +void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } void Connection::receivedLast(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 717e1a6270..c911e88200 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -43,6 +43,7 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" +#include "qpid/HandlerChain.h" #include <boost/ptr_container/ptr_map.hpp> @@ -91,8 +92,6 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); - framing::FrameHandler::Chain& getInChain() { return inChain; } - private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -110,8 +109,7 @@ class Connection : public sys::ConnectionInputHandler, management::Connection* mgmtObject; LinkRegistry& links; framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler; - framing::FrameHandler::Chain inChain; - + PluginHandlerChain<framing::FrameHandler, Connection> inChain; }; }} diff --git a/cpp/src/qpid/broker/ConnectionManager.cpp b/cpp/src/qpid/broker/ConnectionManager.cpp deleted file mode 100644 index 165de5220e..0000000000 --- a/cpp/src/qpid/broker/ConnectionManager.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConnectionManager.h" -#include "Connection.h" - -namespace qpid { -namespace broker { - -std::auto_ptr<Connection> -ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) { - std::auto_ptr<Connection> c(new Connection(out, broker, mgmtId, isClient)); - sys::Mutex::ScopedLock l(lock); - std::for_each(observers.begin(), observers.end(), - boost::bind(&Observer::created, _1, boost::ref(*c))); - return c; -} - -void ConnectionManager::add(const boost::intrusive_ptr<Observer>& observer) { - sys::Mutex::ScopedLock l(lock); - observers.push_back(observer); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionManager.h b/cpp/src/qpid/broker/ConnectionManager.h deleted file mode 100644 index a999523d0d..0000000000 --- a/cpp/src/qpid/broker/ConnectionManager.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_BROKER_CONNECTIONMANAGER_H -#define QPID_BROKER_CONNECTIONMANAGER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/RefCounted.h" -#include "qpid/sys/Mutex.h" -#include <boost/intrusive_ptr.hpp> -#include <vector> -#include <memory> - -namespace qpid { - -namespace sys { -class ConnectionOutputHandler; -} - -namespace broker { - -class Broker; -class Connection; - -/** - * Manages connections and observers. - */ -class ConnectionManager { - public: - - /** - * Observer notified of ConnectionManager events. - */ - struct Observer : public RefCounted { - /** Called when a connection is attached. */ - virtual void created(Connection&) {} - }; - - /** Called to create a new Connection, applies observers. */ - std::auto_ptr<Connection> create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false); - - /** Add an observer */ - void add(const boost::intrusive_ptr<Observer>&); - - private: - typedef std::vector<boost::intrusive_ptr<Observer> > Observers; - - sys::Mutex lock; - Observers observers; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/ diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 69ef29c3eb..e7190fdae6 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -55,11 +55,8 @@ std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const Ses throw SessionBusyException(QPID_MSG("Session already attached: " << id)); Detached::iterator i = std::find(detached.begin(), detached.end(), id); std::auto_ptr<SessionState> state; - if (i == detached.end()) { + if (i == detached.end()) state.reset(new SessionState(broker, h, id, config)); - for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*state))); - } else { state.reset(detached.release(i).release()); state->attach(h); @@ -99,8 +96,4 @@ void SessionManager::eraseExpired() { } } -void SessionManager::add(const intrusive_ptr<Observer>& o) { - observers.push_back(o); -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 9a4142f613..db88e7ec10 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -46,14 +46,6 @@ class SessionHandler; */ class SessionManager : private boost::noncopyable { public: - /** - * Observer notified of SessionManager events. - */ - struct Observer : public RefCounted { - /** Called when a stateless session is attached. */ - virtual void opened(SessionState&) {} - }; - SessionManager(const qpid::SessionState::Configuration&, Broker&); ~SessionManager(); @@ -67,9 +59,6 @@ class SessionManager : private boost::noncopyable { /** Forget about an attached session. Called by SessionState destructor. */ void forget(const SessionId&); - /** Add an Observer. */ - void add(const boost::intrusive_ptr<Observer>&); - Broker& getBroker() const { return broker; } const qpid::SessionState::Configuration& getSessionConfig() const { return config; } @@ -77,7 +66,6 @@ class SessionManager : private boost::noncopyable { private: typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order. typedef std::set<SessionId> Attached; - typedef std::vector<boost::intrusive_ptr<Observer> > Observers; void eraseExpired(); @@ -85,7 +73,6 @@ class SessionManager : private boost::noncopyable { Detached detached; Attached attached; qpid::SessionState::Configuration config; - Observers observers; Broker& broker; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 8a17a787a2..0a122fcae8 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -224,8 +224,8 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } -void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } +void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); } void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); @@ -291,8 +291,4 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } -framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } - -framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 5d18ed161e..f6bf98d431 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,6 +23,7 @@ */ #include "qpid/SessionState.h" +#include "qpid/HandlerChain.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" @@ -58,8 +59,8 @@ class SessionHandler; class SessionManager; /** - * Broker-side session state includes sessions handler chains, which may - * themselves have state. + * Broker-side session state includes session's handler chains, which + * may themselves have state. */ class SessionState : public qpid::SessionState, public SessionContext, @@ -101,8 +102,9 @@ class SessionState : public qpid::SessionState, void readyToSend(); - framing::FrameHandler::Chain& getInChain(); - framing::FrameHandler::Chain& getOutChain(); + // Tag types to identify PluginHandlerChains. + struct InTag {}; + struct OutTag {}; private: @@ -131,7 +133,9 @@ class SessionState : public qpid::SessionState, management::Session* mgmtObject; framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; - framing::FrameHandler::Chain inChain, outChain; + + qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain; + qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain; friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d97a840f82..4ea77e7fbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" +#include "qpid/memory.h" #include <boost/bind.hpp> #include <boost/scoped_array.hpp> #include <algorithm> @@ -36,25 +37,12 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -namespace { - -// FIXME aconway 2008-07-01: sending every frame to cluster, -// serializing all processing in cluster deliver thread. -// This will not perform at all, but provides a correct starting point. -// -// TODO: -// - Fake "Connection" for cluster: owns shadow sessions. -// - Maintain shadow sessions. -// - Apply foreign frames to shadow sessions. -// - - // Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public FrameHandler { - Connection& connection; +struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler { + Cluster::ConnectionChain& connection; Cluster& cluster; - ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} + ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} void handle(AMQFrame& f) { // FIXME aconway 2008-01-29: Refcount Connections to ensure @@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler { } }; -struct ConnectionObserver : public broker::ConnectionManager::Observer { - Cluster& cluster; - ConnectionObserver(Cluster& c) : cluster(c) {} - - void created(Connection& c) { - // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); - c.getInChain().insert(sender); - } -}; +void Cluster::initialize(Cluster::ConnectionChain& cc) { + cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); } ostream& operator <<(ostream& out, const Cluster& cluster) { @@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), url(url_), - observer(new ConnectionObserver(*this)), self(cpg.self()) { QPID_LOG(trace, "Joining cluster: " << name_); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 031baf914a..84b5ed072c 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -22,6 +22,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" +#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -47,6 +48,8 @@ namespace cluster { class Cluster : private sys::Runnable, private Cpg::Handler { public: + typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + /** Details of a cluster member */ struct Member { Member(const Url& url_=Url()) : url(url_) {} @@ -62,11 +65,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler */ Cluster(const std::string& name, const Url& url, broker::Broker&); + // Add cluster handlers to broker chains. + void initialize(ConnectionChain&); + virtual ~Cluster(); - // FIXME aconway 2008-01-29: - boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; } - /** Get the current cluster membership. */ MemberList getMembers() const; @@ -124,7 +127,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - boost::intrusive_ptr<broker::ConnectionManager::Observer> observer; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6d3dca84be..c4b67de141 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,24 +54,29 @@ struct ClusterOptions : public Options { }; struct ClusterPlugin : public Plugin { + typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; ClusterOptions options; boost::optional<Cluster> cluster; - Options* getOptions() { return &options; } + template <class Chain> void init(Plugin::Target& t) { + Chain* c = dynamic_cast<Chain*>(&t); + if (c) cluster->initialize(*c); + } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getConnectionManager().add(cluster->getObserver()); + return; } + if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. + init<ConnectionChain>(target); } }; diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index edd7f469b0..a2a8ee7bfa 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -28,7 +28,6 @@ namespace qpid { namespace framing { -/** Generic handler that can be linked into chains. */ template <class T> struct Handler { typedef T HandledType; @@ -46,23 +45,6 @@ struct Handler { /** Pointer to next handler in a linked list. */ Handler<T>* next; - /** A Chain is a handler holding a linked list of sub-handlers. - * Chain::next is invoked after the full chain, it is not itself part of the chain. - * Handlers inserted into the chain are deleted by the Chain dtor. - */ - class Chain : public Handler<T> { - public: - Chain(Handler<T>& next_) : Handler(&next_), first(&next_) {} - ~Chain() { while (first != next) pop(); } - void handle(T t) { first->handle(t); } - void insert(Handler<T>* h) { h->next = first; first = h; } - bool empty() { return first == next; } - - private: - void pop() { Handler<T>* p=first; first=first->next; delete p; } - Handler<T>* first; - }; - /** Adapt any void(T) functor as a Handler. * Functor<F>(f) will copy f. * Functor<F&>(f) will only take a reference to x. @@ -84,7 +66,7 @@ struct Handler { MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} void handle(T t) { (target->*F)(t); } - /** Allow calling with -> syntax, compatible with Chains */ + /** Allow calling with -> syntax, like a qpid::HandlerChain */ MemFunRef* operator->() { return this; } private: @@ -103,15 +85,13 @@ struct Handler { }; /** Support for implementing an in-out handler pair as a single class. - * Public interface is Handler<T>::Chains pair, but implementation - * overrides handleIn, handleOut functions in a single class. + * Overrides handleIn, handleOut functions in a single class. */ struct InOutHandler : protected InOutHandlerInterface { InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in; MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out; }; - }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 65aa4d5a28..beab305f75 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -174,7 +174,7 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { } } -QPID_AUTO_TEST_CASE(testMessageReplication) { +QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); Client c0(cluster[0].getPort()); @@ -190,6 +190,28 @@ QPID_AUTO_TEST_CASE(testMessageReplication) { BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } -// TODO aconway 2008-06-25: dequeue replication, failover. +QPID_AUTO_TEST_CASE(testMessageDequeue) { + // Enqueue on one broker, dequeue on two others. + ClusterFixture cluster (3); + Client c0(cluster[0].getPort()); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.close(); + + Message msg; + + Client c1(cluster[1].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("foo", msg.getData()); + + Client c2(cluster[2].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("bar", msg.getData()); + QueueQueryResult r = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(0, r.getMessageCount()); +} + +// TODO aconway 2008-06-25: failover. QPID_AUTO_TEST_SUITE_END() |