summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp55
1 files changed, 32 insertions, 23 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index dece9dd045..3580c49826 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -109,7 +109,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
- settings(hb.getSettings())
+ settings(hb.getSettings()), destroyed(false)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -119,10 +119,17 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
setArgs(args);
}
-// This must be separate from the constructor so we can call shared_from_this.
+// This must be called immediately after the constructor.
+// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
+
+ // Enable callback to route()
+ if (!getBroker()->getExchanges().registerExchange(shared_from_this()))
+ throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
+
+ // Enable callback to initializeBridge
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -145,29 +152,37 @@ void QueueReplicator::activate() {
bridge = result.first;
bridge->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
- boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this()));
- queue->addObserver(observer);
+
+ // Enable callback to destroy()
+ queue->addObserver(
+ boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
QueueReplicator::~QueueReplicator() {}
+// Called from Queue::destroyed()
void QueueReplicator::destroy() {
- // Called from Queue::destroyed()
- Mutex::ScopedLock l(lock);
- if (!bridge) return;
- QPID_LOG(debug, logPrefix << "Destroyed.");
- bridge->close();
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- link.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ boost::shared_ptr<Bridge> bridge2; // To call outside of lock
+ {
+ Mutex::ScopedLock l(lock);
+ if (destroyed) return;
+ destroyed = true;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ link.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+ bridge2 = bridge;
+ }
+ if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
// Called in a broker connection thread when the bridge is created.
+// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
+ if (destroyed) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
@@ -207,13 +222,7 @@ template <class T> T decodeContent(Message& m) {
}
void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
- boost::shared_ptr<Queue> q;
- {
- Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
- q = queue;
- }
- // Thread safe: only calls thread safe Queue functions.
+ if (destroyed) return;
queue->dequeueMessageAt(n);
}
@@ -234,7 +243,7 @@ void QueueReplicator::route(Deliverable& msg)
try {
const std::string& key = msg.getMessage().getRoutingKey();
Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
+ if (destroyed) return;
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.