summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-06-01 20:22:37 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-06-01 20:22:37 +0000
commitc578ed1e72954def2a5904990d7a97b9756c4f35 (patch)
tree43f422b49040771f5d0fb4fe8338758aac896612
parent03cfb4f61881c47b7a77f4e75316bdedfd933f98 (diff)
downloadqpid-python-c578ed1e72954def2a5904990d7a97b9756c4f35.tar.gz
Initial design attempt.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1130291 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h5
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStore.h2
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.h3
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h25
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableQueue.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h28
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp147
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h55
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp168
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h111
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
-rw-r--r--qpid/cpp/src/tests/TestMessageStore.h2
21 files changed, 465 insertions, 210 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index fef994438f..bfd6718dcd 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -23,6 +23,7 @@
*/
#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/AtomicValue.h"
@@ -77,7 +78,7 @@ namespace broker {
* assuming no need for synchronization with Completer threads.
*/
-class AsyncCompletion
+class AsyncCompletion : private boost::noncopyable
{
public:
@@ -88,7 +89,7 @@ class AsyncCompletion
* callback object will be used by the last completer thread, and
* released when the callback returns.
*/
- class Callback : public RefCounted
+ class Callback : virtual public RefCounted
{
public:
virtual void completed(bool) = 0;
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 58dcc6d7c7..635123d242 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -110,9 +110,10 @@ void DeliveryRecord::complete() {
completed = true;
}
-bool DeliveryRecord::accept(TransactionContext* ctxt) {
+/** Accept msg, and optionally notify caller when dequeue completes */
+bool DeliveryRecord::accept(TransactionContext* ctxt, Queue::DequeueDoneCallbackFactory *f) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg);
+ queue->dequeue(ctxt, msg, f);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index d388ba94be..10491df94c 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -31,6 +31,7 @@
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/DeliveryId.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
@@ -39,6 +40,7 @@ class TransactionContext;
class SemanticState;
struct AckRange;
+
/**
* Record of a delivery for which an ack is outstanding.
*/
@@ -84,7 +86,7 @@ class DeliveryRecord
void redeliver(SemanticState* const);
void acquire(DeliveryIds& results);
void complete();
- bool accept(TransactionContext* ctxt); // Returns isRedundant()
+ bool accept(TransactionContext*, Queue::DequeueDoneCallbackFactory *); // Returns isRedundant()
bool setEnded(); // Returns isRedundant()
void committed() const;
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 763dc55e40..4f64ae2db9 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -419,11 +419,6 @@ struct ScopedSet {
};
}
-void Message::allDequeuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = dequeueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index d85ee434db..8c5d42bcde 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -163,7 +163,6 @@ public:
void setIsManagementMessage(bool b);
private:
MessageAdapter& getAdapter() const;
- void allDequeuesComplete();
mutable sys::Mutex lock;
framing::FrameSet frames;
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h
index ab0225ef6b..e852edc86a 100644
--- a/qpid/cpp/src/qpid/broker/MessageStore.h
+++ b/qpid/cpp/src/qpid/broker/MessageStore.h
@@ -172,7 +172,7 @@ class MessageStore : public TransactionalStore, public Recoverable {
*/
virtual void dequeue(TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue) = 0;
+ const boost::shared_ptr<PersistableQueue>& queue) = 0;
/**
* Flushes all async messages to disk for the specified queue
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
index cd9fd4c933..7da8372704 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -27,6 +27,7 @@
#define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); }
using boost::intrusive_ptr;
+using boost::shared_ptr;
using qpid::framing::FieldTable;
using std::string;
@@ -127,7 +128,7 @@ void MessageStoreModule::enqueue(TransactionContext* ctxt,
void MessageStoreModule::dequeue(TransactionContext* ctxt,
const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+ const shared_ptr<PersistableQueue>& queue)
{
TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
}
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
index 56b5a3c1ae..21cd3ff9d3 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
@@ -72,7 +72,7 @@ class MessageStoreModule : public MessageStore
const PersistableQueue& queue);
void dequeue(TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue);
+ const boost::shared_ptr<PersistableQueue>& queue);
uint32_t outstandingQueueAIO(const PersistableQueue& queue);
void flush(const qpid::broker::PersistableQueue& queue);
bool isNull() const;
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
index 43f600eaf1..b779d23c08 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -28,6 +28,7 @@
#include <iostream>
using boost::intrusive_ptr;
+using boost::shared_ptr;
namespace qpid{
namespace broker{
@@ -103,9 +104,9 @@ void NullMessageStore::enqueue(TransactionContext*,
void NullMessageStore::dequeue(TransactionContext*,
const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue&)
+ const shared_ptr<PersistableQueue>& q)
{
- msg->dequeueComplete();
+ q->dequeueComplete(msg);
}
void NullMessageStore::flush(const qpid::broker::PersistableQueue&) {}
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h
index c6f402662e..e8bc5d9e62 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.h
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h
@@ -28,6 +28,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -84,7 +85,7 @@ class QPID_BROKER_CLASS_EXTERN NullMessageStore : public MessageStore
const PersistableQueue& queue);
QPID_BROKER_EXTERN virtual void dequeue(TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue);
+ const boost::shared_ptr<PersistableQueue>& queue);
QPID_BROKER_EXTERN virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue);
QPID_BROKER_EXTERN virtual void flush(const qpid::broker::PersistableQueue& queue);
~NullMessageStore(){}
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index 7ba28eb293..0d5e32b40d 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -34,7 +34,6 @@ class MessageStore;
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
- asyncDequeueCounter(0),
store(0)
{}
@@ -43,18 +42,18 @@ void PersistableMessage::flush()
syncList copy;
{
sys::ScopedLock<sys::Mutex> l(storeLock);
- if (store) {
- copy = synclist;
- } else {
+ if (store) {
+ copy = synclist;
+ } else {
return;//early exit as nothing to do
- }
+ }
}
for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
- PersistableQueue::shared_ptr q(i->lock());
+ PersistableQueue::shared_ptr q(i->second.lock());
if (q) {
q->flush();
}
- }
+ }
}
void PersistableMessage::setContentReleased()
@@ -70,8 +69,9 @@ bool PersistableMessage::isContentReleased() const
bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
if (store && (queue->getPersistenceId()!=0)) {
+ sys::ScopedLock<sys::Mutex> l(storeLock);
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- PersistableQueue::shared_ptr q(i->lock());
+ PersistableQueue::shared_ptr q(i->second.lock());
if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
}
}
@@ -84,7 +84,7 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
+ synclist[queue->getName()] = q;
}
}
@@ -93,37 +93,12 @@ void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, Messag
enqueueStart();
}
-bool PersistableMessage::isDequeueComplete() {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- return asyncDequeueCounter == 0;
-}
-
-void PersistableMessage::dequeueComplete() {
- bool notify = false;
- {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- if (asyncDequeueCounter > 0) {
- if (--asyncDequeueCounter == 0) {
- notify = true;
- }
- }
- }
- if (notify) allDequeuesComplete();
-}
-
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::dequeueComplete(PersistableQueue::shared_ptr queue, MessageStore* _store)
+{
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
- store = _store;
- boost::weak_ptr<PersistableQueue> q(queue);
- synclist.push_back(q);
+ synclist.erase(queue->getName());
}
- dequeueAsync();
-}
-
-void PersistableMessage::dequeueAsync() {
- sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- asyncDequeueCounter++;
}
PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index d29c2c45b4..76a44a4cf7 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -43,8 +43,7 @@ class MessageStore;
*/
class PersistableMessage : public Persistable
{
- typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Mutex asyncDequeueLock;
+ typedef std::map< std::string, boost::weak_ptr<PersistableQueue> > syncList;
sys::Mutex storeLock;
/**
@@ -58,17 +57,6 @@ class PersistableMessage : public Persistable
*/
AsyncCompletion ingressCompletion;
- /**
- * Tracks the number of outstanding asynchronous dequeue
- * operations. When the message is dequeued asynchronously the
- * count is incremented; when that dequeue completes it is
- * decremented. Thus when it is 0, there are no outstanding
- * dequeues.
- */
- int asyncDequeueCounter;
-
- void dequeueAsync();
-
syncList synclist;
struct ContentReleaseState
{
@@ -81,8 +69,6 @@ class PersistableMessage : public Persistable
ContentReleaseState contentReleaseState;
protected:
- /** Called when all dequeues are complete for this message. */
- virtual void allDequeuesComplete() = 0;
void setContentReleased();
@@ -124,13 +110,10 @@ class PersistableMessage : public Persistable
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
+ QPID_BROKER_EXTERN void dequeueComplete(PersistableQueue::shared_ptr queue,
+ MessageStore* _store);
- QPID_BROKER_EXTERN bool isDequeueComplete();
-
- QPID_BROKER_EXTERN void dequeueComplete();
-
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
- MessageStore* _store);
+ QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
diff --git a/qpid/cpp/src/qpid/broker/PersistableQueue.h b/qpid/cpp/src/qpid/broker/PersistableQueue.h
index 655d26bc74..3514c7eb53 100644
--- a/qpid/cpp/src/qpid/broker/PersistableQueue.h
+++ b/qpid/cpp/src/qpid/broker/PersistableQueue.h
@@ -26,10 +26,12 @@
#include "qpid/broker/Persistable.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace broker {
+class PersistableMessage;
/**
* Empty class to be used by any module that wanted to set an external per queue store into
@@ -66,6 +68,9 @@ public:
PersistableQueue():externalQueueStore(NULL){
};
+
+ /** the message has finished being dequeued from the store */
+ virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&) = 0;
protected:
ExternalQueueStore* externalQueueStore;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 8efa8be3dc..04c496cbd5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -438,7 +438,8 @@ void Queue::purgeExpired()
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+ (DequeueDoneCallbackFactory*)0));
}
}
@@ -613,7 +614,8 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+ (DequeueDoneCallbackFactory*)0));
}
if (inLastNodeFailure && persistLastNode){
@@ -652,8 +654,13 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
if (policy.get()) policy->enqueueAborted(msg);
}
-// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+/**
+ * returns false if the dequeue completed, otherwise the dequeue will complete
+ * asynchronously. If the caller needs to know when an asynchronous dequeue
+ * completes, it must provide a factory that will provide the callback.
+ */
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
+ DequeueDoneCallbackFactory *factory)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -670,9 +677,14 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
bool fp = msg.payload->isForcedPersistent();
if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) {
if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ msg.payload->dequeueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(ctxt, pmsg, *this);
+ if (factory) {
+ boost::shared_ptr<DequeueDoneCallback> callback((*factory)());
+ Mutex::ScopedLock locker(messageLock);
+ pendingDequeueCallbacks[pmsg.get()] = callback;
+ }
+ store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done
return true;
}
}
@@ -1109,7 +1121,8 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
}
}
//process any pending dequeues
- for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
+ (DequeueDoneCallbackFactory*)0));
pendingDequeues.clear();
}
@@ -1198,6 +1211,27 @@ const Broker* Queue::getBroker()
}
+/** invoked from the store thread when the asynchronous dequeueing of the
+ * message has completed. */
+void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg )
+{
+ msg->dequeueComplete(shared_from_this(), store);
+
+ boost::shared_ptr<DequeueDoneCallback> cb;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i;
+ i = pendingDequeueCallbacks.find(msg.get());
+ if (i != pendingDequeueCallbacks.end()) {
+ cb = i->second;
+ pendingDequeueCallbacks.erase(i);
+ }
+ }
+ if (cb) (*cb)();
+ QPID_LOG(error, "dequeueComplete:=" << cb);
+}
+
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index c4f1bcc07e..1c849e28fa 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -275,9 +275,30 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
- * dequeue from store (only done once messages is acknowledged)
+ * dequeue from store (only done once messages is acknowledged). Dequeue
+ * -may- complete asynchronously. This method returns 'false' if the
+ * dequeue has completed, else 'true' if the dequeue is asynchronous. If
+ * the caller is interested in receiving notification when the asynchronous
+ * dequeue completes, it may pass a pointer to a factory functor that
+ * returns a shareable DequeueDoneCallback object. If the dequeue is
+ * completed synchronously, this pointer is ignored. If the dequeue will
+ * complete asynchronously, the factory is called to obtain a
+ * DequeueDoneCallback. When the dequeue completes, the
+ * DequeueDoneCallback is invoked. The callback should be prepared to
+ * execute on any thread.
*/
- QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+ class DequeueDoneCallback
+ {
+ public:
+ virtual void operator()() = 0;
+ };
+ typedef boost::function<boost::shared_ptr<DequeueDoneCallback>()> DequeueDoneCallbackFactory;
+ QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
+ DequeueDoneCallbackFactory *factory = 0);
+
+ /** invoked by store to signal dequeue() has completed */
+ QPID_BROKER_EXTERN void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg);
+
/**
* Inform the queue that a previous transactional dequeue
* committed.
@@ -382,6 +403,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
const Broker* getBroker();
+
+ private:
+ std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> > pendingDequeueCallbacks;
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index ce86253f4a..0f670ded83 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -761,7 +761,122 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
return IsInSequenceSetAnd<Predicate>(s,p);
}
+/** Design notes for asychronous completion of Message.accept **
+ * Message.Accept command cannot be considered "complete" until all messages
+ * identified by the sequence set that accompanys the command have been
+ * completely dequeued. A message dequeue may not be able to complet
+ * synchronously - specifically when the message is durable. Therefore, the
+ * Message.Accept command handling must be able to track asynchronous dequeues,
+ * and notify the Session when all dequeues are done (at which point the
+ * command completes). See QPID-3079.
+ */
+namespace {
+
+ /** Manage a Message.accept that requires async completion of one or more
+ * message dequeue operations */
+ class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
+ {
+ mutable qpid::sys::Mutex lock;
+ std::set<DeliveryId> pending; // for dequeue to complete
+ bool ready;
+ SemanticState& state;
+
+ public:
+ AsyncMessageAcceptCmd(SemanticState& _state)
+ : ready(false), state(_state) {}
+
+ /** called from session to urge pending dequeues to complete ASAP */
+ void flush()
+ {
+ std::set<DeliveryId> copy;
+ {
+ Mutex::ScopedLock l(lock);
+ copy = pending;
+ }
+ for (std::set<DeliveryId>::iterator i = copy.begin();
+ i != copy.end(); ++i) {
+ for (DeliveryRecords::iterator r = state.getUnacked().begin();
+ r != state.getUnacked().end(); ++r) {
+ if (r->getId() == *i) {
+ r->getQueue()->flush();
+ break;
+ }
+ }
+ }
+ }
+
+ /** add a pending dequeue to track */
+ void add( const DeliveryId& id )
+ {
+ Mutex::ScopedLock l(lock);
+ bool unique = pending.insert(id).second;
+ if (!unique) {
+ assert(false);
+ }
+ }
+
+ /** signal this dequeue done. Note: may be run in *any* thread */
+ void complete( const DeliveryId& id )
+ {
+ Mutex::ScopedLock l(lock);
+ std::set<DeliveryId>::iterator i = pending.find(id);
+ assert(i != pending.end());
+ pending.erase(i);
+
+ if (ready && pending.empty()) {
+ framing::Invoker::Result r;
+ Mutex::ScopedUnlock ul(lock);
+ completed( r );
+ }
+ }
+
+ /** allow the Message.Accept to complete */
+ void enable()
+ {
+ Mutex::ScopedLock l(lock);
+ if (pending.empty()) {
+ framing::Invoker::Result r;
+ Mutex::ScopedUnlock ul(lock);
+ completed( r );
+ return;
+ }
+ ready = true;
+ }
+ };
+
+
+ /** callback to indicate a single message has completed its dequeue. This
+ object is made available to the queue, which will invoke it when the
+ dequeue completes. */
+ class DequeueDone : public Queue::DequeueDoneCallback
+ {
+ DeliveryId id;
+ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
+ public:
+ DequeueDone( const DeliveryId & _id,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
+ : id(_id), cmd(_cmd) {}
+ void operator()() { cmd->complete( id ); }
+ };
+
+
+ /** factory to create the above callback - passed to queue's dequeue
+ method, only called if dequeue is async! */
+ boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
+ const DeliveryId& id,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
+ {
+ if (!cmd) { // first async dequeue creates the context
+ cmd.reset(new AsyncMessageAcceptCmd(*state));
+ }
+ cmd->add( id );
+ boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
+ return x;
+ }
+}
+
void SemanticState::accepted(const SequenceSet& commands) {
+ QPID_LOG(error, "SemanticState::accepted (" << commands << ")");
assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -785,12 +900,32 @@ void SemanticState::accepted(const SequenceSet& commands) {
unacked.erase(removed, unacked.end());
}
} else {
- DeliveryRecords::iterator removed =
- remove_if(unacked.begin(), unacked.end(),
- isInSequenceSetAnd(commands,
- bind(&DeliveryRecord::accept, _1,
- (TransactionContext*) 0)));
- unacked.erase(removed, unacked.end());
+ /** @todo KAG - the following code removes the command from unacked
+ even if the dequeue has not completed. note that the command will
+ still not complete until all dequeues complete. I'm doing this to
+ avoid having to lock the unacked list, which would be necessary if
+ we remove when the dequeue completes. Is this ok? */
+ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
+ DeliveryRecords::iterator i;
+ DeliveryRecords undone;
+ for (i = unacked.begin(); i < unacked.end(); ++i) {
+ if (i->coveredBy(&commands)) {
+ Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd);
+ if (i->accept((TransactionContext*) 0, &f) == false) {
+ undone.push_back(*i);
+ }
+ }
+ }
+ if (undone.empty())
+ unacked.clear();
+ else
+ unacked.swap(undone);
+
+ if (cmd) {
+ boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));
+ session.registerAsyncCommand(pcmd);
+ cmd->enable();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 253ce8dcf2..6f4a894f22 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -25,6 +25,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Invoker.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/OwnershipToken.h"
@@ -37,7 +38,12 @@ namespace broker {
class SessionContext : public OwnershipToken, public sys::OutputControl
{
- public:
+ protected:
+ class AsyncCommandManager;
+
+ public:
+ class AsyncCommandContext;
+
virtual ~SessionContext(){}
virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual bool isAttached() const = 0;
@@ -47,7 +53,52 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
-};
+ // pass async command context to Session, completion must not occur
+ // until -after- this call returns.
+ virtual void registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>&) = 0;
+
+ // class for commands that need to complete asynchronously
+ friend class AsyncCommandContext;
+ class AsyncCommandContext : virtual public RefCounted
+ {
+ private:
+ framing::SequenceNumber id;
+ bool requiresAccept;
+ bool syncBitSet;
+ boost::intrusive_ptr<SessionContext::AsyncCommandManager> manager;
+
+ public:
+ AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {}
+ virtual ~AsyncCommandContext() {}
+
+ framing::SequenceNumber getId() { return id; }
+ void setId(const framing::SequenceNumber seq) { id = seq; }
+ bool getRequiresAccept() { return requiresAccept; }
+ void setRequiresAccept(const bool a) { requiresAccept = a; }
+ bool getSyncBitSet() { return syncBitSet; }
+ void setSyncBitSet(const bool s) { syncBitSet = s; }
+ void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); }
+
+ /** notify session that this command has completed */
+ void completed(const framing::Invoker::Result& r)
+ {
+ boost::intrusive_ptr<AsyncCommandContext> context(this);
+ manager->completePendingCommand( context, r );
+ manager.reset(0);
+ }
+
+ // to force completion as fast as possible (like when Sync arrives)
+ virtual void flush() = 0;
+ };
+
+ protected:
+ class AsyncCommandManager : public RefCounted
+ {
+ public:
+ virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+ const framing::Invoker::Result&) = 0;
+ };
+ };
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 957d5bd4d2..d84256b61b 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -62,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- asyncCommandCompleter(new AsyncCommandCompleter(this))
+ asyncCommandManager(new AsyncCommandManager(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -95,7 +95,7 @@ void SessionState::addManagementObject() {
}
SessionState::~SessionState() {
- asyncCommandCompleter->cancel();
+ asyncCommandManager->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -126,7 +126,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
- asyncCommandCompleter->detached();
+ asyncCommandManager->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
@@ -147,7 +147,7 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
- asyncCommandCompleter->attached();
+ asyncCommandManager->attached();
}
void SessionState::abort() {
@@ -204,22 +204,22 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id)
+{
currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
+ syncCurrentCommand = method->isSync();
+ acceptRequired = false;
Invoker::Result invocation = invoke(adapter, *method);
- if (currentCommandComplete) receiverCompleted(id);
-
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
- } else if (invocation.hasResult()) {
- getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync() && currentCommandComplete) {
- sendAcceptAndCompletion();
+ if (currentCommandComplete) {
+ completeCommand(id, invocation, false, syncCurrentCommand);
}
}
+
struct ScheduledCreditTask : public sys::TimerTask {
sys::Timer& timer;
SessionState& sessionState;
@@ -260,6 +260,9 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
}
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
+ currentCommandComplete = true; // assumed
+ syncCurrentCommand = msg->getFrames().getMethod()->isSync();
+ acceptRequired = msg->requiresAccept();
semanticState.handle(msg);
msgBuilder.end();
IncompleteIngressMsgXfer xfer(this, msg);
@@ -313,17 +316,19 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc). At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
- */
-void SessionState::completeRcvMsg(SequenceNumber id,
- bool requiresAccept,
- bool requiresSync)
+/** Complete a received command */
+void SessionState::completeCommand(const SequenceNumber& id,
+ const framing::Invoker::Result& results,
+ bool requiresAccept,
+ bool syncBitSet)
{
bool callSendCompletion = false;
receiverCompleted(id);
+
+ if (results.hasResult()) {
+ getProxy().getExecution().result(id, results.getResult());
+ }
+
if (requiresAccept)
// will cause msg's seq to appear in the next message.accept we send.
accepted.add(id);
@@ -340,7 +345,7 @@ void SessionState::completeRcvMsg(SequenceNumber id,
}
// if the sender has requested immediate notification of the completion...
- if (requiresSync) {
+ if (syncBitSet) {
sendAcceptAndCompletion();
} else if (callSendCompletion) {
sendCompletion();
@@ -427,12 +432,25 @@ void SessionState::addPendingExecutionSync()
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
- asyncCommandCompleter->flushPendingMessages();
+ asyncCommandManager->flushPendingCommands();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
+void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+ /** @todo KAG: ensure this is invoked during handleCommand() context! */
+ currentCommandComplete = false;
+ asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand );
+}
+
+
+void SessionState::cancelAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+ asyncCommandManager->cancelPendingCommand(aCmd);
+}
+
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
@@ -441,15 +459,14 @@ SessionState::IncompleteIngressMsgXfer::clone()
{
boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
- // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
- // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
- if (requiresSync)
+ // this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed()
+ // will be invoked directly.
+ pending = true;
+ boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb));
+ session->registerAsyncCommand(ctxt);
+ if (ctxt->getSyncBitSet()) {
+ // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
msg->flush();
- else {
- // otherwise, we need to track this message in order to flush it if an execution.sync arrives
- // before it has been completed (see flushPendingMessages())
- pending = true;
- completerContext->addPendingMessage(msg);
}
return cb;
}
@@ -461,110 +478,129 @@ SessionState::IncompleteIngressMsgXfer::clone()
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
* the scheduledCompleterContext, since *this has a shared pointer to it.
* but not session!
*/
session = 0;
- QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId());
+ completed(framing::Invoker::Result());
} else {
// this path runs directly from the ac->end() call in handleContent() above,
// so *session is definately valid.
if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
+ QPID_LOG(debug, ": receive completed for msg seq=" << getId());
+ session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet());
+ }
+ if (pending) {
+ boost::intrusive_ptr<AsyncCommandContext> p(this);
+ session->cancelAsyncCommand(p);
}
}
- completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
+}
+
+
+void SessionState::IncompleteIngressMsgXfer::flush()
+{
+ msg->flush();
}
/** Scheduled from an asynchronous command's completed callback to run on
* the IO thread.
*/
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
+void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncCommandManager> ctxt)
{
- ctxt->completeCommands();
+ ctxt->processCompletedCommands();
}
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+ framing::SequenceNumber seq,
+ bool acceptRequired, bool syncBitSet)
{
+ cmd->setId(seq);
+ cmd->setRequiresAccept(acceptRequired);
+ cmd->setSyncBitSet(syncBitSet);
+ cmd->setManager(this);
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
- bool unique = pendingMsgs.insert(item).second;
- assert(unique);
+ std::pair<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > item(cmd->getId(), cmd);
+ bool unique = pendingCommands.insert(item).second;
+ if (!unique) assert(false);
}
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.erase(id);
+ pendingCommands.erase(cmd->getId());
+ cmd->setManager(0);
}
+
/** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
+void SessionState::AsyncCommandManager::flushPendingCommands()
{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
+ copy = pendingCommands;
}
// drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ for (std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}
}
-/** mark an ingress Message.Transfer command as completed.
+/** mark a pending command as completed.
* This method must be thread safe - it may run on any thread.
*/
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync)
+void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+ const framing::Invoker::Result& result)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
if (session && isAttached) {
- MessageInfo msg(cmd, requiresAccept, requiresSync);
- completedMsgs.push_back(msg);
- if (completedMsgs.size() == 1) {
+ CommandInfo status(cmd->getId(),
+ result,
+ cmd->getRequiresAccept(),
+ cmd->getSyncBitSet());
+ completedCommands.push_back(status);
+ if (completedCommands.size() == 1) {
session->getConnection().requestIOProcessing(boost::bind(&schedule,
- session->asyncCommandCompleter));
+ session->asyncCommandManager));
}
}
+ pendingCommands.erase(cmd->getId());
}
/** Cause the session to complete all completed commands.
* Executes on the IO thread.
*/
-void SessionState::AsyncCommandCompleter::completeCommands()
+void SessionState::AsyncCommandManager::processCompletedCommands()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
// when session is destroyed, it clears the session pointer via cancel().
if (session && session->isAttached()) {
- for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
- msg != completedMsgs.end(); ++msg) {
- session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+ for (std::vector<CommandInfo>::iterator cmd = completedCommands.begin();
+ cmd != completedCommands.end(); ++cmd) {
+ session->completeCommand(cmd->id,
+ cmd->results,
+ cmd->requiresAccept,
+ cmd->syncBitSet);
}
}
- completedMsgs.clear();
+ completedCommands.clear();
}
/** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
+void SessionState::AsyncCommandManager::cancel()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
session = 0;
@@ -573,7 +609,7 @@ void SessionState::AsyncCommandCompleter::cancel()
/** inform the completer that the session has attached,
* allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
+void SessionState::AsyncCommandManager::attached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = true;
@@ -582,7 +618,7 @@ void SessionState::AsyncCommandCompleter::attached()
/** inform the completer that the session has detached,
* disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
+void SessionState::AsyncCommandManager::detached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = false;
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index b43df0c0aa..6ee8a38f4e 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -25,6 +25,7 @@
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/ServerInvoker.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Session.h"
@@ -133,8 +134,17 @@ class SessionState : public qpid::SessionState,
// belonging to inter-broker bridges
void addManagementObject();
+ // allows commands (dispatched via handleCommand()) to inform the session
+ // that they may complete asynchronously.
+ void registerAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&);
+ void cancelAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&);
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
+ /** finish command processing started in handleCommand() */
+ void completeCommand(const framing::SequenceNumber&,
+ const framing::Invoker::Result&, bool requiresAccept,
+ bool syncBitSet);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
// indicate that the given ingress msg has been completely received by the
@@ -173,99 +183,100 @@ class SessionState : public qpid::SessionState,
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
- // sequence numbers for pending received Execution.Sync commands
+ // sequence numbers of received Execution.Sync commands that are pending completion.
std::queue<SequenceNumber> pendingExecutionSyncs;
+
+ // true if command completes during call to handleCommand()
bool currentCommandComplete;
+ bool syncCurrentCommand;
+ bool acceptRequired;
+ protected:
/** This class provides a context for completing asynchronous commands in a thread
* safe manner. Asynchronous commands save their completion state in this class.
- * This class then schedules the completeCommands() method in the IO thread.
- * While running in the IO thread, completeCommands() may safely complete all
+ * This class then schedules the processCompletedCommands() method in the IO thread.
+ * While running in the IO thread, processCompletedCommands() may safely complete all
* saved commands without the risk of colliding with other operations on this
* SessionState.
*/
- class AsyncCommandCompleter : public RefCounted {
+ class AsyncCommandManager : public SessionContext::AsyncCommandManager {
private:
SessionState *session;
bool isAttached;
qpid::sys::Mutex completerLock;
- // special-case message.transfer commands for optimization
- struct MessageInfo {
- SequenceNumber cmd; // message.transfer command id
- bool requiresAccept;
- bool requiresSync;
- MessageInfo(SequenceNumber c, bool a, bool s)
- : cmd(c), requiresAccept(a), requiresSync(s) {}
+ /** all commands pending completion */
+ std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > pendingCommands;
+
+ // Store information about completed commands that are pending the
+ // call to completeCommands()
+ struct CommandInfo {
+ SequenceNumber id;
+ framing::Invoker::Result results;
+ bool requiresAccept; // only if cmd==Message.transfer
+ bool syncBitSet;
+ CommandInfo(SequenceNumber c, const framing::Invoker::Result& r, bool a, bool s)
+ : id(c), results(r), requiresAccept(a), syncBitSet(s) {}
};
- std::vector<MessageInfo> completedMsgs;
- // If an ingress message does not require a Sync, we need to
- // hold a reference to it in case an Execution.Sync command is received and we
- // have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+ std::vector<CommandInfo> completedCommands;
+
+ /** finish processing all completed commands, runs in IO thread */
+ void processCompletedCommands();
- /** complete all pending commands, runs in IO thread */
- void completeCommands();
+ /** for scheduling a run of "processCompletedCommands()" on the IO thread */
+ static void schedule(boost::intrusive_ptr<AsyncCommandManager>);
- /** for scheduling a run of "completeCommands()" on the IO thread */
- static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
public:
- AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
- ~AsyncCommandCompleter() {};
+ AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {};
+ ~AsyncCommandManager() {};
/** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
- void deletePendingMessage(SequenceNumber id);
- void flushPendingMessages();
+ //void addPendingMessage(boost::intrusive_ptr<Message> m);
+ //void deletePendingMessage(SequenceNumber id);
+ //void flushPendingMessages();
/** schedule the processing of a completed ingress message.transfer command */
- void scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync);
+ //void scheduleMsgCompletion(SequenceNumber cmd,
+ // bool requiresAccept,
+ // bool requiresSync);
void cancel(); // called by SessionState destructor.
void attached(); // called by SessionState on attach()
void detached(); // called by SessionState on detach()
- };
- boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
- /** Abstract class that represents a single asynchronous command that is
- * pending completion.
- */
- class AsyncCommandContext : public AsyncCompletion::Callback
- {
- public:
- AsyncCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), completerContext(ss->asyncCommandCompleter) {}
- virtual ~AsyncCommandContext() {}
-
- protected:
- SequenceNumber id;
- boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
+ /** called by async command handlers */
+ void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+ framing::SequenceNumber, bool, bool);
+ void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&);
+ void flushPendingCommands();
+ void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
};
+ boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager;
+
+ private:
/** incomplete Message.transfer commands - inbound to broker from client
*/
- class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
+ class IncompleteIngressMsgXfer : public AsyncCommandContext,
+ public AsyncCompletion::Callback
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<Message> m )
- : AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
+ : session(ss),
msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
pending(false) {}
virtual ~IncompleteIngressMsgXfer() {};
+ // async completion calls
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
+ // async cmd calls
+ virtual void flush();
+
private:
SessionState *session; // only valid if sync flag in callback is true
boost::intrusive_ptr<Message> msg;
- bool requiresAccept;
- bool requiresSync;
bool pending; // true if msg saved on pending list...
};
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 34e4592a15..a752e3afec 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -366,7 +366,7 @@ class TestMessageStoreOC : public MessageStore
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
- const PersistableQueue& /*queue*/)
+ const boost::shared_ptr<PersistableQueue>& /*queue*/)
{
if (error) throw Exception("Dequeue error test");
deqCnt++;
diff --git a/qpid/cpp/src/tests/TestMessageStore.h b/qpid/cpp/src/tests/TestMessageStore.h
index 20e0b755b2..7ef6364a8e 100644
--- a/qpid/cpp/src/tests/TestMessageStore.h
+++ b/qpid/cpp/src/tests/TestMessageStore.h
@@ -41,7 +41,7 @@ class TestMessageStore : public NullMessageStore
void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& /*queue*/)
+ const boost::shared_ptr<PersistableQueue>& /*queue*/)
{
dequeued.push_back(msg);
}