summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp84
1 files changed, 45 insertions, 39 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index dd986deec4..b248de8744 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/log/Statement.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
@@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
connector->setShutdownHandler(this);
}
-ConnectionImpl::~ConnectionImpl() { close(); }
+ConnectionImpl::~ConnectionImpl() {
+ // Important to close the connector first, to ensure the
+ // connector thread does not call on us while the destructor
+ // is running.
+ connector->close();
+}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()];
- if (s.lock())
- throw ChannelBusyException();
+ if (s.lock()) throw ChannelBusyException();
s = session;
}
@@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port,
handler.pwd = pwd;
handler.vhost = vhost;
+ QPID_LOG(info, "Connecting to " << host << ":" << port);
connector->connect(host, port);
connector->init();
handler.waitForOpen();
}
-bool ConnectionImpl::setClosing()
-{
- Mutex::ScopedLock l(lock);
- if (isClosing || isClosed) {
- return false;
- }
- isClosing = true;
- return true;
-}
-
-void ConnectionImpl::close()
-{
- if (setClosing()) {
- handler.close();
- }
-}
-
void ConnectionImpl::idleIn()
{
- connector->close();
+ close();
}
void ConnectionImpl::idleOut()
@@ -114,35 +103,52 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
-template <class F>
-void ConnectionImpl::forChannels(F functor) {
- for (SessionMap::iterator i = sessions.begin();
- i != sessions.end(); ++i) {
- try {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s) functor(*s);
- } catch (...) { assert(0); }
+void ConnectionImpl::close()
+{
+ Mutex::ScopedLock l(lock);
+ if (isClosing || isClosed) return;
+ isClosing = true;
+ {
+ Mutex::ScopedUnlock u(lock);
+ handler.close();
+ }
+ closed(REPLY_SUCCESS, "Closed by client");
+}
+
+// Set closed flags and erase the sessions map, but keep the contents
+// so sessions can be updated outside the lock.
+ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
+ isClosed = true;
+ connector->close();
+ SessionVector save;
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) save.push_back(s);
}
+ sessions.clear();
+ return save;
}
-void ConnectionImpl::shutdown()
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionBroke, _1,
- INTERNAL_ERROR, "Unexpected socket closure."));
- sessions.clear();
- isClosed = true;
+ SessionVector save(closeInternal(l));
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text));
}
-void ConnectionImpl::closed(uint16_t code, const std::string& text)
+static const std::string CONN_CLOSED("Connection closed by broker");
+
+void ConnectionImpl::shutdown()
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
- sessions.clear();
- isClosed = true;
- connector->close();
+ SessionVector save(closeInternal(l));
+ handler.fail(CONN_CLOSED);
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(),
+ boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {