diff options
author | Gordon Sim <gsim@apache.org> | 2010-04-17 23:29:08 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-04-17 23:29:08 +0000 |
commit | 8064f03b9b1f4b106be1dc9b0f2a3e3f5b555e08 (patch) | |
tree | 3c6e852e4e7d761f128f8b38bb5b6e9afca329bf | |
parent | ef97372c04344fe0c0d7234263c25b31c4ed854a (diff) | |
download | qpid-python-8064f03b9b1f4b106be1dc9b0f2a3e3f5b555e08.tar.gz |
An alternative attempt at ensuring ConnectionImpl is not deleted before IO thread is finished with it.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935275 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 15 |
3 files changed, 41 insertions, 36 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index d8ffb6d9e1..6d2fd1d760 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -122,7 +122,7 @@ void Connection::open(const ConnectionSettings& settings) if (isOpen()) throw Exception(QPID_MSG("Connection::open() was already called")); - impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); + impl = ConnectionImpl::create(version, settings); impl->open(); if ( failureCallback ) impl->registerFailureCallback ( failureCallback ); diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 6b352e2e65..692afaae00 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -162,14 +162,20 @@ void ConnectionImpl::init() { (void) theIO(); } +boost::shared_ptr<ConnectionImpl> ConnectionImpl::create(framing::ProtocolVersion version, const ConnectionSettings& settings) +{ + boost::shared_ptr<ConnectionImpl> instance(new ConnectionImpl(version, settings), boost::bind(&ConnectionImpl::release, _1)); + return instance; +} + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), version(v), nextChannel(1), - shutdownComplete(false) + shutdownComplete(false), + released(false) { - QPID_LOG(debug, "ConnectionImpl created for " << version.toString()); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, @@ -182,12 +188,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max(); ConnectionImpl::~ConnectionImpl() { - if (connector) { - connector->close(); - //wait until we get the shutdown callback to ensure that the - //io threads will not call back on us after deletion - waitForShutdownComplete(); - } theIO().sub(); } @@ -249,6 +249,7 @@ void ConnectionImpl::open() connector->setShutdownHandler(this); try { connector->connect(host, port); + } catch (const std::exception& e) { QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); connector.reset(); @@ -340,16 +341,33 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { } void ConnectionImpl::shutdown() { - //May need to take a temporary reference to ourselves to prevent - //our destructor being called until after we have notified of - //shutdown completion; the destructor may be called as a result of - //the call to failedConnection(). - boost::shared_ptr<ConnectionImpl> temp; if (!handler.isClosed()) { - temp = shared_from_this(); failedConnection(); } - notifyShutdownComplete(); + bool canDelete; + { + Mutex::ScopedLock l(lock); + //association with IO thread is now ended + shutdownComplete = true; + //If we have already been released, we can now delete ourselves + canDelete = released; + } + if (canDelete) delete this; +} + +void ConnectionImpl::release() { + bool isActive; + { + Mutex::ScopedLock l(lock); + released = true; + isActive = connector && !shutdownComplete; + } + //If we are still active - i.e. associated with an IO thread - + //then we cannot delete ourselves yet, but must wait for the + //shutdown callback which we can trigger by calling + //connector.close() + if (isActive) connector->close(); + else delete this; } static const std::string CONN_CLOSED("Connection closed"); @@ -397,16 +415,4 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } -void ConnectionImpl::waitForShutdownComplete() -{ - Mutex::ScopedLock l(lock); - while(!shutdownComplete) lock.wait(); -} -void ConnectionImpl::notifyShutdownComplete() -{ - Mutex::ScopedLock l(lock); - shutdownComplete = true; - lock.notifyAll(); -} - }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index c2313e72bc..57d874b555 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -26,7 +26,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -48,9 +48,7 @@ class ConnectionImpl : public Bounds, public sys::TimeoutHandler, public sys::ShutdownHandler, public boost::enable_shared_from_this<ConnectionImpl> - { - friend class Connection; typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; static const uint16_t NEXT_CHANNEL; @@ -60,27 +58,28 @@ class ConnectionImpl : public Bounds, boost::scoped_ptr<Connector> connector; framing::ProtocolVersion version; uint16_t nextChannel; - sys::Monitor lock; + sys::Mutex lock; bool shutdownComplete; + bool released; boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask; template <class F> void closeInternal(const F&); - static void init(); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); void idleIn(); void shutdown(); void failedConnection(); - void waitForShutdownComplete(); - void notifyShutdownComplete(); + void release(); + ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); boost::function<void ()> failureCallback; public: - ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); + static void init(); + static boost::shared_ptr<ConnectionImpl> create(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); void open(); |