diff options
author | Alan Conway <aconway@apache.org> | 2016-06-22 20:41:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2016-06-22 20:41:43 +0000 |
commit | c3c3bc4016270dd75b4c6a1e6831408cd4a5d055 (patch) | |
tree | c6acf024eb57f9360ef50055bfbf3ce0fbd1b2d9 | |
parent | 7ab8ebde50308f76428359c0120473c4d491b55a (diff) | |
download | qpid-python-c3c3bc4016270dd75b4c6a1e6831408cd4a5d055.tar.gz |
QPID-7306: Memory management error in Link/Bridge
qpid::broker Link and Bridge use Connection::requestIOProcessing() to register
callbacks in the connection thread. They were binding a plain "this" pointer to
the callback, but the classes are managed by boost::shared_ptr so if all the
shared_ptr were released, the callback could happen on a dangling pointer.
This fix uses boost::weak_ptr in the callbacks, so if all shared_ptr instances
are released, we don't use the dead pointer.
Link::destroy cannot be skipped, so use a shared_ptr for that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749780 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h | 20 |
4 files changed, 43 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 06d3a0dd52..d6cd3e20e7 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -381,8 +381,11 @@ void Bridge::propagateBinding(const string& key, const string& tagList, else bindArgs.setString(qpidFedOrigin, origin); - conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, - queueName, args.i_src, key, bindArgs)); + conn->requestIOProcessing( + weakCallback<Bridge>( + boost::bind(&Bridge::ioThreadPropagateBinding, _1, + queueName, args.i_src, key, bindArgs), + this)); } } @@ -393,9 +396,13 @@ void Bridge::sendReorigin() bindArgs.setString(qpidFedOp, fedOpReorigin); bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); - conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, - queueName, args.i_src, args.i_key, bindArgs)); + conn->requestIOProcessing( + weakCallback<Bridge>( + boost::bind(&Bridge::ioThreadPropagateBinding, _1, + queueName, args.i_src, args.i_key, bindArgs), + this)); } + bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(channel); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9b85917251..037d1e16c6 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -250,7 +250,8 @@ void Link::established(qpid::broker::amqp_0_10::Connection* c) currentInterval = 1; visitCount = 0; connection = c; - c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + c->requestIOProcessing ( + weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this)); } } if (isClosing) @@ -416,7 +417,8 @@ void Link::add(Bridge::shared_ptr bridge) Mutex::ScopedLock mutex(lock); created.push_back (bridge); if (connection) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this)); } @@ -443,7 +445,8 @@ void Link::cancel(Bridge::shared_ptr bridge) needIOProcessing = !cancellations.empty(); } if (needIOProcessing && connection) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this)); } void Link::ioThreadProcessing() @@ -507,7 +510,8 @@ void Link::maintenanceVisit () case STATE_OPERATIONAL: if ((!active.empty() || !created.empty() || !cancellations.empty()) && connection && connection->isOpen()) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + connection->requestIOProcessing ( + weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this)); break; default: // no-op for all other states @@ -691,7 +695,7 @@ void Link::close() { setStateLH(STATE_CLOSING); if (connection) { //connection can only be closed on the connections own IO processing thread - connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + connection->requestIOProcessing(boost::bind(&Link::destroy, shared_from_this())); } else if (old_state == STATE_CONNECTING) { // cannot destroy Link now since a connection request is outstanding. // destroy the link after we get a response (see Link::established, diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index ceee0186f3..c60632025c 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -23,6 +23,7 @@ */ #include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/PersistableConfig.h" @@ -50,7 +51,9 @@ namespace amqp_0_10 { class Connection; } -class Link : public PersistableConfig, public management::Manageable { +class Link : public PersistableConfig, public management::Manageable, + public boost::enable_shared_from_this<Link> +{ private: mutable sys::Mutex lock; const std::string name; diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h index 1bf6b2cee3..8fff5206ea 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h @@ -226,7 +226,25 @@ friend class OutboundFrameTracker; qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } }; +} + +// See weakCallback below. +template <class T> void callIfValid(boost::function1<void, T*> f, boost::weak_ptr<T> wp) { + boost::shared_ptr<T> sp = wp.lock(); + if (sp) f(sp.get()); +} + +// Memory safety helper for requestIOProcessing with boost::shared_ptr. +// +// Makes a function that calls f(p) only if p is still valid. The returned +// function is bound to a weak_ptr, and only calls f(p) if the weak pointer is +// still valid. Note this does not prevent the object being deleted before the +// IO callback, instead it skips the callback if the object is already deleted. +template <class T> +boost::function0<void> weakCallback(boost::function1<void, T*> f, T* p) { + return boost::bind(&callIfValid<T>, f, p->shared_from_this()); +} -}}} +}} #endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/ |