diff options
Diffstat (limited to 'cpp/src/qpid/ha/Backup.cpp')
-rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 5acbfb9d5f..3f3fa87a01 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -19,10 +19,11 @@ * */ #include "Backup.h" -#include "Settings.h" #include "BrokerReplicator.h" -#include "ReplicatingSubscription.h" #include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -43,37 +44,44 @@ using namespace broker; using types::Variant; using std::string; -Backup::Backup(broker::Broker& b, const Settings& s) : - broker(b), settings(s), excluder(new ConnectionExcluder()) +Backup::Backup(HaBroker& hb, const Settings& s) : + haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) { + // Exclude client connections before starting the link to avoid self-connection. + broker.getConnectionObservers().add(excluder); // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } void Backup::initialize(const Url& url) { - assert(!url.empty()); - QPID_LOG(notice, "Ha: Backup started: " << url); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); + QPID_LOG(notice, "HA: Backup initialized: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); - assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; link->setUrl(url); - - replicator.reset(new BrokerReplicator(link)); + replicator.reset(new BrokerReplicator(haBroker, link)); broker.getExchanges().registerExchange(replicator); - broker.getConnectionObservers().add(excluder); } +Backup::~Backup() { + if (link) link->close(); + if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + replicator.reset(); + broker.getConnectionObservers().remove(excluder); // This allows client connections. +} + + void Backup::setBrokerUrl(const Url& url) { // Ignore empty URLs seen during start-up for some tests. if (url.empty()) return; sys::Mutex::ScopedLock l(lock); if (link) { // URL changed after we initialized. - QPID_LOG(info, "HA: Backup failover URL set to " << url); + QPID_LOG(info, "HA: Backup broker URL set to " << url); link->setUrl(url); } else { @@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) { } } -Backup::~Backup() { - if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); - broker.getConnectionObservers().remove(excluder); // This allows client connections. -} - }} // namespace qpid::ha |