summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp31
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h3
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp6
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp8
4 files changed, 30 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 0b1e744e25..8310980800 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,7 @@
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/framing/all_method_bodies.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -59,24 +60,32 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (!ignoring) {
- if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- session->handle(f);
- } else {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
- }
+ if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
+ return;
+ } else if (session.get()) {
+ session->handle(f);
+ } else if (!ignoring) {
+ throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached"));
}
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
+ }catch(const SessionException& e){
+ //execution.exception will have been sent already
+ ignoring = true;
+ //peerSession.requestTimeout(0);
+ session->setTimeout(0);
+ peerSession.detach(name);
+ localSuspend();
}catch(const std::exception& e){
connection.close(
framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
}
}
+bool SessionHandler::isValid(AMQMethodBody* m) {
+ return session.get() || m->isA<SessionAttachBody>();
+}
+
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
@@ -112,12 +121,14 @@ ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
//new methods:
-void SessionHandler::attach(const std::string& name, bool /*force*/)
+void SessionHandler::attach(const std::string& _name, bool /*force*/)
{
//TODO: need to revise session manager to support resume as well
assertClosed("attach");
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, 0));
+ name = _name;//TODO: this should be used in conjunction with
+ //userid for connection as sessions identity
session.reset(state.release());
peerSession.attached(name);
peerSession.commandPoint(session->nextOut, 0);
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 4b031f2951..4010ce15d2 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -100,12 +100,15 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
+ bool isValid(framing::AMQMethodBody*);
+
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session010 peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
+ std::string name;//TODO: this should be part of the session state and replace the id
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 3c6bed4344..b96d7b5e3f 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -275,11 +275,7 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- timeout = 0;
- //The python client doesn't currently detach on receiving an exception
- //so the session state isn't destroyed. This is a temporary workaround
- //until that is addressed
- adapter.destroyExclusiveQueues();
+ throw e;
}
}
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 4e3f6cdd98..190141c411 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -426,9 +426,12 @@ void SessionImpl::attached(const std::string& _name)
setState(ATTACHED);
}
-void SessionImpl::detach(const std::string& /*name*/)
+void SessionImpl::detach(const std::string& _name)
{
- throw NotImplementedException("Client does not support detach");
+ Lock l(state);
+ if (name != _name) throw InternalErrorException("Incorrect session name");
+ setState(DETACHED);
+ QPID_LOG(info, "Session detached by peer: " << name);
}
void SessionImpl::detached(const std::string& _name, uint8_t _code)
@@ -561,7 +564,6 @@ void SessionImpl::exception(uint16_t errorCode,
//should we wait for the timeout response?
detachedLifetime = 0;
}
- detach();
}