summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/HandlerChain.h97
-rw-r--r--cpp/src/qpid/Plugin.cpp21
-rw-r--r--cpp/src/qpid/Plugin.h28
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp8
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h3
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--cpp/src/qpid/broker/Connection.h6
-rw-r--r--cpp/src/qpid/broker/ConnectionManager.cpp41
-rw-r--r--cpp/src/qpid/broker/ConnectionManager.h70
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp9
-rw-r--r--cpp/src/qpid/broker/SessionManager.h13
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp8
-rw-r--r--cpp/src/qpid/broker/SessionState.h14
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp33
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp13
-rw-r--r--cpp/src/qpid/framing/Handler.h24
-rw-r--r--cpp/src/tests/cluster_test.cpp26
20 files changed, 208 insertions, 224 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 5b4a16429a..bfebd4ae88 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -248,8 +248,6 @@ libqpidbroker_la_SOURCES = \
qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerSingleton.cpp \
- qpid/broker/ConnectionManager.h \
- qpid/broker/ConnectionManager.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
@@ -354,6 +352,7 @@ nobase_include_HEADERS = \
qpid/amqp_0_10/Exception.h \
qpid/Msg.h \
qpid/Options.h \
+ qpid/HandlerChain.h \
qpid/Plugin.h \
qpid/ptr_map.h \
qpid/RangeSet.h \
diff --git a/cpp/src/qpid/HandlerChain.h b/cpp/src/qpid/HandlerChain.h
new file mode 100644
index 0000000000..adeaa96536
--- /dev/null
+++ b/cpp/src/qpid/HandlerChain.h
@@ -0,0 +1,97 @@
+#ifndef QPID_HANDLERCHAIN_H
+#define QPID_HANDLERCHAIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Plugin.h>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
+
+namespace qpid {
+
+/**
+ * Chain-of-responsibility design pattern.
+ *
+ * Construct a chain of objects deriving from Base. Each implements
+ * Base::f by doing its own logic and then calling Base::f on the next
+ * handler (or not if it chooses not to.)
+ *
+ * HandlerChain acts as a smart pointer to the first object in the chain.
+ */
+template <class Base>
+class HandlerChain {
+ public:
+ /** Base class for chainable handlers */
+ class Handler : public Base {
+ public:
+ Handler() : next() {}
+ virtual ~Handler() {}
+ virtual void setNext(Base* next_) { next = next_; }
+
+ protected:
+ Base* next;
+ };
+
+ typedef std::auto_ptr<Handler> HandlerAutoPtr;
+
+ /**@param target is the object at the end of the chain. */
+ HandlerChain(Base& target) : first(&target) {}
+
+ /** HandlerChain owns the ChainableHandler. */
+ void push(HandlerAutoPtr h) {
+ handlers.push_back(h);
+ h->setNext(first);
+ first = h.get();
+ }
+
+ // Smart pointer functions
+ Base* operator*() { return first; }
+ const Base* operator*() const { return first; }
+ Base* operator->() { return first; }
+ const Base* operator->() const { return first; }
+ operator bool() const { return first; }
+
+ private:
+ boost::ptr_vector<Base> handlers;
+ Base* first;
+};
+
+/**
+ * A PluginHandlerChain calls Plugin::initAll(*this) on construction,
+ * allowing plugins to add handlers.
+ *
+ * @param Tag can be any class, use to distinguish different plugin
+ * chains with the same Base type.
+ */
+template <class Base, class Tag=void>
+struct PluginHandlerChain : public HandlerChain<Base>,
+ public Plugin::Target
+{
+ PluginHandlerChain(Base& target) : HandlerChain<Base>(target) {
+ Plugin::initAll(*this);
+ }
+};
+
+
+} // namespace qpid
+
+#endif /*!QPID_HANDLERCHAIN_H*/
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp
index 733d134334..b8206499ae 100644
--- a/cpp/src/qpid/Plugin.cpp
+++ b/cpp/src/qpid/Plugin.cpp
@@ -20,10 +20,13 @@
#include "Plugin.h"
#include "qpid/Options.h"
+#include <boost/bind.hpp>
+#include <algorithm>
namespace qpid {
namespace {
+
Plugin::Plugins& thePlugins() {
// This is a single threaded singleton implementation so
// it is important to be sure that the first use of this
@@ -31,8 +34,17 @@ Plugin::Plugins& thePlugins() {
static Plugin::Plugins plugins;
return plugins;
}
+
+void call(boost::function<void()> f) { f(); }
+
+} // namespace
+
+Plugin::Target::~Target() {
+ std::for_each(cleanup.begin(), cleanup.end(), &call);
}
+void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); }
+
Plugin::Plugin() {
// Register myself.
thePlugins().push_back(this);
@@ -44,6 +56,12 @@ Options* Plugin::getOptions() { return 0; }
const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); }
+namespace {
+template <class F> void each_plugin(const F& f) {
+ std::for_each(Plugin::getPlugins().begin(), Plugin::getPlugins().end(), f);
+}
+}
+
void Plugin::addOptions(Options& opts) {
for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) {
if ((*i)->getOptions())
@@ -51,4 +69,7 @@ void Plugin::addOptions(Options& opts) {
}
}
+void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); }
+void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); }
+
} // namespace qpid
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h
index 3ead770129..a53d4e5d18 100644
--- a/cpp/src/qpid/Plugin.h
+++ b/cpp/src/qpid/Plugin.h
@@ -40,11 +40,17 @@ class Plugin : boost::noncopyable
public:
/**
* Base interface for targets that receive plug-ins.
- *
- * The Broker is a plug-in target, there might be others
- * in future.
+ * Plug-ins can register clean-up functions to execute when
+ * the target is destroyed.
*/
- struct Target { virtual ~Target() {} };
+ struct Target {
+ public:
+ virtual ~Target();
+ void addCleanup(const boost::function<void()>& cleanupFunction);
+
+ private:
+ std::vector<boost::function<void()> > cleanup;
+ };
typedef std::vector<Plugin*> Plugins;
@@ -69,7 +75,9 @@ class Plugin : boost::noncopyable
virtual Options* getOptions();
/**
- * Initialize Plugin functionality on a Target.
+ * Initialize Plugin functionality on a Target, called before
+ * initializing the target.
+ *
* Plugins should ignore targets they don't recognize.
*
* Called before the target itself is initialized.
@@ -77,7 +85,9 @@ class Plugin : boost::noncopyable
virtual void earlyInitialize(Target&) = 0;
/**
- * Initialize Plugin functionality on a Target.
+ * Initialize Plugin functionality on a Target. Called after
+ * initializing the target.
+ *
* Plugins should ignore targets they don't recognize.
*
* Called after the target is fully initialized.
@@ -89,6 +99,12 @@ class Plugin : boost::noncopyable
*/
static const Plugins& getPlugins();
+ /** Call earlyInitialize() on all registered plugins */
+ static void earlyInitAll(Target&);
+
+ /** Call initialize() on all registered plugins */
+ static void initAll(Target&);
+
/** For each registered plugin, add plugin.getOptions() to opts. */
static void addOptions(Options& opts);
};
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index 407fe5ebd8..ccd31c78a7 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -29,7 +29,7 @@ using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
: frameQueueClosed(false), output(o),
- connection(broker.getConnectionManager().create(this, broker, id, _isClient)),
+ connection(this, broker, id, _isClient),
identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
@@ -46,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection->received(frame);
+ connection.received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection->doOutput();
+ if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -91,7 +91,7 @@ void Connection::close() {
}
void Connection::closed() {
- connection->closed();
+ connection.closed();
}
void Connection::send(framing::AMQFrame& f) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index c08545df0f..a3a756cefb 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -33,7 +33,6 @@ namespace qpid {
namespace broker { class Broker; }
namespace amqp_0_10 {
-// FIXME aconway 2008-03-18: Update to 0-10.
class Connection : public sys::ConnectionCodec,
public sys::ConnectionOutputHandler
{
@@ -41,7 +40,7 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18:
+ broker::Connection connection;
std::string identifier;
bool initialized;
bool isClient;
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9a9f502bf0..be59cef24c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -23,7 +23,6 @@
*/
#include "ConnectionFactory.h"
-#include "ConnectionManager.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
@@ -121,7 +120,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
- ConnectionManager& getConnectionManager() { return connectionManager; }
management::ManagementObject* GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -159,7 +157,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
- ConnectionManager connectionManager;
management::ManagementAgent* managementAgent;
management::Broker* mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index bb99c61cdd..61384638b3 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -88,7 +88,7 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){ inChain(frame); }
+void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
void Connection::receivedLast(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 717e1a6270..c911e88200 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -43,6 +43,7 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Connection.h"
+#include "qpid/HandlerChain.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -91,8 +92,6 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
- framing::FrameHandler::Chain& getInChain() { return inChain; }
-
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -110,8 +109,7 @@ class Connection : public sys::ConnectionInputHandler,
management::Connection* mgmtObject;
LinkRegistry& links;
framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
- framing::FrameHandler::Chain inChain;
-
+ PluginHandlerChain<framing::FrameHandler, Connection> inChain;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionManager.cpp b/cpp/src/qpid/broker/ConnectionManager.cpp
deleted file mode 100644
index 165de5220e..0000000000
--- a/cpp/src/qpid/broker/ConnectionManager.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ConnectionManager.h"
-#include "Connection.h"
-
-namespace qpid {
-namespace broker {
-
-std::auto_ptr<Connection>
-ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) {
- std::auto_ptr<Connection> c(new Connection(out, broker, mgmtId, isClient));
- sys::Mutex::ScopedLock l(lock);
- std::for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::created, _1, boost::ref(*c)));
- return c;
-}
-
-void ConnectionManager::add(const boost::intrusive_ptr<Observer>& observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.push_back(observer);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionManager.h b/cpp/src/qpid/broker/ConnectionManager.h
deleted file mode 100644
index a999523d0d..0000000000
--- a/cpp/src/qpid/broker/ConnectionManager.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_BROKER_CONNECTIONMANAGER_H
-#define QPID_BROKER_CONNECTIONMANAGER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/RefCounted.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/intrusive_ptr.hpp>
-#include <vector>
-#include <memory>
-
-namespace qpid {
-
-namespace sys {
-class ConnectionOutputHandler;
-}
-
-namespace broker {
-
-class Broker;
-class Connection;
-
-/**
- * Manages connections and observers.
- */
-class ConnectionManager {
- public:
-
- /**
- * Observer notified of ConnectionManager events.
- */
- struct Observer : public RefCounted {
- /** Called when a connection is attached. */
- virtual void created(Connection&) {}
- };
-
- /** Called to create a new Connection, applies observers. */
- std::auto_ptr<Connection> create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false);
-
- /** Add an observer */
- void add(const boost::intrusive_ptr<Observer>&);
-
- private:
- typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
-
- sys::Mutex lock;
- Observers observers;
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 69ef29c3eb..e7190fdae6 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -55,11 +55,8 @@ std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const Ses
throw SessionBusyException(QPID_MSG("Session already attached: " << id));
Detached::iterator i = std::find(detached.begin(), detached.end(), id);
std::auto_ptr<SessionState> state;
- if (i == detached.end()) {
+ if (i == detached.end())
state.reset(new SessionState(broker, h, id, config));
- for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::opened, _1,boost::ref(*state)));
- }
else {
state.reset(detached.release(i).release());
state->attach(h);
@@ -99,8 +96,4 @@ void SessionManager::eraseExpired() {
}
}
-void SessionManager::add(const intrusive_ptr<Observer>& o) {
- observers.push_back(o);
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index 9a4142f613..db88e7ec10 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -46,14 +46,6 @@ class SessionHandler;
*/
class SessionManager : private boost::noncopyable {
public:
- /**
- * Observer notified of SessionManager events.
- */
- struct Observer : public RefCounted {
- /** Called when a stateless session is attached. */
- virtual void opened(SessionState&) {}
- };
-
SessionManager(const qpid::SessionState::Configuration&, Broker&);
~SessionManager();
@@ -67,9 +59,6 @@ class SessionManager : private boost::noncopyable {
/** Forget about an attached session. Called by SessionState destructor. */
void forget(const SessionId&);
- /** Add an Observer. */
- void add(const boost::intrusive_ptr<Observer>&);
-
Broker& getBroker() const { return broker; }
const qpid::SessionState::Configuration& getSessionConfig() const { return config; }
@@ -77,7 +66,6 @@ class SessionManager : private boost::noncopyable {
private:
typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order.
typedef std::set<SessionId> Attached;
- typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
void eraseExpired();
@@ -85,7 +73,6 @@ class SessionManager : private boost::noncopyable {
Detached detached;
Attached attached;
qpid::SessionState::Configuration config;
- Observers observers;
Broker& broker;
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 8a17a787a2..0a122fcae8 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -224,8 +224,8 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); }
-void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); }
+void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); }
+void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); }
void SessionState::handleInLast(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
@@ -291,8 +291,4 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
-framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; }
-
-framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; }
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 5d18ed161e..f6bf98d431 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -23,6 +23,7 @@
*/
#include "qpid/SessionState.h"
+#include "qpid/HandlerChain.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Mutex.h"
@@ -58,8 +59,8 @@ class SessionHandler;
class SessionManager;
/**
- * Broker-side session state includes sessions handler chains, which may
- * themselves have state.
+ * Broker-side session state includes session's handler chains, which
+ * may themselves have state.
*/
class SessionState : public qpid::SessionState,
public SessionContext,
@@ -101,8 +102,9 @@ class SessionState : public qpid::SessionState,
void readyToSend();
- framing::FrameHandler::Chain& getInChain();
- framing::FrameHandler::Chain& getOutChain();
+ // Tag types to identify PluginHandlerChains.
+ struct InTag {};
+ struct OutTag {};
private:
@@ -131,7 +133,9 @@ class SessionState : public qpid::SessionState,
management::Session* mgmtObject;
framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler;
framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler;
- framing::FrameHandler::Chain inChain, outChain;
+
+ qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain;
+ qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d97a840f82..4ea77e7fbf 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/memory.h"
#include <boost/bind.hpp>
#include <boost/scoped_array.hpp>
#include <algorithm>
@@ -36,25 +37,12 @@ using namespace qpid::sys;
using namespace std;
using broker::Connection;
-namespace {
-
-// FIXME aconway 2008-07-01: sending every frame to cluster,
-// serializing all processing in cluster deliver thread.
-// This will not perform at all, but provides a correct starting point.
-//
-// TODO:
-// - Fake "Connection" for cluster: owns shadow sessions.
-// - Maintain shadow sessions.
-// - Apply foreign frames to shadow sessions.
-//
-
-
// Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public FrameHandler {
- Connection& connection;
+struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
+ Cluster::ConnectionChain& connection;
Cluster& cluster;
- ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
+ ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
void handle(AMQFrame& f) {
// FIXME aconway 2008-01-29: Refcount Connections to ensure
@@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler {
}
};
-struct ConnectionObserver : public broker::ConnectionManager::Observer {
- Cluster& cluster;
- ConnectionObserver(Cluster& c) : cluster(c) {}
-
- void created(Connection& c) {
- // FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
- c.getInChain().insert(sender);
- }
-};
+void Cluster::initialize(Cluster::ConnectionChain& cc) {
+ cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
@@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg(*this),
name(name_),
url(url_),
- observer(new ConnectionObserver(*this)),
self(cpg.self())
{
QPID_LOG(trace, "Joining cluster: " << name_);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 031baf914a..84b5ed072c 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -22,6 +22,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/ShadowConnectionOutputHandler.h"
+#include "qpid/HandlerChain.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
@@ -47,6 +48,8 @@ namespace cluster {
class Cluster : private sys::Runnable, private Cpg::Handler
{
public:
+ typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+
/** Details of a cluster member */
struct Member {
Member(const Url& url_=Url()) : url(url_) {}
@@ -62,11 +65,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler
*/
Cluster(const std::string& name, const Url& url, broker::Broker&);
+ // Add cluster handlers to broker chains.
+ void initialize(ConnectionChain&);
+
virtual ~Cluster();
- // FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
-
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -124,7 +127,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
Id self;
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 6d3dca84be..c4b67de141 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -54,24 +54,29 @@ struct ClusterOptions : public Options {
};
struct ClusterPlugin : public Plugin {
+ typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
ClusterOptions options;
boost::optional<Cluster> cluster;
- Options* getOptions() { return &options; }
+ template <class Chain> void init(Plugin::Target& t) {
+ Chain* c = dynamic_cast<Chain*>(&t);
+ if (c) cluster->initialize(*c);
+ }
void earlyInitialize(Plugin::Target&) {}
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- // Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.name.empty()) {
- assert(!cluster); // A process can only belong to one cluster.
+ if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- broker->getConnectionManager().add(cluster->getObserver());
+ return;
}
+ if (!cluster) return; // Ignore chain handlers if we didn't init a cluster.
+ init<ConnectionChain>(target);
}
};
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index edd7f469b0..a2a8ee7bfa 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -28,7 +28,6 @@
namespace qpid {
namespace framing {
-/** Generic handler that can be linked into chains. */
template <class T>
struct Handler {
typedef T HandledType;
@@ -46,23 +45,6 @@ struct Handler {
/** Pointer to next handler in a linked list. */
Handler<T>* next;
- /** A Chain is a handler holding a linked list of sub-handlers.
- * Chain::next is invoked after the full chain, it is not itself part of the chain.
- * Handlers inserted into the chain are deleted by the Chain dtor.
- */
- class Chain : public Handler<T> {
- public:
- Chain(Handler<T>& next_) : Handler(&next_), first(&next_) {}
- ~Chain() { while (first != next) pop(); }
- void handle(T t) { first->handle(t); }
- void insert(Handler<T>* h) { h->next = first; first = h; }
- bool empty() { return first == next; }
-
- private:
- void pop() { Handler<T>* p=first; first=first->next; delete p; }
- Handler<T>* first;
- };
-
/** Adapt any void(T) functor as a Handler.
* Functor<F>(f) will copy f.
* Functor<F&>(f) will only take a reference to x.
@@ -84,7 +66,7 @@ struct Handler {
MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
void handle(T t) { (target->*F)(t); }
- /** Allow calling with -> syntax, compatible with Chains */
+ /** Allow calling with -> syntax, like a qpid::HandlerChain */
MemFunRef* operator->() { return this; }
private:
@@ -103,15 +85,13 @@ struct Handler {
};
/** Support for implementing an in-out handler pair as a single class.
- * Public interface is Handler<T>::Chains pair, but implementation
- * overrides handleIn, handleOut functions in a single class.
+ * Overrides handleIn, handleOut functions in a single class.
*/
struct InOutHandler : protected InOutHandlerInterface {
InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {}
MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in;
MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out;
};
-
};
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 65aa4d5a28..beab305f75 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -174,7 +174,7 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
}
}
-QPID_AUTO_TEST_CASE(testMessageReplication) {
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
ClusterFixture cluster(2);
Client c0(cluster[0].getPort());
@@ -190,6 +190,28 @@ QPID_AUTO_TEST_CASE(testMessageReplication) {
BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
-// TODO aconway 2008-06-25: dequeue replication, failover.
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+ // Enqueue on one broker, dequeue on two others.
+ ClusterFixture cluster (3);
+ Client c0(cluster[0].getPort());
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+ c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.close();
+
+ Message msg;
+
+ Client c1(cluster[1].getPort());
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("foo", msg.getData());
+
+ Client c2(cluster[2].getPort());
+ BOOST_CHECK(c1.subs.get(msg, "q"));
+ BOOST_CHECK_EQUAL("bar", msg.getData());
+ QueueQueryResult r = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(0, r.getMessageCount());
+}
+
+// TODO aconway 2008-06-25: failover.
QPID_AUTO_TEST_SUITE_END()