diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/WiringReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 58cacb16f4..b86b7cec4a 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = values[ARGS].asMap(); if (values[DISP] == CREATED && replicateLevel(argsMap)) { - framing::FieldTable args; + framing::FieldTable args; amqp_0_10::translate(argsMap, args); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( @@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { } void WiringReplicator::doEventQueueDelete(Variant::Map& values) { + // The remote queue has already been deleted so replicator + // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { QPID_LOG(debug, "HA: Backup deleting queue: " << name); - broker.deleteQueue( - name, - values[USER].asString(), - values[RHOST].asString()); - // Delete the QueueReplicator exchange for this queue. - boost::shared_ptr<broker::Exchange> ex = - broker.getExchanges().find(QueueReplicator::replicatorName(name)); - boost::shared_ptr<QueueReplicator> qr = - boost::dynamic_pointer_cast<QueueReplicator>(ex); + string rname = QueueReplicator::replicatorName(name); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); if (qr) qr->deactivate(); - broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed, deleting the exhange + broker.getExchanges().destroy(rname); + broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); } } @@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); - qr->activate(); broker.getExchanges().registerExchange(qr); + qr->activate(); } } |