summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
committerAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
commitcb566519d58ded6704507fa5530bf901e620edf6 (patch)
treeab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src
parent3f900af77d5f781431dc25e307974e0fc27aa561 (diff)
downloadqpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz
* Summary:
- Connect cluster handlers into broker handler chains. - Progress on wiring replication. * src/tests/cluster.mk: Temporarily disabled Cluster test. * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs. * src/qpidd.cpp: - Load optional libs (cluster) - Include plugin config in options.parse. * src/qpid/cluster/SessionManager.h: - Create sessions, update handler chains (as HandlerUpdater) - Handle frames from cluster. * src/qpid/cluster/ClusterPlugin.h, .cpp: - renamed from ClusterPluginProvider - Create and connect Cluster and SessionManager. - Register SessionManager as HandlerUpdater. * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler. * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters. * src/qpid/broker/Broker.h, .cpp: - Initialize plugins - Apply HandlerUpdaters * src/qpid/Plugin.h, .cpp: Simplified plugin framework. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk6
-rw-r--r--cpp/src/qpid/Plugin.cpp14
-rw-r--r--cpp/src/qpid/Plugin.h66
-rw-r--r--cpp/src/qpid/broker/Broker.cpp25
-rw-r--r--cpp/src/qpid/broker/Broker.h34
-rw-r--r--cpp/src/qpid/broker/Connection.cpp5
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
-rw-r--r--cpp/src/qpid/cluster/Cluster.h43
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp (renamed from cpp/src/qpid/cluster/ClusterPluginProvider.cpp)19
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp103
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h68
-rw-r--r--cpp/src/qpid/framing/HandlerUpdater.h8
-rw-r--r--cpp/src/qpidd.cpp61
-rw-r--r--cpp/src/tests/Cluster.cpp10
-rw-r--r--cpp/src/tests/Cluster.h10
-rw-r--r--cpp/src/tests/Cluster_child.cpp4
-rw-r--r--cpp/src/tests/Makefile.am2
-rw-r--r--cpp/src/tests/cluster.mk9
18 files changed, 368 insertions, 181 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index bfcc4fd850..dcc70ca897 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -11,11 +11,13 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
- qpid/cluster/ClusterPluginProvider.cpp \
+ qpid/cluster/ClusterPlugin.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
qpid/cluster/SessionFrame.h \
- qpid/cluster/SessionFrame.cpp
+ qpid/cluster/SessionFrame.cpp \
+ qpid/cluster/SessionManager.h \
+ qpid/cluster/SessionManager.cpp
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp
index c6513d33e8..d38b53a56e 100644
--- a/cpp/src/qpid/Plugin.cpp
+++ b/cpp/src/qpid/Plugin.cpp
@@ -22,19 +22,19 @@
namespace qpid {
-std::vector<PluginProvider*> PluginProvider::providers;
+Plugin::Plugins Plugin::plugins;
-PluginProvider::PluginProvider() {
+Plugin::Plugin() {
// Register myself.
- providers.push_back(this);
+ plugins.push_back(this);
}
-PluginProvider::~PluginProvider() {}
+Plugin::~Plugin() {}
-Options* PluginProvider::getOptions() { return 0; }
+Options* Plugin::getOptions() { return 0; }
-const std::vector<PluginProvider*>& PluginProvider::getProviders() {
- return providers;
+const Plugin::Plugins& Plugin::getPlugins() {
+ return plugins;
}
} // namespace qpid
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h
index f50e97eb00..e684d238a3 100644
--- a/cpp/src/qpid/Plugin.h
+++ b/cpp/src/qpid/Plugin.h
@@ -32,70 +32,56 @@
namespace qpid {
class Options;
-/** Generic base class to allow dynamic casting of generic Plugin objects
- * to concrete types.
- */
-struct Plugin : private boost::noncopyable {
- virtual ~Plugin() {}
-};
-
-/** Generic interface for anything that uses plug-ins. */
-struct PluginUser : boost::noncopyable {
- virtual ~PluginUser() {}
- /**
- * Called by a PluginProvider to provide a plugin.
- *
- * A concrete PluginUser will dynamic_pointer_cast plugin to a
- * class it knows how to use. A PluginUser should ignore plugins
- * it does not recognize.
- *
- * The user will release its shared_ptr when it is finished using
- * plugin.
- */
- virtual void use(const shared_ptr<Plugin>& plugin) = 0;
-};
-
/**
- * Base for classes that provide plug-ins.
+ * Plug-in base class.
*/
-class PluginProvider : boost::noncopyable
+class Plugin : boost::noncopyable
{
public:
/**
- * Register the provider to appear in getProviders()
+ * Base interface for targets that receive plug-ins.
+ *
+ * The Broker is a plug-in target, there might be others
+ * in future.
+ */
+ struct Target { virtual ~Target() {} };
+
+ typedef std::vector<Plugin*> Plugins;
+
+ /**
+ * Construct registers the plug-in to appear in getPlugins().
*
- * A concrete PluginProvider is instantiated as a global or static
+ * A concrete Plugin is instantiated as a global or static
* member variable in a library so it is registered during static
* initialization when the library is loaded.
*/
- PluginProvider();
+ Plugin();
- virtual ~PluginProvider();
+ virtual ~Plugin();
/**
- * Returns configuration options for the plugin.
+ * Configuration options for the plugin.
* Then will be updated during option parsing by the host program.
*
* @return An options group or 0 for no options. Default returns 0.
- * PluginProvider retains ownership of return value.
+ * Plugin retains ownership of return value.
*/
virtual Options* getOptions();
- /** Provide plugins to a PluginUser.
+ /**
+ * Initialize Plugin functionality on a Target.
*
- * The provider can dynamic_cast the user if it only provides
- * plugins to certain types of user. Providers should ignore
- * users they don't recognize.
+ * Plugins should ignore targets they don't recognize.
*/
- virtual void provide(PluginUser& user) = 0;
+ virtual void initialize(Target&) = 0;
- /** Get the list of pointers to the registered providers.
- * Caller must not delete the pointers.
+ /** List of registered Plugin objects.
+ * Caller must not delete plugin pointers.
*/
- static const std::vector<PluginProvider*>& getProviders();
+ static const Plugins& getPlugins();
private:
- static std::vector<PluginProvider*> providers;
+ static Plugins plugins;
};
} // namespace qpid
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 9c8e98ec9a..86342b3c43 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,7 +20,6 @@
*/
#include "Broker.h"
-
#include "Connection.h"
#include "DirectExchange.h"
#include "FanOutExchange.h"
@@ -40,11 +39,14 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
+#include <boost/bind.hpp>
+
#include <iostream>
#include <memory>
using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
+using qpid::framing::FrameHandler;
namespace qpid {
namespace broker {
@@ -98,6 +100,12 @@ Broker::Broker(const Broker::Options& conf) :
store->recover(recoverer);
}
+ // Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->initialize(*this);
}
@@ -149,13 +157,14 @@ Acceptor& Broker::getAcceptor() const {
return *acceptor;
}
-void Broker::use(const shared_ptr<Plugin>& plugin) {
- shared_ptr<HandlerUpdater> updater=
- dynamic_pointer_cast<HandlerUpdater>(plugin);
- if (updater) {
- QPID_LOG(critical, "HandlerUpdater plugins not implemented");
- // FIXME aconway 2007-06-28: hook into Connections.
- }
+void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
+ QPID_LOG(debug, "Broker added HandlerUpdater");
+ handlerUpdaters.push_back(updater);
+}
+
+void Broker::update(FrameHandler::Chains& chains) {
+ for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
+ boost::bind(&HandlerUpdater::update, _1, chains));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a27bce1751..9f57a45e0c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -23,19 +23,23 @@
*/
#include "ConnectionFactory.h"
-#include "qpid/Url.h"
-#include "qpid/Plugin.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Acceptor.h"
-#include "MessageStore.h"
-#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "ExchangeRegistry.h"
+#include "MessageStore.h"
#include "QueueRegistry.h"
#include "qpid/Options.h"
+#include "qpid/Plugin.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
namespace qpid {
@@ -48,7 +52,7 @@ namespace broker {
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public PluginUser
+class Broker : public sys::Runnable, public Plugin::Target
{
public:
struct Options : public qpid::Options {
@@ -88,26 +92,32 @@ class Broker : public sys::Runnable, public PluginUser
/** Shut down the broker */
virtual void shutdown();
- /** Use a plugin */
- void use(const shared_ptr<Plugin>& plugin);
+ /** Register a handler updater. */
+ void add(const shared_ptr<framing::HandlerUpdater>&);
+
+ /** Apply all handler updaters to a handler chain pair. */
+ void update(framing::FrameHandler::Chains&);
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
uint64_t getStagingThreshold() { return stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
-
+
private:
sys::Acceptor& getAcceptor() const;
Options config;
sys::Acceptor::shared_ptr acceptor;
const std::auto_ptr<MessageStore> store;
+ typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters;
+
QueueRegistry queues;
ExchangeRegistry exchanges;
uint64_t stagingThreshold;
ConnectionFactory factory;
DtxManager dtxManager;
+ HandlerUpdaters handlerUpdaters;
static MessageStore* createStore(const Options& config);
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 978228a364..7a987f28d2 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -104,7 +104,10 @@ void Connection::closeChannel(uint16_t id) {
FrameHandler::Chains& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+ FrameHandler::Chains chains(
+ new SemanticHandler(id, *this),
+ new OutputHandlerFrameHandler(*out));
+ broker.update(chains);
i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
return i->second;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f2d1b75f3f..256378ccd5 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,6 +32,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
@@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-struct Cluster::IncomingHandler : public FrameHandler {
- IncomingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-
-struct Cluster::OutgoingHandler : public FrameHandler {
- OutgoingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-// TODO aconway 2007-06-28: Right now everything is backed up via
-// multicast. When we have point-to-point backups the
-// Incoming/Outgoing handlers must determine where each frame should
-// be sent: to multicast or only to specific backup(s) via AMQP.
-Cluster::Cluster(const std::string& name_, const std::string& url_) :
+Cluster::Cluster(
+ const std::string& name_, const std::string& url_,
+ const SessionFrameHandler::Chain& next
+) :
+ SessionFrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
- self(cpg->getLocalNoideId(), getpid()),
- toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+ self(cpg->getLocalNoideId(), getpid())
{
- QPID_LOG(trace, *this << " Joining cluster.");
+ QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg->join(name);
notify();
dispatcher=Thread(*this);
@@ -102,7 +85,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(SessionFrame& frame) {
+void Cluster::handle(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) {
void Cluster::notify() {
SessionFrame sf;
sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- mcast(sf);
+ handle(sf);
}
size_t Cluster::size() const {
@@ -122,11 +105,6 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
- Mutex::ScopedLock l(lock);
- receivedChain = chain;
-}
-
Cluster::MemberList Cluster::getMembers() const {
// TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
@@ -152,7 +130,7 @@ void Cluster::deliver(
if (frame.uuid.isNull())
handleClusterFrame(from, frame.frame);
else
- receivedChain->handle(frame);
+ next->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
}
// Handle cluster control frame from the null session.
-bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
- if (notifyIn) {
+ assert(notifyIn);
MemberList list;
{
Mutex::ScopedLock l(lock);
- if (!members[from])
- members[from].reset(new Member(url));
+ shared_ptr<Member>& member=members[from];
+ if (!member)
+ member.reset(new Member(notifyIn->getUrl()));
else
- members[from]->url = notifyIn->getUrl();
- QPID_LOG(trace, *this << ": member update: " << members);
+ member->url = notifyIn->getUrl();
lock.notifyAll();
+ QPID_LOG(trace, *this << ": members joined: " << members);
}
- return true;
- }
- return false;
}
void Cluster::configChange(
@@ -207,7 +183,7 @@ void Cluster::configChange(
// We don't record members joining here, we record them when
// we get their ClusterNotify message.
}
- if (newMembers)
+ if (newMembers) // Notify new members of my presence.
notify();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6ab4cb58df..f6afe14c62 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -37,11 +37,17 @@
namespace qpid { namespace cluster {
/**
- * Represents a cluster, provides access to data about members.
- * Implements HandlerUpdater to manage handlers that route frames to
- * and from the cluster.
+ * Connection to the cluster. Maintains cluster membership
+ * data.
+ *
+ * As SessionFrameHandler, handles frames by sending them to the
+ * cluster, sends frames received from the cluster to the next
+ * SessionFrameHandler.
+ *
+ *
*/
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : public SessionFrameHandler,
+ private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
@@ -57,8 +63,10 @@ class Cluster : private sys::Runnable, private Cpg::Handler
* Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
+ * @param handler for frames received from the cluster.
*/
- Cluster(const std::string& name, const std::string& url);
+ Cluster(const std::string& name, const std::string& url,
+ const SessionFrameHandler::Chain& next);
virtual ~Cluster();
@@ -70,14 +78,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool empty() const { return size() == 0; }
- /** Get handler chains to send incoming/outgoing frames to the cluster */
- framing::FrameHandler::Chains getSendChains() {
- return toChains;
- }
-
- /** Set handler for frames received from the cluster */
- void setReceivedChain(const SessionFrameHandler::Chain& chain);
-
/** Wait for predicate(*this) to be true, up to timeout.
*@return True if predicate became true, false if timed out.
*Note the predicate may not be true after wait returns,
@@ -86,13 +86,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool wait(boost::function<bool(const Cluster&)> predicate,
sys::Duration timeout=sys::TIME_INFINITE) const;
+ /** Send frame to the cluster */
+ void handle(SessionFrame&);
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
- typedef std::map<
- framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
- void mcast(SessionFrame&); ///< send frame by multicast.
void notify(); ///< Notify cluster of my details.
void deliver(
@@ -112,7 +112,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
- bool handleClusterFrame(Id from, framing::AMQFrame&);
+ void handleClusterFrame(Id from, framing::AMQFrame&);
mutable sys::Monitor lock;
boost::scoped_ptr<Cpg> cpg;
@@ -120,17 +120,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler
std::string url;
Id self;
MemberMap members;
- ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
- framing::FrameHandler::Chains toChains;
- SessionFrameHandler::Chain receivedChain;
-
- struct IncomingHandler;
- struct OutgoingHandler;
-
- friend struct IncomingHandler;
- friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d48fbadf7b..10b1c44f40 100644
--- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -16,8 +16,8 @@
*
*/
#include "qpid/broker/Broker.h"
-#include "qpid/framing/HandlerUpdater.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/SessionManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
@@ -26,11 +26,11 @@ namespace cluster {
using namespace std;
-struct ClusterPluginProvider : public PluginProvider {
+struct ClusterPlugin : public Plugin {
struct ClusterOptions : public Options {
string clusterName;
- ClusterOptions() {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
("cluster", optValue(clusterName, "NAME"),
"Join the cluster named NAME");
@@ -39,22 +39,25 @@ struct ClusterPluginProvider : public PluginProvider {
ClusterOptions options;
shared_ptr<Cluster> cluster;
+ shared_ptr<SessionManager> sessions;
Options* getOptions() {
return &options;
}
- void provide(PluginUser& user) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
- // FIXME aconway 2007-06-29: register HandlerUpdater.
+ sessions.reset(new SessionManager());
+ cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
+ sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ broker->add(sessions);
}
}
};
-static ClusterPluginProvider instance; // Static initialization.
+static ClusterPlugin instance; // Static initialization.
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
new file mode 100644
index 0000000000..24f201535d
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace sys;
+
+/** Wrap plain AMQFrames in SessionFrames */
+struct FrameWrapperHandler : public FrameHandler {
+
+ FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
+ : uuid(id), direction(dir), next(next_) {
+ assert(!uuid.isNull());
+ }
+
+ void handle(AMQFrame& frame) {
+ SessionFrame sf(uuid, frame, direction);
+ assert(next);
+ next->handle(sf);
+ }
+
+ Uuid uuid;
+ bool direction;
+ SessionFrameHandler::Chain next;
+};
+
+SessionManager::SessionManager() {}
+
+void SessionManager::update(FrameHandler::Chains& chains)
+{
+ Mutex::ScopedLock l(lock);
+ // Create a new local session, store local chains.
+ Uuid uuid(true);
+ sessions[uuid] = chains;
+
+ // Replace local incoming chain. Build from the back.
+ //
+ // TODO aconway 2007-07-05: Currently mcast wiring, bypass
+ // everythign else.
+ assert(clusterSend);
+ FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
+ FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
+ chains.in = classify;
+
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
+
+ // Leave outgoing chain unmodified.
+ // TODO aconway 2007-07-05: Failover will require replication of
+ // outgoing frames to session replicas.
+
+}
+
+void SessionManager::handle(SessionFrame& frame) {
+ // Incoming from frame.
+ FrameHandler::Chains chains;
+ {
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(frame.uuid);
+ if (i == sessions.end()) {
+ QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
+ chains = nonLocal;
+ }
+ else {
+ QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
+ chains = i->second;
+ }
+ }
+ FrameHandler::Chain chain =
+ chain = frame.isIncoming ? chains.in : chains.out;
+ // TODO aconway 2007-07-11: Should this be assert(chain)
+ if (chain)
+ chain->handle(frame.frame);
+
+ // TODO aconway 2007-07-05: Here's where we should unblock frame
+ // dispatch for the channel.
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
new file mode 100644
index 0000000000..c23efde18e
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_SESSIONMANAGER_H
+#define QPID_CLUSTER_SESSIONMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/cluster/SessionFrame.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage sessions and handler chains for the cluster.
+ *
+ */
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+{
+ public:
+ SessionManager();
+
+ /** Set the handler to send to the cluster */
+ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+
+ /** As ChannelUpdater update the handler chains. */
+ void update(framing::FrameHandler::Chains& chains);
+
+ /** As SessionFrameHandler handle frames received from the cluster */
+ void handle(SessionFrame&);
+
+ /** Get ChannelID for UUID. Return 0 if no mapping */
+ framing::ChannelId getChannelId(const framing::Uuid&) const;
+
+ private:
+ typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+
+ sys::Mutex lock;
+ SessionFrameHandler::Chain clusterSend;
+ SessionMap sessions;
+ framing::FrameHandler::Chains nonLocal;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h
index 5cb1e87d6e..b9497e4f12 100644
--- a/cpp/src/qpid/framing/HandlerUpdater.h
+++ b/cpp/src/qpid/framing/HandlerUpdater.h
@@ -26,13 +26,15 @@
namespace qpid {
namespace framing {
-/** Plugin object that can update handler chains. */
-struct HandlerUpdater : public Plugin {
+/** Interface for objects that can update handler chains. */
+struct HandlerUpdater {
+ virtual ~HandlerUpdater() {}
+
/** Update the handler chains.
*@param id Unique identifier for channel or session.
*@param chains Handler chains to be updated.
*/
- virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0;
+ virtual void update(FrameHandler::Chains& chains) = 0;
};
}} // namespace qpid::framing
diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp
index 7ab730bf52..53a68fc815 100644
--- a/cpp/src/qpidd.cpp
+++ b/cpp/src/qpidd.cpp
@@ -24,6 +24,8 @@
#include "qpid/log/Statement.h"
#include "qpid/log/Options.h"
#include "qpid/log/Logger.h"
+#include "qpid/Plugin.h"
+#include "qpid/sys/Shlib.h"
#include "config.h"
#include <boost/filesystem/path.hpp>
#include <iostream>
@@ -54,10 +56,10 @@ struct DaemonOptions : public qpid::Options {
struct QpiddOptions : public qpid::Options {
- DaemonOptions daemon;
+ CommonOptions common;
Broker::Options broker;
+ DaemonOptions daemon;
qpid::log::Options log;
- CommonOptions common;
QpiddOptions() : qpid::Options("Options") {
common.config = "/etc/qpidd.conf";
@@ -65,6 +67,12 @@ struct QpiddOptions : public qpid::Options {
add(broker);
add(daemon);
add(log);
+ const Plugin::Plugins& plugins=
+ Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ ++i)
+ add(*(*i)->getOptions());
}
void usage() const {
@@ -74,7 +82,7 @@ struct QpiddOptions : public qpid::Options {
// Globals
shared_ptr<Broker> brokerPtr;
-QpiddOptions options;
+auto_ptr<QpiddOptions> options;
void shutdownHandler(int signal){
QPID_LOG(notice, "Shutting down on signal " << signal);
@@ -84,45 +92,60 @@ void shutdownHandler(int signal){
struct QpiddDaemon : public Daemon {
/** Code for parent process */
void parent() {
- uint16_t port = wait(options.daemon.wait);
- if (options.broker.port == 0)
+ uint16_t port = wait(options->daemon.wait);
+ if (options->broker.port == 0)
cout << port << endl;
}
/** Code for forked child process */
void child() {
- brokerPtr.reset(new Broker(options.broker));
+ brokerPtr.reset(new Broker(options->broker));
uint16_t port=brokerPtr->getPort();
ready(port); // Notify parent.
brokerPtr->run();
}
};
+void tryShlib(const char* libname) {
+ try {
+ Shlib shlib(libname);
+ }
+ catch (const exception& e) {
+ // TODO aconway 2007-07-09: Should log failures as INFO
+ // at least, but we try shlibs before logging is configured.
+ }
+}
+
int main(int argc, char* argv[])
{
try {
- options.parse(argc, argv, options.common.config);
- qpid::log::Logger::instance().configure(options.log, argv[0]);
+ // Load optional modules
+ tryShlib("libqpidcluster.so.0");
+
+ // Parse options
+ options.reset(new QpiddOptions());
+ options->parse(argc, argv, options->common.config);
+ qpid::log::Logger::instance().configure(options->log, argv[0]);
// Options that just print information.
- if(options.common.help || options.common.version) {
- if (options.common.version)
+ if(options->common.help || options->common.version) {
+ if (options->common.version)
cout << "qpidd (" << PACKAGE_NAME << ") version "
<< PACKAGE_VERSION << endl;
- else if (options.common.help)
- options.usage();
+ else if (options->common.help)
+ options->usage();
return 0;
}
// Options that affect a running daemon.
- if (options.daemon.check || options.daemon.quit) {
- pid_t pid = Daemon::getPid(options.broker.port);
+ if (options->daemon.check || options->daemon.quit) {
+ pid_t pid = Daemon::getPid(options->broker.port);
if (pid < 0)
return 1;
- if (options.daemon.check)
+ if (options->daemon.check)
cout << pid << endl;
- if (options.daemon.quit && kill(pid, SIGINT) < 0)
+ if (options->daemon.quit && kill(pid, SIGINT) < 0)
throw Exception("Failed to stop daemon: " + strError(errno));
return 0;
}
@@ -139,14 +162,14 @@ int main(int argc, char* argv[])
signal(SIGTTOU,SIG_IGN);
signal(SIGTTIN,SIG_IGN);
- if (options.daemon.daemon) {
+ if (options->daemon.daemon) {
// Fork the daemon
QpiddDaemon d;
d.fork();
}
else { // Non-daemon broker.
- brokerPtr.reset(new Broker(options.broker));
- if (options.broker.port == 0)
+ brokerPtr.reset(new Broker(options->broker));
+ if (options->broker.port == 0)
cout << uint16_t(brokerPtr->getPort()) << endl;
brokerPtr->run();
}
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index 2e6d9ecfff..56e17e06db 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -36,11 +36,14 @@ using namespace qpid::log;
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
AMQFrame frame(VER, 1, new ChannelPingBody(VER));
- cluster.getSendChains().in->handle(frame);
+ Uuid id(true);
+ SessionFrame send(id, frame, true);
+ cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitFor(1));
SessionFrame& sf=cluster.received[0];
BOOST_CHECK(sf.isIncoming);
+ BOOST_CHECK_EQUAL(id, sf.uuid);
BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(1u, cluster.size());
@@ -60,9 +63,12 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
// Exchange frames with child.
AMQFrame frame(VER, 1, new ChannelPingBody(VER));
- cluster.getSendChains().in->handle(frame);
+ Uuid id(true);
+ SessionFrame send(id, frame, true);
+ cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitFor(1));
SessionFrame& sf=cluster.received[0];
+ BOOST_CHECK_EQUAL(id, sf.uuid);
BOOST_CHECK(sf.isIncoming);
BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
BOOST_REQUIRE(cluster.received.waitFor(2));
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index 8fddd1d1f7..bf6d1c2a64 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -24,7 +24,10 @@
#include "qpid/framing/ChannelOkBody.h"
#include "qpid/framing/BasicGetOkBody.h"
#include "qpid/log/Logger.h"
+
#include <boost/bind.hpp>
+#include <boost/test/test_tools.hpp>
+
#include <iostream>
#include <vector>
#include <functional>
@@ -69,13 +72,12 @@ void nullDeleter(void*) {}
struct TestCluster : public Cluster
{
- TestCluster(string name, string url) : Cluster(name, url)
- {
- setReceivedChain(make_shared_ptr(&received, nullDeleter));
- }
+ TestCluster(string name, string url)
+ : Cluster(name, url, make_shared_ptr(&received, nullDeleter)) {}
/** Wait for cluster to be of size n. */
bool waitFor(size_t n) {
+ BOOST_CHECKPOINT("About to call Cluster::wait");
return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n));
}
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index 216afc7bca..9c119e5238 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -39,7 +39,9 @@ void clusterTwo() {
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- cluster.getSendChains().out->handle(frame);
+ Uuid id(true);
+ SessionFrame sf(id, frame, false);
+ cluster.handle(sf);
BOOST_REQUIRE(cluster.received.waitFor(2));
BOOST_CHECK(!cluster.received[1].isIncoming);
BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 93c3e80673..4f1e7e1ec3 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -170,7 +170,7 @@ all-am: .valgrind.supp .valgrindrc
# ltmain invocations, one may corrupt the temporaries of the other.
.NOTPARALLEL:
-CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp dummy_test $(unit_wrappers)
+CLEANFILES=valgrind.out *.log *.vglog .valgrindrc .valgrind.supp dummy_test $(unit_wrappers)
MAINTAINERCLEANFILES=gen.mk
interop_runner_SOURCES = \
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 7407565f62..506624569f 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -20,10 +20,11 @@ check_PROGRAMS+=Cpg
Cpg_SOURCES=Cpg.cpp
Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-TESTS+=Cluster
-check_PROGRAMS+=Cluster
-Cluster_SOURCES=Cluster.cpp Cluster.h
-Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# FIXME aconway 2007-07-19:
+# TESTS+=Cluster
+# check_PROGRAMS+=Cluster
+# Cluster_SOURCES=Cluster.cpp Cluster.h
+# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
check_PROGRAMS+=Cluster_child
Cluster_child_SOURCES=Cluster_child.cpp Cluster.h