summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp46
1 files changed, 19 insertions, 27 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 8ab60cff50..43576d2273 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,7 +18,11 @@
* under the License.
*
*/
+#include "qpid/framing/reply_exceptions.h"
+
#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -26,7 +30,8 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false)
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
+ : connector(c), isClosed(false)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
@@ -37,22 +42,13 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), i
connector->setShutdownHandler(this);
}
-void ConnectionImpl::allocated(SessionCore::shared_ptr session)
-{
- Mutex::ScopedLock l(lock);
- if (sessions.find(session->getId()) != sessions.end()) {
- throw Exception("Id already in use.");
- }
- sessions[session->getId()] = session;
-}
-
-void ConnectionImpl::released(SessionCore::shared_ptr session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(session->getId());
- if (i != sessions.end()) {
- sessions.erase(i);
- }
+ boost::shared_ptr<SessionCore>& s = sessions[session->getChannel()];
+ if (s)
+ throw ChannelBusyException();
+ s = session;
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
@@ -62,7 +58,14 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- find(frame.getChannel())->handle(frame);
+ boost::shared_ptr<SessionCore> s;
+ {
+ Mutex::ScopedLock l(lock);
+ s = sessions[frame.getChannel()];
+ }
+ if (!s)
+ throw ChannelErrorException();
+ s->in(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -117,23 +120,12 @@ void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- Mutex::ScopedUnlock u(lock);
i->second->closed(code, text);
}
sessions.clear();
isClosed = true;
}
-SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
-{
- Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- return i->second;
-}
-
void ConnectionImpl::assertNotClosed()
{
Mutex::ScopedLock l(lock);