diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 208 |
1 files changed, 185 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 1ca7b6dfc1..11f3e84b70 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -60,9 +60,9 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), - rateFlowcontrol(0) + rateFlowcontrol(0), + scheduledCompleterContext(new ScheduledCompleterContext(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -101,6 +101,26 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); + + // clean up any outstanding incomplete commands + { + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); + copy.erase(copy.begin()); + { + // note: need to drop lock, as callback may attempt to take it. + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); + ref->cancel(); + } + } + } + + // At this point, we are guaranteed no further completion callbacks will be + // made. Cancel any outstanding scheduledCompleter calls... + scheduledCompleterContext->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -202,15 +222,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, } void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { + currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + if (currentCommandComplete) receiverCompleted(id); + if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { - incomplete.process(enqueuedOp, true); + + if (method->isSync() && currentCommandComplete) { sendAcceptAndCompletion(); } } @@ -254,22 +276,13 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); + + boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); + msg->setIngressCompletion( ac ); + ac->begin(); semanticState.handle(msg); msgBuilder.end(); - - if (msg->isEnqueueComplete()) { - enqueued(msg); - } else { - incomplete.add(msg); - } - - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { - incomplete.process(enqueuedOp, true); - sendAcceptAndCompletion(); - } else { - incomplete.process(enqueuedOp, false); - } + ac->end(); // allows msg to complete xfer } // Handle producer session flow control @@ -319,11 +332,38 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +/** Invoked when the given inbound message is finished being processed + * by all interested parties (eg. it is done being enqueued to all queues, + * its credit has been accounted for, etc). At this point, msg is considered + * by this receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeRcvMsg(SequenceNumber id, + bool requiresAccept, + bool requiresSync) { - receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - accepted.add(msg->getCommandId()); + bool callSendCompletion = false; + receiverCompleted(id); + if (requiresAccept) + // will cause msg's seq to appear in the next message.accept we send. + accepted.add(id); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this msg? If so, complete them. + while (!pendingExecutionSyncs.empty() && + receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { + const SequenceNumber id = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); + receiverCompleted(id); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (requiresSync) { + sendAcceptAndCompletion(); + } else if (callSendCompletion) { + sendCompletion(); + } } void SessionState::handleIn(AMQFrame& frame) { @@ -396,4 +436,126 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +void SessionState::addPendingExecutionSync() +{ + SequenceNumber syncCommandId = receiverGetCurrent(); + if (receiverGetIncomplete().front() < syncCommandId) { + currentCommandComplete = false; + pendingExecutionSyncs.push(syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + } +} + + +/** factory for creating IncompleteIngressMsgXfer objects which + * can be references from Messages as ingress AsyncCompletion objects. + */ +boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> +SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) +{ + SequenceNumber id = msg->getCommandId(); + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + incompleteCmds[id] = cmd; + return cmd; +} + + +/** Invoked by the asynchronous completer associated with + * a received msg that is pending Completion. May be invoked + * by the SessionState directly (sync == true), or some external + * entity (!sync). + */ +void SessionState::IncompleteIngressMsgXfer::completed(bool sync) +{ + if (!sync) { + /** note well: this path may execute in any thread. It is safe to access + * the session, as the SessionState destructor will cancel all outstanding + * callbacks before getting destroyed (so we'll never get here). + */ + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); + if (session->scheduledCompleterContext->scheduleCompletion(id)) + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledCompleterContext)); + } else { // command is being completed in IO thread. + // this path runs only on the IO thread. + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + cmd = session->incompleteCmds.find(id); + if (cmd != session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + session->incompleteCmds.erase(cmd); + + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); + session->completeRcvMsg(id, requiresAccept, requiresSync); + return; + } + } + } +} + + +/** Scheduled from incomplete command's completed callback, safely completes all + * completed commands in the IO Thread. Guaranteed not to be running at the same + * time as the message receive code. + */ +void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) +{ + ctxt->completeCommands(); +} + + +/** mark a command (sequence) as completed, return True if caller should + * schedule a call to completeCommands() + */ +bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + completedCmds.push_back(cmd); + return (completedCmds.size() == 1); +} + + +/** Cause the session to complete all completed commands */ +void SessionState::ScheduledCompleterContext::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (!session) return; + + while (!completedCmds.empty()) { + SequenceNumber id = completedCmds.front(); + completedCmds.pop_front(); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); + + cmd = session->incompleteCmds.find(id); + if (cmd !=session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + { + qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); + tmp->do_completion(); // retakes incompleteCmdslock + } + } + } + } +} + + +/** cancel any pending calls to scheduleComplete */ +void SessionState::ScheduledCompleterContext::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + session = 0; +} + }} // namespace qpid::broker |