diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 507df6ea5a..59b2013f59 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); } +// Debug log expected exceptions on queue replicator, check incoming execution +// exceptions for "deleted on primary" conditions. class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) : queueReplicator(qr), logPrefix(qr->logPrefix) {} - void connectionException(framing::connection::CloseCode, const std::string&) {} - void channelException(framing::session::DetachCode, const std::string&) {} - void executionException(framing::execution::ErrorCode, const std::string&) {} - - void incomingExecutionException(ErrorCode e, const std::string& msg) { - queueReplicator->incomingExecutionException(e, msg); + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(ErrorCode code, const std::string& msg) { + if (!queueReplicator->deletedOnPrimary(code, msg)) + QPID_LOG(error, logPrefix << "Incoming " + << framing::createSessionException(code, msg).what()); } void detach() { QPID_LOG(debug, logPrefix << "Session detached"); @@ -197,20 +206,25 @@ void QueueReplicator::disconnect() { // Called from Queue::destroyed() void QueueReplicator::destroy() { + QPID_LOG(debug, logPrefix << "Destroyed"); boost::shared_ptr<Bridge> bridge2; // To call outside of lock { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - QPID_LOG(debug, logPrefix << "Destroyed"); bridge2 = bridge; // call close outside the lock. - // Need to drop shared pointers to avoid pointer cycles keeping this in memory. - queue.reset(); - bridge.reset(); - getBroker()->getExchanges().destroy(getName()); + destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } +void QueueReplicator::destroy(Mutex::ScopedLock&) { + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); +} + + // Called in a broker connection thread when the bridge is created. // Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) { @@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr<IdEvent>(data).id; } -void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { +bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) { if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { // If the queue is destroyed at the same time we are subscribing, we may // get a not-found or resource-deleted exception before the // BrokerReplicator gets the queue-delete event. Shut down the bridge by // calling destroy(), we can let the BrokerReplicator delete the queue // when the queue-delete arrives. - QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg); + QPID_LOG(debug, logPrefix << "Deleted on primary: " + << framing::createSessionException(e, msg).what()); destroy(); + return true; } - else - QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg); + return false; } // Unused Exchange methods. |