summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-05-11 13:02:32 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-05-11 13:02:32 +0000
commitfd743d3aac6390aeb367e5601317a4397f682ce7 (patch)
treed5383acbb4e24938376b76dc012a813534b04b41
parent0b6bc92b011c5cb809848da40a67b85c76b9594a (diff)
downloadqpid-python-fd743d3aac6390aeb367e5601317a4397f682ce7.tar.gz
QPID-3252: flush msgs when sync requested.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1101864 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp56
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h24
2 files changed, 68 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index eca1883bd9..957d5bd4d2 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -95,14 +95,13 @@ void SessionState::addManagementObject() {
}
SessionState::~SessionState() {
+ asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
if (flowControlTimer)
flowControlTimer->cancel();
-
- asyncCommandCompleter->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -428,6 +427,7 @@ void SessionState::addPendingExecutionSync()
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
+ asyncCommandCompleter->flushPendingMessages();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
@@ -440,6 +440,17 @@ boost::intrusive_ptr<AsyncCompletion::Callback>
SessionState::IncompleteIngressMsgXfer::clone()
{
boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+
+ // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
+ // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
+ if (requiresSync)
+ msg->flush();
+ else {
+ // otherwise, we need to track this message in order to flush it if an execution.sync arrives
+ // before it has been completed (see flushPendingMessages())
+ pending = true;
+ completerContext->addPendingMessage(msg);
+ }
return cb;
}
@@ -450,17 +461,18 @@ SessionState::IncompleteIngressMsgXfer::clone()
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
+ if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
* the scheduledCompleterContext, since *this has a shared pointer to it.
- * but not session or msg!
+ * but not session!
*/
- session = 0; msg = 0;
+ session = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
- // so *session and *msg are definately valid.
+ // so *session is definately valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
session->completeRcvMsg(id, requiresAccept, requiresSync);
@@ -479,6 +491,40 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCom
}
+/** Track an ingress message that is pending completion */
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+ bool unique = pendingMsgs.insert(item).second;
+ assert(unique);
+}
+
+
+/** pending message has completed */
+void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.erase(id);
+}
+
+
+/** done when an execution.sync arrives */
+void SessionState::AsyncCommandCompleter::flushPendingMessages()
+{
+ std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
+ }
+ // drop lock, so it is safe to call "flush()"
+ for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ i != copy.end(); ++i) {
+ i->second->flush();
+ }
+}
+
+
/** mark an ingress Message.Transfer command as completed.
* This method must be thread safe - it may run on any thread.
*/
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index e847b3fa04..b43df0c0aa 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -199,6 +199,10 @@ class SessionState : public qpid::SessionState,
: cmd(c), requiresAccept(a), requiresSync(s) {}
};
std::vector<MessageInfo> completedMsgs;
+ // If an ingress message does not require a Sync, we need to
+ // hold a reference to it in case an Execution.Sync command is received and we
+ // have to manually flush the message.
+ std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
/** complete all pending commands, runs in IO thread */
void completeCommands();
@@ -210,7 +214,11 @@ class SessionState : public qpid::SessionState,
AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandCompleter() {};
- /** schedule the completion of an ingress message.transfer command */
+ /** track a message pending ingress completion */
+ void addPendingMessage(boost::intrusive_ptr<Message> m);
+ void deletePendingMessage(SequenceNumber id);
+ void flushPendingMessages();
+ /** schedule the processing of a completed ingress message.transfer command */
void scheduleMsgCompletion(SequenceNumber cmd,
bool requiresAccept,
bool requiresSync);
@@ -243,20 +251,22 @@ class SessionState : public qpid::SessionState,
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<Message> m )
: AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m.get()),
- requiresAccept(msg->requiresAccept()),
- requiresSync(msg->getFrames().getMethod()->isSync()) {};
+ session(ss),
+ msg(m),
+ requiresAccept(m->requiresAccept()),
+ requiresSync(m->getFrames().getMethod()->isSync()),
+ pending(false) {}
virtual ~IncompleteIngressMsgXfer() {};
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
- SessionState *session; // only valid if sync == true
- Message *msg; // only valid if sync == true
+ SessionState *session; // only valid if sync flag in callback is true
+ boost::intrusive_ptr<Message> msg;
bool requiresAccept;
bool requiresSync;
+ bool pending; // true if msg saved on pending list...
};
friend class SessionManager;