diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 55 |
1 files changed, 32 insertions, 23 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index dece9dd045..3580c49826 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -109,7 +109,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), - settings(hb.getSettings()) + settings(hb.getSettings()), destroyed(false) { args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); @@ -119,10 +119,17 @@ QueueReplicator::QueueReplicator(HaBroker& hb, setArgs(args); } -// This must be separate from the constructor so we can call shared_from_this. +// This must be called immediately after the constructor. +// It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed + + // Enable callback to route() + if (!getBroker()->getExchanges().registerExchange(shared_from_this())) + throw Exception(QPID_MSG("Duplicate queue replicator " << getName())); + + // Enable callback to initializeBridge std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -145,29 +152,37 @@ void QueueReplicator::activate() { bridge = result.first; bridge->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); - boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); - queue->addObserver(observer); + + // Enable callback to destroy() + queue->addObserver( + boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this()))); } QueueReplicator::~QueueReplicator() {} +// Called from Queue::destroyed() void QueueReplicator::destroy() { - // Called from Queue::destroyed() - Mutex::ScopedLock l(lock); - if (!bridge) return; - QPID_LOG(debug, logPrefix << "Destroyed."); - bridge->close(); - // Need to drop shared pointers to avoid pointer cycles keeping this in memory. - queue.reset(); - link.reset(); - bridge.reset(); - getBroker()->getExchanges().destroy(getName()); + boost::shared_ptr<Bridge> bridge2; // To call outside of lock + { + Mutex::ScopedLock l(lock); + if (destroyed) return; + destroyed = true; + QPID_LOG(debug, logPrefix << "Destroyed."); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); + bridge2 = bridge; + } + if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } // Called in a broker connection thread when the bridge is created. +// Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed + if (destroyed) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; @@ -207,13 +222,7 @@ template <class T> T decodeContent(Message& m) { } void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { - boost::shared_ptr<Queue> q; - { - Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed - q = queue; - } - // Thread safe: only calls thread safe Queue functions. + if (destroyed) return; queue->dequeueMessageAt(n); } @@ -234,7 +243,7 @@ void QueueReplicator::route(Deliverable& msg) try { const std::string& key = msg.getMessage().getRoutingKey(); Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed + if (destroyed) return; if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. |
