summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp18
1 files changed, 11 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index b96d7b5e3f..50938de8ac 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -22,7 +22,6 @@
#include "Broker.h"
#include "ConnectionState.h"
#include "MessageDelivery.h"
-#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
#include "qpid/framing/AMQContentBody.h"
@@ -30,6 +29,7 @@
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -50,6 +50,7 @@ SessionState::SessionState(
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
@@ -154,7 +155,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->detach();
+ handler->requestDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -188,7 +189,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
- getProxy().getExecution010().result(id, invocation.getResult());
+ getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
@@ -242,12 +243,13 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
completed.add(msg->getCommandId());
if (msg->requiresAccept()) {
nextOut++;//accept is a command, so the counter must be incremented
- getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
}
void SessionState::handle(AMQFrame& frame)
{
+ if (ignoring) return;
received(frame);
SequenceNumber commandId;
@@ -271,11 +273,13 @@ void SessionState::handle(AMQFrame& frame)
AMQMethodBody* m = frame.getMethod();
if (m) {
- getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
} else {
- getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- throw e;
+ timeout = 0;
+ ignoring = true;
+ handler->requestDetach();
}
}