summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2016-06-22 20:41:43 +0000
committerAlan Conway <aconway@apache.org>2016-06-22 20:41:43 +0000
commitc3c3bc4016270dd75b4c6a1e6831408cd4a5d055 (patch)
treec6acf024eb57f9360ef50055bfbf3ce0fbd1b2d9
parent7ab8ebde50308f76428359c0120473c4d491b55a (diff)
downloadqpid-python-c3c3bc4016270dd75b4c6a1e6831408cd4a5d055.tar.gz
QPID-7306: Memory management error in Link/Bridge
qpid::broker Link and Bridge use Connection::requestIOProcessing() to register callbacks in the connection thread. They were binding a plain "this" pointer to the callback, but the classes are managed by boost::shared_ptr so if all the shared_ptr were released, the callback could happen on a dangling pointer. This fix uses boost::weak_ptr in the callbacks, so if all shared_ptr instances are released, we don't use the dead pointer. Link::destroy cannot be skipped, so use a shared_ptr for that. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749780 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h20
4 files changed, 43 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 06d3a0dd52..d6cd3e20e7 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -381,8 +381,11 @@ void Bridge::propagateBinding(const string& key, const string& tagList,
else
bindArgs.setString(qpidFedOrigin, origin);
- conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
- queueName, args.i_src, key, bindArgs));
+ conn->requestIOProcessing(
+ weakCallback<Bridge>(
+ boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+ queueName, args.i_src, key, bindArgs),
+ this));
}
}
@@ -393,9 +396,13 @@ void Bridge::sendReorigin()
bindArgs.setString(qpidFedOp, fedOpReorigin);
bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
- conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
- queueName, args.i_src, args.i_key, bindArgs));
+ conn->requestIOProcessing(
+ weakCallback<Bridge>(
+ boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+ queueName, args.i_src, args.i_key, bindArgs),
+ this));
}
+
bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(channel);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 9b85917251..037d1e16c6 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -250,7 +250,8 @@ void Link::established(qpid::broker::amqp_0_10::Connection* c)
currentInterval = 1;
visitCount = 0;
connection = c;
- c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ c->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
}
if (isClosing)
@@ -416,7 +417,8 @@ void Link::add(Bridge::shared_ptr bridge)
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
if (connection)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
@@ -443,7 +445,8 @@ void Link::cancel(Bridge::shared_ptr bridge)
needIOProcessing = !cancellations.empty();
}
if (needIOProcessing && connection)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
void Link::ioThreadProcessing()
@@ -507,7 +510,8 @@ void Link::maintenanceVisit ()
case STATE_OPERATIONAL:
if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
connection && connection->isOpen())
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
break;
default: // no-op for all other states
@@ -691,7 +695,7 @@ void Link::close() {
setStateLH(STATE_CLOSING);
if (connection) {
//connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ connection->requestIOProcessing(boost::bind(&Link::destroy, shared_from_this()));
} else if (old_state == STATE_CONNECTING) {
// cannot destroy Link now since a connection request is outstanding.
// destroy the link after we get a response (see Link::established,
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index ceee0186f3..c60632025c 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -23,6 +23,7 @@
*/
#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/PersistableConfig.h"
@@ -50,7 +51,9 @@ namespace amqp_0_10 {
class Connection;
}
-class Link : public PersistableConfig, public management::Manageable {
+class Link : public PersistableConfig, public management::Manageable,
+ public boost::enable_shared_from_this<Link>
+{
private:
mutable sys::Mutex lock;
const std::string name;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
index 1bf6b2cee3..8fff5206ea 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -226,7 +226,25 @@ friend class OutboundFrameTracker;
qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
};
+}
+
+// See weakCallback below.
+template <class T> void callIfValid(boost::function1<void, T*> f, boost::weak_ptr<T> wp) {
+ boost::shared_ptr<T> sp = wp.lock();
+ if (sp) f(sp.get());
+}
+
+// Memory safety helper for requestIOProcessing with boost::shared_ptr.
+//
+// Makes a function that calls f(p) only if p is still valid. The returned
+// function is bound to a weak_ptr, and only calls f(p) if the weak pointer is
+// still valid. Note this does not prevent the object being deleted before the
+// IO callback, instead it skips the callback if the object is already deleted.
+template <class T>
+boost::function0<void> weakCallback(boost::function1<void, T*> f, T* p) {
+ return boost::bind(&callIfValid<T>, f, p->shared_from_this());
+}
-}}}
+}}
#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/