diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 84 |
1 files changed, 45 insertions, 39 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index dd986deec4..b248de8744 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" @@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } -ConnectionImpl::~ConnectionImpl() { close(); } +ConnectionImpl::~ConnectionImpl() { + // Important to close the connector first, to ensure the + // connector thread does not call on us while the destructor + // is running. + connector->close(); +} void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; - if (s.lock()) - throw ChannelBusyException(); + if (s.lock()) throw ChannelBusyException(); s = session; } @@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port, handler.pwd = pwd; handler.vhost = vhost; + QPID_LOG(info, "Connecting to " << host << ":" << port); connector->connect(host, port); connector->init(); handler.waitForOpen(); } -bool ConnectionImpl::setClosing() -{ - Mutex::ScopedLock l(lock); - if (isClosing || isClosed) { - return false; - } - isClosing = true; - return true; -} - -void ConnectionImpl::close() -{ - if (setClosing()) { - handler.close(); - } -} - void ConnectionImpl::idleIn() { - connector->close(); + close(); } void ConnectionImpl::idleOut() @@ -114,35 +103,52 @@ void ConnectionImpl::idleOut() connector->send(frame); } -template <class F> -void ConnectionImpl::forChannels(F functor) { - for (SessionMap::iterator i = sessions.begin(); - i != sessions.end(); ++i) { - try { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) functor(*s); - } catch (...) { assert(0); } +void ConnectionImpl::close() +{ + Mutex::ScopedLock l(lock); + if (isClosing || isClosed) return; + isClosing = true; + { + Mutex::ScopedUnlock u(lock); + handler.close(); + } + closed(REPLY_SUCCESS, "Closed by client"); +} + +// Set closed flags and erase the sessions map, but keep the contents +// so sessions can be updated outside the lock. +ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + isClosed = true; + connector->close(); + SessionVector save; + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) save.push_back(s); } + sessions.clear(); + return save; } -void ConnectionImpl::shutdown() +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionBroke, _1, - INTERNAL_ERROR, "Unexpected socket closure.")); - sessions.clear(); - isClosed = true; + SessionVector save(closeInternal(l)); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); } -void ConnectionImpl::closed(uint16_t code, const std::string& text) +static const std::string CONN_CLOSED("Connection closed by broker"); + +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); - sessions.clear(); - isClosed = true; - connector->close(); + SessionVector save(closeInternal(l)); + handler.fail(CONN_CLOSED); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), + boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { |