diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 74 |
1 files changed, 47 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index c4b67de141..a2c66e3790 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -15,8 +15,8 @@ * limitations under the License. * */ -#include <boost/program_options/value_semantic.hpp> +#include "ConnectionInterceptor.h" #include "qpid/broker/Broker.h" @@ -25,61 +25,81 @@ #include "qpid/Options.h" #include "qpid/shared_ptr.h" -#include <boost/optional.hpp> #include <boost/utility/in_place_factory.hpp> - namespace qpid { namespace cluster { using namespace std; -struct ClusterOptions : public Options { +struct ClusterValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +/** Note separating options from values to work around boost version differences. + * Old boost takes a reference to options objects, but new boost makes a copy. + * New boost allows a shared_ptr but that's not compatible with old boost. + */ +struct ClusterOptions : public Options { + ClusterValues& values; + + ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(values.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") ; } - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } }; struct ClusterPlugin : public Plugin { - typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + ClusterValues values; ClusterOptions options; - boost::optional<Cluster> cluster; + boost::intrusive_ptr<Cluster> cluster; + + ClusterPlugin() : options(values) {} + + Options* getOptions() { return &options; } - template <class Chain> void init(Plugin::Target& t) { - Chain* c = dynamic_cast<Chain*>(&t); - if (c) cluster->initialize(*c); + void init(broker::Broker& b) { + if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); + cluster = new Cluster(values.name, values.getUrl(b.getPort()), b); + b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); + } + + template <class T> void init(T& t) { + if (cluster) cluster->initialize(t); + } + + template <class T> bool init(Plugin::Target& target) { + T* t = dynamic_cast<T*>(&target); + if (t) init(*t); + return t; } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && !options.name.empty()) { - if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - return; - } - if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. - init<ConnectionChain>(target); + if (init<broker::Broker>(target)) return; + if (!cluster) return; // Remaining plugins only valid if cluster initialized. + if (init<broker::Connection>(target)) return; } + + void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. + +// For test purposes. +boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } }} // namespace qpid::cluster |