summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/WiringReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp23
1 files changed, 11 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 58cacb16f4..b86b7cec4a 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
- framing::FieldTable args;
+ framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
@@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
}
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "HA: Backup deleting queue: " << name);
- broker.deleteQueue(
- name,
- values[USER].asString(),
- values[RHOST].asString());
- // Delete the QueueReplicator exchange for this queue.
- boost::shared_ptr<broker::Exchange> ex =
- broker.getExchanges().find(QueueReplicator::replicatorName(name));
- boost::shared_ptr<QueueReplicator> qr =
- boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
if (qr) qr->deactivate();
- broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
}
}
@@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
- qr->activate();
broker.getExchanges().registerExchange(qr);
+ qr->activate();
}
}