summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp58
1 files changed, 31 insertions, 27 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index fae93e8294..f9273bc165 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
#include "ConnectionImpl.h"
@@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&ConnectionImpl::closed, this);
- handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
+ handler.onClose = boost::bind(&ConnectionImpl::closed, this,
+ REPLY_SUCCESS, std::string());
+ handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
@@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException();
+ throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port,
void ConnectionImpl::close()
{
- assertNotClosed();
- handler.close();
-}
-
-void ConnectionImpl::closed()
-{
- closedByPeer(200, "OK");
-}
-
-void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
-{
- signalClose(code, text);
- connector->close();
+ if (!isClosed)
+ handler.close();
}
void ConnectionImpl::idleIn()
@@ -110,26 +101,39 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
+template <class F>
+void ConnectionImpl::forChannels(F functor) {
+ for (SessionMap::iterator i = sessions.begin();
+ i != sessions.end(); ++i) {
+ try {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) functor(*s);
+ } catch (...) { assert(0); }
+ }
+}
+
void ConnectionImpl::shutdown()
{
- //this indicates that the socket to the server has closed
- signalClose(0, "Unexpected socket closure.");
+ Mutex::ScopedLock l(lock);
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionBroke, _1,
+ INTERNAL_ERROR, "Unexpected socket closure."));
+ sessions.clear();
+ isClosed = true;
}
-void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s)
- s->closed(code, text);
- }
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
sessions.clear();
isClosed = true;
+ connector->close();
}
-void ConnectionImpl::assertNotClosed()
-{
+void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
- if (isClosed) throw Exception("Connection has been closed");
+ sessions.erase(ch);
}
+