summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-02-07 19:26:12 +0000
committerAlan Conway <aconway@apache.org>2013-02-07 19:26:12 +0000
commit88613594a4bfa64c5f244ce2d183d605449e2496 (patch)
tree4e475fbdd91e5eff2672dcc045709f3626fb2550 /qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
parent2f8518477686ce2382110d159349fe732db27a1a (diff)
downloadqpid-python-88613594a4bfa64c5f244ce2d183d605449e2496.tar.gz
QPID-4555: HA Primary sets explicit qpid.replicate in Queue and Exchange arguments.
Previously both Primary and Backup would calculate the qpid.replicate value independently, assuming the result would be the same. In the case of exclusive queues, the exclusivity can change over time so its possible that primary and backup won't agree. Now only Primary does the calculation with exclusive, auto-delete etc. and puts an explicity qpid.replicate in the queue or event arguments. Backup uses the value set by primary. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1443678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp59
1 files changed, 27 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0f4c5b2be8..410ebc3114 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -242,12 +242,14 @@ class BrokerReplicator::UpdateTracker {
/** Add an exchange name */
void addExchange(Exchange::shared_ptr ex) {
- if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName());
+ if (repTest.getLevel(*ex))
+ initial.insert(ex->getName());
}
/** Add a queue name. */
void addQueue(Queue::shared_ptr q) {
- if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName());
+ if (repTest.getLevel(*q))
+ initial.insert(q->getName());
}
/** Received an event for name */
@@ -279,7 +281,7 @@ class BrokerReplicator::UpdateTracker {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+ logPrefix("Backup: "), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
@@ -472,9 +474,7 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
- bool autoDel = values[AUTODEL].asBool();
- bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
@@ -488,7 +488,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
<< name);
deleteQueue(name);
}
- replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+ replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
values[ALTEX].asString());
}
}
@@ -506,7 +506,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = queues.find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+ if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
if (queueTracker.get()) queueTracker->event(name);
deleteQueue(name);
@@ -515,8 +515,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
@@ -542,7 +541,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+ } else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -559,11 +558,12 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
queues.find(values[QNAME].asString());
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
- // We only replicate binds for a replicated queue to replicated
- // exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // We only replicate binds for a replicated queue to replicated exchange
+ // that both exist locally. Respect the replication level set in the
+ // bind arguments, but replicate by default.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
@@ -581,8 +581,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
@@ -630,12 +630,7 @@ Variant getHaUuid(const Variant::Map& map) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
- return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
@@ -664,7 +659,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.replicateLevel(argsMap)) return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
if (!exchangeTracker.get())
throw Exception(QPID_MSG("Unexpected exchange response: " << values));
@@ -718,10 +713,11 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- // Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // Automatically replicate binding if queue and exchange exist and are replicated.
+ // Respect replicate setting in binding args but default to replicated.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
@@ -741,8 +737,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = replicationTest.replicateLevel(
- values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
@@ -759,7 +754,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+ if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!exchanges.registerExchange(qr))