summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/Backup.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/Backup.cpp')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp36
1 files changed, 19 insertions, 17 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 5acbfb9d5f..3f3fa87a01 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -19,10 +19,11 @@
*
*/
#include "Backup.h"
-#include "Settings.h"
#include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -43,37 +44,44 @@ using namespace broker;
using types::Variant;
using std::string;
-Backup::Backup(broker::Broker& b, const Settings& s) :
- broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+ haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
{
+ // Exclude client connections before starting the link to avoid self-connection.
+ broker.getConnectionObservers().add(excluder);
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
void Backup::initialize(const Url& url) {
- assert(!url.empty());
- QPID_LOG(notice, "Ha: Backup started: " << url);
+ if (url.empty()) throw Url::Invalid("HA broker URL is empty");
+ QPID_LOG(notice, "HA: Backup initialized: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
- assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
link->setUrl(url);
-
- replicator.reset(new BrokerReplicator(link));
+ replicator.reset(new BrokerReplicator(haBroker, link));
broker.getExchanges().registerExchange(replicator);
- broker.getConnectionObservers().add(excluder);
}
+Backup::~Backup() {
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ replicator.reset();
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+
void Backup::setBrokerUrl(const Url& url) {
// Ignore empty URLs seen during start-up for some tests.
if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
if (link) { // URL changed after we initialized.
- QPID_LOG(info, "HA: Backup failover URL set to " << url);
+ QPID_LOG(info, "HA: Backup broker URL set to " << url);
link->setUrl(url);
}
else {
@@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) {
}
}
-Backup::~Backup() {
- if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- broker.getConnectionObservers().remove(excluder); // This allows client connections.
-}
-
}} // namespace qpid::ha