summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp31
1 files changed, 21 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 0b1e744e25..8310980800 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,7 @@
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/framing/all_method_bodies.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -59,24 +60,32 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (!ignoring) {
- if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- session->handle(f);
- } else {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
- }
+ if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
+ return;
+ } else if (session.get()) {
+ session->handle(f);
+ } else if (!ignoring) {
+ throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached"));
}
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
+ }catch(const SessionException& e){
+ //execution.exception will have been sent already
+ ignoring = true;
+ //peerSession.requestTimeout(0);
+ session->setTimeout(0);
+ peerSession.detach(name);
+ localSuspend();
}catch(const std::exception& e){
connection.close(
framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
}
}
+bool SessionHandler::isValid(AMQMethodBody* m) {
+ return session.get() || m->isA<SessionAttachBody>();
+}
+
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
@@ -112,12 +121,14 @@ ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
//new methods:
-void SessionHandler::attach(const std::string& name, bool /*force*/)
+void SessionHandler::attach(const std::string& _name, bool /*force*/)
{
//TODO: need to revise session manager to support resume as well
assertClosed("attach");
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, 0));
+ name = _name;//TODO: this should be used in conjunction with
+ //userid for connection as sessions identity
session.reset(state.release());
peerSession.attached(name);
peerSession.commandPoint(session->nextOut, 0);