summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp33
1 files changed, 23 insertions, 10 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 5b9993bd90..6d30a5c10c 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -47,6 +47,7 @@ namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using sys::Mutex;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver {
void requeued(const Message&) {}
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
- void destroy() { queueReplicator->deactivate(); }
+ // Queue observer is destroyed when the queue is.
+ void destroy() { queueReplicator->destroy(); }
private:
boost::shared_ptr<QueueReplicator> queueReplicator;
};
@@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -141,12 +144,14 @@ void QueueReplicator::activate() {
queue->addObserver(observer);
}
-QueueReplicator::~QueueReplicator() { deactivate(); }
+QueueReplicator::~QueueReplicator() {}
-void QueueReplicator::deactivate() {
- QPID_LOG(debug, logPrefix << "Deactivated");
- sys::Mutex::ScopedLock l(lock);
- if (bridge) bridge->close();
+void QueueReplicator::destroy() {
+ // Called from Queue::destroyed()
+ Mutex::ScopedLock l(lock);
+ if (!bridge) return;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ bridge->close();
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
@@ -156,7 +161,8 @@ void QueueReplicator::deactivate() {
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable settings;
@@ -197,7 +203,13 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
+ boost::shared_ptr<Queue> q;
+ {
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
+ q = queue;
+ }
// Thread safe: only calls thread safe Queue functions.
queue->dequeueMessageAt(n);
}
@@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable& msg)
{
try {
const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.