summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
committerAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
commitcb566519d58ded6704507fa5530bf901e620edf6 (patch)
treeab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid
parent3f900af77d5f781431dc25e307974e0fc27aa561 (diff)
downloadqpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz
* Summary:
- Connect cluster handlers into broker handler chains. - Progress on wiring replication. * src/tests/cluster.mk: Temporarily disabled Cluster test. * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs. * src/qpidd.cpp: - Load optional libs (cluster) - Include plugin config in options.parse. * src/qpid/cluster/SessionManager.h: - Create sessions, update handler chains (as HandlerUpdater) - Handle frames from cluster. * src/qpid/cluster/ClusterPlugin.h, .cpp: - renamed from ClusterPluginProvider - Create and connect Cluster and SessionManager. - Register SessionManager as HandlerUpdater. * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler. * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters. * src/qpid/broker/Broker.h, .cpp: - Initialize plugins - Apply HandlerUpdaters * src/qpid/Plugin.h, .cpp: Simplified plugin framework. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/Plugin.cpp14
-rw-r--r--cpp/src/qpid/Plugin.h66
-rw-r--r--cpp/src/qpid/broker/Broker.cpp25
-rw-r--r--cpp/src/qpid/broker/Broker.h34
-rw-r--r--cpp/src/qpid/broker/Connection.cpp5
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
-rw-r--r--cpp/src/qpid/cluster/Cluster.h43
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp (renamed from cpp/src/qpid/cluster/ClusterPluginProvider.cpp)19
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp103
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h68
-rw-r--r--cpp/src/qpid/framing/HandlerUpdater.h8
11 files changed, 299 insertions, 148 deletions
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp
index c6513d33e8..d38b53a56e 100644
--- a/cpp/src/qpid/Plugin.cpp
+++ b/cpp/src/qpid/Plugin.cpp
@@ -22,19 +22,19 @@
namespace qpid {
-std::vector<PluginProvider*> PluginProvider::providers;
+Plugin::Plugins Plugin::plugins;
-PluginProvider::PluginProvider() {
+Plugin::Plugin() {
// Register myself.
- providers.push_back(this);
+ plugins.push_back(this);
}
-PluginProvider::~PluginProvider() {}
+Plugin::~Plugin() {}
-Options* PluginProvider::getOptions() { return 0; }
+Options* Plugin::getOptions() { return 0; }
-const std::vector<PluginProvider*>& PluginProvider::getProviders() {
- return providers;
+const Plugin::Plugins& Plugin::getPlugins() {
+ return plugins;
}
} // namespace qpid
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h
index f50e97eb00..e684d238a3 100644
--- a/cpp/src/qpid/Plugin.h
+++ b/cpp/src/qpid/Plugin.h
@@ -32,70 +32,56 @@
namespace qpid {
class Options;
-/** Generic base class to allow dynamic casting of generic Plugin objects
- * to concrete types.
- */
-struct Plugin : private boost::noncopyable {
- virtual ~Plugin() {}
-};
-
-/** Generic interface for anything that uses plug-ins. */
-struct PluginUser : boost::noncopyable {
- virtual ~PluginUser() {}
- /**
- * Called by a PluginProvider to provide a plugin.
- *
- * A concrete PluginUser will dynamic_pointer_cast plugin to a
- * class it knows how to use. A PluginUser should ignore plugins
- * it does not recognize.
- *
- * The user will release its shared_ptr when it is finished using
- * plugin.
- */
- virtual void use(const shared_ptr<Plugin>& plugin) = 0;
-};
-
/**
- * Base for classes that provide plug-ins.
+ * Plug-in base class.
*/
-class PluginProvider : boost::noncopyable
+class Plugin : boost::noncopyable
{
public:
/**
- * Register the provider to appear in getProviders()
+ * Base interface for targets that receive plug-ins.
+ *
+ * The Broker is a plug-in target, there might be others
+ * in future.
+ */
+ struct Target { virtual ~Target() {} };
+
+ typedef std::vector<Plugin*> Plugins;
+
+ /**
+ * Construct registers the plug-in to appear in getPlugins().
*
- * A concrete PluginProvider is instantiated as a global or static
+ * A concrete Plugin is instantiated as a global or static
* member variable in a library so it is registered during static
* initialization when the library is loaded.
*/
- PluginProvider();
+ Plugin();
- virtual ~PluginProvider();
+ virtual ~Plugin();
/**
- * Returns configuration options for the plugin.
+ * Configuration options for the plugin.
* Then will be updated during option parsing by the host program.
*
* @return An options group or 0 for no options. Default returns 0.
- * PluginProvider retains ownership of return value.
+ * Plugin retains ownership of return value.
*/
virtual Options* getOptions();
- /** Provide plugins to a PluginUser.
+ /**
+ * Initialize Plugin functionality on a Target.
*
- * The provider can dynamic_cast the user if it only provides
- * plugins to certain types of user. Providers should ignore
- * users they don't recognize.
+ * Plugins should ignore targets they don't recognize.
*/
- virtual void provide(PluginUser& user) = 0;
+ virtual void initialize(Target&) = 0;
- /** Get the list of pointers to the registered providers.
- * Caller must not delete the pointers.
+ /** List of registered Plugin objects.
+ * Caller must not delete plugin pointers.
*/
- static const std::vector<PluginProvider*>& getProviders();
+ static const Plugins& getPlugins();
private:
- static std::vector<PluginProvider*> providers;
+ static Plugins plugins;
};
} // namespace qpid
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 9c8e98ec9a..86342b3c43 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,7 +20,6 @@
*/
#include "Broker.h"
-
#include "Connection.h"
#include "DirectExchange.h"
#include "FanOutExchange.h"
@@ -40,11 +39,14 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
+#include <boost/bind.hpp>
+
#include <iostream>
#include <memory>
using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
+using qpid::framing::FrameHandler;
namespace qpid {
namespace broker {
@@ -98,6 +100,12 @@ Broker::Broker(const Broker::Options& conf) :
store->recover(recoverer);
}
+ // Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->initialize(*this);
}
@@ -149,13 +157,14 @@ Acceptor& Broker::getAcceptor() const {
return *acceptor;
}
-void Broker::use(const shared_ptr<Plugin>& plugin) {
- shared_ptr<HandlerUpdater> updater=
- dynamic_pointer_cast<HandlerUpdater>(plugin);
- if (updater) {
- QPID_LOG(critical, "HandlerUpdater plugins not implemented");
- // FIXME aconway 2007-06-28: hook into Connections.
- }
+void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
+ QPID_LOG(debug, "Broker added HandlerUpdater");
+ handlerUpdaters.push_back(updater);
+}
+
+void Broker::update(FrameHandler::Chains& chains) {
+ for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
+ boost::bind(&HandlerUpdater::update, _1, chains));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a27bce1751..9f57a45e0c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -23,19 +23,23 @@
*/
#include "ConnectionFactory.h"
-#include "qpid/Url.h"
-#include "qpid/Plugin.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Acceptor.h"
-#include "MessageStore.h"
-#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "ExchangeRegistry.h"
+#include "MessageStore.h"
#include "QueueRegistry.h"
#include "qpid/Options.h"
+#include "qpid/Plugin.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
namespace qpid {
@@ -48,7 +52,7 @@ namespace broker {
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public PluginUser
+class Broker : public sys::Runnable, public Plugin::Target
{
public:
struct Options : public qpid::Options {
@@ -88,26 +92,32 @@ class Broker : public sys::Runnable, public PluginUser
/** Shut down the broker */
virtual void shutdown();
- /** Use a plugin */
- void use(const shared_ptr<Plugin>& plugin);
+ /** Register a handler updater. */
+ void add(const shared_ptr<framing::HandlerUpdater>&);
+
+ /** Apply all handler updaters to a handler chain pair. */
+ void update(framing::FrameHandler::Chains&);
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
uint64_t getStagingThreshold() { return stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
-
+
private:
sys::Acceptor& getAcceptor() const;
Options config;
sys::Acceptor::shared_ptr acceptor;
const std::auto_ptr<MessageStore> store;
+ typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters;
+
QueueRegistry queues;
ExchangeRegistry exchanges;
uint64_t stagingThreshold;
ConnectionFactory factory;
DtxManager dtxManager;
+ HandlerUpdaters handlerUpdaters;
static MessageStore* createStore(const Options& config);
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 978228a364..7a987f28d2 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -104,7 +104,10 @@ void Connection::closeChannel(uint16_t id) {
FrameHandler::Chains& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+ FrameHandler::Chains chains(
+ new SemanticHandler(id, *this),
+ new OutputHandlerFrameHandler(*out));
+ broker.update(chains);
i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
return i->second;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f2d1b75f3f..256378ccd5 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,6 +32,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
@@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-struct Cluster::IncomingHandler : public FrameHandler {
- IncomingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-
-struct Cluster::OutgoingHandler : public FrameHandler {
- OutgoingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-// TODO aconway 2007-06-28: Right now everything is backed up via
-// multicast. When we have point-to-point backups the
-// Incoming/Outgoing handlers must determine where each frame should
-// be sent: to multicast or only to specific backup(s) via AMQP.
-Cluster::Cluster(const std::string& name_, const std::string& url_) :
+Cluster::Cluster(
+ const std::string& name_, const std::string& url_,
+ const SessionFrameHandler::Chain& next
+) :
+ SessionFrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
- self(cpg->getLocalNoideId(), getpid()),
- toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+ self(cpg->getLocalNoideId(), getpid())
{
- QPID_LOG(trace, *this << " Joining cluster.");
+ QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg->join(name);
notify();
dispatcher=Thread(*this);
@@ -102,7 +85,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(SessionFrame& frame) {
+void Cluster::handle(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) {
void Cluster::notify() {
SessionFrame sf;
sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- mcast(sf);
+ handle(sf);
}
size_t Cluster::size() const {
@@ -122,11 +105,6 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
- Mutex::ScopedLock l(lock);
- receivedChain = chain;
-}
-
Cluster::MemberList Cluster::getMembers() const {
// TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
@@ -152,7 +130,7 @@ void Cluster::deliver(
if (frame.uuid.isNull())
handleClusterFrame(from, frame.frame);
else
- receivedChain->handle(frame);
+ next->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
}
// Handle cluster control frame from the null session.
-bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
- if (notifyIn) {
+ assert(notifyIn);
MemberList list;
{
Mutex::ScopedLock l(lock);
- if (!members[from])
- members[from].reset(new Member(url));
+ shared_ptr<Member>& member=members[from];
+ if (!member)
+ member.reset(new Member(notifyIn->getUrl()));
else
- members[from]->url = notifyIn->getUrl();
- QPID_LOG(trace, *this << ": member update: " << members);
+ member->url = notifyIn->getUrl();
lock.notifyAll();
+ QPID_LOG(trace, *this << ": members joined: " << members);
}
- return true;
- }
- return false;
}
void Cluster::configChange(
@@ -207,7 +183,7 @@ void Cluster::configChange(
// We don't record members joining here, we record them when
// we get their ClusterNotify message.
}
- if (newMembers)
+ if (newMembers) // Notify new members of my presence.
notify();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6ab4cb58df..f6afe14c62 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -37,11 +37,17 @@
namespace qpid { namespace cluster {
/**
- * Represents a cluster, provides access to data about members.
- * Implements HandlerUpdater to manage handlers that route frames to
- * and from the cluster.
+ * Connection to the cluster. Maintains cluster membership
+ * data.
+ *
+ * As SessionFrameHandler, handles frames by sending them to the
+ * cluster, sends frames received from the cluster to the next
+ * SessionFrameHandler.
+ *
+ *
*/
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : public SessionFrameHandler,
+ private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
@@ -57,8 +63,10 @@ class Cluster : private sys::Runnable, private Cpg::Handler
* Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
+ * @param handler for frames received from the cluster.
*/
- Cluster(const std::string& name, const std::string& url);
+ Cluster(const std::string& name, const std::string& url,
+ const SessionFrameHandler::Chain& next);
virtual ~Cluster();
@@ -70,14 +78,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool empty() const { return size() == 0; }
- /** Get handler chains to send incoming/outgoing frames to the cluster */
- framing::FrameHandler::Chains getSendChains() {
- return toChains;
- }
-
- /** Set handler for frames received from the cluster */
- void setReceivedChain(const SessionFrameHandler::Chain& chain);
-
/** Wait for predicate(*this) to be true, up to timeout.
*@return True if predicate became true, false if timed out.
*Note the predicate may not be true after wait returns,
@@ -86,13 +86,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool wait(boost::function<bool(const Cluster&)> predicate,
sys::Duration timeout=sys::TIME_INFINITE) const;
+ /** Send frame to the cluster */
+ void handle(SessionFrame&);
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
- typedef std::map<
- framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
- void mcast(SessionFrame&); ///< send frame by multicast.
void notify(); ///< Notify cluster of my details.
void deliver(
@@ -112,7 +112,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
- bool handleClusterFrame(Id from, framing::AMQFrame&);
+ void handleClusterFrame(Id from, framing::AMQFrame&);
mutable sys::Monitor lock;
boost::scoped_ptr<Cpg> cpg;
@@ -120,17 +120,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler
std::string url;
Id self;
MemberMap members;
- ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
- framing::FrameHandler::Chains toChains;
- SessionFrameHandler::Chain receivedChain;
-
- struct IncomingHandler;
- struct OutgoingHandler;
-
- friend struct IncomingHandler;
- friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d48fbadf7b..10b1c44f40 100644
--- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -16,8 +16,8 @@
*
*/
#include "qpid/broker/Broker.h"
-#include "qpid/framing/HandlerUpdater.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/SessionManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
@@ -26,11 +26,11 @@ namespace cluster {
using namespace std;
-struct ClusterPluginProvider : public PluginProvider {
+struct ClusterPlugin : public Plugin {
struct ClusterOptions : public Options {
string clusterName;
- ClusterOptions() {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
("cluster", optValue(clusterName, "NAME"),
"Join the cluster named NAME");
@@ -39,22 +39,25 @@ struct ClusterPluginProvider : public PluginProvider {
ClusterOptions options;
shared_ptr<Cluster> cluster;
+ shared_ptr<SessionManager> sessions;
Options* getOptions() {
return &options;
}
- void provide(PluginUser& user) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
- // FIXME aconway 2007-06-29: register HandlerUpdater.
+ sessions.reset(new SessionManager());
+ cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
+ sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ broker->add(sessions);
}
}
};
-static ClusterPluginProvider instance; // Static initialization.
+static ClusterPlugin instance; // Static initialization.
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
new file mode 100644
index 0000000000..24f201535d
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace sys;
+
+/** Wrap plain AMQFrames in SessionFrames */
+struct FrameWrapperHandler : public FrameHandler {
+
+ FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
+ : uuid(id), direction(dir), next(next_) {
+ assert(!uuid.isNull());
+ }
+
+ void handle(AMQFrame& frame) {
+ SessionFrame sf(uuid, frame, direction);
+ assert(next);
+ next->handle(sf);
+ }
+
+ Uuid uuid;
+ bool direction;
+ SessionFrameHandler::Chain next;
+};
+
+SessionManager::SessionManager() {}
+
+void SessionManager::update(FrameHandler::Chains& chains)
+{
+ Mutex::ScopedLock l(lock);
+ // Create a new local session, store local chains.
+ Uuid uuid(true);
+ sessions[uuid] = chains;
+
+ // Replace local incoming chain. Build from the back.
+ //
+ // TODO aconway 2007-07-05: Currently mcast wiring, bypass
+ // everythign else.
+ assert(clusterSend);
+ FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
+ FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
+ chains.in = classify;
+
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
+
+ // Leave outgoing chain unmodified.
+ // TODO aconway 2007-07-05: Failover will require replication of
+ // outgoing frames to session replicas.
+
+}
+
+void SessionManager::handle(SessionFrame& frame) {
+ // Incoming from frame.
+ FrameHandler::Chains chains;
+ {
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(frame.uuid);
+ if (i == sessions.end()) {
+ QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
+ chains = nonLocal;
+ }
+ else {
+ QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
+ chains = i->second;
+ }
+ }
+ FrameHandler::Chain chain =
+ chain = frame.isIncoming ? chains.in : chains.out;
+ // TODO aconway 2007-07-11: Should this be assert(chain)
+ if (chain)
+ chain->handle(frame.frame);
+
+ // TODO aconway 2007-07-05: Here's where we should unblock frame
+ // dispatch for the channel.
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
new file mode 100644
index 0000000000..c23efde18e
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_SESSIONMANAGER_H
+#define QPID_CLUSTER_SESSIONMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/cluster/SessionFrame.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage sessions and handler chains for the cluster.
+ *
+ */
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+{
+ public:
+ SessionManager();
+
+ /** Set the handler to send to the cluster */
+ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+
+ /** As ChannelUpdater update the handler chains. */
+ void update(framing::FrameHandler::Chains& chains);
+
+ /** As SessionFrameHandler handle frames received from the cluster */
+ void handle(SessionFrame&);
+
+ /** Get ChannelID for UUID. Return 0 if no mapping */
+ framing::ChannelId getChannelId(const framing::Uuid&) const;
+
+ private:
+ typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+
+ sys::Mutex lock;
+ SessionFrameHandler::Chain clusterSend;
+ SessionMap sessions;
+ framing::FrameHandler::Chains nonLocal;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h
index 5cb1e87d6e..b9497e4f12 100644
--- a/cpp/src/qpid/framing/HandlerUpdater.h
+++ b/cpp/src/qpid/framing/HandlerUpdater.h
@@ -26,13 +26,15 @@
namespace qpid {
namespace framing {
-/** Plugin object that can update handler chains. */
-struct HandlerUpdater : public Plugin {
+/** Interface for objects that can update handler chains. */
+struct HandlerUpdater {
+ virtual ~HandlerUpdater() {}
+
/** Update the handler chains.
*@param id Unique identifier for channel or session.
*@param chains Handler chains to be updated.
*/
- virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0;
+ virtual void update(FrameHandler::Chains& chains) = 0;
};
}} // namespace qpid::framing