diff options
author | Alan Conway <aconway@apache.org> | 2012-02-14 16:05:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-14 16:05:30 +0000 |
commit | a7777adba03274f92ce8130ed8769cd77874693c (patch) | |
tree | 2de41b19b7a03265062ab644592df9e084892591 | |
parent | 60c152f146ca4c285fad8014ac0ff82c85afb5f9 (diff) | |
download | qpid-python-a7777adba03274f92ce8130ed8769cd77874693c.tar.gz |
QPID-3603: Lifecycle and locking fixes for QueueReplicator
Separate bridge de-activation from destruction in QueueReplicator:
Only deactivate if destroyed by the WiringReplicator because of a
queue delete. If destroyed for any other reason (e.g. broker
destruction) don't de-activate the bridge as required resources may
not exist.
Added missing locks in QueueReplicator functions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244059 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 5 |
3 files changed, 16 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index f567d61078..d99f5126f5 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -82,14 +82,18 @@ void QueueReplicator::activate() { ); } -QueueReplicator::~QueueReplicator() { +QueueReplicator::~QueueReplicator() {} + +void QueueReplicator::deactivate() { + sys::Mutex::ScopedLock l(lock); queue->getBroker()->getLinks().destroy( link->getHost(), link->getPort(), queue->getName(), getName(), string()); } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self is just to ensure we are still in memory. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + sys::Mutex::ScopedLock l(lock); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index ad820346d5..0a1453dc63 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -46,7 +46,7 @@ namespace ha { * Creates a ReplicatingSubscription on the primary by passing special * arguments to the consume command. * - * THREAD UNSAFE: Only called in the connection thread of the source queue. + * THREAD SAFE: Called in different connection threads. */ class QueueReplicator : public broker::Exchange, public boost::enable_shared_from_this<QueueReplicator> @@ -59,10 +59,12 @@ class QueueReplicator : public broker::Exchange, QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); - void activate(); + void activate(); // Call after ctor + void deactivate(); // Call before dtor std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool bind(boost::shared_ptr<broker::Queue + >, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 416f516f7b..58cacb16f4 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -296,6 +296,11 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); // Delete the QueueReplicator exchange for this queue. + boost::shared_ptr<broker::Exchange> ex = + broker.getExchanges().find(QueueReplicator::replicatorName(name)); + boost::shared_ptr<QueueReplicator> qr = + boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) qr->deactivate(); broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); } } |