summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:07:23 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:07:23 +0000
commit93305abfccb4d79634b5d8f74af3da9d29dd9dbd (patch)
tree0b2b6da6c33f2c3680e17d595addb244fc04feb7
parent9ba2fb5614a95263fc9a0b55b83a9f6f4f02e932 (diff)
downloadqpid-python-93305abfccb4d79634b5d8f74af3da9d29dd9dbd.tar.gz
QPID-3603: Lifecycle and locking fixes for QueueReplicator
Separate bridge de-activation from destruction in QueueReplicator: Only deactivate if destroyed by the WiringReplicator because of a queue delete. If destroyed for any other reason (e.g. broker destruction) don't de-activate the bridge as required resources may not exist. Added missing locks in QueueReplicator functions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233673 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h8
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp5
3 files changed, 16 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index f567d61078..d99f5126f5 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -82,14 +82,18 @@ void QueueReplicator::activate() {
);
}
-QueueReplicator::~QueueReplicator() {
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+ sys::Mutex::ScopedLock l(lock);
queue->getBroker()->getLinks().destroy(
link->getHost(), link->getPort(), queue->getName(), getName(), string());
}
// Called in a broker connection thread when the bridge is created.
-// shared_ptr to self is just to ensure we are still in memory.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ sys::Mutex::ScopedLock l(lock);
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index ad820346d5..0a1453dc63 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -46,7 +46,7 @@ namespace ha {
* Creates a ReplicatingSubscription on the primary by passing special
* arguments to the consume command.
*
- * THREAD UNSAFE: Only called in the connection thread of the source queue.
+ * THREAD SAFE: Called in different connection threads.
*/
class QueueReplicator : public broker::Exchange,
public boost::enable_shared_from_this<QueueReplicator>
@@ -59,10 +59,12 @@ class QueueReplicator : public broker::Exchange,
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate();
+ void activate(); // Call after ctor
+ void deactivate(); // Call before dtor
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue
+ >, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 416f516f7b..58cacb16f4 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -296,6 +296,11 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
// Delete the QueueReplicator exchange for this queue.
+ boost::shared_ptr<broker::Exchange> ex =
+ broker.getExchanges().find(QueueReplicator::replicatorName(name));
+ boost::shared_ptr<QueueReplicator> qr =
+ boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) qr->deactivate();
broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
}
}