diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 5b9993bd90..6d30a5c10c 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -47,6 +47,7 @@ namespace ha { using namespace broker; using namespace framing; using namespace std; +using sys::Mutex; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); @@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void requeued(const Message&) {} void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} - void destroy() { queueReplicator->deactivate(); } + // Queue observer is destroyed when the queue is. + void destroy() { queueReplicator->destroy(); } private: boost::shared_ptr<QueueReplicator> queueReplicator; }; @@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -141,12 +144,14 @@ void QueueReplicator::activate() { queue->addObserver(observer); } -QueueReplicator::~QueueReplicator() { deactivate(); } +QueueReplicator::~QueueReplicator() {} -void QueueReplicator::deactivate() { - QPID_LOG(debug, logPrefix << "Deactivated"); - sys::Mutex::ScopedLock l(lock); - if (bridge) bridge->close(); +void QueueReplicator::destroy() { + // Called from Queue::destroyed() + Mutex::ScopedLock l(lock); + if (!bridge) return; + QPID_LOG(debug, logPrefix << "Destroyed."); + bridge->close(); // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); link.reset(); @@ -156,7 +161,8 @@ void QueueReplicator::deactivate() { // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable settings; @@ -197,7 +203,13 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { + boost::shared_ptr<Queue> q; + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + q = queue; + } // Thread safe: only calls thread safe Queue functions. queue->dequeueMessageAt(n); } @@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable& msg) { try { const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. |
