diff options
author | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
commit | cb566519d58ded6704507fa5530bf901e620edf6 (patch) | |
tree | ab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src | |
parent | 3f900af77d5f781431dc25e307974e0fc27aa561 (diff) | |
download | qpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz |
* Summary:
- Connect cluster handlers into broker handler chains.
- Progress on wiring replication.
* src/tests/cluster.mk: Temporarily disabled Cluster test.
* src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.
* src/qpidd.cpp:
- Load optional libs (cluster)
- Include plugin config in options.parse.
* src/qpid/cluster/SessionManager.h:
- Create sessions, update handler chains (as HandlerUpdater)
- Handle frames from cluster.
* src/qpid/cluster/ClusterPlugin.h, .cpp:
- renamed from ClusterPluginProvider
- Create and connect Cluster and SessionManager.
- Register SessionManager as HandlerUpdater.
* src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.
* src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.
* src/qpid/broker/Broker.h, .cpp:
- Initialize plugins
- Apply HandlerUpdaters
* src/qpid/Plugin.h, .cpp: Simplified plugin framework.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 6 | ||||
-rw-r--r-- | cpp/src/qpid/Plugin.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/Plugin.h | 66 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 34 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 62 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp (renamed from cpp/src/qpid/cluster/ClusterPluginProvider.cpp) | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 103 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 68 | ||||
-rw-r--r-- | cpp/src/qpid/framing/HandlerUpdater.h | 8 | ||||
-rw-r--r-- | cpp/src/qpidd.cpp | 61 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 10 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 9 |
18 files changed, 368 insertions, 181 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index bfcc4fd850..dcc70ca897 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -11,11 +11,13 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ - qpid/cluster/ClusterPluginProvider.cpp \ + qpid/cluster/ClusterPlugin.cpp \ qpid/cluster/ClassifierHandler.h \ qpid/cluster/ClassifierHandler.cpp \ qpid/cluster/SessionFrame.h \ - qpid/cluster/SessionFrame.cpp + qpid/cluster/SessionFrame.cpp \ + qpid/cluster/SessionManager.h \ + qpid/cluster/SessionManager.cpp libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index c6513d33e8..d38b53a56e 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -22,19 +22,19 @@ namespace qpid { -std::vector<PluginProvider*> PluginProvider::providers; +Plugin::Plugins Plugin::plugins; -PluginProvider::PluginProvider() { +Plugin::Plugin() { // Register myself. - providers.push_back(this); + plugins.push_back(this); } -PluginProvider::~PluginProvider() {} +Plugin::~Plugin() {} -Options* PluginProvider::getOptions() { return 0; } +Options* Plugin::getOptions() { return 0; } -const std::vector<PluginProvider*>& PluginProvider::getProviders() { - return providers; +const Plugin::Plugins& Plugin::getPlugins() { + return plugins; } } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index f50e97eb00..e684d238a3 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -32,70 +32,56 @@ namespace qpid { class Options; -/** Generic base class to allow dynamic casting of generic Plugin objects - * to concrete types. - */ -struct Plugin : private boost::noncopyable { - virtual ~Plugin() {} -}; - -/** Generic interface for anything that uses plug-ins. */ -struct PluginUser : boost::noncopyable { - virtual ~PluginUser() {} - /** - * Called by a PluginProvider to provide a plugin. - * - * A concrete PluginUser will dynamic_pointer_cast plugin to a - * class it knows how to use. A PluginUser should ignore plugins - * it does not recognize. - * - * The user will release its shared_ptr when it is finished using - * plugin. - */ - virtual void use(const shared_ptr<Plugin>& plugin) = 0; -}; - /** - * Base for classes that provide plug-ins. + * Plug-in base class. */ -class PluginProvider : boost::noncopyable +class Plugin : boost::noncopyable { public: /** - * Register the provider to appear in getProviders() + * Base interface for targets that receive plug-ins. + * + * The Broker is a plug-in target, there might be others + * in future. + */ + struct Target { virtual ~Target() {} }; + + typedef std::vector<Plugin*> Plugins; + + /** + * Construct registers the plug-in to appear in getPlugins(). * - * A concrete PluginProvider is instantiated as a global or static + * A concrete Plugin is instantiated as a global or static * member variable in a library so it is registered during static * initialization when the library is loaded. */ - PluginProvider(); + Plugin(); - virtual ~PluginProvider(); + virtual ~Plugin(); /** - * Returns configuration options for the plugin. + * Configuration options for the plugin. * Then will be updated during option parsing by the host program. * * @return An options group or 0 for no options. Default returns 0. - * PluginProvider retains ownership of return value. + * Plugin retains ownership of return value. */ virtual Options* getOptions(); - /** Provide plugins to a PluginUser. + /** + * Initialize Plugin functionality on a Target. * - * The provider can dynamic_cast the user if it only provides - * plugins to certain types of user. Providers should ignore - * users they don't recognize. + * Plugins should ignore targets they don't recognize. */ - virtual void provide(PluginUser& user) = 0; + virtual void initialize(Target&) = 0; - /** Get the list of pointers to the registered providers. - * Caller must not delete the pointers. + /** List of registered Plugin objects. + * Caller must not delete plugin pointers. */ - static const std::vector<PluginProvider*>& getProviders(); + static const Plugins& getPlugins(); private: - static std::vector<PluginProvider*> providers; + static Plugins plugins; }; } // namespace qpid diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9c8e98ec9a..86342b3c43 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,7 +20,6 @@ */ #include "Broker.h" - #include "Connection.h" #include "DirectExchange.h" #include "FanOutExchange.h" @@ -40,11 +39,14 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" +#include <boost/bind.hpp> + #include <iostream> #include <memory> using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; +using qpid::framing::FrameHandler; namespace qpid { namespace broker { @@ -98,6 +100,12 @@ Broker::Broker(const Broker::Options& conf) : store->recover(recoverer); } + // Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->initialize(*this); } @@ -149,13 +157,14 @@ Acceptor& Broker::getAcceptor() const { return *acceptor; } -void Broker::use(const shared_ptr<Plugin>& plugin) { - shared_ptr<HandlerUpdater> updater= - dynamic_pointer_cast<HandlerUpdater>(plugin); - if (updater) { - QPID_LOG(critical, "HandlerUpdater plugins not implemented"); - // FIXME aconway 2007-06-28: hook into Connections. - } +void Broker::add(const shared_ptr<HandlerUpdater>& updater) { + QPID_LOG(debug, "Broker added HandlerUpdater"); + handlerUpdaters.push_back(updater); +} + +void Broker::update(FrameHandler::Chains& chains) { + for_each(handlerUpdaters.begin(), handlerUpdaters.end(), + boost::bind(&HandlerUpdater::update, _1, chains)); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a27bce1751..9f57a45e0c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -23,19 +23,23 @@ */ #include "ConnectionFactory.h" -#include "qpid/Url.h" -#include "qpid/Plugin.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Acceptor.h" -#include "MessageStore.h" -#include "ExchangeRegistry.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/ProtocolInitiation.h" +#include "ExchangeRegistry.h" +#include "MessageStore.h" #include "QueueRegistry.h" #include "qpid/Options.h" +#include "qpid/Plugin.h" +#include "qpid/Url.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/Runnable.h" + +#include <vector> namespace qpid { @@ -48,7 +52,7 @@ namespace broker { /** * A broker instance. */ -class Broker : public sys::Runnable, public PluginUser +class Broker : public sys::Runnable, public Plugin::Target { public: struct Options : public qpid::Options { @@ -88,26 +92,32 @@ class Broker : public sys::Runnable, public PluginUser /** Shut down the broker */ virtual void shutdown(); - /** Use a plugin */ - void use(const shared_ptr<Plugin>& plugin); + /** Register a handler updater. */ + void add(const shared_ptr<framing::HandlerUpdater>&); + + /** Apply all handler updaters to a handler chain pair. */ + void update(framing::FrameHandler::Chains&); MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } uint64_t getStagingThreshold() { return stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } - + private: sys::Acceptor& getAcceptor() const; Options config; sys::Acceptor::shared_ptr acceptor; const std::auto_ptr<MessageStore> store; + typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters; + QueueRegistry queues; ExchangeRegistry exchanges; uint64_t stagingThreshold; ConnectionFactory factory; DtxManager dtxManager; + HandlerUpdaters handlerUpdaters; static MessageStore* createStore(const Options& config); }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 978228a364..7a987f28d2 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -104,7 +104,10 @@ void Connection::closeChannel(uint16_t id) { FrameHandler::Chains& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out)); + FrameHandler::Chains chains( + new SemanticHandler(id, *this), + new OutputHandlerFrameHandler(*out)); + broker.update(chains); i = channels.insert(ChannelMap::value_type(id, chains)).first; } return i->second; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f2d1b75f3f..256378ccd5 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,6 +32,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; + ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; } @@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -struct Cluster::IncomingHandler : public FrameHandler { - IncomingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::IN); - cluster.mcast(sf); - } - Cluster& cluster; -}; - -struct Cluster::OutgoingHandler : public FrameHandler { - OutgoingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); - cluster.mcast(sf); - } - Cluster& cluster; -}; -// TODO aconway 2007-06-28: Right now everything is backed up via -// multicast. When we have point-to-point backups the -// Incoming/Outgoing handlers must determine where each frame should -// be sent: to multicast or only to specific backup(s) via AMQP. -Cluster::Cluster(const std::string& name_, const std::string& url_) : +Cluster::Cluster( + const std::string& name_, const std::string& url_, + const SessionFrameHandler::Chain& next +) : + SessionFrameHandler(next), cpg(new Cpg(*this)), name(name_), url(url_), - self(cpg->getLocalNoideId(), getpid()), - toChains(new IncomingHandler(*this), new OutgoingHandler(*this)) + self(cpg->getLocalNoideId(), getpid()) { - QPID_LOG(trace, *this << " Joining cluster."); + QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg->join(name); notify(); dispatcher=Thread(*this); @@ -102,7 +85,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(SessionFrame& frame) { +void Cluster::handle(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) { void Cluster::notify() { SessionFrame sf; sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); - mcast(sf); + handle(sf); } size_t Cluster::size() const { @@ -122,11 +105,6 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { - Mutex::ScopedLock l(lock); - receivedChain = chain; -} - Cluster::MemberList Cluster::getMembers() const { // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); @@ -152,7 +130,7 @@ void Cluster::deliver( if (frame.uuid.isNull()) handleClusterFrame(from, frame.frame); else - receivedChain->handle(frame); + next->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, } // Handle cluster control frame from the null session. -bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { +void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); - if (notifyIn) { + assert(notifyIn); MemberList list; { Mutex::ScopedLock l(lock); - if (!members[from]) - members[from].reset(new Member(url)); + shared_ptr<Member>& member=members[from]; + if (!member) + member.reset(new Member(notifyIn->getUrl())); else - members[from]->url = notifyIn->getUrl(); - QPID_LOG(trace, *this << ": member update: " << members); + member->url = notifyIn->getUrl(); lock.notifyAll(); + QPID_LOG(trace, *this << ": members joined: " << members); } - return true; - } - return false; } void Cluster::configChange( @@ -207,7 +183,7 @@ void Cluster::configChange( // We don't record members joining here, we record them when // we get their ClusterNotify message. } - if (newMembers) + if (newMembers) // Notify new members of my presence. notify(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 6ab4cb58df..f6afe14c62 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -37,11 +37,17 @@ namespace qpid { namespace cluster { /** - * Represents a cluster, provides access to data about members. - * Implements HandlerUpdater to manage handlers that route frames to - * and from the cluster. + * Connection to the cluster. Maintains cluster membership + * data. + * + * As SessionFrameHandler, handles frames by sending them to the + * cluster, sends frames received from the cluster to the next + * SessionFrameHandler. + * + * */ -class Cluster : private sys::Runnable, private Cpg::Handler +class Cluster : public SessionFrameHandler, + private sys::Runnable, private Cpg::Handler { public: /** Details of a cluster member */ @@ -57,8 +63,10 @@ class Cluster : private sys::Runnable, private Cpg::Handler * Join a cluster. * @param name of the cluster. * @param url of this broker, sent to the cluster. + * @param handler for frames received from the cluster. */ - Cluster(const std::string& name, const std::string& url); + Cluster(const std::string& name, const std::string& url, + const SessionFrameHandler::Chain& next); virtual ~Cluster(); @@ -70,14 +78,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Get handler chains to send incoming/outgoing frames to the cluster */ - framing::FrameHandler::Chains getSendChains() { - return toChains; - } - - /** Set handler for frames received from the cluster */ - void setReceivedChain(const SessionFrameHandler::Chain& chain); - /** Wait for predicate(*this) to be true, up to timeout. *@return True if predicate became true, false if timed out. *Note the predicate may not be true after wait returns, @@ -86,13 +86,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool wait(boost::function<bool(const Cluster&)> predicate, sys::Duration timeout=sys::TIME_INFINITE) const; + /** Send frame to the cluster */ + void handle(SessionFrame&); + private: typedef Cpg::Id Id; typedef std::map<Id, shared_ptr<Member> > MemberMap; - typedef std::map< - framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - void mcast(SessionFrame&); ///< send frame by multicast. void notify(); ///< Notify cluster of my details. void deliver( @@ -112,7 +112,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler ); void run(); - bool handleClusterFrame(Id from, framing::AMQFrame&); + void handleClusterFrame(Id from, framing::AMQFrame&); mutable sys::Monitor lock; boost::scoped_ptr<Cpg> cpg; @@ -120,17 +120,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler std::string url; Id self; MemberMap members; - ChannelMap channels; sys::Thread dispatcher; boost::function<void()> callback; - framing::FrameHandler::Chains toChains; - SessionFrameHandler::Chain receivedChain; - - struct IncomingHandler; - struct OutgoingHandler; - - friend struct IncomingHandler; - friend struct OutgoingHandler; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index d48fbadf7b..10b1c44f40 100644 --- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -16,8 +16,8 @@ * */ #include "qpid/broker/Broker.h" -#include "qpid/framing/HandlerUpdater.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/SessionManager.h" #include "qpid/Plugin.h" #include "qpid/Options.h" @@ -26,11 +26,11 @@ namespace cluster { using namespace std; -struct ClusterPluginProvider : public PluginProvider { +struct ClusterPlugin : public Plugin { struct ClusterOptions : public Options { string clusterName; - ClusterOptions() { + ClusterOptions() : Options("Cluster Options") { addOptions() ("cluster", optValue(clusterName, "NAME"), "Join the cluster named NAME"); @@ -39,22 +39,25 @@ struct ClusterPluginProvider : public PluginProvider { ClusterOptions options; shared_ptr<Cluster> cluster; + shared_ptr<SessionManager> sessions; Options* getOptions() { return &options; } - void provide(PluginUser& user) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&user); + void initialize(Plugin::Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - cluster.reset(new Cluster(options.clusterName, broker->getUrl())); - // FIXME aconway 2007-06-29: register HandlerUpdater. + sessions.reset(new SessionManager()); + cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); + sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + broker->add(sessions); } } }; -static ClusterPluginProvider instance; // Static initialization. +static ClusterPlugin instance; // Static initialization. }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp new file mode 100644 index 0000000000..24f201535d --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "SessionManager.h" +#include "ClassifierHandler.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace sys; + +/** Wrap plain AMQFrames in SessionFrames */ +struct FrameWrapperHandler : public FrameHandler { + + FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) + : uuid(id), direction(dir), next(next_) { + assert(!uuid.isNull()); + } + + void handle(AMQFrame& frame) { + SessionFrame sf(uuid, frame, direction); + assert(next); + next->handle(sf); + } + + Uuid uuid; + bool direction; + SessionFrameHandler::Chain next; +}; + +SessionManager::SessionManager() {} + +void SessionManager::update(FrameHandler::Chains& chains) +{ + Mutex::ScopedLock l(lock); + // Create a new local session, store local chains. + Uuid uuid(true); + sessions[uuid] = chains; + + // Replace local incoming chain. Build from the back. + // + // TODO aconway 2007-07-05: Currently mcast wiring, bypass + // everythign else. + assert(clusterSend); + FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); + FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); + chains.in = classify; + + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // + + // Leave outgoing chain unmodified. + // TODO aconway 2007-07-05: Failover will require replication of + // outgoing frames to session replicas. + +} + +void SessionManager::handle(SessionFrame& frame) { + // Incoming from frame. + FrameHandler::Chains chains; + { + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(frame.uuid); + if (i == sessions.end()) { + QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); + chains = nonLocal; + } + else { + QPID_LOG(trace, "Local frame from cluster: " << frame.frame); + chains = i->second; + } + } + FrameHandler::Chain chain = + chain = frame.isIncoming ? chains.in : chains.out; + // TODO aconway 2007-07-11: Should this be assert(chain) + if (chain) + chain->handle(frame.frame); + + // TODO aconway 2007-07-05: Here's where we should unblock frame + // dispatch for the channel. +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h new file mode 100644 index 0000000000..c23efde18e --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_SESSIONMANAGER_H +#define QPID_CLUSTER_SESSIONMANAGER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/BrokerChannel.h" +#include "qpid/cluster/SessionFrame.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" + +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Manage sessions and handler chains for the cluster. + * + */ +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +{ + public: + SessionManager(); + + /** Set the handler to send to the cluster */ + void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } + + /** As ChannelUpdater update the handler chains. */ + void update(framing::FrameHandler::Chains& chains); + + /** As SessionFrameHandler handle frames received from the cluster */ + void handle(SessionFrame&); + + /** Get ChannelID for UUID. Return 0 if no mapping */ + framing::ChannelId getChannelId(const framing::Uuid&) const; + + private: + typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; + + sys::Mutex lock; + SessionFrameHandler::Chain clusterSend; + SessionMap sessions; + framing::FrameHandler::Chains nonLocal; +}; + + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h index 5cb1e87d6e..b9497e4f12 100644 --- a/cpp/src/qpid/framing/HandlerUpdater.h +++ b/cpp/src/qpid/framing/HandlerUpdater.h @@ -26,13 +26,15 @@ namespace qpid { namespace framing { -/** Plugin object that can update handler chains. */ -struct HandlerUpdater : public Plugin { +/** Interface for objects that can update handler chains. */ +struct HandlerUpdater { + virtual ~HandlerUpdater() {} + /** Update the handler chains. *@param id Unique identifier for channel or session. *@param chains Handler chains to be updated. */ - virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0; + virtual void update(FrameHandler::Chains& chains) = 0; }; }} // namespace qpid::framing diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp index 7ab730bf52..53a68fc815 100644 --- a/cpp/src/qpidd.cpp +++ b/cpp/src/qpidd.cpp @@ -24,6 +24,8 @@ #include "qpid/log/Statement.h" #include "qpid/log/Options.h" #include "qpid/log/Logger.h" +#include "qpid/Plugin.h" +#include "qpid/sys/Shlib.h" #include "config.h" #include <boost/filesystem/path.hpp> #include <iostream> @@ -54,10 +56,10 @@ struct DaemonOptions : public qpid::Options { struct QpiddOptions : public qpid::Options { - DaemonOptions daemon; + CommonOptions common; Broker::Options broker; + DaemonOptions daemon; qpid::log::Options log; - CommonOptions common; QpiddOptions() : qpid::Options("Options") { common.config = "/etc/qpidd.conf"; @@ -65,6 +67,12 @@ struct QpiddOptions : public qpid::Options { add(broker); add(daemon); add(log); + const Plugin::Plugins& plugins= + Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + ++i) + add(*(*i)->getOptions()); } void usage() const { @@ -74,7 +82,7 @@ struct QpiddOptions : public qpid::Options { // Globals shared_ptr<Broker> brokerPtr; -QpiddOptions options; +auto_ptr<QpiddOptions> options; void shutdownHandler(int signal){ QPID_LOG(notice, "Shutting down on signal " << signal); @@ -84,45 +92,60 @@ void shutdownHandler(int signal){ struct QpiddDaemon : public Daemon { /** Code for parent process */ void parent() { - uint16_t port = wait(options.daemon.wait); - if (options.broker.port == 0) + uint16_t port = wait(options->daemon.wait); + if (options->broker.port == 0) cout << port << endl; } /** Code for forked child process */ void child() { - brokerPtr.reset(new Broker(options.broker)); + brokerPtr.reset(new Broker(options->broker)); uint16_t port=brokerPtr->getPort(); ready(port); // Notify parent. brokerPtr->run(); } }; +void tryShlib(const char* libname) { + try { + Shlib shlib(libname); + } + catch (const exception& e) { + // TODO aconway 2007-07-09: Should log failures as INFO + // at least, but we try shlibs before logging is configured. + } +} + int main(int argc, char* argv[]) { try { - options.parse(argc, argv, options.common.config); - qpid::log::Logger::instance().configure(options.log, argv[0]); + // Load optional modules + tryShlib("libqpidcluster.so.0"); + + // Parse options + options.reset(new QpiddOptions()); + options->parse(argc, argv, options->common.config); + qpid::log::Logger::instance().configure(options->log, argv[0]); // Options that just print information. - if(options.common.help || options.common.version) { - if (options.common.version) + if(options->common.help || options->common.version) { + if (options->common.version) cout << "qpidd (" << PACKAGE_NAME << ") version " << PACKAGE_VERSION << endl; - else if (options.common.help) - options.usage(); + else if (options->common.help) + options->usage(); return 0; } // Options that affect a running daemon. - if (options.daemon.check || options.daemon.quit) { - pid_t pid = Daemon::getPid(options.broker.port); + if (options->daemon.check || options->daemon.quit) { + pid_t pid = Daemon::getPid(options->broker.port); if (pid < 0) return 1; - if (options.daemon.check) + if (options->daemon.check) cout << pid << endl; - if (options.daemon.quit && kill(pid, SIGINT) < 0) + if (options->daemon.quit && kill(pid, SIGINT) < 0) throw Exception("Failed to stop daemon: " + strError(errno)); return 0; } @@ -139,14 +162,14 @@ int main(int argc, char* argv[]) signal(SIGTTOU,SIG_IGN); signal(SIGTTIN,SIG_IGN); - if (options.daemon.daemon) { + if (options->daemon.daemon) { // Fork the daemon QpiddDaemon d; d.fork(); } else { // Non-daemon broker. - brokerPtr.reset(new Broker(options.broker)); - if (options.broker.port == 0) + brokerPtr.reset(new Broker(options->broker)); + if (options->broker.port == 0) cout << uint16_t(brokerPtr->getPort()) << endl; brokerPtr->run(); } diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index 2e6d9ecfff..56e17e06db 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -36,11 +36,14 @@ using namespace qpid::log; BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); AMQFrame frame(VER, 1, new ChannelPingBody(VER)); - cluster.getSendChains().in->handle(frame); + Uuid id(true); + SessionFrame send(id, frame, true); + cluster.handle(send); BOOST_REQUIRE(cluster.received.waitFor(1)); SessionFrame& sf=cluster.received[0]; BOOST_CHECK(sf.isIncoming); + BOOST_CHECK_EQUAL(id, sf.uuid); BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); BOOST_CHECK_EQUAL(1u, cluster.size()); @@ -60,9 +63,12 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { // Exchange frames with child. AMQFrame frame(VER, 1, new ChannelPingBody(VER)); - cluster.getSendChains().in->handle(frame); + Uuid id(true); + SessionFrame send(id, frame, true); + cluster.handle(send); BOOST_REQUIRE(cluster.received.waitFor(1)); SessionFrame& sf=cluster.received[0]; + BOOST_CHECK_EQUAL(id, sf.uuid); BOOST_CHECK(sf.isIncoming); BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); BOOST_REQUIRE(cluster.received.waitFor(2)); diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index 8fddd1d1f7..bf6d1c2a64 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -24,7 +24,10 @@ #include "qpid/framing/ChannelOkBody.h" #include "qpid/framing/BasicGetOkBody.h" #include "qpid/log/Logger.h" + #include <boost/bind.hpp> +#include <boost/test/test_tools.hpp> + #include <iostream> #include <vector> #include <functional> @@ -69,13 +72,12 @@ void nullDeleter(void*) {} struct TestCluster : public Cluster { - TestCluster(string name, string url) : Cluster(name, url) - { - setReceivedChain(make_shared_ptr(&received, nullDeleter)); - } + TestCluster(string name, string url) + : Cluster(name, url, make_shared_ptr(&received, nullDeleter)) {} /** Wait for cluster to be of size n. */ bool waitFor(size_t n) { + BOOST_CHECKPOINT("About to call Cluster::wait"); return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n)); } diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index 216afc7bca..9c119e5238 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -39,7 +39,9 @@ void clusterTwo() { BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - cluster.getSendChains().out->handle(frame); + Uuid id(true); + SessionFrame sf(id, frame, false); + cluster.handle(sf); BOOST_REQUIRE(cluster.received.waitFor(2)); BOOST_CHECK(!cluster.received[1].isIncoming); BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 93c3e80673..4f1e7e1ec3 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -170,7 +170,7 @@ all-am: .valgrind.supp .valgrindrc # ltmain invocations, one may corrupt the temporaries of the other. .NOTPARALLEL: -CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp dummy_test $(unit_wrappers) +CLEANFILES=valgrind.out *.log *.vglog .valgrindrc .valgrind.supp dummy_test $(unit_wrappers) MAINTAINERCLEANFILES=gen.mk interop_runner_SOURCES = \ diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 7407565f62..506624569f 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -20,10 +20,11 @@ check_PROGRAMS+=Cpg Cpg_SOURCES=Cpg.cpp Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework -TESTS+=Cluster -check_PROGRAMS+=Cluster -Cluster_SOURCES=Cluster.cpp Cluster.h -Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework +# FIXME aconway 2007-07-19: +# TESTS+=Cluster +# check_PROGRAMS+=Cluster +# Cluster_SOURCES=Cluster.cpp Cluster.h +# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework check_PROGRAMS+=Cluster_child Cluster_child_SOURCES=Cluster_child.cpp Cluster.h |