summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ClusterPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterPlugin.cpp')
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp74
1 files changed, 47 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index c4b67de141..a2c66e3790 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -15,8 +15,8 @@
* limitations under the License.
*
*/
-#include <boost/program_options/value_semantic.hpp>
+#include "ConnectionInterceptor.h"
#include "qpid/broker/Broker.h"
@@ -25,61 +25,81 @@
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
-#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
-
namespace qpid {
namespace cluster {
using namespace std;
-struct ClusterOptions : public Options {
+struct ClusterValues {
string name;
string url;
- ClusterOptions() : Options("Cluster Options") {
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
+
+/** Note separating options from values to work around boost version differences.
+ * Old boost takes a reference to options objects, but new boost makes a copy.
+ * New boost allows a shared_ptr but that's not compatible with old boost.
+ */
+struct ClusterOptions : public Options {
+ ClusterValues& values;
+
+ ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) {
addOptions()
- ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(url,"URL"),
+ ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(values.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
;
}
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
};
struct ClusterPlugin : public Plugin {
- typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+ ClusterValues values;
ClusterOptions options;
- boost::optional<Cluster> cluster;
+ boost::intrusive_ptr<Cluster> cluster;
+
+ ClusterPlugin() : options(values) {}
+
+ Options* getOptions() { return &options; }
- template <class Chain> void init(Plugin::Target& t) {
- Chain* c = dynamic_cast<Chain*>(&t);
- if (c) cluster->initialize(*c);
+ void init(broker::Broker& b) {
+ if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
+ cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
+ b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+ }
+
+ template <class T> void init(T& t) {
+ if (cluster) cluster->initialize(t);
+ }
+
+ template <class T> bool init(Plugin::Target& target) {
+ T* t = dynamic_cast<T*>(&target);
+ if (t) init(*t);
+ return t;
}
void earlyInitialize(Plugin::Target&) {}
void initialize(Plugin::Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && !options.name.empty()) {
- if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
- cluster = boost::in_place(options.name,
- options.getUrl(broker->getPort()),
- boost::ref(*broker));
- return;
- }
- if (!cluster) return; // Ignore chain handlers if we didn't init a cluster.
- init<ConnectionChain>(target);
+ if (init<broker::Broker>(target)) return;
+ if (!cluster) return; // Remaining plugins only valid if cluster initialized.
+ if (init<broker::Connection>(target)) return;
}
+
+ void shutdown() { cluster = 0; }
};
static ClusterPlugin instance; // Static initialization.
+
+// For test purposes.
+boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
}} // namespace qpid::cluster