diff options
author | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
commit | 90f063a35251345f80616c898e1f6cc7a7d0c393 (patch) | |
tree | 8d17b607338f654fc5ba78da44134e7cebe5b9fb /qpid | |
parent | 53507c3ddb1936c7e64804ccf0b19d84f6da8fff (diff) | |
download | qpid-python-90f063a35251345f80616c898e1f6cc7a7d0c393.tar.gz |
* Summary:
- Improved plugin framework and HandlerUpdater interface.
- Cluster handlers for traffic to/from cluster.
- Cluster HandlerUpdater configures channel chains for cluster.
- Cluster PluginProvider registers cluster objects with broker.
* src/qpid/framing/AMQFrame.h: Made data members public. Handlers
need to be able to modify frame data, getters/setters are just a
nuisance here.
* src/tests/Cluster.cpp: Updated for cluster changes, using
handlers instead of friendship to hook test into Cluster code.
* src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and
CHANNEL_HIGH_BIT constants.
* src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer,
broke dependency on broker channel types.
* src/qpid/framing/Handler.h: Added constructors and nextHandler()
* src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel()
* src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster
plugins.
* src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out
plugin details to ClusterPluginProvider.
* src/qpid/cluster/ChannelManager.h: Insert cluster handlers
into channel chains, route frames between cluster and channels.
* src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX
constant.
* src/qpid/broker/Broker.cpp:
- Refactored for new plugin framework.
- Added getUrl().
* src/qpid/Url.h: Added constructor from Address.
* src/qpid/Plugin.h: Generalized plugin framework, broke
dependency on Broker interfaces. We may want to use plug-ins for
clients also at some point.
* src/tests/run_test: Fix bug when VALGRIND is not set.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
34 files changed, 651 insertions, 312 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index f0fbf7e672..b92f8d85e5 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -152,11 +152,14 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Proxy.cpp \ qpid/framing/Handler.h \ qpid/framing/FrameHandler.h \ + qpid/framing/HandlerUpdater.h \ gen/qpid/framing/AMQP_ClientProxy.cpp \ gen/qpid/framing/AMQP_HighestVersion.h \ gen/qpid/framing/AMQP_MethodVersionMap.cpp \ gen/qpid/framing/AMQP_ServerProxy.cpp \ qpid/Exception.cpp \ + qpid/Plugin.h \ + qpid/Plugin.cpp \ qpid/Url.h \ qpid/Url.cpp \ qpid/QpidError.cpp \ @@ -217,10 +220,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ - qpid/broker/TxPublish.cpp \ - qpid/broker/Plugin.h \ - qpid/broker/Plugin.cpp \ - qpid/broker/ChannelInitializer.h + qpid/broker/TxPublish.cpp libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index f97e95c208..4eddf4ffe7 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -10,9 +10,12 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/Cluster.h \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ - qpid/cluster/Dispatchable.h + qpid/cluster/Dispatchable.h \ + qpid/cluster/ChannelManager.h \ + qpid/cluster/ChannelManager.cpp \ + qpid/cluster/ClusterPluginProvider.cpp -libqpidcluster_la_LIBADD= -lcpg libqpidcommon.la +libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la else # Empty stub library to satisfy rpm spec file. diff --git a/qpid/cpp/src/qpid/broker/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp index f018c4e152..6389bfd73d 100644 --- a/qpid/cpp/src/qpid/broker/Plugin.cpp +++ b/qpid/cpp/src/qpid/Plugin.cpp @@ -19,24 +19,24 @@ #include "Plugin.h" namespace qpid { -namespace broker { -std::vector<Plugin*> Plugin::plugins; +std::vector<PluginProvider*> PluginProvider::providers; -Plugin::Plugin() { +PluginProvider::PluginProvider() { // Register myself. - plugins.push_back(this); + providers.push_back(this); } -Plugin::~Plugin() {} +PluginProvider::~PluginProvider() {} -Options* Plugin::getOptions() { return 0; } +Options* PluginProvider::getOptions() { return 0; } -void Plugin::start(Broker&) {} - -void Plugin::finish(Broker&) {} +const std::vector<PluginProvider*>& PluginProvider::getProviders() { + return providers; +} +} // namespace qpid -const std::vector<Plugin*>& Plugin::getPlugins() { return plugins; } +// TODO aconway 2007-06-28: GNU lib has portable dlopen if we go that way. #ifdef USE_APR_PLATFORM @@ -44,24 +44,27 @@ const std::vector<Plugin*>& Plugin::getPlugins() { return plugins; } #include "qpid/sys/apr/APRPool.h" #include <apr_dso.h> -void Plugin::dlopen(const std::string& name) { +namespace qpid { +void dlopen(const char* name) { apr_dso_handle_t* handle; CHECK_APR_SUCCESS( - apr_dso_load(&handle, name.c_str(), sys::APRPool::get())); + apr_dso_load(&handle, name, sys::APRPool::get())); } +} // namespace qpid #else // Posix #include <dlfcn.h> -void Plugin::dlopen(const std::string& name) { +namespace qpid { +void dlopen(const char* name) { dlerror(); - dlopen(name.c_str(), RTLD_NOW); + dlopen(name, RTLD_NOW); const char* error = dlerror(); if (error) { THROW_QPID_ERROR(INTERNAL_ERROR, error); } } -#endif // USE_APR_PLATFORM +} // namespace qpidpp -}} // namespace qpid::broker +#endif // USE_APR_PLATFORM diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h new file mode 100644 index 0000000000..48fb16a541 --- /dev/null +++ b/qpid/cpp/src/qpid/Plugin.h @@ -0,0 +1,111 @@ +#ifndef QPID_PLUGIN_H +#define QPID_PLUGIN_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/shared_ptr.h" +#include <boost/noncopyable.hpp> +#include <vector> +#include <boost/function.hpp> + + +/**@file Generic plug-in framework. */ + +namespace qpid { +class Options; + +/** Generic base class to allow dynamic casting of generic Plugin objects + * to concrete types. + */ +struct Plugin : private boost::noncopyable { + virtual ~Plugin() {} +}; + +/** Generic interface for anything that uses plug-ins. */ +struct PluginUser : boost::noncopyable { + virtual ~PluginUser() {} + /** + * Called by a PluginProvider to provide a plugin. + * + * A concrete PluginUser will dynamic_pointer_cast plugin to a + * class it knows how to use. A PluginUser should ignore plugins + * it does not recognize. + * + * The user will release its shared_ptr when it is finished using + * plugin. + */ + virtual void use(const shared_ptr<Plugin>& plugin) = 0; +}; + + +/** + * Base for classes that provide plug-ins. + */ +class PluginProvider : boost::noncopyable +{ + public: + /** + * Register the provider to appear in getProviders() + * + * A concrete PluginProvider is instantiated as a global or static + * member variable in a library so it is registered during static + * initialization when the library is loaded. + */ + PluginProvider(); + + virtual ~PluginProvider(); + + /** + * Returns configuration options for the plugin. + * Then will be updated during option parsing by the host program. + * + * @return An options group or 0 for no options. Default returns 0. + * PluginProvider retains ownership of return value. + */ + virtual Options* getOptions(); + + /** Provide plugins to a PluginUser. + * + * The provider can dynamic_cast the user if it only provides + * plugins to certain types of user. Providers should ignore + * users they don't recognize. + */ + virtual void provide(PluginUser& user) = 0; + + /** Get the list of pointers to the registered providers. + * Caller must not delete the pointers. + */ + static const std::vector<PluginProvider*>& getProviders(); + + private: + static std::vector<PluginProvider*> providers; +}; + +/** Load a shared library, registering any PluginProvider it contains. + * + * This is just a convenient portable wrapper for normal shared + * library loading. A global PluginProvider instance loaded or + * linked in any way will get registered. + */ +void dlopen(const char* libname); + + +} // namespace qpid + +#endif /*!QPID_PLUGIN_H*/ diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp index b9c17ffd82..d52fc71870 100644 --- a/qpid/cpp/src/qpid/Url.cpp +++ b/qpid/cpp/src/qpid/Url.cpp @@ -100,14 +100,13 @@ struct UrlGrammar : public grammar<UrlGrammar> }; }; -Url::Url(const string& url) { - if (!parse(url.c_str(), UrlGrammar(*this)).full) - throw InvalidUrl("Invalid AMQP url: "+url); - // TODO aconway 2007-06-15: Better error handling? +void Url::parse(const char* url) { + if (!boost::spirit::parse(url, UrlGrammar(*this)).full) + throw InvalidUrl(string("Invalid AMQP url: ")+url); } -Url::Url(const string& url, const nothrow_t&) { - if (!parse(url.c_str(), UrlGrammar(*this)).full) +void Url::parseNoThrow(const char* url) { + if (!boost::spirit::parse(url, UrlGrammar(*this)).full) clear(); } diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h index 93a53afde6..0cf54c493e 100644 --- a/qpid/cpp/src/qpid/Url.h +++ b/qpid/cpp/src/qpid/Url.h @@ -31,7 +31,7 @@ namespace qpid { /** TCP address of a broker - host:port */ struct TcpAddress { static const uint16_t DEFAULT_PORT=5672; - TcpAddress(const std::string& host_=std::string(), + explicit TcpAddress(const std::string& host_=std::string(), uint16_t port_=DEFAULT_PORT) : host(host_), port(port_) {} std::string host; @@ -62,17 +62,26 @@ struct Url : public std::vector<Address> { /** Empty URL. */ Url() {} - /** Parse an amqp URL string as defined in + /** URL containing a single address */ + explicit Url(const Address& addr) { push_back(addr); } + + /** Parse url, throw InvalidUrl if invalid. */ + explicit Url(const std::string& url) { parse(url.c_str()); } + + /** Parse url, throw InvalidUrl if invalid. */ + explicit Url(const char* url) { parse(url); } + + /** Replace contents with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 *@exception InvalidUrl if the url is invalid. */ - Url(const std::string& url); + void parse(const char* url); - /** Parse an amqp URL string as defined in + /** Replace contesnts with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 - * Results in empty URL if url is invalid. + * url.empty() will be true if url is invalid. */ - Url(const std::string& url, const std::nothrow_t&); + void parseNoThrow(const char* url); }; std::ostream& operator<<(std::ostream& os, const Url& url); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 4354f6d38a..65252f5415 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -22,6 +22,7 @@ #include "Broker.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/HandlerUpdater.h" #include "DirectExchange.h" #include "TopicExchange.h" #include "FanOutExchange.h" @@ -41,6 +42,7 @@ #include <memory> using qpid::sys::Acceptor; +using qpid::framing::HandlerUpdater; namespace qpid { namespace broker { @@ -100,16 +102,17 @@ Broker::Broker(const Broker::Options& conf) : } -Broker::shared_ptr Broker::create(int16_t port) +shared_ptr<Broker> Broker::create(int16_t port) { Options config; config.port=port; return create(config); } -Broker::shared_ptr Broker::create(const Options& config) { - return Broker::shared_ptr(new Broker(config)); -} +shared_ptr<Broker> Broker::create(const Options& opts) +{ + return shared_ptr<Broker>(new Broker(opts)); +} MessageStore* Broker::createStore(const Options& config) { if (config.store.empty()) @@ -134,6 +137,10 @@ Broker::~Broker() { int16_t Broker::getPort() const { return getAcceptor().getPort(); } +std::string Broker::getUrl() const { + return Url(TcpAddress(getAcceptor().getHost(), getPort())).str(); +} + Acceptor& Broker::getAcceptor() const { if (!acceptor) const_cast<Acceptor::shared_ptr&>(acceptor) = @@ -144,6 +151,14 @@ Acceptor& Broker::getAcceptor() const { return *acceptor; } +void Broker::use(const shared_ptr<Plugin>& plugin) { + shared_ptr<HandlerUpdater> updater= + dynamic_pointer_cast<HandlerUpdater>(plugin); + if (updater) { + QPID_LOG(critical, "HandlerUpdater plugins not implemented"); + // FIXME aconway 2007-06-28: hook into Connections. + } +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 4be3d9761e..58ba8589b0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -24,9 +24,9 @@ #include "ConnectionFactory.h" #include "qpid/Url.h" +#include "qpid/Plugin.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Acceptor.h" -#include "qpid/SharedObject.h" #include "MessageStore.h" #include "AutoDelete.h" #include "ExchangeRegistry.h" @@ -39,14 +39,17 @@ #include "qpid/Options.h" namespace qpid { + +namespace framing { +class HandlerUpdater; +} + namespace broker { -class ChannelInitializer; /** * A broker instance. */ -class Broker : public sys::Runnable, - public SharedObject<Broker> +class Broker : public sys::Runnable, public PluginUser { public: struct Options : public qpid::Options { @@ -62,16 +65,9 @@ class Broker : public sys::Runnable, virtual ~Broker(); - /** - * Create a broker. - * @param port Port to listen on or 0 to pick a port dynamically. - */ - static shared_ptr create(int16_t port = TcpAddress::DEFAULT_PORT); - - /** - * Create a broker with the options in config. - */ - static shared_ptr create(const Options& config); + Broker(const Options& configuration); + static shared_ptr<Broker> create(const Options& configuration); + static shared_ptr<Broker> create(int16_t port = TcpAddress::DEFAULT_PORT); /** * Return listening port. If called before bind this is @@ -80,19 +76,22 @@ class Broker : public sys::Runnable, * 0. */ virtual int16_t getPort() const; - + + /** Return the broker's URL. */ + virtual std::string getUrl() const; + /** * Run the broker. Implements Runnable::run() so the broker * can be run in a separate thread. */ virtual void run(); - /** Plug-in a channel initializer. */ - void plugin(const qpid::shared_ptr<ChannelInitializer>&); - /** Shut down the broker */ virtual void shutdown(); + /** Use a plugin */ + void use(const shared_ptr<Plugin>& plugin); + MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } @@ -102,7 +101,6 @@ class Broker : public sys::Runnable, DtxManager& getDtxManager() { return dtxManager; } private: - Broker(const Options& configuration); sys::Acceptor& getAcceptor() const; Options config; diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index c31f4d197d..b7a61ababe 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -58,7 +58,7 @@ void BrokerAdapter::ConnectionHandlerImpl::startOk( const string& /*response*/, const string& /*locale*/) { client.tune( - 100, connection.getFrameMax(), connection.getHeartbeat()); + CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } void BrokerAdapter::ConnectionHandlerImpl::secureOk( diff --git a/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp b/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp index 4571764850..77200dd760 100644 --- a/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerSingleton.cpp @@ -24,13 +24,13 @@ namespace broker { BrokerSingleton::BrokerSingleton() { if (broker.get() == 0) broker = Broker::create(); - Broker::shared_ptr::operator=(broker); + shared_ptr<Broker>::operator=(broker); } BrokerSingleton::~BrokerSingleton() { broker->shutdown(); } -Broker::shared_ptr BrokerSingleton::broker; +shared_ptr<Broker> BrokerSingleton::broker; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/BrokerSingleton.h b/qpid/cpp/src/qpid/broker/BrokerSingleton.h index 139e02a5fd..14b932df36 100644 --- a/qpid/cpp/src/qpid/broker/BrokerSingleton.h +++ b/qpid/cpp/src/qpid/broker/BrokerSingleton.h @@ -32,17 +32,17 @@ namespace broker { * Useful for unit tests that want to share a broker between multiple * tests to reduce overhead of starting/stopping a broker for every test. * - * Tests that need a new broker can call Broker::create directly. + * Tests that need a new broker can create it directly. * * THREAD UNSAFE. */ -class BrokerSingleton : public Broker::shared_ptr +class BrokerSingleton : public shared_ptr<Broker> { public: BrokerSingleton(); ~BrokerSingleton(); private: - static Broker::shared_ptr broker; + static shared_ptr<Broker> broker; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Plugin.h b/qpid/cpp/src/qpid/broker/Plugin.h deleted file mode 100644 index 8c794b86a0..0000000000 --- a/qpid/cpp/src/qpid/broker/Plugin.h +++ /dev/null @@ -1,80 +0,0 @@ -#ifndef QPID_BROKER_PLUGIN_H -#define QPID_BROKER_PLUGIN_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/Options.h" -#include "qpid/broker/Broker.h" -#include <boost/noncopyable.hpp> -#include <vector> - -namespace qpid { -namespace broker { - -/** - * Inherit broker plug-ins from this class, override the virtual - * functions and create a global (or class static member) instance in - * a shared library. When the library is loaded your plug-in will be - * registered. - */ -class Plugin : boost::noncopyable -{ - public: - /** Constructor registers the plugin to appear in getPlugins(). - * Note: Plugin subclasses should only be constructed during - * static initialization, i.e. they should only be declared - * as global or static member variables. - */ - Plugin(); - - virtual ~Plugin(); - - /** - * Override if your plugin has configuration options. - * They will be included in options parsing prior to broker - * creation setup. - *@return An options group or 0. Default returns 0. - */ - virtual Options* getOptions(); - - /** Called immediately after broker creation to allow plug-ins - * to do whatever they do to the broker, e.g. add handler chain - * manipulators. - */ - virtual void start(Broker& b); - - /** Called just before broker shutdown. Default does nothing */ - virtual void finish(Broker& b); - - /** Get the list of registered plug-ins. */ - static const std::vector<Plugin*>& getPlugins(); - - /** Load a shared library (that contains plugins presumably!) */ - static void dlopen(const std::string& libname); - - private: - static std::vector<Plugin*> plugins; -}; - -}} // namespace qpid::broker - - - - -#endif /*!QPID_BROKER_PLUGIN_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ChannelManager.cpp b/qpid/cpp/src/qpid/cluster/ChannelManager.cpp new file mode 100644 index 0000000000..f573d78ca1 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ChannelManager.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "ChannelManager.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +/** Handler to multicast to the cluster */ +struct ClusterHandler : public FrameHandler { + + ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_) + : FrameHandler(next), bitmask(bitmask_) {} + + void handle(AMQFrame& frame) { + frame.channel |= bitmask; // Mark the frame + nextHandler(frame); + // TODO aconway 2007-06-28: Right now everything is backed up + // via multicast. When we have point-to-point backups this + // function must determine where each frame should be sent: to + // multicast or only to specific backup(s) via AMQP. + } + + ChannelId bitmask; +}; + +ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){} + +void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) { + // Store the original cluster chains for the channel. + channels[id] = chains; + + // Replace chains with multicast-to-cluster handlers that mark the + // high-bit of the channel ID on outgoing frames so we can tell + // them from incoming frames in handle() + // + // When handle() receives the frames from the cluster it + // will forward them to the original channel chains stored in + // channels map. + // + chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0)); + chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT)); +} + +void ChannelManager::handle(AMQFrame& frame) { + bool isOut = frame.channel | CHANNEL_HIGH_BIT; + frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit. + ChannelMap::iterator i = channels.find(frame.getChannel()); + if (i != channels.end()) { + Chain& chain = isOut ? i->second.out : i->second.in; + chain->handle(frame); + } + else + updateFailoverState(frame); +} + +void ChannelManager::updateFailoverState(AMQFrame& ) { + QPID_LOG(critical, "Failover is not implemented"); + // FIXME aconway 2007-06-28: + // If the channel is not in my map then I'm not primary so + // I don't pass the frame to the channel handler but I + // do need to update the session failover state. +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ChannelManager.h b/qpid/cpp/src/qpid/cluster/ChannelManager.h new file mode 100644 index 0000000000..59fce77957 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ChannelManager.h @@ -0,0 +1,66 @@ +#ifndef QPID_CLUSTER_CHANNELMANAGER_H +#define QPID_CLUSTER_CHANNELMANAGER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/HandlerUpdater.h" +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Manage channels and handler chains on channels for the cluster. + * + * As HandlerUpdater plugin, updates channel handler chains with + * cluster handlers. + * + * As a FrameHandler handles frames coming from the cluster and + * dispatches them to the appropriate channel handler. + * + */ +class ChannelManager : public framing::HandlerUpdater, + public framing::FrameHandler +{ + public: + /** Takes a handler to send frames to the cluster. */ + ChannelManager(framing::FrameHandler::Chain mcastOut); + + /** As FrameHandler handle frames from the cluster */ + void handle(framing::AMQFrame& frame); + + /** As ChannelUpdater update the handler chains. */ + void update(framing::ChannelId id, framing::FrameHandler::Chains& chains); + + private: + void updateFailoverState(framing::AMQFrame&); + + typedef std::map<framing::ChannelId, + framing::FrameHandler::Chains> ChannelMap; + + framing::FrameHandler::Chain mcastOut; + ChannelMap channels; +}; + + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 30073c4551..8d898eefa3 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -17,13 +17,13 @@ */ #include "Cluster.h" -#include "Cpg.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> #include <iterator> +#include <map> namespace qpid { namespace cluster { @@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "(" << cluster.self << ")"; } +namespace { +Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; +struct StatusMapInit { + StatusMapInit() { + statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; + statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; + statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; + statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; + statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; + } +} instance; +} + +Cluster::Member::Member(const cpg_address& addr) + : status(statusMap[addr.reason]) {} + void Cluster::notify() { + ProtocolVersion version; // TODO aconway 2007-06-25: Use proxy here. AMQFrame frame(version, 0, make_shared_ptr(new ClusterNotifyBody(version, url))); handle(frame); } -Cluster::Cluster( - const std::string& name_, const std::string& url_, FrameHandler& next_, - ProtocolVersion ver) - : name(name_), url(url_), version(ver), - cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - next(next_) -{ - self=Id(cpg->getLocalNoideId(), getpid()); +Cluster::Cluster(const std::string& name_, const std::string& url_) : + name(name_), + url(url_), + cpg(new Cpg( + boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + self(cpg->getLocalNoideId(), getpid()) +{} + +void Cluster::join(FrameHandler::Chain next) { QPID_LOG(trace, *this << " Joining cluster."); + next = next; + dispatcher=Thread(*this); cpg->join(name); notify(); - dispatcher=Thread(*this); } Cluster::~Cluster() { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); + if (cpg) { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } } } void Cluster::handle(AMQFrame& frame) { + assert(cpg); QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -104,52 +126,59 @@ void Cluster::cpgDeliver( frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame); // TODO aconway 2007-06-20: use visitor pattern. - ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + ClusterNotifyBody* notifyIn= + dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { - Mutex::ScopedLock l(lock); - members[from].reset(new Member(notifyIn->getUrl())); - lock.notifyAll(); + { + Mutex::ScopedLock l(lock); + assert(members[from]); + members[from]->url = notifyIn->getUrl(); + members[from]->status = Member::BROKER; + } + if (callback) + callback(); } else - next.handle(frame); + next->handle(frame); } void Cluster::cpgConfigChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *ccMembers, int nMembers, + struct cpg_address *current, int nCurrent, struct cpg_address *left, int nLeft, struct cpg_address *joined, int nJoined ) { - QPID_LOG( - trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(ccMembers, nMembers)); - + QPID_LOG(trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(current, nCurrent)); + + bool needNotify=false; + MemberList updated; { Mutex::ScopedLock l(lock); - // Erase members that left. - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - lock.notifyAll(); - } - - // If there are new members (other than myself) then notify. - for (int i=0; i< nJoined; ++i) { - if (Id(joined[i]) != self) { - notify(); - break; + for (int i = 0; i < nJoined; ++i) { + Id id(current[i]); + members[id].reset(new Member(current[i])); + if (id != self) + needNotify = true; // Notify new members other than myself. } - } - - // Note: New members are be added to my map when cpgDeliver - // gets a cluster.notify frame. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(current[i])); + } // End of locked scope. + if (needNotify) + notify(); + if (callback) + callback(); } +void Cluster::setCallback(boost::function<void()> f) { callback=f; } + void Cluster::run() { + assert(cpg); cpg->dispatchBlocking(); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 1cbbb249f2..aff213b6c9 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -25,47 +25,74 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/shared_ptr.h" -#include "qpid/framing/ProtocolVersion.h" +#include <boost/function.hpp> #include <boost/scoped_ptr.hpp> #include <map> #include <vector> namespace qpid { + +namespace broker { +class HandlerUpdater; +} + namespace cluster { +class ChannelManager; + /** - * Represents a cluster. Creating an instance joins current process - * to the cluster. + * Represents a cluster, provides access to data about members. + * + * Implements a FrameHandler that multicasts frames to the cluster. + * + * Requires a handler for frames arriving from the cluster, + * normally a ChannelManager but other handlers could be interposed + * for testing, logging etc. */ class Cluster : public framing::FrameHandler, private sys::Runnable { public: /** Details of a cluster member */ struct Member { - Member(const std::string& url_) : url(url_) {} + typedef shared_ptr<const Member> Ptr; + /** Status of a cluster member. */ + enum Status { + JOIN, ///< Process joined the group. + LEAVE, ///< Process left the group cleanly. + NODEDOWN, ///< Process's node went down. + NODEUP, ///< Process's node joined the cluster. + PROCDOWN, ///< Process died without leaving. + BROKER ///< Broker details are available. + }; + + Member(const cpg_address&); std::string url; + Status status; }; - - typedef std::vector<shared_ptr<const Member> > MemberList; + + typedef std::vector<Member::Ptr> MemberList; /** - * Join a cluster. + * Create a cluster object but do not joing. * @param name of the cluster. * @param url of this broker, sent to the cluster. - * @param next handler receives the frame when it has been - * acknowledged by the cluster. */ - Cluster(const std::string& name, - const std::string& url, - framing::FrameHandler& next, - framing::ProtocolVersion); + Cluster(const std::string& name, const std::string& url); ~Cluster(); + + /** Join the cluster. + *@handler is the handler for frames arriving from the cluster. + */ + void join(framing::FrameHandler::Chain handler); /** Multicast a frame to the cluster. */ void handle(framing::AMQFrame&); /** Get the current cluster membership. */ MemberList getMembers() const; + + /** Called when membership changes. */ + void setCallback(boost::function<void()>); /** Number of members in the cluster. */ size_t size() const; @@ -76,7 +103,6 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { void run(); void notify(); - void cpgDeliver( cpg_handle_t /*handle*/, struct cpg_name *group, @@ -93,18 +119,14 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { struct cpg_address */*joined*/, int /*nJoined*/ ); - Id self; + mutable sys::Monitor lock; Cpg::Name name; std::string url; - framing::ProtocolVersion version; boost::scoped_ptr<Cpg> cpg; - framing::FrameHandler& next; + Id self; MemberMap members; sys::Thread dispatcher; - - protected: - // Allow access from ClusterTest subclass. - mutable sys::Monitor lock; + boost::function<void()> callback; friend std::ostream& operator <<(std::ostream&, const Cluster&); }; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp new file mode 100644 index 0000000000..3a09a66b81 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterPluginProvider.cpp @@ -0,0 +1,66 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/Broker.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/cluster/Cluster.h" +#include "qpid/cluster/ChannelManager.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" + +namespace qpid { +namespace cluster { + +using namespace std; + +struct ClusterPluginProvider : public PluginProvider { + + struct ClusterOptions : public Options { + string clusterName; + ClusterOptions() { + addOptions() + ("cluster", optValue(clusterName, "NAME"), + "Join the cluster named NAME"); + } + }; + + ClusterOptions options; + shared_ptr<Cluster> cluster; + + Options* getOptions() { + return &options; + } + + void provide(PluginUser& user) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&user); + // Only provide to a Broker, and only if the --cluster config is set. + if (broker && !options.clusterName.empty()) { + assert(!cluster); // A process can only belong to one cluster. + cluster.reset(new Cluster(options.clusterName, broker->getUrl())); + + // Channel manager is both the next handler for the cluster + // and the HandlerUpdater plugin for the broker. + shared_ptr<ChannelManager> manager(new ChannelManager(cluster)); + cluster->join(manager); + broker->use(manager); + } + } +}; + +static ClusterPluginProvider instance; // Static initialization. + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index 6b157301a7..e164ed1215 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -23,11 +23,10 @@ #include "qpid/cluster/Dispatchable.h" #include <boost/function.hpp> #include <cassert> -#ifdef CLUSTER extern "C" { #include <openais/cpg.h> } -#endif + namespace qpid { namespace cluster { diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 6541a0e788..a528913fd9 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -39,20 +39,14 @@ version(_version) assert(version != ProtocolVersion(0,0)); } -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : -version(_version), channel(_channel), body(_body) -{} +AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {} AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) : -version(_version), channel(_channel), body(_body) + channel(_channel), body(_body), version(_version) {} AMQFrame::~AMQFrame() {} -uint16_t AMQFrame::getChannel(){ - return channel; -} - AMQBody::shared_ptr AMQFrame::getBody(){ return body; } diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index bef1b01df4..1a7b203ad7 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -48,7 +48,7 @@ class AMQFrame : public AMQDataBlock virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); virtual uint32_t size() const; - uint16_t getChannel(); + uint16_t getChannel() const { return channel; } AMQBody::shared_ptr getBody(); /** Convenience template to cast the body to an expected type */ @@ -60,18 +60,17 @@ class AMQFrame : public AMQDataBlock uint32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); - private: - static AMQP_MethodVersionMap versionMap; - ProtocolVersion version; - uint16_t channel; uint8_t type; AMQBody::shared_ptr body; - + ProtocolVersion version; - friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); + private: + static AMQP_MethodVersionMap versionMap; }; +std::ostream& operator<<(std::ostream&, const AMQFrame&); + }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Handler.h b/qpid/cpp/src/qpid/framing/Handler.h index 56e150a66d..f6b59393d9 100644 --- a/qpid/cpp/src/qpid/framing/Handler.h +++ b/qpid/cpp/src/qpid/framing/Handler.h @@ -40,11 +40,21 @@ template <class T> struct Handler { Chain out; }; + Handler() {} + Handler(Chain next_) : next(next_) {} virtual ~Handler() {} + virtual void handle(T) = 0; + + /** Next handler. Public so chains can be modified by altering next. */ Chain next; -}; + protected: + /** Derived handle() implementations call nextHandler to invoke the + * next handler in the chain. */ + void nextHandler(T data) { if (next) next->handle(data); } + +}; }} diff --git a/qpid/cpp/src/qpid/broker/ChannelInitializer.h b/qpid/cpp/src/qpid/framing/HandlerUpdater.h index 84dea7a3d7..5cb1e87d6e 100644 --- a/qpid/cpp/src/qpid/broker/ChannelInitializer.h +++ b/qpid/cpp/src/qpid/framing/HandlerUpdater.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_CHANNELINITIALIZER_H -#define QPID_BROKER_CHANNELINITIALIZER_H +#ifndef QPID_FRAMING_HANDLERUPDATER_H +#define QPID_FRAMING_HANDLERUPDATER_H /* * @@ -19,25 +19,26 @@ * */ -#include <boost/noncopyable.hpp> +#include "qpid/Plugin.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/FrameHandler.h" namespace qpid { -namespace broker { +namespace framing { + +/** Plugin object that can update handler chains. */ +struct HandlerUpdater : public Plugin { + /** Update the handler chains. + *@param id Unique identifier for channel or session. + *@param chains Handler chains to be updated. + */ + virtual void update(ChannelId id, FrameHandler::Chains& chains) = 0; +}; -/** - * A ChannelInitializer is called each time a new Channel is created. - */ -class ChannelInitializer : boost::noncopyable -{ - public: - virtual ~ChannelInitializer() {} +}} // namespace qpid::framing - /** Called for each new channel */ - virtual initialize(Channe&) = 0; -}; -}} // namespace qpid::broker -#endif /*!QPID_BROKER_CHANNELINITIALIZER_H*/ +#endif /*!QPID_FRAMING_HANDLERUPDATER_H*/ diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h index f0bd0ce427..efb720f047 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types.h +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -53,5 +53,12 @@ typedef uint16_t ReplyCode; // Types represented by classes. class Content; class FieldTable; + +// Useful constants + +/** Maximum channel ID used by broker. Reserve high bit for internal use.*/ +const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1; +const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); + }} // namespace qpid::framing #endif diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index 28a6cf263e..e17deeee2e 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -74,7 +74,7 @@ struct QpiddOptions : public qpid::Options { }; // Globals -Broker::shared_ptr brokerPtr; +shared_ptr<Broker> brokerPtr; QpiddOptions options; void handle_signal(int /*signal*/){ @@ -112,7 +112,7 @@ void parent(Daemon& demon) { /** Code for forked child */ void child(Daemon& demon) { - brokerPtr=Broker::create(options.broker); + brokerPtr.reset(new Broker(options.broker)); uint16_t realPort=brokerPtr->getPort(); demon.ready(realPort); // Notify parent. brokerPtr->run(); @@ -161,7 +161,7 @@ int main(int argc, char* argv[]) demon.fork(parent, child); } else { // Non-daemon broker. - brokerPtr = Broker::create(options.broker); + brokerPtr.reset(new Broker(options.broker)); if (options.broker.port == 0) cout << uint16_t(brokerPtr->getPort()) << endl; brokerPtr->run(); diff --git a/qpid/cpp/src/tests/.valgrindrc-default b/qpid/cpp/src/tests/.valgrindrc-default index 41e484d04e..4aba7661de 100644 --- a/qpid/cpp/src/tests/.valgrindrc-default +++ b/qpid/cpp/src/tests/.valgrindrc-default @@ -3,5 +3,5 @@ --demangle=yes --suppressions=.valgrind.supp --num-callers=25 ---track-fds=yes +--trace-children=yes diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp index 9dee1fc862..29ed1ae230 100644 --- a/qpid/cpp/src/tests/BrokerChannelTest.cpp +++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp @@ -60,7 +60,7 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testFlow); CPPUNIT_TEST_SUITE_END(); - Broker::shared_ptr broker; + shared_ptr<Broker> broker; Connection connection; MockHandler handler; diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp index ed50cc5d7b..008575140b 100644 --- a/qpid/cpp/src/tests/Cluster.cpp +++ b/qpid/cpp/src/tests/Cluster.cpp @@ -24,46 +24,44 @@ #include "qpid/framing/BasicGetOkBody.h" - static const ProtocolVersion VER; /** Verify membership ind a cluster with one member. */ BOOST_AUTO_TEST_CASE(clusterOne) { - VectorFrameHandler received; - Cluster cluster("Test", "amqp:one:1", received, VER); + Cluster cluster("Test", "amqp:one:1"); + TestClusterHandler handler(cluster); AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - cluster.handle(frame); - BOOST_REQUIRE(received.waitFor(1)); + BOOST_REQUIRE(handler.waitFrames(1)); BOOST_CHECK_EQUAL(1u, cluster.size()); Cluster::MemberList members = cluster.getMembers(); BOOST_CHECK_EQUAL(1u, members.size()); BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1"); - BOOST_CHECK_EQUAL(1u, received.size()); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody()); + BOOST_CHECK_EQUAL(1u, handler.size()); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); } /** Fork a process to verify membership in a cluster with two members */ BOOST_AUTO_TEST_CASE(clusterTwo) { - VectorFrameHandler received; pid_t pid=fork(); BOOST_REQUIRE(pid >= 0); - if (pid) { // Parent - TestCluster cluster("Test", "amqp::1", received, VER); - BOOST_REQUIRE(cluster.waitFor(2)); + if (pid) { // Parent see Cluster_child.cpp for child. + Cluster cluster("Test", "amqp::1"); + TestClusterHandler handler(cluster); + BOOST_REQUIRE(handler.waitMembers(2)); // Exchange frames with child. AMQFrame frame(VER, 1, new ChannelOkBody(VER)); cluster.handle(frame); - BOOST_REQUIRE(received.waitFor(2)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody()); - BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody()); + BOOST_REQUIRE(handler.waitFrames(2)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); + BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody()); // Wait for child to exit. int status; BOOST_CHECK_EQUAL(::wait(&status), pid); BOOST_CHECK_EQUAL(0, status); - BOOST_CHECK(cluster.waitFor(1)); + BOOST_CHECK(handler.waitMembers(1)); BOOST_CHECK_EQUAL(1u, cluster.size()); } else { // Child diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h index 7ca5445e10..edb1f1524f 100644 --- a/qpid/cpp/src/tests/Cluster.h +++ b/qpid/cpp/src/tests/Cluster.h @@ -24,6 +24,7 @@ #include "qpid/framing/ChannelOkBody.h" #include "qpid/framing/BasicGetOkBody.h" #include "qpid/log/Logger.h" +#include <boost/bind.hpp> #include <iostream> #include <vector> @@ -39,44 +40,44 @@ using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::sys; -struct TestCluster : public Cluster { - TestCluster(const std::string& name, - const std::string& url, - framing::FrameHandler& next, - framing::ProtocolVersion ver) : Cluster(name,url,next, ver) {} +void null_deleter(void*) {} - /** Wait for the cluster to be of expected size (exactly) */ - bool waitFor(size_t n) { - Mutex::ScopedLock l(lock); - AbsTime deadline(now(),2*TIME_SEC); - while(size() != n && lock.wait(deadline)) - ; - return size() == n; - } -}; - -struct VectorFrameHandler : +struct TestClusterHandler : public std::vector<AMQFrame>, public FrameHandler, public Monitor { + TestClusterHandler(Cluster& c) : cluster(c) { + cluster.join(make_shared_ptr(this, &null_deleter)); + cluster.setCallback(boost::bind(&Monitor::notify, this)); + } + void handle(AMQFrame& f) { ScopedLock l(*this); push_back(f); notifyAll(); } - /** Wait for vector to reach size n exactly */ - bool waitFor(size_t n) { + /** Wait for the vector to contain n frames. */ + bool waitFrames(size_t n) { ScopedLock l(*this); - AbsTime deadline(now(), 1*TIME_SEC); + AbsTime deadline(now(), TIME_SEC); while (size() != n && wait(deadline)) ; return size() == n; } -}; + /** Wait for the cluster to have n members */ + bool waitMembers(size_t n) { + ScopedLock l(*this); + AbsTime deadline(now(), TIME_SEC); + while (cluster.size() != n && wait(deadline)) + ; + return cluster.size() == n; + } + + Cluster& cluster; +}; -// namespace diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp index 1540717f4a..a5ac3e9669 100644 --- a/qpid/cpp/src/tests/Cluster_child.cpp +++ b/qpid/cpp/src/tests/Cluster_child.cpp @@ -32,16 +32,14 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { - VectorFrameHandler received; - TestCluster cluster("Test", "amqp::2", received, VER); - BOOST_REQUIRE(cluster.waitFor(2)); - - BOOST_REQUIRE(received.waitFor(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody()); + Cluster cluster("Test", "amqp::2"); + TestClusterHandler handler(cluster); + BOOST_REQUIRE(handler.waitFrames(1)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); AMQFrame frame(VER, 1, new BasicGetOkBody(VER)); cluster.handle(frame); - BOOST_REQUIRE(received.waitFor(2)); - BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody()); + BOOST_REQUIRE(handler.waitFrames(2)); + BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody()); } int test_main(int, char**) { diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp index 528919de48..9c60af7866 100644 --- a/qpid/cpp/src/tests/FramingTest.cpp +++ b/qpid/cpp/src/tests/FramingTest.cpp @@ -402,8 +402,8 @@ class FramingTest : public CppUnit::TestCase broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h index 8628bde431..48ac80d30a 100644 --- a/qpid/cpp/src/tests/InProcessBroker.h +++ b/qpid/cpp/src/tests/InProcessBroker.h @@ -113,7 +113,7 @@ class InProcessBroker : public client::Connector { }; framing::ProtocolInitiation protocolInit; - Broker::shared_ptr broker; + shared_ptr<Broker> broker; OutputToInputHandler brokerOut; broker::Connection brokerConnection; OutputToInputHandler clientOut; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 004693b582..3303afa0be 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -83,7 +83,7 @@ testprogs = \ topic_publisher -check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner +check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test diff --git a/qpid/cpp/src/tests/Url.cpp b/qpid/cpp/src/tests/Url.cpp index 09aabb80b3..faf802ba1e 100644 --- a/qpid/cpp/src/tests/Url.cpp +++ b/qpid/cpp/src/tests/Url.cpp @@ -30,27 +30,33 @@ BOOST_AUTO_TEST_CASE(testUrl_str) { Url url; url.push_back(TcpAddress("foo.com")); url.push_back(TcpAddress("bar.com", 6789)); - - BOOST_CHECK_EQUAL( - url.str(), "amqp:tcp:foo.com:5672,tcp:bar.com:6789"); - BOOST_CHECK_EQUAL(Url().str(), "amqp:"); + BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str()); + BOOST_CHECK_EQUAL("amqp:", Url().str()); } -BOOST_AUTO_TEST_CASE(testUrl_ctor) { - BOOST_CHECK_EQUAL( - Url("amqp:foo.com,tcp:bar.com:1234").str(), - "amqp:tcp:foo.com:5672,tcp:bar.com:1234"); - BOOST_CHECK_EQUAL( - Url("amqp:foo/ignorethis").str(), - "amqp:tcp:foo:5672"); - BOOST_CHECK_EQUAL("amqp:tcp::5672", Url("amqp:").str()); - BOOST_CHECK_EQUAL(0u, Url("xxx", nothrow).size()); +BOOST_AUTO_TEST_CASE(testUrl_parse) { + Url url; + url.parse("amqp:foo.com,tcp:bar.com:1234"); + BOOST_CHECK_EQUAL(2u, url.size()); + BOOST_CHECK_EQUAL("foo.com", boost::get<TcpAddress>(url[0]).host); + BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:1234", url.str()); + + url.parse("amqp:foo/ignorethis"); + BOOST_CHECK_EQUAL("amqp:tcp:foo:5672", url.str()); + + url.parse("amqp:"); + BOOST_CHECK_EQUAL("amqp:tcp::5672", url.str()); + try { - Url invalid("xxx"); + url.parse("invalid url"); BOOST_FAIL("Expected InvalidUrl exception"); } catch (const Url::InvalidUrl&) {} + + url.parseNoThrow("invalid url"); + BOOST_CHECK(url.empty()); } + diff --git a/qpid/cpp/src/tests/run_test b/qpid/cpp/src/tests/run_test index deb22b4450..ec724fe727 100755 --- a/qpid/cpp/src/tests/run_test +++ b/qpid/cpp/src/tests/run_test @@ -47,7 +47,7 @@ if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then # This is a libtool "executable". Valgrind it if VALGRIND specified. test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file-exactly=$VG_LOG --" # Hide output unless there's an error. - libtool --mode=execute "$VALGRIND" "$@" >$TEST_LOG 2>&1 || { + libtool --mode=execute $VALGRIND "$@" >$TEST_LOG 2>&1 || { ERROR=$? cat $TEST_LOG } |