diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 63 |
1 files changed, 39 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dada7567f9..d7089424a5 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,11 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), + inLastHandler(*this), + outLastHandler(*this), + inChain(inLastHandler), + outChain(outLastHandler) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -102,20 +106,20 @@ void SessionState::detach() { handler = 0; if (mgmtObject.get() != 0) mgmtObject->set_attached (0); - } +} void SessionState::attach(SessionHandler& h) { // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": attached on broker."); - handler = &h; - if (mgmtObject.get() != 0) - { - mgmtObject->set_attached (1); - mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); - } + handler = &h; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); } +} void SessionState::activateOutput() { // activateOutput can be called in a different thread, lock to protect attached status @@ -137,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, switch (methodId) { - case management::Session::METHOD_DETACH : + case management::Session::METHOD_DETACH : if (handler != 0) { handler->sendDetach(); @@ -145,18 +149,18 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; - case management::Session::METHOD_CLOSE : + case management::Session::METHOD_CLOSE : /* - if (handler != 0) - { - handler->getConnection().closeChannel(handler->getChannel()); - } - status = Manageable::STATUS_OK; - break; + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; */ - case management::Session::METHOD_SOLICITACK : - case management::Session::METHOD_RESETLIFESPAN : + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : status = Manageable::STATUS_NOT_IMPLEMENTED; break; } @@ -218,10 +222,12 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) receiverCompleted(msg->getCommandId()); if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); - } +} -void SessionState::handle(AMQFrame& frame) -{ +void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } + +void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether @@ -252,6 +258,11 @@ void SessionState::handle(AMQFrame& frame) } } +void SessionState::handleOutLast(AMQFrame& frame) { + assert(handler); + handler->out(frame); +} + DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); @@ -267,7 +278,7 @@ void SessionState::sendCompletion() { handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++) - semanticState.completed(i->first(), i->last()); + semanticState.completed(i->first(), i->last()); } void SessionState::readyToSend() { @@ -280,4 +291,8 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } + +framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } + }} // namespace qpid::broker |