summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/Backup.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/Backup.cpp')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp106
1 files changed, 70 insertions, 36 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 6852a58b0c..2affc12bf6 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -20,9 +20,12 @@
*/
#include "Backup.h"
#include "BrokerReplicator.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "Primary.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StatusCheck.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -44,28 +47,38 @@ using namespace framing;
using namespace broker;
using types::Variant;
using std::string;
+using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+ haBroker(hb), broker(hb.getBroker()), settings(s),
+ statusCheck(
+ new StatusCheck(
+ logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
{
- // Empty brokerUrl means delay initialization until seBrokertUrl() is called.
- if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+ // Set link properties to tag outgoing links.
+ framing::FieldTable linkProperties = broker.getLinkClientProperties();
+ linkProperties.setTable(
+ ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
+ broker.setLinkClientProperties(linkProperties);
}
-void Backup::initialize(const Url& brokers) {
- if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
- types::Uuid uuid(true);
- // Declare the link
- std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
- broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- brokers[0].host, brokers[0].port, protocol,
- false, // durable
- settings.mechanism, settings.username, settings.password,
- false); // no amq.failover - don't want to use client URL.
- {
- sys::Mutex::ScopedLock l(lock);
+void Backup::setBrokerUrl(const Url& brokers) {
+ if (brokers.empty()) return;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return;
+ if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
+ if (!link) { // Not yet initialized
+ QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
+ types::Uuid uuid(true);
+ std::pair<Link::shared_ptr, bool> result;
+ result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+ brokers[0].host, brokers[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password,
+ false); // no amq.failover - don't want to use client URL.
link = result.first;
replicator.reset(new BrokerReplicator(haBroker, link));
replicator->initialize();
@@ -74,36 +87,57 @@ void Backup::initialize(const Url& brokers) {
link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
-Backup::~Backup() {
+void Backup::stop(Mutex::ScopedLock&) {
+ if (stopped) return;
+ stopped = true;
+ QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ if (replicator.get()) {
+ replicator->shutdown();
+ replicator.reset();
+ }
}
-// Called via management.
-void Backup::setBrokerUrl(const Url& url) {
- // Ignore empty URLs seen during start-up for some tests.
- if (url.empty()) return;
- bool linkSet = false;
+Role* Backup::recover(Mutex::ScopedLock&) {
+ BrokerInfo::Set backups;
{
- sys::Mutex::ScopedLock l(lock);
- linkSet = link;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ stop(l); // Stop backup activity before starting primary.
+ QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo());
+ // Reset membership before allowing backups to connect.
+ backups = membership.otherBackups();
+ membership.clear();
+ return new Primary(haBroker, backups);
}
- if (linkSet)
- link->setUrl(url); // Outside lock, once set link doesn't change
- else
- initialize(url); // Deferred initialization
}
-void Backup::setStatus(BrokerStatus status) {
- switch (status) {
- case READY:
- QPID_LOG(notice, logPrefix << "Ready to become primary.");
+Role* Backup::promote() {
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ switch (haBroker.getStatus()) {
+ case JOINING:
+ if (statusCheck->canPromote()) return recover(l);
+ else {
+ QPID_LOG(error,
+ logPrefix << "Joining active cluster, cannot be promoted.");
+ throw Exception("Joining active cluster, cannot be promoted.");
+ }
break;
case CATCHUP:
- QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ throw Exception("Still catching up, cannot be promoted.");
+ break;
+ case READY: return recover(l); break;
default:
- assert(0);
+ assert(0); // Not a valid state for the Backup role..
}
+ return 0; // Keep compiler happy
+}
+
+Backup::~Backup() {
+ Mutex::ScopedLock l(lock);
+ stop(l);
}
}} // namespace qpid::ha