summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 59b2013f59..6881896f5e 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -39,6 +39,7 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include <boost/bind.hpp>
@@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
}
void incomingExecutionException(ErrorCode code, const std::string& msg) {
- if (!queueReplicator->deletedOnPrimary(code, msg))
+ boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+ if (qr && !qr->deletedOnPrimary(code, msg))
QPID_LOG(error, logPrefix << "Incoming "
<< framing::createSessionException(code, msg).what());
}
@@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(debug, logPrefix << "Session detached");
}
private:
- boost::shared_ptr<QueueReplicator> queueReplicator;
+ boost::weak_ptr<QueueReplicator> queueReplicator;
std::string logPrefix;
};
@@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver {
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
// Queue observer is destroyed when the queue is.
- void destroy() { queueReplicator->destroy(); }
+ void destroy() {
+ boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+ if (qr) qr->destroy();
+ }
private:
- boost::shared_ptr<QueueReplicator> queueReplicator;
+ boost::weak_ptr<QueueReplicator> queueReplicator;
};
boost::shared_ptr<QueueReplicator> QueueReplicator::create(
@@ -171,8 +176,7 @@ void QueueReplicator::initialize() {
throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
// Enable callback to initializeBridge
- std::pair<Bridge::shared_ptr, bool> result =
- queue->getBroker()->getLinks().declare(
+ boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare(
bridgeName,
*link,
false, // durable
@@ -189,10 +193,10 @@ void QueueReplicator::initialize() {
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
- );
- bridge = result.first;
- bridge->setErrorListener(
+ ).first;
+ b->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
+ bridge = b; // bridge is a weak_ptr to avoid a cycle.
// Enable callback to destroy()
queue->getObservers().add(
@@ -211,7 +215,7 @@ void QueueReplicator::destroy() {
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- bridge2 = bridge; // call close outside the lock.
+ bridge2 = bridge.lock(); // !call close outside the lock.
destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.