summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:19:48 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:19:48 +0000
commitbf69fd2f69325dd660454e6b6c8399c51cacea2c (patch)
treecd17f5a4c546a4a26c8456a5a143fe7de776a7cd
parent379b80858ea24a3909657aa2b35bf61ff1322fee (diff)
downloadqpid-python-bf69fd2f69325dd660454e6b6c8399c51cacea2c.tar.gz
QPID-3603: Separate QueueGuard from ReplicatingSubscription.
QueueGuard: implements QueueObserver to delay completion of new messages. ReplicatingSubscription: Implements subscription, sends messages & events to backup. These were previously combined as one. QueueGuard is now separated out so that it can be created before the ReplicatingSubscription, in anticipation of an expected backup connecting. This is needed for 2 reasons: - new queues must be guarded until they are backuped up. - after a failover, all queues must be guarded until backups are ready. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349538 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt23
-rw-r--r--qpid/cpp/src/ha.mk23
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h4
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h (renamed from qpid/cpp/src/qpid/ha/LogPrefix.cpp)40
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp47
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp75
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h72
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp104
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h37
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp70
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h19
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp55
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h24
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h76
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp99
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h101
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp119
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h56
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp70
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h (renamed from qpid/cpp/src/qpid/ha/LogPrefix.h)47
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h5
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp90
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.h88
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml7
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp (renamed from qpid/cpp/src/qpid/ha/Enum.cpp)27
-rw-r--r--qpid/cpp/src/qpid/ha/types.h (renamed from qpid/cpp/src/qpid/ha/Enum.h)12
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py102
37 files changed, 1100 insertions, 458 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index aee5fb6213..b83e8d9c6d 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -626,31 +626,38 @@ set (ha_default ON)
option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
+ qpid/ha/BackupConnectionExcluder.h
+ qpid/ha/BrokerInfo.cpp
+ qpid/ha/BrokerInfo.h
+ qpid/ha/HeldQueue.h
+ qpid/ha/QueueGuard.cpp
+ qpid/ha/QueueGuard.h
+ qpid/ha/ReplicationTest.cpp
+ qpid/ha/ReplicationTest.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
- qpid/ha/BrokerInfo.h
- qpid/ha/BrokerInfo.cpp
qpid/ha/BrokerReplicator.cpp
qpid/ha/BrokerReplicator.h
- qpid/ha/ConnectionExcluder.cpp
- qpid/ha/ConnectionExcluder.h
+ qpid/ha/ConnectionObserver.cpp
+ qpid/ha/ConnectionObserver.h
qpid/ha/Counter.h
- qpid/ha/Enum.cpp
- qpid/ha/Enum.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/LogPrefix.cpp
- qpid/ha/LogPrefix.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
qpid/ha/Primary.h
+ qpid/ha/PrimaryConnectionMonitor.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
qpid/ha/ReplicatingSubscription.cpp
qpid/ha/ReplicatingSubscription.h
qpid/ha/Settings.h
+ qpid/ha/Types.cpp
+ qpid/ha/Types.h
+ qpid/ha/UnreadyQueueSet.cpp
+ qpid/ha/UnreadyQueueSet.h
)
add_library (ha MODULE ${ha_SOURCES})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index cab7d8c42b..dae924b6ea 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -25,29 +25,36 @@ dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
- qpid/ha/BrokerInfo.h \
+ qpid/ha/BackupConnectionExcluder.h \
qpid/ha/BrokerInfo.cpp \
+ qpid/ha/BrokerInfo.h \
qpid/ha/BrokerReplicator.cpp \
qpid/ha/BrokerReplicator.h \
- qpid/ha/ConnectionExcluder.cpp \
- qpid/ha/ConnectionExcluder.h \
+ qpid/ha/ConnectionObserver.cpp \
+ qpid/ha/ConnectionObserver.h \
qpid/ha/Counter.h \
- qpid/ha/Enum.cpp \
- qpid/ha/Enum.h \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
- qpid/ha/LogPrefix.cpp \
- qpid/ha/LogPrefix.h \
+ qpid/ha/HeldQueue.h \
qpid/ha/Membership.cpp \
qpid/ha/Membership.h \
qpid/ha/Primary.cpp \
qpid/ha/Primary.h \
+ qpid/ha/PrimaryConnectionMonitor. \
+ qpid/ha/QueueGuard.cpp \
+ qpid/ha/QueueGuard.h \
qpid/ha/QueueReplicator.cpp \
qpid/ha/QueueReplicator.h \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/ReplicatingSubscription.h \
- qpid/ha/Settings.h
+ qpid/ha/ReplicationTest.cpp \
+ qpid/ha/ReplicationTest.h \
+ qpid/ha/Settings.h \
+ qpid/ha/UnreadyQueueSet.cpp \
+ qpid/ha/UnreadyQueueSet.h \
+ qpid/ha/types.cpp \
+ qpid/ha/types.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 44fb098e79..f2cc2a3454 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -45,7 +45,7 @@ using types::Variant;
using std::string;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix(hb), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
{
// Empty brokerUrl means delay initialization until seBrokertUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
@@ -64,14 +64,14 @@ Url Backup::linkUrl(const Url& brokers) const {
for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (!isSelf(*i)) url.push_back(*i);
if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty");
- QPID_LOG(debug, logPrefix << "Backup failover URL (excluding self): " << url);
+ QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url);
return url;
*/
}
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Backup broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
sys::Mutex::ScopedLock l(lock);
Url url = linkUrl(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 92387ece60..ca3c2a02d0 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,7 +22,6 @@
*
*/
-#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -57,7 +56,8 @@ class Backup
Url linkUrl(const Url&) const;
void initialize(const Url&);
- LogPrefix logPrefix;
+ std::string logPrefix;
+
sys::Mutex lock;
HaBroker& haBroker;
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index d80fe23458..9776fac016 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -1,3 +1,6 @@
+#ifndef QPID_HA_BACKUPCONNECTIONEXCLUDER_H
+#define QPID_HA_BACKUPCONNECTIONEXCLUDER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,33 +21,26 @@
* under the License.
*
*/
-#include "LogPrefix.h"
-#include "HaBroker.h"
-#include <iostream>
+
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/broker/Connection.h"
namespace qpid {
namespace ha {
-LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) {
- if (msg.size()) setMessage(msg);
-}
-
-LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg)
- : haBroker(lp.haBroker), status(0)
+/**
+ * Exclude connections to a backup broker.
+ */
+class BackupConnectionExcluder : public broker::ConnectionObserver
{
- if (msg.size()) setMessage(msg);
-}
+ public:
+ void opened(broker::Connection& connection) {
+ throw Exception("HA backup rejected connection "+connection.getMgmtId());
+ }
-LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
-
-void LogPrefix::setMessage(const std::string& msg) {
- tail = " "+msg+":";
-}
-
-std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
- return o << "HA("
- << printable(l.status ? *l.status : l.haBroker->getStatus())
- << ")" << l.tail << " ";
-}
+ void closed(broker::Connection&) {}
+};
}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUPCONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 2673646646..0c5de9542f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -42,6 +42,9 @@ using types::Uuid;
using types::Variant;
using framing::FieldTable;
+BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
+ logId(id.str().substr(0,9)+"..."), hostName(host), port(port_), systemId(id) {}
+
FieldTable BrokerInfo::asFieldTable() const {
Variant::Map m = asMap();
FieldTable ft;
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index b0864e0402..55479df4b9 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -22,7 +22,7 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/types/Uuid.h"
@@ -40,15 +40,15 @@ class BrokerInfo
{
public:
BrokerInfo() {}
- BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
- hostName(host), port(port_), systemId(id) {}
+ BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
BrokerInfo(const types::Variant::Map& m) { assign(m); }
types::Uuid getSystemId() const { return systemId; }
std::string getHostName() const { return hostName; }
BrokerStatus getStatus() const { return status; }
- uint16_t getPort() const { return port; }
+ uint16_t getPort() const { return port; }
+ std::string getLogId() const { return logId; }
void setStatus(BrokerStatus s) { status = s; }
@@ -59,6 +59,7 @@ class BrokerInfo
void assign(const types::Variant::Map&);
private:
+ std::string logId;
std::string hostName;
uint16_t port;
types::Uuid systemId;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0173495a00..81667e0437 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix(hb),
+ logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l)
{}
@@ -291,10 +291,10 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (!isReplicated(
+ if (!replicationTest.isReplicated(
values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool()))
return;
- if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
@@ -335,7 +335,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (!queue) {
QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name);
- } else if (!haBroker.replicateLevel(queue->getSettings())) {
+ } else if (!replicationTest.replicateLevel(queue->getSettings())) {
QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name);
} else {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
@@ -353,8 +353,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
+ if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
+ if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -383,7 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
- } else if (!haBroker.replicateLevel(exchange->getArgs())) {
+ } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -401,8 +401,8 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
@@ -421,8 +421,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
@@ -441,7 +441,7 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!isReplicated(values[ARGUMENTS].asMap(),
+ if (!replicationTest.isReplicated(values[ARGUMENTS].asMap(),
values[AUTODELETE].asBool(),
values[EXCLUSIVE].asBool()))
return;
@@ -465,7 +465,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!haBroker.replicateLevel(argsMap)) return;
+ if (!replicationTest.replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
if (broker.createExchange(
@@ -512,8 +512,8 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
// Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
@@ -533,7 +533,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = haBroker.replicateLevel(
+ ReplicateLevel primary = replicationTest.replicateLevel(
values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
@@ -545,22 +545,11 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
-}
-
-bool BrokerReplicator::isReplicated(
- const Variant::Map& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return haBroker.replicateLevel(args) && !ignore;
-}
-
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
- if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
+ if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker, queue, link));
+ new QueueReplicator(haBroker.getBrokerInfo(), queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index f7466d6406..677de31370 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -23,8 +23,8 @@
*/
#include "Counter.h"
-#include "Enum.h"
-#include "LogPrefix.h"
+#include "types.h"
+#include "ReplicationTest.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -97,7 +97,8 @@ class BrokerReplicator : public broker::Exchange,
bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
void ready();
- LogPrefix logPrefix;
+ std::string logPrefix;
+ ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index 9294f38ef3..61835b15d1 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -31,7 +31,7 @@ namespace qpid {
namespace ha {
ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix(hb), self(uuid) {}
+ : haBroker(hb), logPrefix("HA: "), self(uuid) {}
namespace {
bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
@@ -53,7 +53,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
}
BrokerStatus status = haBroker.getStatus();
if (isBackup(status)) reject(connection);
- BrokerInfo info; // Get info about a connecting backup.
+ BrokerInfo info; // Avoid self connections.
if (getBrokerInfo(connection, info)) {
if (info.getSystemId() == self) {
QPID_LOG(debug, logPrefix << "Rejected self connection");
@@ -65,8 +65,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
return;
}
}
- else // This is a client connection.
- if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow clients in recovery
+ // else: Primary node accepts connections.
}
void ConnectionExcluder::reject(broker::Connection& connection) {
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index 629fda7519..c24a138e2c 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -22,9 +22,9 @@
*
*/
-#include "LogPrefix.h"
+#include "types.h"
#include "qpid/broker/ConnectionObserver.h"
-#include "qpid/framing/Uuid.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
@@ -35,6 +35,7 @@ class Connection;
}
namespace ha {
+class HaBroker;
/**
* Exclude normal connections to a backup broker.
@@ -58,7 +59,7 @@ class ConnectionExcluder : public broker::ConnectionObserver
void reject(broker::Connection&);
HaBroker& haBroker;
- LogPrefix logPrefix;
+ std::string logPrefix;
types::Uuid self;
};
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
new file mode 100644
index 0000000000..694a253fc3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionObserver.h"
+#include "BrokerInfo.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+ConnectionObserver::ConnectionObserver(const types::Uuid& uuid)
+ : logPrefix("HA: "), self(uuid) {}
+
+bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+ framing::FieldTable ft;
+ if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
+ info = BrokerInfo(ft);
+ return true;
+ }
+ return false;
+}
+
+void ConnectionObserver::setObserver(const ObserverPtr& o){
+ sys::Mutex::ScopedLock l(lock);
+ observer = o;
+}
+
+ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
+ sys::Mutex::ScopedLock l(lock);
+ return observer;
+}
+
+void ConnectionObserver::opened(broker::Connection& connection) {
+ if (connection.isLink()) return; // Allow outgoing links.
+ if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ << connection.getMgmtId());
+ return; // No need to call observer, always allow admins.
+ }
+ BrokerInfo info; // Avoid self connections.
+ if (getBrokerInfo(connection, info) && info.getSystemId() == self)
+ throw Exception("HA rejected self connection");
+ ObserverPtr o(getObserver());
+ if (o) o->opened(connection);
+}
+
+void ConnectionObserver::closed(broker::Connection& connection) {
+ ObserverPtr o(getObserver());
+ if (o) o->closed(connection);
+}
+
+const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
+const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup";
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
new file mode 100644
index 0000000000..a950f41739
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -0,0 +1,72 @@
+#ifndef QPID_HA_CONNECTIONOBSERVER_H
+#define QPID_HA_CONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid {
+namespace ha {
+class BrokerInfo;
+
+/**
+ * Observes connections, delegates to another ConnectionObserver for
+ * actions specific to primary or backup.
+ *
+ * THREAD SAFE: called in arbitrary connection threads.
+ *
+ * Main role of this class is to provide a continuous observer object
+ * on the connection so we can't lose observations between removing
+ * one observer and adding another.
+ */
+class ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ typedef boost::shared_ptr<broker::ConnectionObserver> ObserverPtr;
+
+ static const std::string ADMIN_TAG;
+ static const std::string BACKUP_TAG;
+
+ static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+
+ ConnectionObserver(const types::Uuid& self);
+
+ void setObserver(const ObserverPtr&);
+ ObserverPtr getObserver();
+
+ void opened(broker::Connection& connection);
+ void closed(broker::Connection& connection);
+
+ private:
+ sys::Mutex lock;
+ std::string logPrefix;
+ ObserverPtr observer;
+ types::Uuid self;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_CONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 6095837cd6..046800791d 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -19,11 +19,13 @@
*
*/
#include "Backup.h"
-#include "ConnectionExcluder.h"
+#include "BackupConnectionExcluder.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
#include "Primary.h"
-#include "Settings.h"
+#include "PrimaryConnectionMonitor.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -39,9 +41,9 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
@@ -49,20 +51,22 @@ namespace ha {
namespace _qmf = ::qmf::org::apache::qpid::ha;
using namespace management;
using namespace std;
+using types::Variant;
+using types::Uuid;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix(status),
+ : logPrefix("HA: "),
broker(b),
systemId(broker.getSystem()->getSystemId().data()),
settings(s),
mgmtObject(0),
status(STANDALONE),
- excluder(new ConnectionExcluder(*this, systemId)),
+ observer(new ConnectionObserver(systemId)),
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
- membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1))
-
+ membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1, _2)),
+ replicationTest(s.replicateDefault.get())
{
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -77,13 +81,15 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
- new ReplicatingSubscription::Factory(*this)));
+ new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
status = JOINING;
+ observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
+ new BackupConnectionExcluder));
+ broker.getConnectionObservers().add(observer);
backup.reset(new Backup(*this, s));
- broker.getConnectionObservers().add(excluder);
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
}
@@ -99,57 +105,35 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
HaBroker::~HaBroker() {
QPID_LOG(debug, logPrefix << "Broker shut down: " << brokerInfo);
- broker.getConnectionObservers().remove(excluder);
+ broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover(sys::Mutex::ScopedLock&) {
+void HaBroker::recover(sys::Mutex::ScopedLock& l) {
setStatus(RECOVERING);
backup.reset(); // No longer replicating, close link.
+ IdSet backups = membership.otherBackups();
membership.reset(brokerInfo);
- primary.reset(new Primary(*this)); // Starts primary-ready check.
+ primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
+ observer->setObserver( // Allow connections
+ boost::shared_ptr<broker::ConnectionObserver>(
+ new PrimaryConnectionMonitor(*this)));
+ if (primary->isActive()) activate(l);
}
-// Called back from Primary ready check.
+// Called back from Primary active check.
void HaBroker::activate() {
sys::Mutex::ScopedLock l(lock);
activate(l);
}
-void HaBroker::activate(sys::Mutex::ScopedLock&) {
- BrokerStatus oldStatus = status;
- setStatus(ACTIVE);
- if (oldStatus != RECOVERING) // Already set membership
- membership.reset(brokerInfo);
- backup.reset(); // No longer replicating, close link.
-}
-
-ReplicateLevel HaBroker::replicateLevel(const std::string& str) {
- Enum<ReplicateLevel> rl;
- if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
- else return getSettings().replicateDefault.get();
-}
-
-ReplicateLevel HaBroker::replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
- else
- return getSettings().replicateDefault.get();
-}
-
-ReplicateLevel HaBroker::replicateLevel(const types::Variant::Map& m) {
- types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end())
- return replicateLevel(i->second.asString());
- else
- return getSettings().replicateDefault.get();
-}
+void HaBroker::activate(sys::Mutex::ScopedLock&) { setStatus(ACTIVE); }
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
sys::Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
switch (status) {
- case JOINING: activate(l); break;
+ case JOINING: recover(l); break;
case CATCHUP:
// FIXME aconway 2012-04-27: don't allow promotion in catch-up
// QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
@@ -171,20 +155,16 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url), l);
break;
}
- case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
- setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
- break;
- }
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- types::Uuid uuid(true);
+ Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
@@ -193,7 +173,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, queue, link));
+ boost::shared_ptr<QueueReplicator> qr(
+ new QueueReplicator(brokerInfo, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
@@ -205,13 +186,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setClientUrl(const Url& url, sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
clientUrl = url;
updateClientUrl(l);
}
-void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
if (url.empty()) throw Url::Invalid("HA client URL is empty");
mgmtObject->set_publicUrl(url.str());
@@ -220,7 +201,7 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
}
-void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setBrokerUrl(const Url& url, sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
@@ -229,11 +210,6 @@ void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
if (clientUrl.empty()) updateClientUrl(l);
}
-void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) {
- expectedBackups = n;
- mgmtObject->set_expectedBackups(n);
-}
-
std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
@@ -259,7 +235,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
static const BrokerStatus TRANSITIONS[][2] = {
{ CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior
{ JOINING, CATCHUP }, // Connected to primary
- { JOINING, ACTIVE }, // Chosen as initial primary.
+ { JOINING, RECOVERING }, // Chosen as initial primary.
{ CATCHUP, READY }, // Caught up all queues, ready to take over.
{ READY, RECOVERING }, // Chosen as new primary
{ RECOVERING, ACTIVE }
@@ -293,10 +269,13 @@ void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) {
setLinkProperties(l);
}
-void HaBroker::membershipUpdate(const types::Variant::List& brokers) {
+void HaBroker::membershipUpdate(const Variant::List& brokers, const IdSet& otherBackups)
+{
QPID_LOG(debug, logPrefix << "Membership update: " << brokers);
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
+ sys::Mutex::ScopedLock l(lock);
+ if (primary.get()) primary->getUnreadyQueueSet().setExpectedBackups(otherBackups);
}
void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
@@ -305,13 +284,13 @@ void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
// If this is a backup then any links we make are backup links
// and need to be tagged.
QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo);
- linkProperties.setTable(ConnectionExcluder::BACKUP_TAG, brokerInfo.asFieldTable());
+ linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
}
else {
// If this is a primary then any links are federation links
// and should not be tagged.
QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links");
- linkProperties.erase(ConnectionExcluder::BACKUP_TAG);
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
}
broker.setLinkClientProperties(linkProperties);
}
@@ -326,7 +305,8 @@ void HaBroker::deactivatedBackup(const std::string& queue) {
activeBackups.erase(queue);
}
-std::set<std::string> HaBroker::getActiveBackups() const {
+// FIXME aconway 2012-05-31: strip out.
+HaBroker::QueueNames HaBroker::getActiveBackups() const {
sys::Mutex::ScopedLock l(lock);
return activeBackups;
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 224a0923c5..80e8a6cc3d 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -24,8 +24,8 @@
#include "BrokerInfo.h"
#include "Membership.h"
-#include "Enum.h"
-#include "LogPrefix.h"
+#include "types.h"
+#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -37,8 +37,14 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
+
+namespace types {
+class Variant;
+}
+
namespace broker {
class Broker;
+class Queue;
}
namespace framing {
class FieldTable;
@@ -46,7 +52,7 @@ class FieldTable;
namespace ha {
class Backup;
-class ConnectionExcluder;
+class ConnectionObserver;
class Primary;
/**
@@ -76,11 +82,7 @@ class HaBroker : public management::Manageable
void activate();
Backup* getBackup() { return backup.get(); }
-
- // Translate replicate levels.
- ReplicateLevel replicateLevel(const std::string& str);
- ReplicateLevel replicateLevel(const framing::FieldTable& f);
- ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ ReplicationTest getReplicationTest() const { return replicationTest; }
// Keep track of the set of actively replicated queues on a backup
// so that it can be transferred to the Primary on promotion.
@@ -89,19 +91,18 @@ class HaBroker : public management::Manageable
void deactivatedBackup(const std::string& queue);
QueueNames getActiveBackups() const;
- boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
+ boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
Membership& getMembership() { return membership; }
- void membershipUpdate(const types::Variant::List&);
+ void membershipUpdate(const types::Variant::List&, const IdSet&);
private:
- void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
- void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
- void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&);
- void updateClientUrl(const sys::Mutex::ScopedLock&);
+ void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, sys::Mutex::ScopedLock&);
+ void updateClientUrl(sys::Mutex::ScopedLock&);
- bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
void recover(sys::Mutex::ScopedLock&);
@@ -111,7 +112,7 @@ class HaBroker : public management::Manageable
std::vector<Url> getKnownBrokers() const;
- LogPrefix logPrefix;
+ std::string logPrefix;
broker::Broker& broker;
types::Uuid systemId;
const Settings settings;
@@ -122,12 +123,12 @@ class HaBroker : public management::Manageable
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
- size_t expectedBackups;
BrokerStatus status;
QueueNames activeBackups;
- boost::shared_ptr<ConnectionExcluder> excluder;
+ boost::shared_ptr<ConnectionObserver> observer;
BrokerInfo brokerInfo;
Membership membership;
+ ReplicationTest replicationTest;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index b6aa0d4a91..42758c4689 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -40,9 +40,6 @@ struct Options : public qpid::Options {
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
"Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
- // FIXME aconway 2012-04-30: required-backups? Also need timeout.
- ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
- "Number of backups expected to be active in the HA cluster.")
("ha-username", optValue(settings.username, "USER"),
"Username for connections between HA brokers")
("ha-password", optValue(settings.password, "PASS"),
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 7d22a019d5..6c6961f094 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -25,31 +25,26 @@ namespace ha {
void Membership::reset(const BrokerInfo& b) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers.clear();
- brokers[b.getSystemId()] = b;
- }
- update();
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::add(const BrokerInfo& b) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers[b.getSystemId()] = b;
- }
- update();
+ sys::Mutex::ScopedLock l(lock);
+ brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
- {
- sys::Mutex::ScopedLock l(lock);
- BrokerMap::iterator i = brokers.find(id);
- if (i != brokers.end())
- brokers.erase(i);
+ sys::Mutex::ScopedLock l(lock);
+ BrokerMap::iterator i = brokers.find(id);
+ if (i != brokers.end()) {
+ brokers.erase(i);
+ update(l);
}
- update();
}
bool Membership::contains(const types::Uuid& id) {
@@ -58,32 +53,47 @@ bool Membership::contains(const types::Uuid& id) {
}
void Membership::assign(const types::Variant::List& list) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers.clear();
- for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- BrokerInfo b(i->asMap());
- brokers[b.getSystemId()] = b;
- }
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ BrokerInfo b(i->asMap());
+ brokers[b.getSystemId()] = b;
}
- update();
+ update(l);
}
types::Variant::List Membership::asList() const {
sys::Mutex::ScopedLock l(lock);
+ return asList(l);
+}
+
+types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
return list;
}
-void Membership::update() {
+void Membership::update(sys::Mutex::ScopedLock& l) {
if (updateCallback) {
- types::Variant::List list;
- for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- list.push_back(i->second.asMap());
- updateCallback(list);
+ types::Variant::List list = asList(l);
+ IdSet ids = otherBackups(l);
+ sys::Mutex::ScopedUnlock u(lock);
+ updateCallback(list, ids);
}
}
+IdSet Membership::otherBackups() const {
+ sys::Mutex::ScopedLock l(lock);
+ return otherBackups(l);
+}
+
+IdSet Membership::otherBackups(sys::Mutex::ScopedLock&) const {
+ IdSet result;
+ for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
+ result.insert(i->second.getSystemId());
+ return result;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 8af03e0f40..bdb55425dc 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -23,7 +23,7 @@
*/
#include "BrokerInfo.h"
-#include "LogPrefix.h"
+#include "types.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
@@ -40,25 +40,32 @@ namespace ha {
class Membership
{
public:
- Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)> updateFn)
- : logPrefix(lp), updateCallback(updateFn) {}
+ typedef boost::function<void (const types::Variant::List&,
+ const IdSet&) > UpdateCallback;
+
+ Membership(const types::Uuid& self_, UpdateCallback updateFn)
+ : self(self_), updateCallback(updateFn) {}
void reset(const BrokerInfo& b); ///< Reset to contain just one member.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+ /** Return IDs of all backups other than self */
+ IdSet otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
private:
typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
- void update();
+ IdSet otherBackups(sys::Mutex::ScopedLock&) const;
+ types::Variant::List asList(sys::Mutex::ScopedLock&) const;
+ void update(sys::Mutex::ScopedLock&);
mutable sys::Mutex lock;
- LogPrefix logPrefix;
+ types::Uuid self;
BrokerMap brokers;
- boost::function<void(const types::Variant::List&)> updateCallback;
+ UpdateCallback updateCallback;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index ae48e48c6f..63cba14484 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -22,9 +22,11 @@
#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "Primary.h"
+#include "ReplicatingSubscription.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -32,52 +34,29 @@ namespace ha {
Primary* Primary::instance = 0;
-Primary::Primary(HaBroker& b) :
- haBroker(b), logPrefix(b),
- expected(b.getSettings().expectedBackups),
- unready(0), activated(false)
+Primary::Primary(HaBroker& hb, const IdSet& backups) :
+ haBroker(hb), logPrefix("HA primary: "),
+ unready(0), activated(false),
+ queues(hb.getBroker(), hb.getReplicationTest(), backups)
{
+ assert(instance == 0);
instance = this; // Let queue replicators find us.
- if (expected == 0) // No need for ready check
- activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
+ if (backups.empty()) {
+ QPID_LOG(debug, logPrefix << "Not waiting for backups");
+ activated = true;
+ }
else {
- // Set up the primary-ready check: ready when all queues have
- // expected number of backups. Note clients are excluded at this point
- // so dont't have to worry about changes to the set of queues.
- HaBroker::QueueNames names = haBroker.getActiveBackups();
- for (HaBroker::QueueNames::const_iterator i = names.begin(); i != names.end(); ++i)
- {
- queues[*i] = 0;
- ++unready;
- QPID_LOG(debug, logPrefix << "Need backup of " << *i
- << ", " << unready << " unready queues");
- }
- if (queues.empty())
- activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
- else {
- QPID_LOG(debug, logPrefix << "Waiting for " << expected
- << " backups on " << queues.size() << " queues");
- }
+ QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups);
}
}
-void Primary::readyReplica(const std::string& q) {
+void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- if (!activated) {
- QueueCounts::iterator i = queues.find(q);
- if (i != queues.end()) {
- ++i->second;
- if (i->second == expected) --unready;
- QPID_LOG(debug, logPrefix << i->second << " backups caught up on " << q
- << ", " << unready << " unready queues");
- if (unready == 0) activate(l);
- }
+ if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) {
+ activated = true;
+ haBroker.activate();
+ QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe.");
}
}
-void Primary::activate(sys::Mutex::ScopedLock&) {
- activated = true;
- haBroker.activate();
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 7e347fdbe2..3a1a9be9e8 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -22,7 +22,8 @@
*
*/
-#include "LogPrefix.h"
+#include "UnreadyQueueSet.h"
+#include "types.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <map>
@@ -36,10 +37,11 @@ class Queue;
namespace ha {
class HaBroker;
+class ReplicatingSubscription;
/**
- * State associated with a primary broker. Tracks replicating
- * subscriptions to determine when primary is ready.
+ * State associated with a primary broker. Tracks replicating
+ * subscriptions to determine when primary is active.
*
* THREAD SAFE: readyReplica is called in arbitray threads.
*/
@@ -47,22 +49,22 @@ class Primary
{
public:
static Primary* get() { return instance; }
- Primary(HaBroker& b);
- void readyReplica(const std::string& q);
- void removeReplica(const std::string& q);
+ Primary(HaBroker& hb, const IdSet& expectedBackups);
- private:
- typedef std::map<std::string, size_t> QueueCounts;
+ void readyReplica(const ReplicatingSubscription&);
+ void removeReplica(const std::string& q);
- void activate(sys::Mutex::ScopedLock&);
+ UnreadyQueueSet& getUnreadyQueueSet() { return queues; }
+ bool isActive() { return activated; }
+ private:
sys::Mutex lock;
HaBroker& haBroker;
- LogPrefix logPrefix;
- QueueCounts queues;
+ std::string logPrefix;
size_t expected, unready;
bool activated;
+ UnreadyQueueSet queues;
static Primary* instance;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
new file mode 100644
index 0000000000..1aa61b2dea
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
@@ -0,0 +1,76 @@
+#ifndef QPID_HA_PRIMARYCONNECTIONOBSERVER_H
+#define QPID_HA_PRIMARYCONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "ConnectionObserver.h"
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace ha {
+class HaBroker;
+
+/**
+ * Monitor connections on a primary broker. Update membership and
+ * primary readiness.
+ *
+ * THREAD SAFE: has no state, just mediates between other thread-safe objects.
+ */
+class PrimaryConnectionMonitor : public broker::ConnectionObserver
+{
+ public:
+ PrimaryConnectionMonitor(HaBroker& hb) : haBroker(hb) {}
+
+ void opened(broker::Connection& connection) {
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ QPID_LOG(debug, "HA primary: Backup connected: " << info);
+ haBroker.getMembership().add(info);
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
+ }
+ }
+
+ void closed(broker::Connection& connection) {
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
+ haBroker.getMembership().remove(info.getSystemId());
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
+ }
+ }
+ private:
+ void reject(broker::Connection&);
+ HaBroker& haBroker;
+ };
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_PRIMARYCONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
new file mode 100644
index 0000000000..55dc6b0d50
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueGuard.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+using namespace broker;
+using sys::Mutex;
+
+QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
+ : queue(q), subscription(0)
+{
+ // Set a log prefix message that identifies the remote broker.
+ std::ostringstream os;
+ os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
+}
+
+void QueueGuard::initialize() {
+ Mutex::ScopedLock l(lock);
+ queue.addObserver(shared_from_this());
+}
+
+void QueueGuard::enqueued(const QueuedMessage& qm) {
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
+ qm.payload->getIngressCompletion().startCompleter();
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(!delayed.contains(qm.position));
+ delayed += qm.position;
+ }
+}
+
+// FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription
+
+void QueueGuard::dequeued(const QueuedMessage& qm) {
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ ReplicatingSubscription* rs = 0;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ rs = subscription;
+ }
+ if (rs) rs->dequeued(qm);
+}
+
+void QueueGuard::cancel() {
+ queue.removeObserver(shared_from_this());
+ queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+}
+
+void QueueGuard::attach(ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ subscription = &rs;
+}
+
+void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << qm);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so we only call
+ // finishCompleter() once
+ if (delayed.contains(qm.position)) {
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed -= qm.position;
+ }
+}
+
+void QueueGuard::complete(const QueuedMessage& qm) {
+ Mutex::ScopedLock l(lock);
+ complete(qm, l);
+}
+
+// FIXME aconway 2012-06-04: TODO support for timeout.
+
+}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
new file mode 100644
index 0000000000..739c1e0e13
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -0,0 +1,101 @@
+#ifndef QPID_HA_QUEUEGUARD_H
+#define QPID_HA_QUEUEGUARD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace broker {
+class Queue;
+class QueuedMessage;
+class Message;
+}
+
+namespace ha {
+class BrokerInfo;
+class ReplicatingSubscription;
+
+/**
+ * A queue guard is a QueueObserver that delays completion of new
+ * messages arriving on a queue. It works as part of a
+ * ReplicatingSubscription to ensure messages are not acknowledged
+ * till they have been replicated.
+ *
+ * The guard is created before the ReplicatingSubscription to protect
+ * messages arriving before the creation of the subscription has not
+ * yet seen.
+ *
+ * THREAD SAFE: Called concurrently via QueueObserver::enqueued in
+ * arbitrary connection threads, and from ReplicatingSubscription
+ * in the subscriptions thread.
+ */
+class QueueGuard : public broker::QueueObserver,
+ public boost::enable_shared_from_this<QueueGuard>
+{
+ public:
+ QueueGuard(broker::Queue& q, const BrokerInfo&);
+
+ /** Must be called after ctor, requires a shared_ptr to this to exist.
+ * Must be called before ReplicatingSubscription::initialize(this)
+ */
+ void initialize();
+
+ /** QueueObserver override. Delay completion of the message. */
+ void enqueued(const broker::QueuedMessage&);
+
+ /** QueueObserver override: Complete a delayed message */
+ void dequeued(const broker::QueuedMessage&);
+
+ /** Complete a delayed message. */
+ void complete(const broker::QueuedMessage&);
+
+ /** Complete all delayed messages. */
+ void cancel();
+
+ void attach(ReplicatingSubscription&);
+
+ // Unused QueueObserver functions.
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ private:
+ sys::Mutex lock;
+ std::string logPrefix;
+ broker::Queue& queue;
+ framing::SequenceSet delayed;
+ ReplicatingSubscription* subscription;
+
+ void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&);
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEGUARD_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 58c5e452d7..4d12015008 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,7 +21,6 @@
#include "Counter.h"
#include "QueueReplicator.h"
-#include "HaBroker.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
@@ -60,15 +59,15 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(HaBroker& hb,
+QueueReplicator::QueueReplicator(const BrokerInfo& info,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- haBroker(hb), logPrefix(hb), queue(q), link(l)
+ logPrefix("HA backup of "+q->getName()+": "),
+ queue(q), link(l), brokerInfo(info)
{
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- logPrefix.setMessage(q->getName());
QPID_LOG(info, logPrefix << "Created");
}
@@ -119,7 +118,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
queue->getPosition());
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
- haBroker.getBrokerInfo().asFieldTable());
+ brokerInfo.asFieldTable());
SequenceNumber front;
if (ReplicatingSubscription::getFront(*queue, front))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
@@ -143,7 +142,7 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
QueuedMessage message;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 1b221a8d28..2f55e6cc85 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -22,7 +22,7 @@
*
*/
-#include "LogPrefix.h"
+#include "BrokerInfo.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/SequenceSet.h"
#include <boost/enable_shared_from_this.hpp>
@@ -42,7 +42,6 @@ class Deliverable;
namespace ha {
class Counter;
-class HaBroker;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -63,7 +62,7 @@ class QueueReplicator : public broker::Exchange,
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- QueueReplicator(HaBroker&,
+ QueueReplicator(const BrokerInfo&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
@@ -81,15 +80,15 @@ class QueueReplicator : public broker::Exchange,
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
- void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
- HaBroker& haBroker;
- LogPrefix logPrefix;
+ std::string logPrefix;
std::string bridgeName;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
+ BrokerInfo brokerInfo;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4490e309aa..f7bfe6fda0 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,8 +19,8 @@
*
*/
+#include "QueueGuard.h"
#include "ReplicatingSubscription.h"
-#include "HaBroker.h"
#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
@@ -129,14 +129,11 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
- haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- queue->addObserver(rs);
- // NOTE: initialize must be called _after_ addObserver, so
- // messages can't be enqueued after setting readyPosition
- // but before registering the observer.
- rs->initialize();
+ boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo()));
+ guard->initialize(); // Must call before ReplicatingSubscription::initialize
+ rs->initialize(guard);
}
return rs;
}
@@ -177,7 +174,6 @@ ostream& operator<<(ostream& o, const QueueRange& qr) {
}
ReplicatingSubscription::ReplicatingSubscription(
- HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -190,24 +186,19 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- haBroker(hb),
- logPrefix(hb),
dummy(new Queue(mask(name))),
ready(false)
{
try {
+ FieldTable ft;
+ if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw Exception("Replicating subscription does not have broker info");
+ info.assign(ft);
+
// Set a log prefix message that identifies the remote broker.
- // FIXME aconway 2012-05-24: use URL instead of host:port, include transport?
ostringstream os;
- os << queue->getName() << "@";
- FieldTable ft;
- if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) {
- BrokerInfo info(ft);
- os << info.getHostName() << ":" << info.getPort();
- }
- else
- os << parent->getSession().getConnection().getUrl();
- logPrefix.setMessage(os.str());
+ os << "HA subscription " << queue->getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
QueueRange primary(*queue);
QueueRange backup(arguments);
@@ -251,17 +242,20 @@ ReplicatingSubscription::~ReplicatingSubscription() {
}
// Called in subscription's connection thread when the subscription is created.
-void ReplicatingSubscription::initialize() {
- sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently
+void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) {
+ sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
+ // Attach to the guard.
+ guard = g;
+ guard->attach(*this);
// Send initial dequeues and position to the backup.
- // There most be a shared_ptr(this) when sending.
+ // There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
// Set the ready position. All messages after this position have
- // been seen by us as QueueObserver.
+ // been seen by the guard.
QueueRange range;
{
// Drop the lock, QueueRange will lock the queues message lock
@@ -320,82 +314,28 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
+void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) {
if (ready) return;
ready = true;
// Notify Primary that a subscription is ready.
{
sys::Mutex::ScopedUnlock u(lock);
QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
- if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
- }
-}
-
-// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
-
-// Mark a message completed. May be called by acknowledge or dequeued,
-// in arbitrary connection threads.
-void ReplicatingSubscription::complete(
- const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
-{
- // Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- Delayed::iterator i= delayed.find(qm.position);
- // The same message can be completed twice, by acknowledged and
- // dequeued, remove it from the set so it only gets completed
- // once.
- if (i != delayed.end()) {
- assert(i->second.payload == qm.payload);
- qm.payload->getIngressCompletion().finishCompleter();
- delayed.erase(i);
- }
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue.
-// Called in arbitrary connection thread *with the queue lock held*
-void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
- // Delay completion
- QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
- qm.payload->getIngressCompletion().startCompleter();
- {
- sys::Mutex::ScopedLock l(lock);
- assert(delayed.find(qm.position) == delayed.end());
- delayed[qm.position] = qm;
- }
-}
-
-// Function to complete a delayed message, called by cancel()
-void ReplicatingSubscription::cancelComplete(
- const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
-{
- QPID_LOG(trace, logPrefix << "Cancel completed " << v.second);
- v.second.payload->getIngressCompletion().finishCompleter();
-}
-
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- getQueue()->removeObserver(
- boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
- {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Cancel backup subscription to "
- << getQueue()->getName());
- for_each(delayed.begin(), delayed.end(),
- boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
- delayed.clear();
- }
+ guard->cancel();
ConsumerImpl::cancel();
}
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- sys::Mutex::ScopedLock l(lock);
// Finish completion of message, it has been acknowledged by the backup.
- complete(msg, l);
+ guard->complete(msg);
}
// Hide the "queue deleted" error for a ReplicatingSubscription when a
@@ -403,7 +343,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -418,24 +358,27 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
}
}
-// QueueObserver override. Called after the message has been removed
+// Called via QueueObserver::dequeued override on guard.
+// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ bool doComplete = false;
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) complete(qm, l);
+ if (qm.position > position) doComplete = true;
}
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index ab02949952..e69a2159e6 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -23,6 +23,7 @@
*/
#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
+#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
@@ -43,27 +44,25 @@ class Buffer;
}
namespace ha {
-class LogPrefix;
+class QueueGuard;
/**
- * A susbcription that represents a backup replicating a queue.
+ * A susbcription that replicates to a remote backup.
*
- * Runs on the primary. Delays completion of messages till the backup
- * has acknowledged, informs backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays
+ * completion of messages till the backup has acknowledged, informs
+ * backup of locally dequeued messages.
*
- * THREAD SAFE: Used as a consumer in subscription's connection
- * thread, and as a QueueObserver in arbitrary connection threads.
+ * THREAD SAFE: Called in subscription's connection thread but also
+ * in arbitrary connection threads via dequeued.
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
*/
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
- public broker::QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
struct Factory : public broker::ConsumerFactory {
- HaBroker& haBroker;
- Factory(HaBroker& hb) : haBroker(hb) {}
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -88,9 +87,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
*/
static bool getNext(broker::Queue&, framing::SequenceNumber from,
framing::SequenceNumber& result);
+ static bool isEmpty(broker::Queue&);
- ReplicatingSubscription(HaBroker&,
- broker::SemanticState* parent,
+ ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
@@ -98,11 +97,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
- // QueueObserver overrides. NB called with queue lock held.
- void enqueued(const broker::QueuedMessage&);
- void dequeued(const broker::QueuedMessage&);
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ // Called via QueueGuard::dequeued
+ void dequeued(const broker::QueuedMessage& qm);
// Consumer overrides.
bool deliver(broker::QueuedMessage& msg);
@@ -111,30 +107,30 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool browseAcquired() const { return true; }
bool hideDeletedError();
+
/** Initialization that must be done after construction because it
- * requires a shared_ptr to this to exist.
+ * requires a shared_ptr to this to exist. Will attach to guard
*/
- void initialize();
+ void initialize(const boost::shared_ptr<QueueGuard>& guard);
+
+ BrokerInfo getBrokerInfo() const { return info; }
protected:
bool doDispatch();
- private:
- typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- HaBroker& haBroker;
- LogPrefix logPrefix;
+ private:
+ std::string logPrefix;
boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
- Delayed delayed;
framing::SequenceSet dequeues;
- framing::SequenceNumber backupPosition;
framing::SequenceNumber readyPosition;
+ framing::SequenceNumber backupPosition;
bool ready;
+ BrokerInfo info;
+ boost::shared_ptr<QueueGuard> guard;
- void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
- void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
- void sendDequeueEvent(const sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
- void setReady(const sys::Mutex::ScopedLock&);
+ void sendDequeueEvent(sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ void setReady(sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&);
friend struct Factory;
};
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
new file mode 100644
index 0000000000..1db101dc94
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicationTest.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace ha {
+
+using types::Variant;
+
+ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ Enum<ReplicateLevel> rl;
+ if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
+ else return replicateDefault;
+}
+
+ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return replicateDefault;
+}
+
+ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return replicateDefault;
+}
+
+namespace {
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+}
+
+bool ReplicationTest::isReplicated(const Variant::Map& args, bool autodelete, bool exclusive) {
+ bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
+ return replicateLevel(args) && !ignore;
+}
+
+bool ReplicationTest::isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive) {
+ bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
+ return replicateLevel(args) && !ignore;
+}
+
+bool ReplicationTest::isReplicated(const broker::Queue& q) {
+ return isReplicated(q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+}
+
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index da01154a80..4851f34e84 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_LOGPREFIX_H
-#define QPID_HA_LOGPREFIX_H
+#ifndef QPID_HA_REPLICATIONTEST_H
+#define QPID_HA_REPLICATIONTEST_H
/*
*
@@ -22,38 +22,41 @@
*
*/
-#include "Enum.h"
-#include <iosfwd>
+#include "types.h"
+#include "qpid/types/Variant.h"
#include <string>
namespace qpid {
-namespace ha {
-class HaBroker;
+namespace broker {
+class Queue;
+}
+
+namespace framing {
+class FieldTable;
+}
+namespace ha {
/**
- * Standard information to prefix log messages.
+ * Test whether something is replicated, taking into account the
+ * default replication level.
*/
-class LogPrefix
+class ReplicationTest
{
public:
- /** For use by all classes other than HaBroker */
- LogPrefix(HaBroker& hb, const std::string& queue=std::string());
- LogPrefix(LogPrefix& lp, const std::string& queue);
- /** For use by the HaBroker itself. */
- LogPrefix(BrokerStatus&);
+ ReplicationTest(ReplicateLevel replicateDefault_) :
+ replicateDefault(replicateDefault_) {}
- void setMessage(const std::string&);
+ ReplicateLevel replicateLevel(const std::string& str);
+ ReplicateLevel replicateLevel(const framing::FieldTable& f);
+ ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
+ bool isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive);
+ bool isReplicated(const broker::Queue&);
private:
- HaBroker* haBroker;
- BrokerStatus* status;
- std::string tail;
- friend std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
+ ReplicateLevel replicateDefault;
};
-
-std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
-
}} // namespace qpid::ha
-#endif /*!QPID_HA_LOGPREFIX_H*/
+#endif /*!QPID_HA_REPLICATIONTEST_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 08d42471b8..213a5f64d5 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -22,7 +22,7 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include <string>
namespace qpid {
@@ -34,13 +34,12 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0), replicateDefault(NONE)
+ Settings() : cluster(false), replicateDefault(NONE)
{}
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
- size_t expectedBackups;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
private:
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
new file mode 100644
index 0000000000..279eb2c0e1
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UnreadyQueueSet.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include <boost/bind.hpp>
+#include <iostream>
+#include <algorithm>
+
+namespace qpid {
+namespace ha {
+
+using sys::Mutex;
+
+UnreadyQueueSet::UnreadyQueueSet(broker::Broker& broker, ReplicationTest rt, const IdSet& ids) :
+ logPrefix("HA unsafe queues: "), replicationTest(rt), expected(ids),
+ initializing(true), initialQueues(0)
+{
+ if (!expected.empty()) {
+ QPID_LOG(debug, logPrefix << "Recovering, waiting for backups: " << expected);
+ broker.getQueues().eachQueue(boost::bind(&UnreadyQueueSet::queueCreate, this, _1));
+ initialQueues = queues.size();
+ }
+ initializing = false;
+}
+
+void UnreadyQueueSet::setExpectedBackups(const IdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ expected = ids;
+}
+
+bool UnreadyQueueSet::ready(const boost::shared_ptr<broker::Queue>& q, const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Replicating subscription ready: " << id << " on "
+ << q->getName());
+ QueueMap::iterator i = queues.find(q);
+ if (i != queues.end()) {
+ // if (i->second.guard->ready(id)) {
+ // QPID_LOG(debug, logPrefix << "Releasing guard on " << q->getName());
+ // remove(i, l);
+ if (i->second.initial) --initialQueues;
+ // }
+ }
+ return initialQueues == 0;
+}
+
+void UnreadyQueueSet::queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ Mutex::ScopedLock l(lock);
+ if (replicationTest.isReplicated(*q) && !expected.empty()) {
+ QPID_LOG(debug, logPrefix << "Guarding " << q->getName() << " for " << expected);
+ // GuardPtr guard(new QueueGuard(*q, expected));
+ // FIXME aconway 2012-06-05: q->addObserver(guard);
+ queues[q] = Entry(initializing);//, guard);
+ }
+}
+
+void UnreadyQueueSet::queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
+ Mutex::ScopedLock l(lock);
+ remove(queues.find(q), l);
+}
+
+void UnreadyQueueSet::remove(QueueMap::iterator i, sys::Mutex::ScopedLock&) {
+ if (i != queues.end()) {
+ QPID_LOG(debug, logPrefix << "Queue is safe: " << i->first->getName());
+ // FIXME aconway 2012-06-05: i->first->removeObserver(i->second.guard);
+ //i->second.guard->release();
+ queues.erase(i);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
new file mode 100644
index 0000000000..0731282c2b
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
@@ -0,0 +1,88 @@
+#ifndef QPID_HA_BROKERGUARD_H
+#define QPID_HA_BROKERGUARD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicationTest.h"
+#include "types.h"
+#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/sys/Mutex.h"
+#include <set>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace ha {
+class QueueGuard;
+
+/**
+ * ConfigurationObserver that sets a QueueGuard on all existing/new queues
+ * so they are safe until their ReplicatingSubscriptions catch up.
+ *
+ * THREAD SAFE: Called concurrently as a ConfigurationObserver and via ready()
+ */
+class UnreadyQueueSet : public qpid::broker::ConfigurationObserver
+{
+ public:
+ /** Caller should call broker.getConfigurationObservers().add(shared_ptr(this)) */
+ UnreadyQueueSet(broker::Broker&, ReplicationTest rt, const IdSet& expected);
+
+ void setExpectedBackups(const IdSet&);
+
+ /** Backup id is ready on queue.
+ *@return true if all initial queuse are now ready.
+ */
+ bool ready(const boost::shared_ptr<broker::Queue>&, const types::Uuid& id);
+
+ // ConfigurationObserver overrides
+ void queueCreate(const boost::shared_ptr<broker::Queue>&);
+ void queueDestroy(const boost::shared_ptr<broker::Queue>&);
+
+ // FIXME aconway 2012-05-31: handle IdSet changes.
+ private:
+ typedef boost::shared_ptr<QueueGuard> GuardPtr;
+ struct Entry {
+ bool initial;
+ // FIXME aconway 2012-06-05: GuardPtr guard;
+ // Entry(bool i=false, GuardPtr g=GuardPtr()) : initial(i), guard(g) {}
+ Entry(bool i=false) : initial(i) {}
+ };
+ typedef std::map<boost::shared_ptr<broker::Queue>, Entry> QueueMap;
+
+ void remove(QueueMap::iterator i, sys::Mutex::ScopedLock&);
+
+ sys::Mutex lock;
+ std::string logPrefix;
+ ReplicationTest replicationTest;
+ IdSet expected;
+ QueueMap queues;
+ bool initializing;
+ size_t initialQueues; // Count of initial queues still unready.
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKERGUARD_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index ce14032694..3da482e732 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -31,9 +31,6 @@
<property name="publicUrl" type="sstr"
desc="URL advertized to clients to connect to the cluster."/>
- <property name="expectedBackups" type="uint16"
- desc="Number of HA backup brokers expected."/>
-
<property name="replicateDefault" type="sstr"
desc="Replication for queues/exchanges with no qpid.replicate argument"/>
@@ -51,10 +48,6 @@
<arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setExpectedBackups" desc="Set number of backups expected">
- <arg name="expectedBackups" type="uint16" dir="I"/>
- </method>
-
<method name="replicate" desc="Replicate individual queue from remote broker.">
<arg name="broker" type="sstr" dir="I"/>
<arg name="queue" type="sstr" dir="I"/>
diff --git a/qpid/cpp/src/qpid/ha/Enum.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index a5ee7ea51f..92acd76fca 100644
--- a/qpid/cpp/src/qpid/ha/Enum.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -19,30 +19,33 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include "qpid/Msg.h"
#include "qpid/Exception.h"
#include <algorithm>
#include <iostream>
+#include <iterator>
#include <assert.h>
namespace qpid {
namespace ha {
-const std::string QPID_REPLICATE("qpid.replicate");
+using namespace std;
-std::string EnumBase::str() const {
+const string QPID_REPLICATE("qpid.replicate");
+
+string EnumBase::str() const {
assert(value < count);
return names[value];
}
-void EnumBase::parse(const std::string& s) {
+void EnumBase::parse(const string& s) {
if (!parseNoThrow(s))
throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s));
}
-bool EnumBase::parseNoThrow(const std::string& s) {
- const char** i = std::find(names, names+count, s);
+bool EnumBase::parseNoThrow(const string& s) {
+ const char** i = find(names, names+count, s);
value = i - names;
return value < count;
}
@@ -58,15 +61,21 @@ template <> const char* Enum<BrokerStatus>::NAMES[] = {
};
template <> const size_t Enum<BrokerStatus>::N = 7;
-std::ostream& operator<<(std::ostream& o, EnumBase e) {
+ostream& operator<<(ostream& o, EnumBase e) {
return o << e.str();
}
-std::istream& operator>>(std::istream& i, EnumBase& e) {
- std::string s;
+istream& operator>>(istream& i, EnumBase& e) {
+ string s;
i >> s;
e.parse(s);
return i;
}
+ostream& operator<<(ostream& o, const IdSet& ids) {
+ ostream_iterator<qpid::types::Uuid> out(o, " ");
+ copy(ids.begin(), ids.end(), out);
+ return o;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Enum.h b/qpid/cpp/src/qpid/ha/types.h
index adad21d2d4..e12c039c79 100644
--- a/qpid/cpp/src/qpid/ha/Enum.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -23,7 +23,9 @@
*/
#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
#include <string>
+#include <set>
#include <iosfwd>
namespace qpid {
@@ -93,5 +95,15 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
extern const std::string QPID_REPLICATE;
+
+// FIXME aconway 2012-06-04: rename types.h->types.h
+
+/**
+ * Define IdSet type, not a typedef so we can overload operator <<
+ */
+class IdSet : public std::set<types::Uuid> {};
+
+std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+
}} // qpid::ha
#endif /*!QPID_HA_ENUM_H*/
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 1b93504b64..c32b7f2c96 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -248,7 +248,7 @@ class Broker(Popen):
self.log = "%s.log" % self.name
i = 1
while (os.path.exists(self.log)):
- self.log = "%s-%d.log" % (self.name, i)
+ self.log = "%s.%d.log" % (self.name, i)
i += 1
def get_log(self):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 6e270851f0..86679611c4 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
@@ -75,7 +75,10 @@ class HaBroker(Broker):
if not self._agent: self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self): self.agent().getHaBroker().status
+ def ha_status(self): return self.agent().getHaBroker().status
+
+ def wait_status(self, status):
+ assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -97,6 +100,14 @@ class HaBroker(Broker):
try: wait_address(bs, address)
finally: bs.connection.close()
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
bs = self.connect_admin().session()
@@ -157,9 +168,11 @@ class HaCluster(object):
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
+ """Start a broker with the same name and data directory. It will get
+ a separate log file: foo.n.log"""
b = self._brokers[i]
self._brokers[i] = HaBroker(
- self.test, name=self.next_name(), port=b.port(), brokers_url=self.url,
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
**self.kwargs)
def bounce(self, i, promote_next=True):
@@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
@@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest):
assert valid_address(s, "ad")
assert valid_address(s, "time")
- def test_recovering(self):
- """Verify that the primary broker does not go active until expected
- backups have connected"""
- cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
- c = cluster[0].connect()
- for i in xrange(10):
- s = c.session().sender("q%s;{create:always}"%i)
- for j in xrange(100): s.send(str(j))
- cluster.kill(0) # Fail over to 1
- cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
- cluster.restart(0)
- c = retry(cluster[1].try_connect)
- self.assertTrue(c)
- cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
-
- # Verify in logs that all queue catch-up happened before the transition to active.
- log = open(cluster[1].log).read()
- i = log.find("Status change: recovering -> active")
- self.failIf(i < 0)
- self.assertEqual(log.find("caught up", i), -1)
-
def test_broker_info(self):
"""Check that broker information is correctly published via management"""
cluster = HaCluster(self, 3)
@@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest):
# Check that all brokers have the same membership as the cluster
for broker in cluster:
qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
# Add a new broker, check it is updated everywhere
b = cluster.start()
cluster_ports.append(b.port())
@@ -720,12 +713,10 @@ class LongTests(BrokerTest):
def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # Start a cluster, all members will be killed during the test.
- # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed.
- brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"])
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
+ sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False)
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
@@ -744,14 +735,14 @@ class LongTests(BrokerTest):
# otherwise we can lose messages. When we implement non-promotion
# of catchup brokers we can make this stronger: wait only for
# there to be at least one ready backup.
- assert retry(brokers[i%3].try_connect, 1)
+ brokers[i%3].wait_status("active")
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
receiver.check() # Verify no exceptions
return receiver.received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 10)
+ assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n)
except:
traceback.print_exc()
raise
@@ -764,6 +755,53 @@ class LongTests(BrokerTest):
brokers.kill(i, False)
if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ cluster = HaCluster(self, 4);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")