summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp38
1 files changed, 30 insertions, 8 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 69c8a56873..5b9993bd90 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
@@ -55,6 +56,10 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+}
+
bool QueueReplicator::isEventKey(const std::string key) {
const std::string& prefix = QPID_HA_EVENT_PREFIX;
bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
@@ -74,19 +79,33 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(error, logPrefix << "Execution error: " << msg);
}
void detach() {
- QPID_LOG(error, logPrefix << "Unexpectedly detached.");
+ QPID_LOG(debug, logPrefix << "Session detached");
}
private:
std::string logPrefix;
};
+class QueueReplicator::QueueObserver : public broker::QueueObserver {
+ public:
+ QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {}
+ void enqueued(const Message&) {}
+ void dequeued(const Message&) {}
+ void acquired(const Message&) {}
+ void requeued(const Message&) {}
+ void consumerAdded( const Consumer& ) {}
+ void consumerRemoved( const Consumer& ) {}
+ void destroy() { queueReplicator->deactivate(); }
+ private:
+ boost::shared_ptr<QueueReplicator> queueReplicator;
+};
+
QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo())
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -118,18 +137,21 @@ void QueueReplicator::activate() {
bridge = result.first;
bridge->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+ boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this()));
+ queue->addObserver(observer);
}
QueueReplicator::~QueueReplicator() { deactivate(); }
void QueueReplicator::deactivate() {
- // destroy the route
+ QPID_LOG(debug, logPrefix << "Deactivated");
sys::Mutex::ScopedLock l(lock);
- if (bridge) {
- bridge->close();
- bridge.reset();
- QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
- }
+ if (bridge) bridge->close();
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ link.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
}
// Called in a broker connection thread when the bridge is created.