diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 168 |
1 files changed, 102 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 957d5bd4d2..d84256b61b 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - asyncCommandCompleter(new AsyncCommandCompleter(this)) + asyncCommandManager(new AsyncCommandManager(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,7 +95,7 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { - asyncCommandCompleter->cancel(); + asyncCommandManager->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -126,7 +126,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); - asyncCommandCompleter->detached(); + asyncCommandManager->detached(); disableOutput(); handler = 0; if (mgmtObject != 0) @@ -147,7 +147,7 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } - asyncCommandCompleter->attached(); + asyncCommandManager->attached(); } void SessionState::abort() { @@ -204,22 +204,22 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) +{ currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). + syncCurrentCommand = method->isSync(); + acceptRequired = false; Invoker::Result invocation = invoke(adapter, *method); - if (currentCommandComplete) receiverCompleted(id); - if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); - } else if (invocation.hasResult()) { - getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync() && currentCommandComplete) { - sendAcceptAndCompletion(); + if (currentCommandComplete) { + completeCommand(id, invocation, false, syncCurrentCommand); } } + struct ScheduledCreditTask : public sys::TimerTask { sys::Timer& timer; SessionState& sessionState; @@ -260,6 +260,9 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) } msg->setPublisher(&getConnection()); msg->getIngressCompletion().begin(); + currentCommandComplete = true; // assumed + syncCurrentCommand = msg->getFrames().getMethod()->isSync(); + acceptRequired = msg->requiresAccept(); semanticState.handle(msg); msgBuilder.end(); IncompleteIngressMsgXfer xfer(this, msg); @@ -313,17 +316,19 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -/** Invoked when the given inbound message is finished being processed - * by all interested parties (eg. it is done being enqueued to all queues, - * its credit has been accounted for, etc). At this point, msg is considered - * by this receiver as 'completed' (as defined by AMQP 0_10) - */ -void SessionState::completeRcvMsg(SequenceNumber id, - bool requiresAccept, - bool requiresSync) +/** Complete a received command */ +void SessionState::completeCommand(const SequenceNumber& id, + const framing::Invoker::Result& results, + bool requiresAccept, + bool syncBitSet) { bool callSendCompletion = false; receiverCompleted(id); + + if (results.hasResult()) { + getProxy().getExecution().result(id, results.getResult()); + } + if (requiresAccept) // will cause msg's seq to appear in the next message.accept we send. accepted.add(id); @@ -340,7 +345,7 @@ void SessionState::completeRcvMsg(SequenceNumber id, } // if the sender has requested immediate notification of the completion... - if (requiresSync) { + if (syncBitSet) { sendAcceptAndCompletion(); } else if (callSendCompletion) { sendCompletion(); @@ -427,12 +432,25 @@ void SessionState::addPendingExecutionSync() if (receiverGetIncomplete().front() < syncCommandId) { currentCommandComplete = false; pendingExecutionSyncs.push(syncCommandId); - asyncCommandCompleter->flushPendingMessages(); + asyncCommandManager->flushPendingCommands(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); } } +void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd) +{ + /** @todo KAG: ensure this is invoked during handleCommand() context! */ + currentCommandComplete = false; + asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand ); +} + + +void SessionState::cancelAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd) +{ + asyncCommandManager->cancelPendingCommand(aCmd); +} + /** factory for creating a reference-counted IncompleteIngressMsgXfer object * which will be attached to a message that will be completed asynchronously. */ @@ -441,15 +459,14 @@ SessionState::IncompleteIngressMsgXfer::clone() { boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); - // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. - // If the client is pending the message.transfer completion, flush now to force immediate write to journal. - if (requiresSync) + // this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed() + // will be invoked directly. + pending = true; + boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb)); + session->registerAsyncCommand(ctxt); + if (ctxt->getSyncBitSet()) { + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. msg->flush(); - else { - // otherwise, we need to track this message in order to flush it if an execution.sync arrives - // before it has been completed (see flushPendingMessages()) - pending = true; - completerContext->addPendingMessage(msg); } return cb; } @@ -461,110 +478,129 @@ SessionState::IncompleteIngressMsgXfer::clone() */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access * the scheduledCompleterContext, since *this has a shared pointer to it. * but not session! */ session = 0; - QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId()); + completed(framing::Invoker::Result()); } else { // this path runs directly from the ac->end() call in handleContent() above, // so *session is definately valid. if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << id); - session->completeRcvMsg(id, requiresAccept, requiresSync); + QPID_LOG(debug, ": receive completed for msg seq=" << getId()); + session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet()); + } + if (pending) { + boost::intrusive_ptr<AsyncCommandContext> p(this); + session->cancelAsyncCommand(p); } } - completerContext = boost::intrusive_ptr<AsyncCommandCompleter>(); +} + + +void SessionState::IncompleteIngressMsgXfer::flush() +{ + msg->flush(); } /** Scheduled from an asynchronous command's completed callback to run on * the IO thread. */ -void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) +void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncCommandManager> ctxt) { - ctxt->completeCommands(); + ctxt->processCompletedCommands(); } -/** Track an ingress message that is pending completion */ -void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) +void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, + framing::SequenceNumber seq, + bool acceptRequired, bool syncBitSet) { + cmd->setId(seq); + cmd->setRequiresAccept(acceptRequired); + cmd->setSyncBitSet(syncBitSet); + cmd->setManager(this); qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); - bool unique = pendingMsgs.insert(item).second; - assert(unique); + std::pair<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > item(cmd->getId(), cmd); + bool unique = pendingCommands.insert(item).second; + if (!unique) assert(false); } -/** pending message has completed */ -void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.erase(id); + pendingCommands.erase(cmd->getId()); + cmd->setManager(0); } + /** done when an execution.sync arrives */ -void SessionState::AsyncCommandCompleter::flushPendingMessages() +void SessionState::AsyncCommandManager::flushPendingCommands() { - std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; + std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > copy; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + copy = pendingCommands; } // drop lock, so it is safe to call "flush()" - for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); + for (std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> >::iterator i = copy.begin(); i != copy.end(); ++i) { i->second->flush(); } } -/** mark an ingress Message.Transfer command as completed. +/** mark a pending command as completed. * This method must be thread safe - it may run on any thread. */ -void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync) +void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, + const framing::Invoker::Result& result) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - if (session && isAttached) { - MessageInfo msg(cmd, requiresAccept, requiresSync); - completedMsgs.push_back(msg); - if (completedMsgs.size() == 1) { + CommandInfo status(cmd->getId(), + result, + cmd->getRequiresAccept(), + cmd->getSyncBitSet()); + completedCommands.push_back(status); + if (completedCommands.size() == 1) { session->getConnection().requestIOProcessing(boost::bind(&schedule, - session->asyncCommandCompleter)); + session->asyncCommandManager)); } } + pendingCommands.erase(cmd->getId()); } /** Cause the session to complete all completed commands. * Executes on the IO thread. */ -void SessionState::AsyncCommandCompleter::completeCommands() +void SessionState::AsyncCommandManager::processCompletedCommands() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); // when session is destroyed, it clears the session pointer via cancel(). if (session && session->isAttached()) { - for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); - msg != completedMsgs.end(); ++msg) { - session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); + for (std::vector<CommandInfo>::iterator cmd = completedCommands.begin(); + cmd != completedCommands.end(); ++cmd) { + session->completeCommand(cmd->id, + cmd->results, + cmd->requiresAccept, + cmd->syncBitSet); } } - completedMsgs.clear(); + completedCommands.clear(); } /** cancel any pending calls to scheduleComplete */ -void SessionState::AsyncCommandCompleter::cancel() +void SessionState::AsyncCommandManager::cancel() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); session = 0; @@ -573,7 +609,7 @@ void SessionState::AsyncCommandCompleter::cancel() /** inform the completer that the session has attached, * allows command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::attached() +void SessionState::AsyncCommandManager::attached() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); isAttached = true; @@ -582,7 +618,7 @@ void SessionState::AsyncCommandCompleter::attached() /** inform the completer that the session has detached, * disables command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::detached() +void SessionState::AsyncCommandManager::detached() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); isAttached = false; |