diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index b96d7b5e3f..50938de8ac 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -22,7 +22,6 @@ #include "Broker.h" #include "ConnectionState.h" #include "MessageDelivery.h" -#include "SemanticHandler.h" #include "SessionManager.h" #include "SessionHandler.h" #include "qpid/framing/AMQContentBody.h" @@ -30,6 +29,7 @@ #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -50,6 +50,7 @@ SessionState::SessionState( factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), + ignoring(false), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), @@ -154,7 +155,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->detach(); + handler->requestDetach(); } status = Manageable::STATUS_OK; break; @@ -188,7 +189,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { nextOut++;//execution result is now a command, so the counter must be incremented - getProxy().getExecution010().result(id, invocation.getResult()); + getProxy().getExecution().result(id, invocation.getResult()); } if (method->isSync()) { incomplete.process(enqueuedOp, true); @@ -242,12 +243,13 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) completed.add(msg->getCommandId()); if (msg->requiresAccept()) { nextOut++;//accept is a command, so the counter must be incremented - getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } } void SessionState::handle(AMQFrame& frame) { + if (ignoring) return; received(frame); SequenceNumber commandId; @@ -271,11 +273,13 @@ void SessionState::handle(AMQFrame& frame) AMQMethodBody* m = frame.getMethod(); if (m) { - getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); + getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); } else { - getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); + getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - throw e; + timeout = 0; + ignoring = true; + handler->requestDetach(); } } |