diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 56 |
1 files changed, 51 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index eca1883bd9..957d5bd4d2 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -95,14 +95,13 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { + asyncCommandCompleter->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); if (flowControlTimer) flowControlTimer->cancel(); - - asyncCommandCompleter->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -428,6 +427,7 @@ void SessionState::addPendingExecutionSync() if (receiverGetIncomplete().front() < syncCommandId) { currentCommandComplete = false; pendingExecutionSyncs.push(syncCommandId); + asyncCommandCompleter->flushPendingMessages(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); } } @@ -440,6 +440,17 @@ boost::intrusive_ptr<AsyncCompletion::Callback> SessionState::IncompleteIngressMsgXfer::clone() { boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); + + // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. + if (requiresSync) + msg->flush(); + else { + // otherwise, we need to track this message in order to flush it if an execution.sync arrives + // before it has been completed (see flushPendingMessages()) + pending = true; + completerContext->addPendingMessage(msg); + } return cb; } @@ -450,17 +461,18 @@ SessionState::IncompleteIngressMsgXfer::clone() */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { + if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access * the scheduledCompleterContext, since *this has a shared pointer to it. - * but not session or msg! + * but not session! */ - session = 0; msg = 0; + session = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); } else { // this path runs directly from the ac->end() call in handleContent() above, - // so *session and *msg are definately valid. + // so *session is definately valid. if (session->isAttached()) { QPID_LOG(debug, ": receive completed for msg seq=" << id); session->completeRcvMsg(id, requiresAccept, requiresSync); @@ -479,6 +491,40 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCom } +/** Track an ingress message that is pending completion */ +void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); + bool unique = pendingMsgs.insert(item).second; + assert(unique); +} + + +/** pending message has completed */ +void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.erase(id); +} + + +/** done when an execution.sync arrives */ +void SessionState::AsyncCommandCompleter::flushPendingMessages() +{ + std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + } + // drop lock, so it is safe to call "flush()" + for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); + i != copy.end(); ++i) { + i->second->flush(); + } +} + + /** mark an ingress Message.Transfer command as completed. * This method must be thread safe - it may run on any thread. */ |