summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp56
1 files changed, 51 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index eca1883bd9..957d5bd4d2 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -95,14 +95,13 @@ void SessionState::addManagementObject() {
}
SessionState::~SessionState() {
+ asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
if (flowControlTimer)
flowControlTimer->cancel();
-
- asyncCommandCompleter->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -428,6 +427,7 @@ void SessionState::addPendingExecutionSync()
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
+ asyncCommandCompleter->flushPendingMessages();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
@@ -440,6 +440,17 @@ boost::intrusive_ptr<AsyncCompletion::Callback>
SessionState::IncompleteIngressMsgXfer::clone()
{
boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+
+ // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
+ // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
+ if (requiresSync)
+ msg->flush();
+ else {
+ // otherwise, we need to track this message in order to flush it if an execution.sync arrives
+ // before it has been completed (see flushPendingMessages())
+ pending = true;
+ completerContext->addPendingMessage(msg);
+ }
return cb;
}
@@ -450,17 +461,18 @@ SessionState::IncompleteIngressMsgXfer::clone()
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
+ if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
* the scheduledCompleterContext, since *this has a shared pointer to it.
- * but not session or msg!
+ * but not session!
*/
- session = 0; msg = 0;
+ session = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
- // so *session and *msg are definately valid.
+ // so *session is definately valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
session->completeRcvMsg(id, requiresAccept, requiresSync);
@@ -479,6 +491,40 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCom
}
+/** Track an ingress message that is pending completion */
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+ bool unique = pendingMsgs.insert(item).second;
+ assert(unique);
+}
+
+
+/** pending message has completed */
+void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.erase(id);
+}
+
+
+/** done when an execution.sync arrives */
+void SessionState::AsyncCommandCompleter::flushPendingMessages()
+{
+ std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
+ }
+ // drop lock, so it is safe to call "flush()"
+ for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ i != copy.end(); ++i) {
+ i->second->flush();
+ }
+}
+
+
/** mark an ingress Message.Transfer command as completed.
* This method must be thread safe - it may run on any thread.
*/