diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 151 |
1 files changed, 134 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 6f02399795..ca98ee1437 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -59,7 +59,6 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), rateFlowcontrol(0) { @@ -94,6 +93,18 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); + + // clean up any outstanding incomplete receive messages + + qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); + while (!incompleteRcvMsgs.empty()) { + boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front()); + incompleteRcvMsgs.pop_front(); + { + qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock); + ref->cancel(); + } + } } AMQP_ClientProxy& SessionState::getProxy() { @@ -195,15 +206,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, } void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { + currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + if (currentCommandComplete) receiverCompleted(id); + if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { - incomplete.process(enqueuedOp, true); + + if (method->isSync() && currentCommandComplete) { sendAcceptAndCompletion(); } } @@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); + + msg->getReceiveCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - - if (msg->isEnqueueComplete()) { - enqueued(msg); - } else { - incomplete.add(msg); - } - - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { - incomplete.process(enqueuedOp, true); - sendAcceptAndCompletion(); + if (msg->getReceiveCompletion().getPendingCompleters() == 1) { + // There are no other pending receive completers (just this SessionState). + // Mark the message as completed. + completeRcvMsg( msg ); } else { - incomplete.process(enqueuedOp, false); + // There are outstanding receive completers. Save the message until + // they are all done. + QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId()); + boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg)); + { + qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); + incompleteRcvMsgs.push_back(pendingMsg); + } + msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete } } @@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +/** Invoked when the given inbound message is finished being processed + * by all interested parties (eg. it is done being enqueued to all queues, + * its credit has been accounted for, etc). At this point, msg is considered + * by this receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg) { + bool callSendCompletion = false; receiverCompleted(msg->getCommandId()); if (msg->requiresAccept()) + // will cause msg's seq to appear in the next message.accept we send. accepted.add(msg->getCommandId()); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this msg? If so, complete them. + while (!pendingExecutionSyncs.empty() && + receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { + const SequenceNumber& id = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); + receiverCompleted(id); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (msg->getFrames().getMethod()->isSync()) { + sendAcceptAndCompletion(); + } else if (callSendCompletion) { + sendCompletion(); + } } void SessionState::handleIn(AMQFrame& frame) { @@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +void SessionState::addPendingExecutionSync() +{ + SequenceNumber syncCommandId = receiverGetCurrent(); + if (receiverGetIncomplete().front() < syncCommandId) { + currentCommandComplete = false; + pendingExecutionSyncs.push(syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + } +} + + +/** Invoked by the asynchronous completer associated with + * a received msg that is pending Completion. May be invoked + * by the SessionState directly (sync == true), or some external + * entity (!sync). + */ +void SessionState::IncompleteRcvMsg::operator() (bool sync) +{ + QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync); + boost::shared_ptr<IncompleteRcvMsg> tmp; + { + qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock); + for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin(); + i != session->incompleteRcvMsgs.end(); ++i) { + if (i->get() == this) { + tmp.swap(*i); + session->incompleteRcvMsgs.remove(*i); + break; + } + } + } + + if (session->isAttached()) { + if (sync) { + QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); + session->completeRcvMsg(msg); + } else { // potentially called from a different thread + QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); + session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp)); + } + } +} + + +/** Scheduled from IncompleteRcvMsg callback, completes the message receive + * asynchronously + */ +void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg) +{ + QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); + if (iMsg->session && iMsg->session->isAttached()) { + QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); + iMsg->session->completeRcvMsg(iMsg->msg); + } +} + + +/** Cancels a pending incomplete receive message completion callback. Note + * well: will wait for the callback to finish if it is currently in progress + * on another thread. + */ +void SessionState::IncompleteRcvMsg::cancel() +{ + QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId()); + // Cancel the message complete callback. On return, we are guaranteed there + // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync) + msg->getReceiveCompletion().cancel(); + // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending, + // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg. + session = 0; +} + }} // namespace qpid::broker |