diff options
author | Alan Conway <aconway@apache.org> | 2013-11-21 16:27:53 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-11-21 16:27:53 +0000 |
commit | ca56df8136bb82f074fae29f877e856a834120fc (patch) | |
tree | 7852ac435ffeeb820e0e030f70a7c3494c0d25af | |
parent | be7491cfc7b25c78687198dd85ad9dc9451a2f2d (diff) | |
download | qpid-python-ca56df8136bb82f074fae29f877e856a834120fc.tar.gz |
QPID-5366: qpid segfaults in qpid::ha::BrokerReplicator::disconnected
Fix for a race condition: previously, BrokerReplicator created a separate
ConnectionObserver object to forward connection events to it. However the
Observers locking is such that it is possible for an event to arrive *after*
calling Observers::remove (Observers copies the pointers and delivers events
outside its lock.) This meant that it was possible for a call to
BrokerReplicator::disconnect to be made after the BrokerReplicator was deleted.
The fix is to combine BrokerReplicator and BrokerReplicator::ConnectionObserver
into a single object with one lifetime that will last until it is removed from
both the ExchangeRegistry and the ConnectionObservers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544246 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 18 |
2 files changed, 19 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index d27d5e84b3..5e8da17a1b 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -24,7 +24,6 @@ #include "TxReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/amqp_0_10/Connection.h" -#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Link.h" @@ -203,22 +202,6 @@ class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { BrokerReplicator& brokerReplicator; }; -class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver -{ - public: - ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {} - virtual void connection(Connection&) {} - virtual void opened(Connection&) {} - - virtual void closed(Connection& c) { - if (brokerReplicator.link && &c == brokerReplicator.connection) - brokerReplicator.disconnected(); - } - virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); } - private: - BrokerReplicator& brokerReplicator; -}; - /** Keep track of queues or exchanges during the update process to solve 2 * problems. * @@ -300,10 +283,8 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& link(l), initialized(false), alternates(hb.getBroker().getExchanges()), - connection(0), - connectionObserver(new ConnectionObserver(*this)) + connect(0) { - broker.getConnectionObservers().add(connectionObserver); framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); @@ -343,9 +324,10 @@ void BrokerReplicator::initialize() { assert(result.second); result.first->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); + broker.getConnectionObservers().add(shared_from_this()); } -BrokerReplicator::~BrokerReplicator() { shutdown(); } +BrokerReplicator::~BrokerReplicator() {} namespace { void collectQueueReplicators( @@ -363,10 +345,7 @@ void BrokerReplicator::shutdown() { // it only calls thread safe functions objects belonging to the Broker. // Unregister with broker objects: - if (connectionObserver) { - broker.getConnectionObservers().remove(connectionObserver); - connectionObserver.reset(); - } + broker.getConnectionObservers().remove(shared_from_this()); broker.getExchanges().destroy(getName()); } @@ -376,8 +355,8 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. // - connection = link->getConnection(); - assert(connection); + connect = link->getConnection(); + assert(connect); userId = link->getConnection()->getUserId(); remoteHost = link->getConnection()->getMgmtId(); @@ -922,7 +901,7 @@ namespace { // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); - connection = 0; + connect = 0; // Make copy of exchanges so we can work outside the registry lock. ExchangeVector exs; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 07b992df6a..e319ab1219 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -27,6 +27,7 @@ #include "AlternateExchangeSetter.h" #include "qpid/Address.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/types/Variant.h" #include "qpid/management/ManagementObject.h" #include "qpid/sys/unordered_map.h" @@ -68,7 +69,8 @@ class QueueReplicator; * */ class BrokerReplicator : public broker::Exchange, - public boost::enable_shared_from_this<BrokerReplicator> + public boost::enable_shared_from_this<BrokerReplicator>, + public broker::ConnectionObserver { public: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; @@ -76,7 +78,8 @@ class BrokerReplicator : public broker::Exchange, BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&); ~BrokerReplicator(); - void initialize(); + void initialize(); // Must be called immediately after constructor. + void shutdown(); // Exchange methods std::string getType() const; @@ -85,7 +88,12 @@ class BrokerReplicator : public broker::Exchange, void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); bool hasBindings(); - void shutdown(); + + // ConnectionObserver methods + void connection(broker::Connection&) {} + void opened(broker::Connection&) {} + void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); } + void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); } QueueReplicatorPtr findQueueReplicator(const std::string& qname); @@ -100,7 +108,6 @@ class BrokerReplicator : public broker::Exchange, class UpdateTracker; class ErrorListener; - class ConnectionObserver; void connected(broker::Bridge&, broker::SessionHandler&); void existingQueue(const boost::shared_ptr<broker::Queue>&); @@ -157,11 +164,10 @@ class BrokerReplicator : public broker::Exchange, bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; - broker::Connection* connection; + broker::Connection* connect; EventDispatchMap dispatch; std::auto_ptr<UpdateTracker> queueTracker; std::auto_ptr<UpdateTracker> exchangeTracker; - boost::shared_ptr<ConnectionObserver> connectionObserver; }; }} // namespace qpid::broker |