diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 38 |
1 files changed, 30 insertions, 8 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 69c8a56873..5b9993bd90 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionHandler.h" @@ -55,6 +56,10 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } +bool QueueReplicator::isReplicatorName(const std::string& name) { + return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; +} + bool QueueReplicator::isEventKey(const std::string key) { const std::string& prefix = QPID_HA_EVENT_PREFIX; bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; @@ -74,19 +79,33 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(error, logPrefix << "Execution error: " << msg); } void detach() { - QPID_LOG(error, logPrefix << "Unexpectedly detached."); + QPID_LOG(debug, logPrefix << "Session detached"); } private: std::string logPrefix; }; +class QueueReplicator::QueueObserver : public broker::QueueObserver { + public: + QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {} + void enqueued(const Message&) {} + void dequeued(const Message&) {} + void acquired(const Message&) {} + void requeued(const Message&) {} + void consumerAdded( const Consumer& ) {} + void consumerRemoved( const Consumer& ) {} + void destroy() { queueReplicator->deactivate(); } + private: + boost::shared_ptr<QueueReplicator> queueReplicator; +}; + QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false) { args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); @@ -118,18 +137,21 @@ void QueueReplicator::activate() { bridge = result.first; bridge->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); + boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); + queue->addObserver(observer); } QueueReplicator::~QueueReplicator() { deactivate(); } void QueueReplicator::deactivate() { - // destroy the route + QPID_LOG(debug, logPrefix << "Deactivated"); sys::Mutex::ScopedLock l(lock); - if (bridge) { - bridge->close(); - bridge.reset(); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); - } + if (bridge) bridge->close(); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); } // Called in a broker connection thread when the bridge is created. |