diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 59b2013f59..6881896f5e 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -39,6 +39,7 @@ #include "qpid/Msg.h" #include "qpid/assert.h" #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> #include <boost/bind.hpp> @@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); } void incomingExecutionException(ErrorCode code, const std::string& msg) { - if (!queueReplicator->deletedOnPrimary(code, msg)) + boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); + if (qr && !qr->deletedOnPrimary(code, msg)) QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } @@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << "Session detached"); } private: - boost::shared_ptr<QueueReplicator> queueReplicator; + boost::weak_ptr<QueueReplicator> queueReplicator; std::string logPrefix; }; @@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} // Queue observer is destroyed when the queue is. - void destroy() { queueReplicator->destroy(); } + void destroy() { + boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); + if (qr) qr->destroy(); + } private: - boost::shared_ptr<QueueReplicator> queueReplicator; + boost::weak_ptr<QueueReplicator> queueReplicator; }; boost::shared_ptr<QueueReplicator> QueueReplicator::create( @@ -171,8 +176,7 @@ void QueueReplicator::initialize() { throw Exception(QPID_MSG("Duplicate queue replicator " << getName())); // Enable callback to initializeBridge - std::pair<Bridge::shared_ptr, bool> result = - queue->getBroker()->getLinks().declare( + boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare( bridgeName, *link, false, // durable @@ -189,10 +193,10 @@ void QueueReplicator::initialize() { // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) - ); - bridge = result.first; - bridge->setErrorListener( + ).first; + b->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this()))); + bridge = b; // bridge is a weak_ptr to avoid a cycle. // Enable callback to destroy() queue->getObservers().add( @@ -211,7 +215,7 @@ void QueueReplicator::destroy() { { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - bridge2 = bridge; // call close outside the lock. + bridge2 = bridge.lock(); // !call close outside the lock. destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. |