summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
committerAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
commit90f063a35251345f80616c898e1f6cc7a7d0c393 (patch)
tree8d17b607338f654fc5ba78da44134e7cebe5b9fb /qpid
parent53507c3ddb1936c7e64804ccf0b19d84f6da8fff (diff)
downloadqpid-python-90f063a35251345f80616c898e1f6cc7a7d0c393.tar.gz
* Summary:
- Improved plugin framework and HandlerUpdater interface. - Cluster handlers for traffic to/from cluster. - Cluster HandlerUpdater configures channel chains for cluster. - Cluster PluginProvider registers cluster objects with broker. * src/qpid/framing/AMQFrame.h: Made data members public. Handlers need to be able to modify frame data, getters/setters are just a nuisance here. * src/tests/Cluster.cpp: Updated for cluster changes, using handlers instead of friendship to hook test into Cluster code. * src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and CHANNEL_HIGH_BIT constants. * src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer, broke dependency on broker channel types. * src/qpid/framing/Handler.h: Added constructors and nextHandler() * src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel() * src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster plugins. * src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out plugin details to ClusterPluginProvider. * src/qpid/cluster/ChannelManager.h: Insert cluster handlers into channel chains, route frames between cluster and channels. * src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX constant. * src/qpid/broker/Broker.cpp: - Refactored for new plugin framework. - Added getUrl(). * src/qpid/Url.h: Added constructor from Address. * src/qpid/Plugin.h: Generalized plugin framework, broke dependency on Broker interfaces. We may want to use plug-ins for clients also at some point. * src/tests/run_test: Fix bug when VALGRIND is not set. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/Makefile.am8
-rw-r--r--qpid/cpp/src/cluster.mk7
-rw-r--r--qpid/cpp/src/qpid/Plugin.cpp (renamed from qpid/cpp/src/qpid/broker/Plugin.cpp)35
-rw-r--r--qpid/cpp/src/qpid/Plugin.h111
-rw-r--r--qpid/cpp/src/qpid/Url.cpp11
-rw-r--r--qpid/cpp/src/qpid/Url.h21
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h36
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerSingleton.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerSingleton.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Plugin.h80
-rw-r--r--qpid/cpp/src/qpid/cluster/ChannelManager.cpp85
-rw-r--r--qpid/cpp/src/qpid/cluster/ChannelManager.h66
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp121
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h64
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp66
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.h3
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.cpp10
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h13
-rw-r--r--qpid/cpp/src/qpid/framing/Handler.h12
-rw-r--r--qpid/cpp/src/qpid/framing/HandlerUpdater.h (renamed from qpid/cpp/src/qpid/broker/ChannelInitializer.h)33
-rw-r--r--qpid/cpp/src/qpid/framing/amqp_types.h7
-rw-r--r--qpid/cpp/src/qpidd.cpp6
-rw-r--r--qpid/cpp/src/tests/.valgrindrc-default2
-rw-r--r--qpid/cpp/src/tests/BrokerChannelTest.cpp2
-rw-r--r--qpid/cpp/src/tests/Cluster.cpp28
-rw-r--r--qpid/cpp/src/tests/Cluster.h43
-rw-r--r--qpid/cpp/src/tests/Cluster_child.cpp14
-rw-r--r--qpid/cpp/src/tests/FramingTest.cpp4
-rw-r--r--qpid/cpp/src/tests/InProcessBroker.h2
-rw-r--r--qpid/cpp/src/tests/Makefile.am2
-rw-r--r--qpid/cpp/src/tests/Url.cpp34
-rwxr-xr-xqpid/cpp/src/tests/run_test2
34 files changed, 651 insertions, 312 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index f0fbf7e672..b92f8d85e5 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -152,11 +152,14 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Proxy.cpp \
qpid/framing/Handler.h \
qpid/framing/FrameHandler.h \
+ qpid/framing/HandlerUpdater.h \
gen/qpid/framing/AMQP_ClientProxy.cpp \
gen/qpid/framing/AMQP_HighestVersion.h \
gen/qpid/framing/AMQP_MethodVersionMap.cpp \
gen/qpid/framing/AMQP_ServerProxy.cpp \
qpid/Exception.cpp \
+ qpid/Plugin.h \
+ qpid/Plugin.cpp \
qpid/Url.h \
qpid/Url.cpp \
qpid/QpidError.cpp \
@@ -217,10 +220,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
- qpid/broker/TxPublish.cpp \
- qpid/broker/Plugin.h \
- qpid/broker/Plugin.cpp \
- qpid/broker/ChannelInitializer.h
+ qpid/broker/TxPublish.cpp
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index f97e95c208..4eddf4ffe7 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -10,9 +10,12 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/Cluster.h \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
- qpid/cluster/Dispatchable.h
+ qpid/cluster/Dispatchable.h \
+ qpid/cluster/ChannelManager.h \
+ qpid/cluster/ChannelManager.cpp \
+ qpid/cluster/ClusterPluginProvider.cpp
-libqpidcluster_la_LIBADD= -lcpg libqpidcommon.la
+libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
else
# Empty stub library to satisfy rpm spec file.
diff --git a/qpid/cpp/src/qpid/broker/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp
index f018c4e152..6389bfd73d 100644
--- a/qpid/cpp/src/qpid/broker/Plugin.cpp
+++ b/qpid/cpp/src/qpid/Plugin.cpp
@@ -19,24 +19,24 @@
#include "Plugin.h"
namespace qpid {
-namespace broker {
-std::vector<Plugin*> Plugin::plugins;
+std::vector<PluginProvider*> PluginProvider::providers;
-Plugin::Plugin() {
+PluginProvider::PluginProvider() {
// Register myself.
- plugins.push_back(this);
+ providers.push_back(this);
}
-Plugin::~Plugin() {}
+PluginProvider::~PluginProvider() {}
-Options* Plugin::getOptions() { return 0; }
+Options* PluginProvider::getOptions() { return 0; }
-void Plugin::start(Broker&) {}
-
-void Plugin::finish(Broker&) {}
+const std::vector<PluginProvider*>& PluginProvider::getProviders() {
+ return providers;
+}
+} // namespace qpid
-const std::vector<Plugin*>& Plugin::getPlugins() { return plugins; }
+// TODO aconway 2007-06-28: GNU lib has portable dlopen if we go that way.
#ifdef USE_APR_PLATFORM
@@ -44,24 +44,27 @@ const std::vector<Plugin*>& Plugin::getPlugins() { return plugins; }
#include "qpid/sys/apr/APRPool.h"
#include <apr_dso.h>
-void Plugin::dlopen(const std::string& name) {
+namespace qpid {
+void dlopen(const char* name) {
apr_dso_handle_t* handle;
CHECK_APR_SUCCESS(
- apr_dso_load(&handle, name.c_str(), sys::APRPool::get()));
+ apr_dso_load(&handle, name, sys::APRPool::get()));
}
+} // namespace qpid
#else // Posix
#include <dlfcn.h>
-void Plugin::dlopen(const std::string& name) {
+namespace qpid {
+void dlopen(const char* name) {
dlerror();
- dlopen(name.c_str(), RTLD_NOW);
+ dlopen(name, RTLD_NOW);
const char* error = dlerror();
if (error) {
THROW_QPID_ERROR(INTERNAL_ERROR, error);
}
}
-#endif // USE_APR_PLATFORM
+} // namespace qpidpp
-}} // namespace qpid::broker
+#endif // USE_APR_PLATFORM
diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h
new file mode 100644
index 0000000000..48fb16a541
--- /dev/null
+++ b/qpid/cpp/src/qpid/Plugin.h
@@ -0,0 +1,111 @@
+#ifndef QPID_PLUGIN_H
+#define QPID_PLUGIN_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/shared_ptr.h"
+#include <boost/noncopyable.hpp>
+#include <vector>
+#include <boost/function.hpp>
+
+
+/**@file Generic plug-in framework. */
+
+namespace qpid {
+class Options;
+
+/** Generic base class to allow dynamic casting of generic Plugin objects
+ * to concrete types.
+ */
+struct Plugin : private boost::noncopyable {
+ virtual ~Plugin() {}
+};
+
+/** Generic interface for anything that uses plug-ins. */
+struct PluginUser : boost::noncopyable {
+ virtual ~PluginUser() {}
+ /**
+ * Called by a PluginProvider to provide a plugin.
+ *
+ * A concrete PluginUser will dynamic_pointer_cast plugin to a
+ * class it knows how to use. A PluginUser should ignore plugins
+ * it does not recognize.
+ *
+ * The user will release its shared_ptr when it is finished using
+ * plugin.
+ */
+ virtual void use(const shared_ptr<Plugin>& plugin) = 0;
+};
+
+
+/**
+ * Base for classes that provide plug-ins.
+ */
+class PluginProvider : boost::noncopyable
+{
+ public:
+ /**
+ * Register the provider to appear in getProviders()
+ *
+ * A concrete PluginProvider is instantiated as a global or static
+ * member variable in a library so it is registered during static
+ * initialization when the library is loaded.
+ */
+ PluginProvider();
+
+ virtual ~PluginProvider();
+
+ /**
+ * Returns configuration options for the plugin.
+ * Then will be updated during option parsing by the host program.
+ *
+ * @return An options group or 0 for no options. Default returns 0.
+ * PluginProvider retains ownership of return value.
+ */
+ virtual Options* getOptions();
+
+ /** Provide plugins to a PluginUser.
+ *
+ * The provider can dynamic_cast the user if it only provides
+ * plugins to certain types of user. Providers should ignore
+ * users they don't recognize.
+ */
+ virtual void provide(PluginUser& user) = 0;
+
+ /** Get the list of pointers to the registered providers.
+ * Caller must not delete the pointers.
+ */
+ static const std::vector<PluginProvider*>& getProviders();
+
+ private:
+ static std::vector<PluginProvider*> providers;
+};
+
+/** Load a shared library, registering any PluginProvider it contains.
+ *
+ * This is just a convenient portable wrapper for normal shared
+ * library loading. A global PluginProvider instance loaded or
+ * linked in any way will get registered.
+ */
+void dlopen(const char* libname);
+
+
+} // namespace qpid
+
+#endif /*!QPID_PLUGIN_H*/
diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp
index b9c17ffd82..d52fc71870 100644
--- a/qpid/cpp/src/qpid/Url.cpp
+++ b/qpid/cpp/src/qpid/Url.cpp
@@ -100,14 +100,13 @@ struct UrlGrammar : public grammar<UrlGrammar>
};
};
-Url::Url(const string& url) {
- if (!parse(url.c_str(), UrlGrammar(*this)).full)
- throw InvalidUrl("Invalid AMQP url: "+url);
- // TODO aconway 2007-06-15: Better error handling?
+void Url::parse(const char* url) {
+ if (!boost::spirit::parse(url, UrlGrammar(*this)).full)
+ throw InvalidUrl(string("Invalid AMQP url: ")+url);
}
-Url::Url(const string& url, const nothrow_t&) {
- if (!parse(url.c_str(), UrlGrammar(*this)).full)
+void Url::parseNoThrow(const char* url) {
+ if (!boost::spirit::parse(url, UrlGrammar(*this)).full)
clear();
}
diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h
index 93a53afde6..0cf54c493e 100644
--- a/qpid/cpp/src/qpid/Url.h
+++ b/qpid/cpp/src/qpid/Url.h
@@ -31,7 +31,7 @@ namespace qpid {
/** TCP address of a broker - host:port */
struct TcpAddress {
static const uint16_t DEFAULT_PORT=5672;
- TcpAddress(const std::string& host_=std::string(),
+ explicit TcpAddress(const std::string& host_=std::string(),
uint16_t port_=DEFAULT_PORT)
: host(host_), port(port_) {}
std::string host;
@@ -62,17 +62,26 @@ struct Url : public std::vector<Address> {
/** Empty URL. */
Url() {}
- /** Parse an amqp URL string as defined in
+ /** URL containing a single address */
+ explicit Url(const Address& addr) { push_back(addr); }
+
+ /** Parse url, throw InvalidUrl if invalid. */
+ explicit Url(const std::string& url) { parse(url.c_str()); }
+
+ /** Parse url, throw InvalidUrl if invalid. */
+ explicit Url(const char* url) { parse(url); }
+
+ /** Replace contents with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
*@exception InvalidUrl if the url is invalid.
*/
- Url(const std::string& url);
+ void parse(const char* url);
- /** Parse an amqp URL string as defined in
+ /** Replace contesnts with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
- * Results in empty URL if url is invalid.
+ * url.empty() will be true if url is invalid.
*/
- Url(const std::string& url, const std::nothrow_t&);
+ void parseNoThrow(const char* url);
};
std::ostream& operator<<(std::ostream& os, const Url& url);
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 4354f6d38a..65252f5415 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -22,6 +22,7 @@
#include "Broker.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/HandlerUpdater.h"
#include "DirectExchange.h"
#include "TopicExchange.h"
#include "FanOutExchange.h"
@@ -41,6 +42,7 @@
#include <memory>
using qpid::sys::Acceptor;
+using qpid::framing::HandlerUpdater;
namespace qpid {
namespace broker {
@@ -100,16 +102,17 @@ Broker::Broker(const Broker::Options& conf) :
}
-Broker::shared_ptr Broker::create(int16_t port)
+shared_ptr<Broker> Broker::create(int16_t port)
{
Options config;
config.port=port;
return create(config);
}
-Broker::shared_ptr Broker::create(const Options& config) {
- return Broker::shared_ptr(new Broker(config));
-}
+shared_ptr<Broker> Broker::create(const Options& opts)
+{
+ return shared_ptr<Broker>(new Broker(opts));
+}
MessageStore* Broker::createStore(const Options& config) {
if (config.store.empty())
@@ -134,6 +137,10 @@ Broker::~Broker() {
int16_t Broker::getPort() const { return getAcceptor().getPort(); }
+std::string Broker::getUrl() const {
+ return Url(TcpAddress(getAcceptor().getHost(), getPort())).str();
+}
+
Acceptor& Broker::getAcceptor() const {
if (!acceptor)
const_cast<Acceptor::shared_ptr&>(acceptor) =
@@ -144,6 +151,14 @@ Acceptor& Broker::getAcceptor() const {
return *acceptor;
}
+void Broker::use(const shared_ptr<Plugin>& plugin) {
+ shared_ptr<HandlerUpdater> updater=
+ dynamic_pointer_cast<HandlerUpdater>(plugin);
+ if (updater) {
+ QPID_LOG(critical, "HandlerUpdater plugins not implemented");
+ // FIXME aconway 2007-06-28: hook into Connections.
+ }
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 4be3d9761e..58ba8589b0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -24,9 +24,9 @@
#include "ConnectionFactory.h"
#include "qpid/Url.h"
+#include "qpid/Plugin.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Acceptor.h"
-#include "qpid/SharedObject.h"
#include "MessageStore.h"
#include "AutoDelete.h"
#include "ExchangeRegistry.h"
@@ -39,14 +39,17 @@
#include "qpid/Options.h"
namespace qpid {
+
+namespace framing {
+class HandlerUpdater;
+}
+
namespace broker {
-class ChannelInitializer;
/**
* A broker instance.
*/
-class Broker : public sys::Runnable,
- public SharedObject<Broker>
+class Broker : public sys::Runnable, public PluginUser
{
public:
struct Options : public qpid::Options {
@@ -62,16 +65,9 @@ class Broker : public sys::Runnable,
virtual ~Broker();
- /**
- * Create a broker.
- * @param port Port to listen on or 0 to pick a port dynamically.
- */
- static shared_ptr create(int16_t port = TcpAddress::DEFAULT_PORT);
-
- /**
- * Create a broker with the options in config.
- */
- static shared_ptr create(const Options& config);
+ Broker(const Options& configuration);
+ static shared_ptr<Broker> create(const Options& configuration);
+ static shared_ptr<Broker> create(int16_t port = TcpAddress::DEFAULT_PORT);
/**
* Return listening port. If called before bind this is
@@ -80,19 +76,22 @@ class Broker : public sys::Runnable,
* 0.
*/
virtual int16_t getPort() const;
-
+
+ /** Return the broker's URL. */
+ virtual std::string getUrl() const;
+
/**
* Run the broker. Implements Runnable::run() so the broker
* can be run in a separate thread.
*/
virtual void run();
- /** Plug-in a channel initializer. */
- void plugin(const qpid::shared_ptr<ChannelInitializer>&);
-
/** Shut down the broker */
virtual void shutdown();
+ /** Use a plugin */
+ void use(const shared_ptr<Plugin>& plugin);
+
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
@@ -102,7 +101,6 @@ class Broker : public sys::Runnable,
DtxManager& getDtxManager() { return dtxManager; }
private:
- Broker(const Options& configuration);
sys::Acceptor& getAcceptor() const;
Options config;
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
index c31f4d197d..b7a61ababe 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -58,7 +58,7 @@ void BrokerAdapter::ConnectionHandlerImpl::startOk(
const string& /*response*/, const string& /*locale*/)
{
client.tune(
- 100, connection.getFrameMax(), connection.getHeartbeat());
+ CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
}
void BrokerAdapter::ConnectionHandlerImpl::secureOk(
diff --git a/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp b/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp
index 4571764850..77200dd760 100644
--- a/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp
@@ -24,13 +24,13 @@ namespace broker {
BrokerSingleton::BrokerSingleton() {
if (broker.get() == 0)
broker = Broker::create();
- Broker::shared_ptr::operator=(broker);
+ shared_ptr<Broker>::operator=(broker);
}
BrokerSingleton::~BrokerSingleton() {
broker->shutdown();
}
-Broker::shared_ptr BrokerSingleton::broker;
+shared_ptr<Broker> BrokerSingleton::broker;
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/BrokerSingleton.h b/qpid/cpp/src/qpid/broker/BrokerSingleton.h
index 139e02a5fd..14b932df36 100644
--- a/qpid/cpp/src/qpid/broker/BrokerSingleton.h
+++ b/qpid/cpp/src/qpid/broker/BrokerSingleton.h
@@ -32,17 +32,17 @@ namespace broker {
* Useful for unit tests that want to share a broker between multiple
* tests to reduce overhead of starting/stopping a broker for every test.
*
- * Tests that need a new broker can call Broker::create directly.
+ * Tests that need a new broker can create it directly.
*
* THREAD UNSAFE.
*/
-class BrokerSingleton : public Broker::shared_ptr
+class BrokerSingleton : public shared_ptr<Broker>
{
public:
BrokerSingleton();
~BrokerSingleton();
private:
- static Broker::shared_ptr broker;
+ static shared_ptr<Broker> broker;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Plugin.h b/qpid/cpp/src/qpid/broker/Plugin.h
deleted file mode 100644
index 8c794b86a0..0000000000
--- a/qpid/cpp/src/qpid/broker/Plugin.h
+++ /dev/null
@@ -1,80 +0,0 @@
-#ifndef QPID_BROKER_PLUGIN_H
-#define QPID_BROKER_PLUGIN_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/Options.h"
-#include "qpid/broker/Broker.h"
-#include <boost/noncopyable.hpp>
-#include <vector>
-
-namespace qpid {
-namespace broker {
-
-/**
- * Inherit broker plug-ins from this class, override the virtual
- * functions and create a global (or class static member) instance in
- * a shared library. When the library is loaded your plug-in will be
- * registered.
- */
-class Plugin : boost::noncopyable
-{
- public:
- /** Constructor registers the plugin to appear in getPlugins().
- * Note: Plugin subclasses should only be constructed during
- * static initialization, i.e. they should only be declared
- * as global or static member variables.
- */
- Plugin();
-
- virtual ~Plugin();
-
- /**
- * Override if your plugin has configuration options.
- * They will be included in options parsing prior to broker
- * creation setup.
- *@return An options group or 0. Default returns 0.
- */
- virtual Options* getOptions();
-
- /** Called immediately after broker creation to allow plug-ins
- * to do whatever they do to the broker, e.g. add handler chain
- * manipulators.
- */
- virtual void start(Broker& b);
-
- /** Called just before broker shutdown. Default does nothing */
- virtual void finish(Broker& b);
-
- /** Get the list of registered plug-ins. */
- static const std::vector<Plugin*>& getPlugins();
-
- /** Load a shared library (that contains plugins presumably!) */
- static void dlopen(const std::string& libname);
-
- private:
- static std::vector<Plugin*> plugins;
-};
-
-}} // namespace qpid::broker
-
-
-
-
-#endif /*!QPID_BROKER_PLUGIN_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ChannelManager.cpp b/qpid/cpp/src/qpid/cluster/ChannelManager.cpp
new file mode 100644
index 0000000000..f573d78ca1
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ChannelManager.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "ChannelManager.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+/** Handler to multicast to the cluster */
+struct ClusterHandler : public FrameHandler {
+
+ ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_)
+ : FrameHandler(next), bitmask(bitmask_) {}
+
+ void handle(AMQFrame& frame) {
+ frame.channel |= bitmask; // Mark the frame
+ nextHandler(frame);
+ // TODO aconway 2007-06-28: Right now everything is backed up
+ // via multicast. When we have point-to-point backups this
+ // function must determine where each frame should be sent: to
+ // multicast or only to specific backup(s) via AMQP.
+ }
+
+ ChannelId bitmask;
+};
+
+ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){}
+
+void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) {
+ // Store the original cluster chains for the channel.
+ channels[id] = chains;
+
+ // Replace chains with multicast-to-cluster handlers that mark the
+ // high-bit of the channel ID on outgoing frames so we can tell
+ // them from incoming frames in handle()
+ //
+ // When handle() receives the frames from the cluster it
+ // will forward them to the original channel chains stored in
+ // channels map.
+ //
+ chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0));
+ chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT));
+}
+
+void ChannelManager::handle(AMQFrame& frame) {
+ bool isOut = frame.channel | CHANNEL_HIGH_BIT;
+ frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit.
+ ChannelMap::iterator i = channels.find(frame.getChannel());
+ if (i != channels.end()) {
+ Chain& chain = isOut ? i->second.out : i->second.in;
+ chain->handle(frame);
+ }
+ else
+ updateFailoverState(frame);
+}
+
+void ChannelManager::updateFailoverState(AMQFrame& ) {
+ QPID_LOG(critical, "Failover is not implemented");
+ // FIXME aconway 2007-06-28:
+ // If the channel is not in my map then I'm not primary so
+ // I don't pass the frame to the channel handler but I
+ // do need to update the session failover state.
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ChannelManager.h b/qpid/cpp/src/qpid/cluster/ChannelManager.h
new file mode 100644
index 0000000000..59fce77957
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ChannelManager.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLUSTER_CHANNELMANAGER_H
+#define QPID_CLUSTER_CHANNELMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/HandlerUpdater.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage channels and handler chains on channels for the cluster.
+ *
+ * As HandlerUpdater plugin, updates channel handler chains with
+ * cluster handlers.
+ *
+ * As a FrameHandler handles frames coming from the cluster and
+ * dispatches them to the appropriate channel handler.
+ *
+ */
+class ChannelManager : public framing::HandlerUpdater,
+ public framing::FrameHandler
+{
+ public:
+ /** Takes a handler to send frames to the cluster. */
+ ChannelManager(framing::FrameHandler::Chain mcastOut);
+
+ /** As FrameHandler handle frames from the cluster */
+ void handle(framing::AMQFrame& frame);
+
+ /** As ChannelUpdater update the handler chains. */
+ void update(framing::ChannelId id, framing::FrameHandler::Chains& chains);
+
+ private:
+ void updateFailoverState(framing::AMQFrame&);
+
+ typedef std::map<framing::ChannelId,
+ framing::FrameHandler::Chains> ChannelMap;
+
+ framing::FrameHandler::Chain mcastOut;
+ ChannelMap channels;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 30073c4551..8d898eefa3 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,13 +17,13 @@
*/
#include "Cluster.h"
-#include "Cpg.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <iterator>
+#include <map>
namespace qpid {
namespace cluster {
@@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "(" << cluster.self << ")";
}
+namespace {
+Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
+struct StatusMapInit {
+ StatusMapInit() {
+ statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
+ statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
+ statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
+ statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
+ statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
+ }
+} instance;
+}
+
+Cluster::Member::Member(const cpg_address& addr)
+ : status(statusMap[addr.reason]) {}
+
void Cluster::notify() {
+ ProtocolVersion version;
// TODO aconway 2007-06-25: Use proxy here.
AMQFrame frame(version, 0,
make_shared_ptr(new ClusterNotifyBody(version, url)));
handle(frame);
}
-Cluster::Cluster(
- const std::string& name_, const std::string& url_, FrameHandler& next_,
- ProtocolVersion ver)
- : name(name_), url(url_), version(ver),
- cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- next(next_)
-{
- self=Id(cpg->getLocalNoideId(), getpid());
+Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ name(name_),
+ url(url_),
+ cpg(new Cpg(
+ boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
+ boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
+ self(cpg->getLocalNoideId(), getpid())
+{}
+
+void Cluster::join(FrameHandler::Chain next) {
QPID_LOG(trace, *this << " Joining cluster.");
+ next = next;
+ dispatcher=Thread(*this);
cpg->join(name);
notify();
- dispatcher=Thread(*this);
}
Cluster::~Cluster() {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
+ if (cpg) {
+ try {
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << e.what());
+ }
}
}
void Cluster::handle(AMQFrame& frame) {
+ assert(cpg);
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -104,52 +126,59 @@ void Cluster::cpgDeliver(
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame);
// TODO aconway 2007-06-20: use visitor pattern.
- ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
+ ClusterNotifyBody* notifyIn=
+ dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
- Mutex::ScopedLock l(lock);
- members[from].reset(new Member(notifyIn->getUrl()));
- lock.notifyAll();
+ {
+ Mutex::ScopedLock l(lock);
+ assert(members[from]);
+ members[from]->url = notifyIn->getUrl();
+ members[from]->status = Member::BROKER;
+ }
+ if (callback)
+ callback();
}
else
- next.handle(frame);
+ next->handle(frame);
}
void Cluster::cpgConfigChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *ccMembers, int nMembers,
+ struct cpg_address *current, int nCurrent,
struct cpg_address *left, int nLeft,
struct cpg_address *joined, int nJoined
)
{
- QPID_LOG(
- trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(ccMembers, nMembers));
-
+ QPID_LOG(trace,
+ *this << " Configuration change. " << endl
+ << " Joined: " << make_pair(joined, nJoined) << endl
+ << " Left: " << make_pair(left, nLeft) << endl
+ << " Current: " << make_pair(current, nCurrent));
+
+ bool needNotify=false;
+ MemberList updated;
{
Mutex::ScopedLock l(lock);
- // Erase members that left.
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(left[i]));
- lock.notifyAll();
- }
-
- // If there are new members (other than myself) then notify.
- for (int i=0; i< nJoined; ++i) {
- if (Id(joined[i]) != self) {
- notify();
- break;
+ for (int i = 0; i < nJoined; ++i) {
+ Id id(current[i]);
+ members[id].reset(new Member(current[i]));
+ if (id != self)
+ needNotify = true; // Notify new members other than myself.
}
- }
-
- // Note: New members are be added to my map when cpgDeliver
- // gets a cluster.notify frame.
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(current[i]));
+ } // End of locked scope.
+ if (needNotify)
+ notify();
+ if (callback)
+ callback();
}
+void Cluster::setCallback(boost::function<void()> f) { callback=f; }
+
void Cluster::run() {
+ assert(cpg);
cpg->dispatchBlocking();
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 1cbbb249f2..aff213b6c9 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -25,47 +25,74 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/shared_ptr.h"
-#include "qpid/framing/ProtocolVersion.h"
+#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
#include <map>
#include <vector>
namespace qpid {
+
+namespace broker {
+class HandlerUpdater;
+}
+
namespace cluster {
+class ChannelManager;
+
/**
- * Represents a cluster. Creating an instance joins current process
- * to the cluster.
+ * Represents a cluster, provides access to data about members.
+ *
+ * Implements a FrameHandler that multicasts frames to the cluster.
+ *
+ * Requires a handler for frames arriving from the cluster,
+ * normally a ChannelManager but other handlers could be interposed
+ * for testing, logging etc.
*/
class Cluster : public framing::FrameHandler, private sys::Runnable {
public:
/** Details of a cluster member */
struct Member {
- Member(const std::string& url_) : url(url_) {}
+ typedef shared_ptr<const Member> Ptr;
+ /** Status of a cluster member. */
+ enum Status {
+ JOIN, ///< Process joined the group.
+ LEAVE, ///< Process left the group cleanly.
+ NODEDOWN, ///< Process's node went down.
+ NODEUP, ///< Process's node joined the cluster.
+ PROCDOWN, ///< Process died without leaving.
+ BROKER ///< Broker details are available.
+ };
+
+ Member(const cpg_address&);
std::string url;
+ Status status;
};
-
- typedef std::vector<shared_ptr<const Member> > MemberList;
+
+ typedef std::vector<Member::Ptr> MemberList;
/**
- * Join a cluster.
+ * Create a cluster object but do not joing.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
- * @param next handler receives the frame when it has been
- * acknowledged by the cluster.
*/
- Cluster(const std::string& name,
- const std::string& url,
- framing::FrameHandler& next,
- framing::ProtocolVersion);
+ Cluster(const std::string& name, const std::string& url);
~Cluster();
+
+ /** Join the cluster.
+ *@handler is the handler for frames arriving from the cluster.
+ */
+ void join(framing::FrameHandler::Chain handler);
/** Multicast a frame to the cluster. */
void handle(framing::AMQFrame&);
/** Get the current cluster membership. */
MemberList getMembers() const;
+
+ /** Called when membership changes. */
+ void setCallback(boost::function<void()>);
/** Number of members in the cluster. */
size_t size() const;
@@ -76,7 +103,6 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
void run();
void notify();
-
void cpgDeliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -93,18 +119,14 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
struct cpg_address */*joined*/, int /*nJoined*/
);
- Id self;
+ mutable sys::Monitor lock;
Cpg::Name name;
std::string url;
- framing::ProtocolVersion version;
boost::scoped_ptr<Cpg> cpg;
- framing::FrameHandler& next;
+ Id self;
MemberMap members;
sys::Thread dispatcher;
-
- protected:
- // Allow access from ClusterTest subclass.
- mutable sys::Monitor lock;
+ boost::function<void()> callback;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
};
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
new file mode 100644
index 0000000000..3a09a66b81
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/ChannelManager.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+
+struct ClusterPluginProvider : public PluginProvider {
+
+ struct ClusterOptions : public Options {
+ string clusterName;
+ ClusterOptions() {
+ addOptions()
+ ("cluster", optValue(clusterName, "NAME"),
+ "Join the cluster named NAME");
+ }
+ };
+
+ ClusterOptions options;
+ shared_ptr<Cluster> cluster;
+
+ Options* getOptions() {
+ return &options;
+ }
+
+ void provide(PluginUser& user) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+ // Only provide to a Broker, and only if the --cluster config is set.
+ if (broker && !options.clusterName.empty()) {
+ assert(!cluster); // A process can only belong to one cluster.
+ cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
+
+ // Channel manager is both the next handler for the cluster
+ // and the HandlerUpdater plugin for the broker.
+ shared_ptr<ChannelManager> manager(new ChannelManager(cluster));
+ cluster->join(manager);
+ broker->use(manager);
+ }
+ }
+};
+
+static ClusterPluginProvider instance; // Static initialization.
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h
index 6b157301a7..e164ed1215 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.h
+++ b/qpid/cpp/src/qpid/cluster/Cpg.h
@@ -23,11 +23,10 @@
#include "qpid/cluster/Dispatchable.h"
#include <boost/function.hpp>
#include <cassert>
-#ifdef CLUSTER
extern "C" {
#include <openais/cpg.h>
}
-#endif
+
namespace qpid {
namespace cluster {
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
index 6541a0e788..a528913fd9 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
@@ -39,20 +39,14 @@ version(_version)
assert(version != ProtocolVersion(0,0));
}
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) :
-version(_version), channel(_channel), body(_body)
-{}
+AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {}
AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) :
-version(_version), channel(_channel), body(_body)
+ channel(_channel), body(_body), version(_version)
{}
AMQFrame::~AMQFrame() {}
-uint16_t AMQFrame::getChannel(){
- return channel;
-}
-
AMQBody::shared_ptr AMQFrame::getBody(){
return body;
}
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index bef1b01df4..1a7b203ad7 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -48,7 +48,7 @@ class AMQFrame : public AMQDataBlock
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
virtual uint32_t size() const;
- uint16_t getChannel();
+ uint16_t getChannel() const { return channel; }
AMQBody::shared_ptr getBody();
/** Convenience template to cast the body to an expected type */
@@ -60,18 +60,17 @@ class AMQFrame : public AMQDataBlock
uint32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
- private:
- static AMQP_MethodVersionMap versionMap;
- ProtocolVersion version;
-
uint16_t channel;
uint8_t type;
AMQBody::shared_ptr body;
-
+ ProtocolVersion version;
- friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body);
+ private:
+ static AMQP_MethodVersionMap versionMap;
};
+std::ostream& operator<<(std::ostream&, const AMQFrame&);
+
}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/Handler.h b/qpid/cpp/src/qpid/framing/Handler.h
index 56e150a66d..f6b59393d9 100644
--- a/qpid/cpp/src/qpid/framing/Handler.h
+++ b/qpid/cpp/src/qpid/framing/Handler.h
@@ -40,11 +40,21 @@ template <class T> struct Handler {
Chain out;
};
+ Handler() {}
+ Handler(Chain next_) : next(next_) {}
virtual ~Handler() {}
+
virtual void handle(T) = 0;
+
+ /** Next handler. Public so chains can be modified by altering next. */
Chain next;
-};
+ protected:
+ /** Derived handle() implementations call nextHandler to invoke the
+ * next handler in the chain. */
+ void nextHandler(T data) { if (next) next->handle(data); }
+
+};
}}
diff --git a/qpid/cpp/src/qpid/broker/ChannelInitializer.h b/qpid/cpp/src/qpid/framing/HandlerUpdater.h
index 84dea7a3d7..5cb1e87d6e 100644
--- a/qpid/cpp/src/qpid/broker/ChannelInitializer.h
+++ b/qpid/cpp/src/qpid/framing/HandlerUpdater.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CHANNELINITIALIZER_H
-#define QPID_BROKER_CHANNELINITIALIZER_H
+#ifndef QPID_FRAMING_HANDLERUPDATER_H
+#define QPID_FRAMING_HANDLERUPDATER_H
/*
*
@@ -19,25 +19,26 @@
*
*/
-#include <boost/noncopyable.hpp>
+#include "qpid/Plugin.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/FrameHandler.h"
namespace qpid {
-namespace broker {
+namespace framing {
+
+/** Plugin object that can update handler chains. */
+struct HandlerUpdater : public Plugin {
+ /** Update the handler chains.
+ *@param id Unique identifier for channel or session.
+ *@param chains Handler chains to be updated.
+ */
+ virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0;
+};
-/**
- * A ChannelInitializer is called each time a new Channel is created.
- */
-class ChannelInitializer : boost::noncopyable
-{
- public:
- virtual ~ChannelInitializer() {}
+}} // namespace qpid::framing
- /** Called for each new channel */
- virtual initialize(Channe&) = 0;
-};
-}} // namespace qpid::broker
-#endif /*!QPID_BROKER_CHANNELINITIALIZER_H*/
+#endif /*!QPID_FRAMING_HANDLERUPDATER_H*/
diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h
index f0bd0ce427..efb720f047 100644
--- a/qpid/cpp/src/qpid/framing/amqp_types.h
+++ b/qpid/cpp/src/qpid/framing/amqp_types.h
@@ -53,5 +53,12 @@ typedef uint16_t ReplyCode;
// Types represented by classes.
class Content;
class FieldTable;
+
+// Useful constants
+
+/** Maximum channel ID used by broker. Reserve high bit for internal use.*/
+const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1;
+const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX);
+
}} // namespace qpid::framing
#endif
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index 28a6cf263e..e17deeee2e 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -74,7 +74,7 @@ struct QpiddOptions : public qpid::Options {
};
// Globals
-Broker::shared_ptr brokerPtr;
+shared_ptr<Broker> brokerPtr;
QpiddOptions options;
void handle_signal(int /*signal*/){
@@ -112,7 +112,7 @@ void parent(Daemon& demon) {
/** Code for forked child */
void child(Daemon& demon) {
- brokerPtr=Broker::create(options.broker);
+ brokerPtr.reset(new Broker(options.broker));
uint16_t realPort=brokerPtr->getPort();
demon.ready(realPort); // Notify parent.
brokerPtr->run();
@@ -161,7 +161,7 @@ int main(int argc, char* argv[])
demon.fork(parent, child);
}
else { // Non-daemon broker.
- brokerPtr = Broker::create(options.broker);
+ brokerPtr.reset(new Broker(options.broker));
if (options.broker.port == 0)
cout << uint16_t(brokerPtr->getPort()) << endl;
brokerPtr->run();
diff --git a/qpid/cpp/src/tests/.valgrindrc-default b/qpid/cpp/src/tests/.valgrindrc-default
index 41e484d04e..4aba7661de 100644
--- a/qpid/cpp/src/tests/.valgrindrc-default
+++ b/qpid/cpp/src/tests/.valgrindrc-default
@@ -3,5 +3,5 @@
--demangle=yes
--suppressions=.valgrind.supp
--num-callers=25
---track-fds=yes
+--trace-children=yes
diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp
index 9dee1fc862..29ed1ae230 100644
--- a/qpid/cpp/src/tests/BrokerChannelTest.cpp
+++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp
@@ -60,7 +60,7 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testFlow);
CPPUNIT_TEST_SUITE_END();
- Broker::shared_ptr broker;
+ shared_ptr<Broker> broker;
Connection connection;
MockHandler handler;
diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp
index ed50cc5d7b..008575140b 100644
--- a/qpid/cpp/src/tests/Cluster.cpp
+++ b/qpid/cpp/src/tests/Cluster.cpp
@@ -24,46 +24,44 @@
#include "qpid/framing/BasicGetOkBody.h"
-
static const ProtocolVersion VER;
/** Verify membership ind a cluster with one member. */
BOOST_AUTO_TEST_CASE(clusterOne) {
- VectorFrameHandler received;
- Cluster cluster("Test", "amqp:one:1", received, VER);
+ Cluster cluster("Test", "amqp:one:1");
+ TestClusterHandler handler(cluster);
AMQFrame frame(VER, 1, new ChannelOkBody(VER));
-
cluster.handle(frame);
- BOOST_REQUIRE(received.waitFor(1));
+ BOOST_REQUIRE(handler.waitFrames(1));
BOOST_CHECK_EQUAL(1u, cluster.size());
Cluster::MemberList members = cluster.getMembers();
BOOST_CHECK_EQUAL(1u, members.size());
BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1");
- BOOST_CHECK_EQUAL(1u, received.size());
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody());
+ BOOST_CHECK_EQUAL(1u, handler.size());
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
}
/** Fork a process to verify membership in a cluster with two members */
BOOST_AUTO_TEST_CASE(clusterTwo) {
- VectorFrameHandler received;
pid_t pid=fork();
BOOST_REQUIRE(pid >= 0);
- if (pid) { // Parent
- TestCluster cluster("Test", "amqp::1", received, VER);
- BOOST_REQUIRE(cluster.waitFor(2));
+ if (pid) { // Parent see Cluster_child.cpp for child.
+ Cluster cluster("Test", "amqp::1");
+ TestClusterHandler handler(cluster);
+ BOOST_REQUIRE(handler.waitMembers(2));
// Exchange frames with child.
AMQFrame frame(VER, 1, new ChannelOkBody(VER));
cluster.handle(frame);
- BOOST_REQUIRE(received.waitFor(2));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody());
- BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody());
+ BOOST_REQUIRE(handler.waitFrames(2));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
+ BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
// Wait for child to exit.
int status;
BOOST_CHECK_EQUAL(::wait(&status), pid);
BOOST_CHECK_EQUAL(0, status);
- BOOST_CHECK(cluster.waitFor(1));
+ BOOST_CHECK(handler.waitMembers(1));
BOOST_CHECK_EQUAL(1u, cluster.size());
}
else { // Child
diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h
index 7ca5445e10..edb1f1524f 100644
--- a/qpid/cpp/src/tests/Cluster.h
+++ b/qpid/cpp/src/tests/Cluster.h
@@ -24,6 +24,7 @@
#include "qpid/framing/ChannelOkBody.h"
#include "qpid/framing/BasicGetOkBody.h"
#include "qpid/log/Logger.h"
+#include <boost/bind.hpp>
#include <iostream>
#include <vector>
@@ -39,44 +40,44 @@ using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::sys;
-struct TestCluster : public Cluster {
- TestCluster(const std::string& name,
- const std::string& url,
- framing::FrameHandler& next,
- framing::ProtocolVersion ver) : Cluster(name,url,next, ver) {}
+void null_deleter(void*) {}
- /** Wait for the cluster to be of expected size (exactly) */
- bool waitFor(size_t n) {
- Mutex::ScopedLock l(lock);
- AbsTime deadline(now(),2*TIME_SEC);
- while(size() != n && lock.wait(deadline))
- ;
- return size() == n;
- }
-};
-
-struct VectorFrameHandler :
+struct TestClusterHandler :
public std::vector<AMQFrame>, public FrameHandler, public Monitor
{
+ TestClusterHandler(Cluster& c) : cluster(c) {
+ cluster.join(make_shared_ptr(this, &null_deleter));
+ cluster.setCallback(boost::bind(&Monitor::notify, this));
+ }
+
void handle(AMQFrame& f) {
ScopedLock l(*this);
push_back(f);
notifyAll();
}
- /** Wait for vector to reach size n exactly */
- bool waitFor(size_t n) {
+ /** Wait for the vector to contain n frames. */
+ bool waitFrames(size_t n) {
ScopedLock l(*this);
- AbsTime deadline(now(), 1*TIME_SEC);
+ AbsTime deadline(now(), TIME_SEC);
while (size() != n && wait(deadline))
;
return size() == n;
}
-};
+ /** Wait for the cluster to have n members */
+ bool waitMembers(size_t n) {
+ ScopedLock l(*this);
+ AbsTime deadline(now(), TIME_SEC);
+ while (cluster.size() != n && wait(deadline))
+ ;
+ return cluster.size() == n;
+ }
+
+ Cluster& cluster;
+};
-// namespace
diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp
index 1540717f4a..a5ac3e9669 100644
--- a/qpid/cpp/src/tests/Cluster_child.cpp
+++ b/qpid/cpp/src/tests/Cluster_child.cpp
@@ -32,16 +32,14 @@ static const ProtocolVersion VER;
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
- VectorFrameHandler received;
- TestCluster cluster("Test", "amqp::2", received, VER);
- BOOST_REQUIRE(cluster.waitFor(2));
-
- BOOST_REQUIRE(received.waitFor(1));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody());
+ Cluster cluster("Test", "amqp::2");
+ TestClusterHandler handler(cluster);
+ BOOST_REQUIRE(handler.waitFrames(1));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
AMQFrame frame(VER, 1, new BasicGetOkBody(VER));
cluster.handle(frame);
- BOOST_REQUIRE(received.waitFor(2));
- BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody());
+ BOOST_REQUIRE(handler.waitFrames(2));
+ BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
}
int test_main(int, char**) {
diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp
index 528919de48..9c60af7866 100644
--- a/qpid/cpp/src/tests/FramingTest.cpp
+++ b/qpid/cpp/src/tests/FramingTest.cpp
@@ -402,8 +402,8 @@ class FramingTest : public CppUnit::TestCase
broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h
index 8628bde431..48ac80d30a 100644
--- a/qpid/cpp/src/tests/InProcessBroker.h
+++ b/qpid/cpp/src/tests/InProcessBroker.h
@@ -113,7 +113,7 @@ class InProcessBroker : public client::Connector {
};
framing::ProtocolInitiation protocolInit;
- Broker::shared_ptr broker;
+ shared_ptr<Broker> broker;
OutputToInputHandler brokerOut;
broker::Connection brokerConnection;
OutputToInputHandler clientOut;
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 004693b582..3303afa0be 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -83,7 +83,7 @@ testprogs = \
topic_publisher
-check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner
+check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
diff --git a/qpid/cpp/src/tests/Url.cpp b/qpid/cpp/src/tests/Url.cpp
index 09aabb80b3..faf802ba1e 100644
--- a/qpid/cpp/src/tests/Url.cpp
+++ b/qpid/cpp/src/tests/Url.cpp
@@ -30,27 +30,33 @@ BOOST_AUTO_TEST_CASE(testUrl_str) {
Url url;
url.push_back(TcpAddress("foo.com"));
url.push_back(TcpAddress("bar.com", 6789));
-
- BOOST_CHECK_EQUAL(
- url.str(), "amqp:tcp:foo.com:5672,tcp:bar.com:6789");
- BOOST_CHECK_EQUAL(Url().str(), "amqp:");
+ BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str());
+ BOOST_CHECK_EQUAL("amqp:", Url().str());
}
-BOOST_AUTO_TEST_CASE(testUrl_ctor) {
- BOOST_CHECK_EQUAL(
- Url("amqp:foo.com,tcp:bar.com:1234").str(),
- "amqp:tcp:foo.com:5672,tcp:bar.com:1234");
- BOOST_CHECK_EQUAL(
- Url("amqp:foo/ignorethis").str(),
- "amqp:tcp:foo:5672");
- BOOST_CHECK_EQUAL("amqp:tcp::5672", Url("amqp:").str());
- BOOST_CHECK_EQUAL(0u, Url("xxx", nothrow).size());
+BOOST_AUTO_TEST_CASE(testUrl_parse) {
+ Url url;
+ url.parse("amqp:foo.com,tcp:bar.com:1234");
+ BOOST_CHECK_EQUAL(2u, url.size());
+ BOOST_CHECK_EQUAL("foo.com", boost::get<TcpAddress>(url[0]).host);
+ BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:1234", url.str());
+
+ url.parse("amqp:foo/ignorethis");
+ BOOST_CHECK_EQUAL("amqp:tcp:foo:5672", url.str());
+
+ url.parse("amqp:");
+ BOOST_CHECK_EQUAL("amqp:tcp::5672", url.str());
+
try {
- Url invalid("xxx");
+ url.parse("invalid url");
BOOST_FAIL("Expected InvalidUrl exception");
}
catch (const Url::InvalidUrl&) {}
+
+ url.parseNoThrow("invalid url");
+ BOOST_CHECK(url.empty());
}
+
diff --git a/qpid/cpp/src/tests/run_test b/qpid/cpp/src/tests/run_test
index deb22b4450..ec724fe727 100755
--- a/qpid/cpp/src/tests/run_test
+++ b/qpid/cpp/src/tests/run_test
@@ -47,7 +47,7 @@ if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then
# This is a libtool "executable". Valgrind it if VALGRIND specified.
test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file-exactly=$VG_LOG --"
# Hide output unless there's an error.
- libtool --mode=execute "$VALGRIND" "$@" >$TEST_LOG 2>&1 || {
+ libtool --mode=execute $VALGRIND "$@" >$TEST_LOG 2>&1 || {
ERROR=$?
cat $TEST_LOG
}