summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
committerAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
commite65b0086a2924ff04640b1350393a816249d01b3 (patch)
treeb372c5386cc44e3ad16c4ae585088ed038a629e4
parente596837411d54a16dd3cb1e5de717664496c2bd0 (diff)
downloadqpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd. - Plugin::addFinalizer - more flexible way to shutdown plugins. - Reworked cluster extension points using boost::function. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/HandlerChain.h97
-rw-r--r--cpp/src/qpid/Options.cpp2
-rw-r--r--cpp/src/qpid/Options.h2
-rw-r--r--cpp/src/qpid/Plugin.cpp17
-rw-r--r--cpp/src/qpid/Plugin.h36
-rw-r--r--cpp/src/qpid/RefCounted.h19
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp8
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h10
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Connection.cpp23
-rw-r--r--cpp/src/qpid/broker/Connection.h28
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h10
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp195
-rw-r--r--cpp/src/qpid/cluster/Cluster.h53
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp74
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.cpp82
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.h77
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp8
-rw-r--r--cpp/src/qpid/cluster/Cpg.h1
-rw-r--r--cpp/src/qpid/framing/Handler.h2
-rw-r--r--cpp/src/tests/ForkedBroker.h100
-rw-r--r--cpp/src/tests/cluster.mk5
-rw-r--r--cpp/src/tests/cluster_test.cpp93
-rwxr-xr-xcpp/src/tests/run_test4
-rw-r--r--cpp/xml/cluster.xml10
29 files changed, 538 insertions, 441 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 74aa504e90..f513ab8ee3 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -353,7 +353,6 @@ nobase_include_HEADERS = \
qpid/amqp_0_10/Exception.h \
qpid/Msg.h \
qpid/Options.h \
- qpid/HandlerChain.h \
qpid/Plugin.h \
qpid/ptr_map.h \
qpid/RangeSet.h \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 87a6d4cd54..718dffff38 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -12,6 +12,8 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/ClusterPlugin.cpp \
+ qpid/cluster/ConnectionInterceptor.h \
+ qpid/cluster/ConnectionInterceptor.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
qpid/cluster/ShadowConnectionOutputHandler.h
diff --git a/cpp/src/qpid/HandlerChain.h b/cpp/src/qpid/HandlerChain.h
deleted file mode 100644
index e3746ec14b..0000000000
--- a/cpp/src/qpid/HandlerChain.h
+++ /dev/null
@@ -1,97 +0,0 @@
-#ifndef QPID_HANDLERCHAIN_H
-#define QPID_HANDLERCHAIN_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/Plugin.h>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <memory>
-
-namespace qpid {
-
-/**
- * Chain-of-responsibility design pattern.
- *
- * Construct a chain of objects deriving from Base. Each implements
- * Base::f by doing its own logic and then calling Base::f on the next
- * handler (or not if it chooses not to.)
- *
- * HandlerChain acts as a smart pointer to the first object in the chain.
- */
-template <class Base>
-class HandlerChain {
- public:
- /** Base class for chainable handlers */
- class Handler : public Base {
- public:
- Handler() : next() {}
- virtual ~Handler() {}
- virtual void setNext(Base* next_) { next = next_; }
-
- protected:
- Base* next;
- };
-
- typedef std::auto_ptr<Handler> HandlerAutoPtr;
-
- /**@param target is the object at the end of the chain. */
- HandlerChain(Base& target) : first(&target) {}
-
- /** HandlerChain owns the ChainableHandler. */
- void push(HandlerAutoPtr h) {
- handlers.push_back(h.release());
- h->setNext(first);
- first = h.get();
- }
-
- // Smart pointer functions
- Base* operator*() { return first; }
- const Base* operator*() const { return first; }
- Base* operator->() { return first; }
- const Base* operator->() const { return first; }
- operator bool() const { return first; }
-
- private:
- boost::ptr_vector<Base> handlers;
- Base* first;
-};
-
-/**
- * A PluginHandlerChain calls Plugin::initAll(*this) on construction,
- * allowing plugins to add handlers.
- *
- * @param Tag can be any class, use to distinguish different plugin
- * chains with the same Base type.
- */
-template <class Base, class Tag=void>
-struct PluginHandlerChain : public HandlerChain<Base>,
- public Plugin::Target
-{
- PluginHandlerChain(Base& target) : HandlerChain<Base>(target) {
- Plugin::initAll(*this);
- }
-};
-
-
-} // namespace qpid
-
-#endif /*!QPID_HANDLERCHAIN_H*/
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index 1deaca199f..e521b1220a 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -176,7 +176,7 @@ Options::Options(const string& name) :
-void Options::parse(int argc, char** argv, const std::string& configFile, bool allowUnknown)
+void Options::parse(int argc, char const* const* argv, const std::string& configFile, bool allowUnknown)
{
string defaultConfigFile = configFile; // May be changed by env/cmdline
string parsing;
diff --git a/cpp/src/qpid/Options.h b/cpp/src/qpid/Options.h
index 35ce8f9d40..cb86d27241 100644
--- a/cpp/src/qpid/Options.h
+++ b/cpp/src/qpid/Options.h
@@ -209,7 +209,7 @@ struct Options : public po::options_description {
* Note the filename argument can reference an options variable that
* is updated by argc/argv or environment variable parsing.
*/
- void parse(int argc, char** argv,
+ void parse(int argc, char const* const* argv,
const std::string& configfile=std::string(),
bool allowUnknown = false);
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp
index b8206499ae..e4b76db28a 100644
--- a/cpp/src/qpid/Plugin.cpp
+++ b/cpp/src/qpid/Plugin.cpp
@@ -35,15 +35,20 @@ Plugin::Plugins& thePlugins() {
return plugins;
}
-void call(boost::function<void()> f) { f(); }
+void invoke(boost::function<void()> f) { f(); }
} // namespace
-Plugin::Target::~Target() {
- std::for_each(cleanup.begin(), cleanup.end(), &call);
+Plugin::Target::~Target() { finalize(); }
+
+void Plugin::Target::finalize() {
+ for_each(finalizers.begin(), finalizers.end(), invoke);
+ finalizers.clear();
}
-void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); }
+void Plugin::Target::addFinalizer(const boost::function<void()>& f) {
+ finalizers.push_back(f);
+}
Plugin::Plugin() {
// Register myself.
@@ -69,7 +74,7 @@ void Plugin::addOptions(Options& opts) {
}
}
-void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); }
-void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); }
+void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); }
+void Plugin::initializeAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); }
} // namespace qpid
diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h
index a53d4e5d18..3c7c8031bb 100644
--- a/cpp/src/qpid/Plugin.h
+++ b/cpp/src/qpid/Plugin.h
@@ -21,11 +21,9 @@
*
*/
-#include "qpid/shared_ptr.h"
#include <boost/noncopyable.hpp>
-#include <vector>
#include <boost/function.hpp>
-
+#include <vector>
/**@file Generic plug-in framework. */
@@ -35,30 +33,36 @@ class Options;
/**
* Plug-in base class.
*/
-class Plugin : boost::noncopyable
-{
+class Plugin : private boost::noncopyable {
public:
+ typedef std::vector<Plugin*> Plugins;
+
/**
- * Base interface for targets that receive plug-ins.
- * Plug-ins can register clean-up functions to execute when
- * the target is destroyed.
+ * Base interface for targets that can receive plug-ins.
+ * Also allows plug-ins to attach a a function to be called
+ * when the target is 'finalized'.
*/
- struct Target {
+ class Target : private boost::noncopyable
+ {
public:
+ /** Calls finalize() if not already called. */
virtual ~Target();
- void addCleanup(const boost::function<void()>& cleanupFunction);
+
+ /** Run all the finalizers */
+ void finalize();
+
+ /** Add a function to run when finalize() is called */
+ void addFinalizer(const boost::function<void()>&);
private:
- std::vector<boost::function<void()> > cleanup;
+ std::vector<boost::function<void()> > finalizers;
};
- typedef std::vector<Plugin*> Plugins;
-
/**
- * Construct registers the plug-in to appear in getPlugins().
+ * Constructor registers the plug-in to appear in getPlugins().
*
* A concrete Plugin is instantiated as a global or static
- * member variable in a library so it is registered during static
+ * member variable in a library so it is registered during
* initialization when the library is loaded.
*/
Plugin();
@@ -103,7 +107,7 @@ class Plugin : boost::noncopyable
static void earlyInitAll(Target&);
/** Call initialize() on all registered plugins */
- static void initAll(Target&);
+ static void initializeAll(Target&);
/** For each registered plugin, add plugin.getOptions() to opts. */
static void addOptions(Options& opts);
diff --git a/cpp/src/qpid/RefCounted.h b/cpp/src/qpid/RefCounted.h
index d67f6c31db..10b5e4afcc 100644
--- a/cpp/src/qpid/RefCounted.h
+++ b/cpp/src/qpid/RefCounted.h
@@ -46,23 +46,6 @@ protected:
virtual ~RefCounted() {};
};
-/**
- * Reference-counted member of a reference-counted parent class.
- * Delegates reference counts to the parent so that the parent is
- * deleted only when there are no references to the parent or any of
- * its children.
- * TODO: Delete this class if it's unused as I don't think this class makes much sense:
- */
-struct RefCountedChild {
- RefCounted& parent;
-
-protected:
- RefCountedChild(RefCounted& parent_) : parent(parent_) {}
-
-public:
- void addRef() const { parent.addRef(); }
- void release() const { parent.release(); }
-};
} // namespace qpid
@@ -70,8 +53,6 @@ public:
namespace boost {
inline void intrusive_ptr_add_ref(const qpid::RefCounted* p) { p->addRef(); }
inline void intrusive_ptr_release(const qpid::RefCounted* p) { p->release(); }
-inline void intrusive_ptr_add_ref(const qpid::RefCountedChild* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCountedChild* p) { p->release(); }
}
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index ccd31c78a7..a3692911b2 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -29,7 +29,7 @@ using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
: frameQueueClosed(false), output(o),
- connection(this, broker, id, _isClient),
+ connection(new broker::Connection(this, broker, id, _isClient)),
identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
@@ -46,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
+ connection->received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection.doOutput();
+ if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -91,7 +91,7 @@ void Connection::close() {
}
void Connection::closed() {
- connection.closed();
+ connection->closed();
}
void Connection::send(framing::AMQFrame& f) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index a3a756cefb..b707031789 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTION_H
-#define QPID_BROKER_CONNECTION_H
+#ifndef QPID_AMQP_0_10_CONNECTION_H
+#define QPID_AMQP_0_10_CONNECTION_H
/*
*
@@ -24,8 +24,8 @@
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/Mutex.h"
-#include "Connection.h"
#include "qpid/broker/Connection.h"
+#include <boost/intrusive_ptr.hpp>
#include <queue>
#include <memory>
@@ -40,7 +40,7 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection connection;
+ boost::intrusive_ptr<broker::Connection> connection;
std::string identifier;
bool initialized;
bool isClient;
@@ -60,4 +60,4 @@ class Connection : public sys::ConnectionCodec,
}} // namespace qpid::amqp_0_10
-#endif /*!QPID_BROKER_CONNECTION_H*/
+#endif /*!QPID_AMQP_0_10_CONNECTION_H*/
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index bffca94f95..b8204c9cf5 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -285,6 +285,7 @@ void Broker::shutdown() {
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
poller->shutdown();
+ finalize(); // Finalize any plugins.
}
Broker::~Broker() {
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5e85d3c89c..e77911bd10 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -49,14 +49,14 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
+ receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
+ closedFn(boost::bind(&Connection::closedImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
mgmtObject(0),
- links(broker_.getLinks()),
- lastInHandler(*this),
- inChain(lastInHandler)
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject();
@@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
agent->addObject(mgmtObject);
}
+
+ Plugin::initializeAll(*this); // Let plug-ins update extension points.
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
out->activateOutput();
}
-
Connection::~Connection()
{
if (mgmtObject != 0)
@@ -88,9 +89,9 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
-
-void Connection::receivedLast(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
+
+void Connection::receivedImpl(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -170,10 +171,13 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){ // Physically closed, suspend open sessions.
+void Connection::closed() { closedFn(); }
+
+void Connection::closedImpl(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
+ // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
exclusiveQueues.erase(exclusiveQueues.begin());
}
} catch(std::exception& e) {
- QPID_LOG(error, " Unhandled exception while closing session: " <<
- e.what());
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
assert(0);
}
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index c911e88200..0d646bab83 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_CONNECTION_H
+#define QPID_BROKER_CONNECTION_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,8 +21,6 @@
* under the License.
*
*/
-#ifndef _Connection_
-#define _Connection_
#include <memory>
#include <sstream>
@@ -43,7 +44,8 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Connection.h"
-#include "qpid/HandlerChain.h"
+#include "qpid/Plugin.h"
+#include "qpid/RefCounted.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -53,11 +55,11 @@ namespace broker {
class LinkRegistry;
class Connection : public sys::ConnectionInputHandler,
- public ConnectionState
+ public ConnectionState,
+ public Plugin::Target,
+ public RefCounted
{
public:
- typedef boost::shared_ptr<Connection> shared_ptr;
-
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -74,8 +76,8 @@ class Connection : public sys::ConnectionInputHandler,
void received(framing::AMQFrame& frame);
void idleOut();
void idleIn();
- void closed();
bool doOutput();
+ void closed();
void closeChannel(framing::ChannelId channel);
@@ -92,12 +94,16 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ // Extension points: allow plugins to insert additional functionality.
+ boost::function<void(framing::AMQFrame&)> receivedFn;
+ boost::function<void()> closedFn;
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- // End of the received handler chain.
- void receivedLast(framing::AMQFrame& frame);
+ void receivedImpl(framing::AMQFrame& frame);
+ void closedImpl();
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
@@ -108,10 +114,8 @@ class Connection : public sys::ConnectionInputHandler,
boost::function0<void> ioCallback;
management::Connection* mgmtObject;
LinkRegistry& links;
- framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
- PluginHandlerChain<framing::FrameHandler, Connection> inChain;
};
}}
-#endif
+#endif /*!QPID_BROKER_CONNECTION_H*/
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 698f8123e8..c9cf6ece8d 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -70,6 +70,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
sys::ConnectionOutputHandler& getOutput() const { return *out; }
framing::ProtocolVersion getVersion() const { return version; }
+ void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+
protected:
framing::ProtocolVersion version;
sys::ConnectionOutputHandler* out;
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 3cc509c904..aa6f6b7520 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -54,11 +54,7 @@ SessionState::SessionState(
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
- mgmtObject(0),
- inLastHandler(*this),
- outLastHandler(*this),
- inChain(inLastHandler),
- outChain(outLastHandler)
+ mgmtObject(0)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -75,9 +71,6 @@ SessionState::SessionState(
SessionState::~SessionState() {
// Remove ID from active session list.
- // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
- // they don't belong in the manager. For now rely on uniqueness of UUIDs.
- //
broker.getSessionManager().forget(getId());
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -126,7 +119,6 @@ void SessionState::activateOutput() {
Mutex::ScopedLock l(lock);
if (isAttached())
getConnection().outputTasks.activateOutput();
- // FIXME aconway 2008-05-22: should we hold the lock over activateOutput??
}
ManagementObject* SessionState::GetManagementObject (void) const
@@ -224,10 +216,7 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); }
-void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); }
-
-void SessionState::handleInLast(AMQFrame& frame) {
+void SessionState::handleIn(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
@@ -258,7 +247,7 @@ void SessionState::handleInLast(AMQFrame& frame) {
}
}
-void SessionState::handleOutLast(AMQFrame& frame) {
+void SessionState::handleOut(AMQFrame& frame) {
assert(handler);
handler->out(frame);
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index f6bf98d431..96f2e8f512 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -23,7 +23,6 @@
*/
#include "qpid/SessionState.h"
-#include "qpid/HandlerChain.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Mutex.h"
@@ -102,10 +101,6 @@ class SessionState : public qpid::SessionState,
void readyToSend();
- // Tag types to identify PluginHandlerChains.
- struct InTag {};
- struct OutTag {};
-
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
@@ -131,11 +126,6 @@ class SessionState : public qpid::SessionState,
IncompleteMessageList incomplete;
IncompleteMessageList::CompletionListener enqueuedOp;
management::Session* mgmtObject;
- framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler;
- framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler;
-
- qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain;
- qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 4ea77e7fbf..3b7f32e822 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,15 +17,19 @@
*/
#include "Cluster.h"
+#include "ConnectionInterceptor.h"
+
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
+#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
-#include <boost/scoped_array.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -37,24 +41,6 @@ using namespace qpid::sys;
using namespace std;
using broker::Connection;
-// Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
- Cluster::ConnectionChain& connection;
- Cluster& cluster;
-
- ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-
- void handle(AMQFrame& f) {
- // FIXME aconway 2008-01-29: Refcount Connections to ensure
- // Connection not destroyed till message is self delivered.
- cluster.send(f, &connection, next); // Indirectly send to next via cluster.
- }
-};
-
-void Cluster::initialize(Cluster::ConnectionChain& cc) {
- cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
-}
-
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "-" << cluster.self;
}
@@ -69,14 +55,14 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-// FIXME aconway 2008-07-02: create a Connection for the cluster.
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- broker(b),
cpg(*this),
+ broker(&b),
name(name_),
url(url_),
self(cpg.self())
{
+ broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
@@ -90,15 +76,27 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
}
Cluster::~Cluster() {
- QPID_LOG(trace, *this << " Leaving cluster.");
- try {
- cpg.leave(name);
- cpg.shutdown();
- dispatcher.join();
- }
- catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << *this << ": "
- << e.what());
+ cpg.shutdown();
+ dispatcher.join();
+}
+
+// local connection initializes plugins
+void Cluster::initialize(broker::Connection& c) {
+ bool isLocal = &c.getOutput() != &shadowOut;
+ if (isLocal)
+ new ConnectionInterceptor(c, *this);
+}
+
+void Cluster::leave() {
+ if (!broker.get()) return; // Already left
+ QPID_LOG(info, QPID_MSG("Leaving cluster " << *this));
+ // Must not be called in the dispatch thread.
+ assert(Thread::current().id() != dispatcher.id());
+ cpg.leave(name);
+ // Wait till final config-change is delivered and broker is released.
+ {
+ Mutex::ScopedLock l(lock);
+ while(broker.get()) lock.wait();
}
}
@@ -112,22 +110,20 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
buf.putLongLong(value);
}
-void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- // TODO aconway 2008-07-03: More efficient buffer management.
+ // FIXME aconway 2008-07-03: More efficient buffer management.
// Cache coded form of decoded frames for re-encoding?
Buffer buf(buffer);
- assert(frame.size() + 128 < sizeof(buffer));
+ assert(frame.size() + 64 < sizeof(buffer));
frame.encode(buf);
encodePtr(buf, connection);
- encodePtr(buf, next);
iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
- AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0, 0);
+ send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
}
size_t Cluster::size() const {
@@ -143,19 +139,17 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-boost::shared_ptr<broker::Connection>
-Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
- // FIXME aconway 2008-07-02: locking - called by deliver in
- // cluster thread so no locks but may need to revisit as model
- // changes.
- ShadowConnectionId id(member, connectionPtr);
- boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
- if (!ptr) {
+ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
+ ShadowConnectionId id(member, remotePtr);
+ ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
+ if (i == shadowConnectionMap.end()) { // A new shadow connection.
std::ostringstream os;
- os << name << ":" << member << ":" << std::hex << connectionPtr;
- ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ os << name << ":" << member << ":" << remotePtr;
+ broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
+ ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
+ i = shadowConnectionMap.insert(value).first;
}
- return ptr;
+ return i->second;
}
void Cluster::deliver(
@@ -171,78 +165,75 @@ void Cluster::deliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- void* connectionId;
- decodePtr(buf, connectionId);
+ ConnectionInterceptor* connection;
+ decodePtr(buf, connection);
+ QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
- QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
-
- if (connectionId == 0) // A cluster control frame.
- handleClusterFrame(from, frame);
- else if (from == self) { // My own frame, carries a next pointer.
- FrameHandler* next;
- decodePtr(buf, next);
- next->handle(frame);
- }
- else { // Foreign frame, forward to shadow connection.
- // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
- boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
- shadow->received(frame);
+ if (!broker.get()) {
+ QPID_LOG(warning, "Ignoring late DLVR, already left the cluster.");
+ return;
}
+
+ if (connection && from != self) // Look up shadow for remote connections
+ connection = getShadowConnection(from, connection);
+
+ if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
+ handleMethod(from, connection, *frame.getMethod());
+ else
+ connection->deliver(frame);
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+ assert(0);
+ throw;
}
}
-bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
- Duration timeout) const
-{
- AbsTime deadline(now(), timeout);
- Mutex::ScopedLock l(lock);
- while (!predicate(*this) && lock.wait(deadline))
- ;
- return (predicate(*this));
-}
-
-// Handle cluster control frame .
-void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
- // TODO aconway 2007-06-20: use visitor pattern here.
- ClusterNotifyBody* notifyIn=
- dynamic_cast<ClusterNotifyBody*>(frame.getBody());
- assert(notifyIn);
- MemberList list;
- {
- Mutex::ScopedLock l(lock);
- members[from].url=notifyIn->getUrl();
- lock.notifyAll();
- QPID_LOG(debug, "Cluster join: " << members);
+// Handle cluster methods
+// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
+void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
+ assert(method.amqpClassId() == CLUSTER_CLASS_ID);
+ switch (method.amqpMethodId()) {
+ case CLUSTER_NOTIFY_METHOD_ID: {
+ ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
+ Mutex::ScopedLock l(lock);
+ members[from].url=notify.getUrl();
+ lock.notifyAll();
+ break;
+ }
+ case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
+ if (!connection->isLocal())
+ shadowConnectionMap.erase(connection->getShadowId());
+ connection->deliverClosed();
+ break;
+ }
+ default:
+ assert(0);
}
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int nJoined)
{
- bool newMembers=false;
- MemberList updated;
- {
- Mutex::ScopedLock l(lock);
- if (nLeft) {
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(left[i]));
- QPID_LOG(debug, "Cluster leave: " << members);
- lock.notifyAll();
- }
- newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
- // We don't record members joining here, we record them when
- // we get their ClusterNotify message.
+ Mutex::ScopedLock l(lock);
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(left[i]);
+ for(int j = 0; j < nCurrent; ++j)
+ members[current[j]].id = current[j];
+ QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
+ << members);
+ assert(members.size() == size_t(nCurrent));
+ if (members.find(self) == members.end()) {
+ QPID_LOG(debug, "Left cluster " << *this);
+ broker = 0; // Release broker reference.
}
- if (newMembers) // Notify new members of my presence.
- notify();
+
+ lock.notifyAll(); // Threads waiting for membership changes.
}
void Cluster::run() {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 84b5ed072c..7147b1ac05 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -22,14 +22,14 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/ShadowConnectionOutputHandler.h"
-#include "qpid/HandlerChain.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Logger.h"
#include "qpid/Url.h"
-
+#include "qpid/RefCounted.h"
#include <boost/optional.hpp>
#include <boost/function.hpp>
@@ -41,19 +41,21 @@
namespace qpid {
namespace cluster {
+class ConnectionInterceptor;
+
/**
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
{
public:
- typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+ typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
/** Details of a cluster member */
struct Member {
- Member(const Url& url_=Url()) : url(url_) {}
- Url url; ///< Broker address.
+ Cpg::Id id;
+ Url url;
};
typedef std::vector<Member> MemberList;
@@ -65,11 +67,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler
*/
Cluster(const std::string& name, const Url& url, broker::Broker&);
- // Add cluster handlers to broker chains.
- void initialize(ConnectionChain&);
-
virtual ~Cluster();
+ /** Initialize interceptors for a new connection */
+ void initialize(broker::Connection&);
+
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -78,22 +80,22 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool empty() const { return size() == 0; }
- /** Wait for predicate(*this) to be true, up to timeout.
- *@return True if predicate became true, false if timed out.
- *Note the predicate may not be true after wait returns,
- *all the caller can say is it was true at some earlier point.
- */
- bool wait(boost::function<bool(const Cluster&)> predicate,
- sys::Duration timeout=sys::TIME_INFINITE) const;
-
/** Send frame to the cluster */
- void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
+ void send(const framing::AMQFrame&, ConnectionInterceptor*);
+
+ /** Leave the cluster */
+ void leave();
+
+ // Cluster frame handing functions
+ void notify(const std::string& url);
+ void connectionClose();
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
- typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
- typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
+ typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+
+ boost::function<void()> shutdownNext;
void notify(); ///< Notify cluster of my details.
@@ -114,19 +116,18 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
-
- void handleClusterFrame(Id from, framing::AMQFrame&);
- boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+ void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
+
+ ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
- mutable sys::Monitor lock;
- broker::Broker& broker;
+ mutable sys::Monitor lock; // Protect access to members.
Cpg cpg;
+ boost::intrusive_ptr<broker::Broker> broker;
Cpg::Name name;
Url url;
MemberMap members;
sys::Thread dispatcher;
- boost::function<void()> callback;
Id self;
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index c4b67de141..a2c66e3790 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -15,8 +15,8 @@
* limitations under the License.
*
*/
-#include <boost/program_options/value_semantic.hpp>
+#include "ConnectionInterceptor.h"
#include "qpid/broker/Broker.h"
@@ -25,61 +25,81 @@
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
-#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
-
namespace qpid {
namespace cluster {
using namespace std;
-struct ClusterOptions : public Options {
+struct ClusterValues {
string name;
string url;
- ClusterOptions() : Options("Cluster Options") {
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
+
+/** Note separating options from values to work around boost version differences.
+ * Old boost takes a reference to options objects, but new boost makes a copy.
+ * New boost allows a shared_ptr but that's not compatible with old boost.
+ */
+struct ClusterOptions : public Options {
+ ClusterValues& values;
+
+ ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) {
addOptions()
- ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(url,"URL"),
+ ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(values.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
;
}
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
};
struct ClusterPlugin : public Plugin {
- typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+ ClusterValues values;
ClusterOptions options;
- boost::optional<Cluster> cluster;
+ boost::intrusive_ptr<Cluster> cluster;
+
+ ClusterPlugin() : options(values) {}
+
+ Options* getOptions() { return &options; }
- template <class Chain> void init(Plugin::Target& t) {
- Chain* c = dynamic_cast<Chain*>(&t);
- if (c) cluster->initialize(*c);
+ void init(broker::Broker& b) {
+ if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
+ cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
+ b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+ }
+
+ template <class T> void init(T& t) {
+ if (cluster) cluster->initialize(t);
+ }
+
+ template <class T> bool init(Plugin::Target& target) {
+ T* t = dynamic_cast<T*>(&target);
+ if (t) init(*t);
+ return t;
}
void earlyInitialize(Plugin::Target&) {}
void initialize(Plugin::Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && !options.name.empty()) {
- if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
- cluster = boost::in_place(options.name,
- options.getUrl(broker->getPort()),
- boost::ref(*broker));
- return;
- }
- if (!cluster) return; // Ignore chain handlers if we didn't init a cluster.
- init<ConnectionChain>(target);
+ if (init<broker::Broker>(target)) return;
+ if (!cluster) return; // Remaining plugins only valid if cluster initialized.
+ if (init<broker::Connection>(target)) return;
}
+
+ void shutdown() { cluster = 0; }
};
static ClusterPlugin instance; // Static initialization.
+
+// For test purposes.
+boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
new file mode 100644
index 0000000000..5283ba9b1a
--- /dev/null
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/AMQFrame.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; }
+
+ConnectionInterceptor::ConnectionInterceptor(
+ broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
+ : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+{
+ connection->addFinalizer(boost::bind(operator delete, this));
+ // Attach my functions to Connection extension points.
+ shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
+ shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
+}
+
+ConnectionInterceptor::~ConnectionInterceptor() {
+ assert(isClosed);
+ assert(connection == 0);
+}
+
+void ConnectionInterceptor::received(framing::AMQFrame& f) {
+ if (isClosed) return;
+ cluster.send(f, this);
+}
+
+void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
+ receivedNext(f);
+}
+
+void ConnectionInterceptor::closed() {
+ if (isClosed) return;
+ try {
+ // Called when the local network connection is closed. We still
+ // need to process any outstanding cluster frames for this
+ // connection to ensure our sessions are up-to-date. We defer
+ // closing the Connection object till deliverClosed(), but replace
+ // its output handler with a null handler since the network output
+ // handler will be deleted.
+ //
+ connection->setOutputHandler(&discardHandler);
+ cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this);
+ isClosed = true;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+ }
+}
+
+void ConnectionInterceptor::deliverClosed() {
+ closedNext();
+ // Drop reference so connection will be deleted, which in turn
+ // will delete this via finalizer added in ctor.
+ connection = 0;
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h
new file mode 100644
index 0000000000..d499acb832
--- /dev/null
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
+#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+/**
+ * Plug-in associated with broker::Connections, both local and shadow.
+ */
+class ConnectionInterceptor {
+ public:
+ ConnectionInterceptor(broker::Connection&, Cluster&,
+ Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0));
+ ~ConnectionInterceptor();
+
+ // Called on self-delivery
+ void deliver(framing::AMQFrame& f);
+
+ // Called on self-delivery of my own cluster.connection-close
+ void deliverClosed();
+
+ Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
+
+ bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
+
+ private:
+ struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
+ void close() {}
+ void send(framing::AMQFrame&) {}
+ void doOutput() {}
+ void activateOutput() {}
+ };
+
+ // Functions to add to Connection extension points.
+ void received(framing::AMQFrame&);
+ void closed();
+
+ boost::function<void(framing::AMQFrame&)> receivedNext;
+ boost::function<void()> closedNext;
+
+ boost::intrusive_ptr<broker::Connection> connection;
+ Cluster& cluster;
+ NullConnectionHandler discardHandler;
+ bool isClosed;
+ Cluster::ShadowConnectionId shadowId;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
+
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 3118e11e57..674781ac06 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -62,7 +62,7 @@ void Cpg::globalConfigChange(
cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(Handler& h) : handler(h) {
+Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
check(cpg_context_set(handle, this), "Cannot set CPG context");
@@ -78,10 +78,10 @@ Cpg::~Cpg() {
}
void Cpg::shutdown() {
- if (handle) {
- cpg_context_set(handle, 0);
+ if (!isShutdown) {
+ QPID_LOG(debug,"Shutting down CPG");
+ isShutdown=true;
check(cpg_finalize(handle), "Error in shutdown of CPG");
- handle = 0;
}
}
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index d3142efcb6..c89bf3e121 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -165,6 +165,7 @@ class Cpg : public Dispatchable {
cpg_handle_t handle;
Handler& handler;
+ bool isShutdown;
};
std::ostream& operator <<(std::ostream& out, const cpg_name& name);
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index a2a8ee7bfa..e07a803e17 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -66,7 +66,7 @@ struct Handler {
MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
void handle(T t) { (target->*F)(t); }
- /** Allow calling with -> syntax, like a qpid::HandlerChain */
+ /** Allow calling with -> syntax */
MemFunRef* operator->() { return this; }
private:
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
index e01034c355..6c20330c28 100644
--- a/cpp/src/tests/ForkedBroker.h
+++ b/cpp/src/tests/ForkedBroker.h
@@ -1,5 +1,5 @@
#ifndef TESTS_FORKEDBROKER_H
-#define TESTS_FORKEDBROKER_H
+
/*
*
@@ -23,16 +23,11 @@
*/
#include "qpid/Exception.h"
-#include "qpid/sys/Fork.h"
-#include "qpid/log/Logger.h"
+#include "qpid/log/Statement.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SignalHandler.h"
-
#include <boost/lexical_cast.hpp>
-
#include <string>
-
-#include <signal.h>
+#include <stdio.h>
#include <sys/wait.h>
/**
@@ -48,63 +43,66 @@
* process.)
*
*/
-class ForkedBroker : public qpid::sys::ForkWithMessage {
- pid_t pid;
- uint16_t port;
- qpid::broker::Broker::Options opts;
- std::string prefix;
-
+class ForkedBroker {
public:
- struct ChildExit {}; // Thrown in child processes.
+ ForkedBroker(std::vector<const char*> argv) { init(argv); }
- ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(),
- const std::string& prefix_=std::string())
- : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
+ ForkedBroker(int argc, const char* const argv[]) {
+ std::vector<const char*> args(argv, argv+argc);
+ init(args);
+ }
~ForkedBroker() {
- try { stop(); }
- catch(const std::exception& e) {
- QPID_LOG(error, e.what());
+ try { stop(); } catch(const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what()));
}
}
void stop() {
- if (pid > 0) { // I am the parent, clean up children.
- if (::kill(pid, SIGINT) < 0)
- throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno)));
- int status = 0;
- if (::waitpid(pid, &status, 0) < 0)
- throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno)));
- if (WEXITSTATUS(status) != 0)
- throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status)));
- }
+ using qpid::ErrnoException;
+ if (pid == 0) return;
+ if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");
+ int status;
+ if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed");
+ if (WEXITSTATUS(status) != 0)
+ throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status)));
+ pid = 0;
}
- void parent(pid_t pid_) {
- pid = pid_;
- qpid::log::Logger::instance().setPrefix("parent");
- std::string portStr = wait(5);
- port = boost::lexical_cast<uint16_t>(portStr);
- }
+ uint16_t getPort() { return port; }
- void child() {
- prefix += boost::lexical_cast<std::string>(long(getpid()));
- qpid::log::Logger::instance().setPrefix(prefix);
- opts.port = 0;
- boost::intrusive_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts));
- qpid::broker::SignalHandler::setBroker(broker);
- QPID_LOG(info, "ForkedBroker started on " << broker->getPort());
- ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent.
- broker->run();
- QPID_LOG(notice, "ForkedBroker exiting.");
+ private:
- // Force exit in the child process, otherwise we will try to
- // carry with parent tests.
- broker = 0; // Run broker dtor before we exit.
- exit(0);
+ void init(const std::vector<const char*>& args) {
+ using qpid::ErrnoException;
+ pid = 0;
+ port = 0;
+ int pipeFds[2];
+ if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe");
+ pid = ::fork();
+ if (pid < 0) throw ErrnoException("Fork failed");
+ if (pid) { // parent
+ ::close(pipeFds[1]);
+ FILE* f = ::fdopen(pipeFds[0], "r");
+ if (!f) throw ErrnoException("fopen failed");
+ if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed");
+ }
+ else { // child
+ ::close(pipeFds[0]);
+ int fd = ::dup2(pipeFds[1], 1);
+ if (fd < 0) throw ErrnoException("dup2 failed");
+ const char* prog = "../qpidd";
+ std::vector<const char*> args2(args);
+ args2.push_back("--port=0");
+ args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems?
+ args2.push_back(0);
+ execv(prog, const_cast<char* const*>(&args2[0]));
+ throw ErrnoException("execv failed");
+ }
}
- uint16_t getPort() { return port; }
+ pid_t pid;
+ int port;
};
#endif /*!TESTS_FORKEDBROKER_H*/
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index da5b4d6e90..9190eee4e5 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -10,9 +10,8 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la
#
-# FIXME aconway 2008-07-04: disabled till process leak is plugged.
-# ais_check checks conditions for cluster tests and run them if ok.
-#TESTS+=ais_check
+# ais_check checks pre-requisites for cluster tests and runs them if ok.
+TESTS+=ais_check
EXTRA_DIST+=ais_check
check_PROGRAMS+=cluster_test
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d361919f0b..cafac489d2 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -16,13 +16,13 @@
*
*/
-
#include "test_tools.h"
#include "unit_test.h"
#include "ForkedBroker.h"
#include "BrokerFixture.h"
#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
@@ -37,10 +37,13 @@
#include <vector>
#include <algorithm>
-#include <signal.h>
+namespace qpid {
+namespace cluster {
+boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
+}} // namespace qpid::cluster
-QPID_AUTO_TEST_SUITE(CpgTestSuite)
+QPID_AUTO_TEST_SUITE(CpgTestSuite)
using namespace std;
using namespace qpid;
@@ -49,27 +52,60 @@ using namespace qpid::framing;
using namespace qpid::client;
using qpid::broker::Broker;
using boost::ptr_vector;
+using qpid::cluster::Cluster;
+using qpid::cluster::getGlobalCluster;
-struct ClusterFixture : public ptr_vector<ForkedBroker> {
+/** Cluster fixture is a vector of ports for the replicas.
+ * Replica 0 is in the current process, all others are forked as children.
+ */
+struct ClusterFixture : public vector<uint16_t> {
string name;
+ Broker::Options opts;
+ std::auto_ptr<BrokerFixture> broker0;
+ boost::ptr_vector<ForkedBroker> forkedBrokers;
- ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); }
+ ClusterFixture(size_t n);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add();
+ void setup();
};
+ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
+ add(n);
+ // Wait for all n members to join the cluster
+ int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
+ while (retry && getGlobalCluster()->size() != n) {
+ ::sleep(1);
+ --retry;
+ }
+ BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+}
+
void ClusterFixture::add() {
- broker::Broker::Options opts;
- Plugin::addOptions(opts); // For cluster options.
+ std::ostringstream os;
+ os << "broker" << size();
+ std::string prefix = os.str();
+
const char* argv[] = {
- "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir"
+ "qpidd " __FILE__ ,
+ "--load-module=../.libs/libqpidcluster.so",
+ "--cluster-name", name.c_str(),
+ "--auth=no", "--no-data-dir",
+ "--log-prefix", prefix.c_str(),
};
- opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
- ostringstream prefix;
- prefix << "b" << size() << "-";
- QPID_LOG(info, "ClusterFixture adding broker " << prefix.str());
- push_back(new ForkedBroker(opts, prefix.str()));
- QPID_LOG(info, "ClusterFixture added broker " << prefix.str());
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+
+ if (size()) { // Not the first broker, fork.
+ forkedBrokers.push_back(new ForkedBroker(argc, argv));
+ push_back(forkedBrokers.back().getPort());
+ }
+ else { // First broker, run in this process.
+ Broker::Options opts;
+ Plugin::addOptions(opts); // Pick up cluster options.
+ opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+ broker0.reset(new BrokerFixture(opts));
+ push_back(broker0->getPort());
+ }
}
// For debugging: op << for CPG types.
@@ -149,26 +185,25 @@ QPID_AUTO_TEST_CASE(CpgBasic) {
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
- Broker::Options opts;
- opts.auth="no";
- opts.noDataDir=true;
- ForkedBroker broker(opts);
+ const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
+ ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv);
Client c(broker.getPort());
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers
- Client c0(cluster[0].getPort());
+ ClusterFixture cluster(3);
+ Client c0(cluster[0]);
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
c0.session.queueDeclare("q");
c0.session.exchangeDeclare("ex", arg::type="direct");
c0.session.close();
+ c0.connection.close();
// Verify all brokers get wiring update.
for (size_t i = 0; i < cluster.size(); ++i) {
BOOST_MESSAGE("i == "<< i);
- Client c(cluster[i].getPort());
+ Client c(cluster[i]);
BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
}
@@ -177,12 +212,12 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
ClusterFixture cluster(2);
- Client c0(cluster[0].getPort());
+ Client c0(cluster[0]);
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
c0.session.close();
- Client c1(cluster[1].getPort());
+ Client c1(cluster[1]);
Message msg;
BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(string("foo"), msg.getData());
@@ -190,10 +225,14 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
+#if 0
+
+// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test.
+
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
ClusterFixture cluster (3);
- Client c0(cluster[0].getPort());
+ Client c0(cluster[0]);
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
@@ -201,11 +240,11 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
Message msg;
- Client c1(cluster[1].getPort());
+ Client c1(cluster[1]);
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("foo", msg.getData());
- Client c2(cluster[2].getPort());
+ Client c2(cluster[2]);
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
QueueQueryResult r = c2.session.queueQuery("q");
@@ -214,4 +253,6 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
// TODO aconway 2008-06-25: failover.
+#endif
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test
index 8fbaaaee07..4d0da15d4c 100755
--- a/cpp/src/tests/run_test
+++ b/cpp/src/tests/run_test
@@ -38,9 +38,11 @@ VALGRIND_OPTS="
--demangle=yes
--suppressions=$srcdir/.valgrind.supp
--num-callers=25
---trace-children=yes
--log-file=$VG_LOG --
"
+# FIXME aconway 2008-07-16: removed --trace-children=yes, problems with cluster tests forking
+# qpidd libtool script. Investigate & restore --trace-children if possible.
+
ERROR=0
if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then
# This is a libtool "executable". Valgrind it if VALGRIND specified.
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index ce9303fef8..aac095a9a9 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -22,12 +22,14 @@
<amqp major="0" minor="10" port="5672">
- <class name = "cluster" code = "0x80" label="clustering extensions">
+ <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name = "notify" code="0x1" label="notify cluster of a new member">
- <doc>Notify the cluster of a member URL</doc>
- <!-- No chassis element, this is handled by separte cluster code for now.-->
+
+ <control name = "notify" code="0x1">
<field name="url" type="str16" />
</control>
+
+ <control name="connection-close" code="0x2">
+ </control>
</class>
</amqp>