diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 128 |
1 files changed, 113 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index b6c59cfb3b..573a567da6 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -19,12 +19,16 @@ * */ #include "SessionState.h" -#include "SessionManager.h" -#include "SessionContext.h" -#include "ConnectionState.h" #include "Broker.h" +#include "ConnectionState.h" +#include "MessageDelivery.h" #include "SemanticHandler.h" +#include "SessionManager.h" +#include "SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" + +#include <boost/bind.hpp> namespace qpid { namespace broker { @@ -37,17 +41,17 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) + semanticState(*this, *this), + adapter(semanticState), + msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) { - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.addOutputTask(&semanticState); Manageable* parent = broker.GetVhostObject (); @@ -76,7 +80,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionContext* SessionState::getHandler() { +SessionHandler* SessionState::getHandler() { return handler; } @@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() { } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; + handler = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); } } -void SessionState::attach(SessionContext& h) { +void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; - out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); @@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + h.getConnection().outputTasks.addOutputTask(&semanticState); } void SessionState::activateOutput() @@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } +void SessionState::handleCommand(framing::AMQMethodBody* method) +{ + SequenceNumber id = incoming.next(); + Invoker::Result invocation = invoke(adapter, *method); + incoming.complete(id); + + if (!invocation.wasHandled()) { + throw NotImplementedException("Not implemented"); + } else if (invocation.hasResult()) { + getProxy().getExecution().result(id.getValue(), invocation.getResult()); + } + if (method->isSync()) { + incoming.sync(id); + sendCompletion(); + } + //TODO: if window gets too large send unsolicited completion +} + +void SessionState::handleContent(AMQFrame& frame) +{ + intrusive_ptr<Message> msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(incoming.next()); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&getConnection()); + semanticState.handle(msg); + msgBuilder.end(); + incoming.track(msg); + if (msg->getFrames().getMethod()->isSync()) { + incoming.sync(msg->getCommandId()); + sendCompletion(); + } + } +} + +void SessionState::handle(AMQFrame& frame) +{ + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assmblies are non-content bearing and all content-bearing + //assmeblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + handleContent(frame); + } + +} + +DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); + return outgoing.hwm; +} + +void SessionState::sendCompletion() +{ + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); + getProxy().getExecution().complete(mark.getValue(), range); +} + +void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + semanticState.ackCumulative(mark.getValue()); + } + range.processRanges(ackOp); +} + +void SessionState::flush() +{ + incoming.flush(); + sendCompletion(); +} + +void SessionState::sync() +{ + incoming.sync(); + sendCompletion(); +} + +void SessionState::noop() +{ + incoming.noop(); +} + }} // namespace qpid::broker |