summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-01-24 20:45:55 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-01-24 20:45:55 +0000
commite65c4741c7e80a3e2b0cf3f9ccff080312191785 (patch)
treeebea5b487a7d7ea80eb90e477acc3c6ebcd1be52
parent4d1cd18372191f905d7e3f7b373a0fe87433b856 (diff)
downloadqpid-python-e65c4741c7e80a3e2b0cf3f9ccff080312191785.tar.gz
QPID-2921: (partial) Async completion of message.transfer and execution.sync commands
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1062965 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h171
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h6
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h29
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp151
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h32
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp4
13 files changed, 367 insertions, 103 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 9f97b94b8a..739424783a 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -548,8 +548,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HandlerImpl.h \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
- qpid/broker/IncompleteMessageList.cpp \
- qpid/broker/IncompleteMessageList.h \
+ qpid/broker/AsyncCompletion.h \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
qpid/broker/LinkRegistry.cpp \
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
new file mode 100644
index 0000000000..382e2f7565
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -0,0 +1,171 @@
+#ifndef _Completion_
+#define _Completion_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * Class to implement asynchronous notification of completion.
+ *
+ * Use-case: An "initiator" needs to wait for a set of "completers" to
+ * finish a unit of work before an action can occur. This object
+ * tracks the progress of the set of completers, and allows the action
+ * to occur once all completers have signalled that they are done.
+ *
+ * The initiator and completers may be running in separate threads.
+ *
+ * The initiating thread is the thread that initiates the action,
+ * i.e. the connection read thread.
+ *
+ * A completing thread is any thread that contributes to completion,
+ * e.g. a store thread that does an async write.
+ * There may be zero or more completers.
+ *
+ * When the work is complete, a callback is invoked. The callback
+ * may be invoked in the Initiator thread, or one of the Completer
+ * threads. The callback is passed a flag indicating whether or not
+ * the callback is running under the context of the Initiator thread.
+ *
+ * Use model:
+ * 1) Initiator thread invokes begin()
+ * 2) After begin() has been invoked, zero or more Completers invoke
+ * startCompleter(). Completers may be running in the same or
+ * different thread as the Initiator, as long as they guarantee that
+ * startCompleter() is invoked at least once before the Initiator invokes end().
+ * 3) Completers may invoke finishCompleter() at any time, even after the
+ * initiator has invoked end(). finishCompleter() may be called from any
+ * thread.
+ * 4) startCompleter()/finishCompleter() calls "nest": for each call to
+ * startCompleter(), a corresponding call to finishCompleter() must be made.
+ * Once the last finishCompleter() is called, the Completer must no longer
+ * reference the completion object.
+ * 5) The Initiator invokes end() at the point where it has finished
+ * dispatching work to the Completers, and is prepared for the callback
+ * handler to be invoked. Note: if there are no outstanding Completers
+ * pending when the Initiator invokes end(), the callback will be invoked
+ * directly, and the sync parameter will be set true. This indicates to the
+ * Initiator that the callback is executing in the context of the end() call,
+ * and the Initiator is free to optimize the handling of the completion,
+ * assuming no need for synchronization with Completer threads.
+ */
+ class AsyncCompletion {
+ public:
+ // encapsulates the completion callback handler
+ class CompletionHandler {
+ public:
+ virtual void operator() (bool) { /* bool == true if called via end() */}
+ };
+
+ private:
+ mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
+ mutable qpid::sys::Monitor callbackLock;
+ bool inCallback;
+ void invokeCallback(bool sync) {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ inCallback = true;
+ if (handler) {
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ (*handler)(sync);
+ handler.reset();
+ }
+ inCallback = false;
+ callbackLock.notifyAll();
+ }
+
+ protected:
+ /** Invoked when all completers have signalled that they have completed
+ * (via calls to finishCompleter()).
+ */
+ boost::shared_ptr<CompletionHandler> handler;
+
+ public:
+ AsyncCompletion() : completionsNeeded(0), inCallback(false) {};
+ virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() == 0); */ };
+
+ /** True when all outstanding operations have compeleted
+ */
+ bool isDone()
+ {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ return !inCallback && completionsNeeded.get() == 0;
+ }
+
+ /** Called to signal the start of an asynchronous operation. The operation
+ * is considered pending until finishCompleter() is called.
+ * E.g. called when initiating an async store operation.
+ */
+ void startCompleter() { ++completionsNeeded; }
+
+ /** Called by completer to signal that it has finished the operation started
+ * when startCompleter() was invoked.
+ * e.g. called when async write complete.
+ */
+ void finishCompleter()
+ {
+ if (--completionsNeeded == 0) {
+ invokeCallback(false);
+ }
+ }
+
+ /** called by initiator before any calls to startCompleter can be done.
+ */
+ void begin() { startCompleter(); };
+
+ /** called by initiator after all potential completers have called
+ * startCompleter().
+ */
+ //void end(CompletionHandler::shared_ptr& _handler)
+ void end(boost::shared_ptr<CompletionHandler> _handler)
+ {
+ assert(completionsNeeded.get() > 0); // ensure begin() has been called!
+ handler = _handler;
+ if (--completionsNeeded == 0) {
+ invokeCallback(true);
+ }
+ }
+
+ /** may be called by Initiator to cancel the callback registered by end()
+ */
+ void cancel() {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
+ handler.reset();
+ }
+
+ /** may be called by Initiator after all completers have been added but
+ * prior to calling end(). Allows initiator to determine if it _really_
+ * needs to wait for pending Completers (e.g. count > 1).
+ */
+ uint32_t getPendingCompleters() { return completionsNeeded.get(); }
+ };
+
+ }} // qpid::broker::
+#endif /*!_Completion_*/
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 147b9e7a6a..a16180f3ae 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0) {}
+ expiration(FAR_FUTURE), dequeueCallback(0),
+ inCallback(false), requiredCredit(0)
+{}
Message::Message(const Message& original) :
PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
+ expiration(original.expiration), dequeueCallback(0),
+ inCallback(false), requiredCredit(0)
{
setExpiryPolicy(original.expiryPolicy);
}
@@ -431,30 +432,12 @@ struct ScopedSet {
};
}
-void Message::allEnqueuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = enqueueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
void Message::allDequeuesComplete() {
ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = 0;
-}
-
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index ee80657f39..e8a8a19d53 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -156,10 +156,6 @@ public:
boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
- /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
- void setEnqueueCompleteCallback(MessageCallback& cb);
- void resetEnqueueCompleteCallback();
-
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
@@ -170,7 +166,6 @@ public:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
MessageAdapter& getAdapter() const;
- void allEnqueuesComplete();
void allDequeuesComplete();
mutable sys::Mutex lock;
@@ -192,7 +187,6 @@ public:
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
- MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
bool inCallback;
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index e5fbb71cbd..7ba28eb293 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -34,7 +34,6 @@ class MessageStore;
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
- asyncEnqueueCounter(0),
asyncDequeueCounter(0),
store(0)
{}
@@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleased() const
return contentReleaseState.released;
}
-bool PersistableMessage::isEnqueueComplete() {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- return asyncEnqueueCounter == 0;
-}
-
-void PersistableMessage::enqueueComplete() {
- bool notify = false;
- {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- if (asyncEnqueueCounter > 0) {
- if (--asyncEnqueueCounter == 0) {
- notify = true;
- }
- }
- }
- if (notify)
- allEnqueuesComplete();
-}
bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
if (store && (queue->getPersistenceId()!=0)) {
@@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
addToSyncList(queue, _store);
- enqueueAsync();
-}
-
-void PersistableMessage::enqueueAsync() {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- asyncEnqueueCounter++;
+ enqueueStart();
}
bool PersistableMessage::isDequeueComplete() {
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index 96fb922c1a..e22d0ec4b8 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -31,6 +31,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/AsyncCompletion.h"
namespace qpid {
namespace broker {
@@ -43,18 +44,18 @@ class MessageStore;
class PersistableMessage : public Persistable
{
typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Mutex asyncEnqueueLock;
sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
-
+
/**
- * Tracks the number of outstanding asynchronous enqueue
- * operations. When the message is enqueued asynchronously the
- * count is incremented; when that enqueue completes it is
- * decremented. Thus when it is 0, there are no outstanding
- * enqueues.
+ * Tracks the number of outstanding asynchronous operations that must
+ * complete before the message can be considered safely received by the
+ * broker. E.g. all enqueues have completed, the message has been written
+ * to store, credit has been replenished, etc. Once all outstanding
+ * operations have completed, the transfer of this message from the client
+ * may be considered complete.
*/
- int asyncEnqueueCounter;
+ AsyncCompletion receiveCompletion;
/**
* Tracks the number of outstanding asynchronous dequeue
@@ -65,7 +66,6 @@ class PersistableMessage : public Persistable
*/
int asyncDequeueCounter;
- void enqueueAsync();
void dequeueAsync();
syncList synclist;
@@ -80,8 +80,6 @@ class PersistableMessage : public Persistable
ContentReleaseState contentReleaseState;
protected:
- /** Called when all enqueues are complete for this message. */
- virtual void allEnqueuesComplete() = 0;
/** Called when all dequeues are complete for this message. */
virtual void allDequeuesComplete() = 0;
@@ -115,9 +113,9 @@ class PersistableMessage : public Persistable
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
- QPID_BROKER_EXTERN bool isEnqueueComplete();
-
- QPID_BROKER_EXTERN void enqueueComplete();
+ QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); }
+ QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); }
+ QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
@@ -133,7 +131,8 @@ class PersistableMessage : public Persistable
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-
+
+ QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e59857462c..24469ed0e4 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -157,13 +157,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- // if no store then mark as enqueued
- if (!enqueue(0, msg)){
- push(msg);
- msg->enqueueComplete();
- }else {
- push(msg);
- }
+ enqueue(0, msg);
+ push(msg);
mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
@@ -688,7 +683,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
//NOTE: don't need to use checkLvqReplace() here as it
//is only relevant for LVQ which does not support persistence
//so the enqueueComplete check has no effect
- if ( i->payload->isEnqueueComplete() ) count ++;
+ if ( i->payload->isReceiveComplete() ) count ++;
}
return count;
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index d4f6b28ccb..3a98d52d15 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/SessionState.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -586,7 +587,12 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st
-void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+ session.addPendingExecutionSync();
+ /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */
+
+}
void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
{
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index afbbb2cc22..253ce8dcf2 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -46,6 +46,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
+ virtual void addPendingExecutionSync() = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 6f02399795..ca98ee1437 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -59,7 +59,6 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore()),
- enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
rateFlowcontrol(0)
{
@@ -94,6 +93,18 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
+
+ // clean up any outstanding incomplete receive messages
+
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ while (!incompleteRcvMsgs.empty()) {
+ boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
+ incompleteRcvMsgs.pop_front();
+ {
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+ ref->cancel();
+ }
+ }
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -195,15 +206,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+ currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ if (currentCommandComplete) receiverCompleted(id);
+
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
- incomplete.process(enqueuedOp, true);
+
+ if (method->isSync() && currentCommandComplete) {
sendAcceptAndCompletion();
}
}
@@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
+
+ msg->getReceiveCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
-
- if (msg->isEnqueueComplete()) {
- enqueued(msg);
- } else {
- incomplete.add(msg);
- }
-
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
- incomplete.process(enqueuedOp, true);
- sendAcceptAndCompletion();
+ if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
+ // There are no other pending receive completers (just this SessionState).
+ // Mark the message as completed.
+ completeRcvMsg( msg );
} else {
- incomplete.process(enqueuedOp, false);
+ // There are outstanding receive completers. Save the message until
+ // they are all done.
+ QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId());
+ boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg));
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ incompleteRcvMsgs.push_back(pendingMsg);
+ }
+ msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete
}
}
@@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc). At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg)
{
+ bool callSendCompletion = false;
receiverCompleted(msg->getCommandId());
if (msg->requiresAccept())
+ // will cause msg's seq to appear in the next message.accept we send.
accepted.add(msg->getCommandId());
+
+ // Are there any outstanding Execution.Sync commands pending the
+ // completion of this msg? If so, complete them.
+ while (!pendingExecutionSyncs.empty() &&
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+ const SequenceNumber& id = pendingExecutionSyncs.front();
+ pendingExecutionSyncs.pop();
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
+ receiverCompleted(id);
+ callSendCompletion = true; // likely peer is pending for this completion.
+ }
+
+ // if the sender has requested immediate notification of the completion...
+ if (msg->getFrames().getMethod()->isSync()) {
+ sendAcceptAndCompletion();
+ } else if (callSendCompletion) {
+ sendCompletion();
+ }
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
return handler->getClusterOrderProxy();
}
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+ SequenceNumber syncCommandId = receiverGetCurrent();
+ if (receiverGetIncomplete().front() < syncCommandId) {
+ currentCommandComplete = false;
+ pendingExecutionSyncs.push(syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+ }
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion. May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteRcvMsg::operator() (bool sync)
+{
+ QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
+ boost::shared_ptr<IncompleteRcvMsg> tmp;
+ {
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+ for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin();
+ i != session->incompleteRcvMsgs.end(); ++i) {
+ if (i->get() == this) {
+ tmp.swap(*i);
+ session->incompleteRcvMsgs.remove(*i);
+ break;
+ }
+ }
+ }
+
+ if (session->isAttached()) {
+ if (sync) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
+ session->completeRcvMsg(msg);
+ } else { // potentially called from a different thread
+ QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
+ session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp));
+ }
+ }
+}
+
+
+/** Scheduled from IncompleteRcvMsg callback, completes the message receive
+ * asynchronously
+ */
+void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg)
+{
+ QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
+ if (iMsg->session && iMsg->session->isAttached()) {
+ QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
+ iMsg->session->completeRcvMsg(iMsg->msg);
+ }
+}
+
+
+/** Cancels a pending incomplete receive message completion callback. Note
+ * well: will wait for the callback to finish if it is currently in progress
+ * on another thread.
+ */
+void SessionState::IncompleteRcvMsg::cancel()
+{
+ QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId());
+ // Cancel the message complete callback. On return, we are guaranteed there
+ // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync)
+ msg->getReceiveCompletion().cancel();
+ // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
+ // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg.
+ session = 0;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 3dcb0a62d4..b33181eee4 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -30,10 +30,11 @@
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -122,11 +123,15 @@ class SessionState : public qpid::SessionState,
const SessionId& getSessionId() const { return getId(); }
+ // Used by ExecutionHandler sync command processing. Notifies
+ // the SessionState of a received Execution.Sync command.
+ void addPendingExecutionSync();
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
- void enqueued(boost::intrusive_ptr<Message> msg);
+ void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -152,8 +157,6 @@ class SessionState : public qpid::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
- IncompleteMessageList incomplete;
- IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
@@ -162,7 +165,28 @@ class SessionState : public qpid::SessionState,
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
+ // sequence numbers for pending received Execution.Sync commands
+ std::queue<SequenceNumber> pendingExecutionSyncs;
+ bool currentCommandComplete;
+
+ class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+ {
+ public:
+ IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
+ : session(&_session), msg(_msg) {}
+ virtual void operator() (bool sync);
+ void cancel(); // cancel pending incomplete callback [operator() above].
+
+ private:
+ SessionState *session;
+ boost::intrusive_ptr<Message> msg;
+ static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg );
+ };
+ std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+ qpid::sys::Mutex incompleteRcvMsgsLock;
+
friend class SessionManager;
+ friend class IncompleteRcvMsg;
};
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 07405bcd8f..fc3f7c0854 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
InlineVector.cpp \
SequenceSet.cpp \
StringUtils.cpp \
- IncompleteMessageList.cpp \
RangeSet.cpp \
AtomicValue.cpp \
QueueTest.cpp \
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
index 6b44d95baa..ffb0125302 100644
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ b/qpid/cpp/src/tests/TxPublishTest.cpp
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
- BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete());
}
QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
- BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());