From fd743d3aac6390aeb367e5601317a4397f682ce7 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 11 May 2011 13:02:32 +0000 Subject: QPID-3252: flush msgs when sync requested. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1101864 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/SessionState.cpp | 56 ++++++++++++++++++++++++++++--- qpid/cpp/src/qpid/broker/SessionState.h | 24 +++++++++---- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index eca1883bd9..957d5bd4d2 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -95,14 +95,13 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { + asyncCommandCompleter->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); if (flowControlTimer) flowControlTimer->cancel(); - - asyncCommandCompleter->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -428,6 +427,7 @@ void SessionState::addPendingExecutionSync() if (receiverGetIncomplete().front() < syncCommandId) { currentCommandComplete = false; pendingExecutionSyncs.push(syncCommandId); + asyncCommandCompleter->flushPendingMessages(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); } } @@ -440,6 +440,17 @@ boost::intrusive_ptr SessionState::IncompleteIngressMsgXfer::clone() { boost::intrusive_ptr cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); + + // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. + if (requiresSync) + msg->flush(); + else { + // otherwise, we need to track this message in order to flush it if an execution.sync arrives + // before it has been completed (see flushPendingMessages()) + pending = true; + completerContext->addPendingMessage(msg); + } return cb; } @@ -450,17 +461,18 @@ SessionState::IncompleteIngressMsgXfer::clone() */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { + if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access * the scheduledCompleterContext, since *this has a shared pointer to it. - * but not session or msg! + * but not session! */ - session = 0; msg = 0; + session = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); } else { // this path runs directly from the ac->end() call in handleContent() above, - // so *session and *msg are definately valid. + // so *session is definately valid. if (session->isAttached()) { QPID_LOG(debug, ": receive completed for msg seq=" << id); session->completeRcvMsg(id, requiresAccept, requiresSync); @@ -479,6 +491,40 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr msg) +{ + qpid::sys::ScopedLock l(completerLock); + std::pair > item(msg->getCommandId(), msg); + bool unique = pendingMsgs.insert(item).second; + assert(unique); +} + + +/** pending message has completed */ +void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +{ + qpid::sys::ScopedLock l(completerLock); + pendingMsgs.erase(id); +} + + +/** done when an execution.sync arrives */ +void SessionState::AsyncCommandCompleter::flushPendingMessages() +{ + std::map > copy; + { + qpid::sys::ScopedLock l(completerLock); + pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + } + // drop lock, so it is safe to call "flush()" + for (std::map >::iterator i = copy.begin(); + i != copy.end(); ++i) { + i->second->flush(); + } +} + + /** mark an ingress Message.Transfer command as completed. * This method must be thread safe - it may run on any thread. */ diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index e847b3fa04..b43df0c0aa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -199,6 +199,10 @@ class SessionState : public qpid::SessionState, : cmd(c), requiresAccept(a), requiresSync(s) {} }; std::vector completedMsgs; + // If an ingress message does not require a Sync, we need to + // hold a reference to it in case an Execution.Sync command is received and we + // have to manually flush the message. + std::map > pendingMsgs; /** complete all pending commands, runs in IO thread */ void completeCommands(); @@ -210,7 +214,11 @@ class SessionState : public qpid::SessionState, AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {}; ~AsyncCommandCompleter() {}; - /** schedule the completion of an ingress message.transfer command */ + /** track a message pending ingress completion */ + void addPendingMessage(boost::intrusive_ptr m); + void deletePendingMessage(SequenceNumber id); + void flushPendingMessages(); + /** schedule the processing of a completed ingress message.transfer command */ void scheduleMsgCompletion(SequenceNumber cmd, bool requiresAccept, bool requiresSync); @@ -243,20 +251,22 @@ class SessionState : public qpid::SessionState, IncompleteIngressMsgXfer( SessionState *ss, boost::intrusive_ptr m ) : AsyncCommandContext(ss, m->getCommandId()), - session(ss), - msg(m.get()), - requiresAccept(msg->requiresAccept()), - requiresSync(msg->getFrames().getMethod()->isSync()) {}; + session(ss), + msg(m), + requiresAccept(m->requiresAccept()), + requiresSync(m->getFrames().getMethod()->isSync()), + pending(false) {} virtual ~IncompleteIngressMsgXfer() {}; virtual void completed(bool); virtual boost::intrusive_ptr clone(); private: - SessionState *session; // only valid if sync == true - Message *msg; // only valid if sync == true + SessionState *session; // only valid if sync flag in callback is true + boost::intrusive_ptr msg; bool requiresAccept; bool requiresSync; + bool pending; // true if msg saved on pending list... }; friend class SessionManager; -- cgit v1.2.1