diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-01 20:22:37 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-01 20:22:37 +0000 |
commit | c578ed1e72954def2a5904990d7a97b9756c4f35 (patch) | |
tree | 43f422b49040771f5d0fb4fe8338758aac896612 | |
parent | 03cfb4f61881c47b7a77f4e75316bdedfd933f98 (diff) | |
download | qpid-python-c578ed1e72954def2a5904990d7a97b9756c4f35.tar.gz |
Initial design attempt.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1130291 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 465 insertions, 210 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index fef994438f..bfd6718dcd 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -23,6 +23,7 @@ */ #include <boost/intrusive_ptr.hpp> +#include <boost/noncopyable.hpp> #include "qpid/broker/BrokerImportExport.h" #include "qpid/sys/AtomicValue.h" @@ -77,7 +78,7 @@ namespace broker { * assuming no need for synchronization with Completer threads. */ -class AsyncCompletion +class AsyncCompletion : private boost::noncopyable { public: @@ -88,7 +89,7 @@ class AsyncCompletion * callback object will be used by the last completer thread, and * released when the callback returns. */ - class Callback : public RefCounted + class Callback : virtual public RefCounted { public: virtual void completed(bool) = 0; diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 58dcc6d7c7..635123d242 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -110,9 +110,10 @@ void DeliveryRecord::complete() { completed = true; } -bool DeliveryRecord::accept(TransactionContext* ctxt) { +/** Accept msg, and optionally notify caller when dequeue completes */ +bool DeliveryRecord::accept(TransactionContext* ctxt, Queue::DequeueDoneCallbackFactory *f) { if (acquired && !ended) { - queue->dequeue(ctxt, msg); + queue->dequeue(ctxt, msg, f); setEnded(); QPID_LOG(debug, "Accepted " << id); } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index d388ba94be..10491df94c 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -31,6 +31,7 @@ #include "qpid/broker/QueuedMessage.h" #include "qpid/broker/DeliveryId.h" #include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" namespace qpid { namespace broker { @@ -39,6 +40,7 @@ class TransactionContext; class SemanticState; struct AckRange; + /** * Record of a delivery for which an ack is outstanding. */ @@ -84,7 +86,7 @@ class DeliveryRecord void redeliver(SemanticState* const); void acquire(DeliveryIds& results); void complete(); - bool accept(TransactionContext* ctxt); // Returns isRedundant() + bool accept(TransactionContext*, Queue::DequeueDoneCallbackFactory *); // Returns isRedundant() bool setEnded(); // Returns isRedundant() void committed() const; diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 763dc55e40..4f64ae2db9 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -419,11 +419,6 @@ struct ScopedSet { }; } -void Message::allDequeuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = dequeueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); -} void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index d85ee434db..8c5d42bcde 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -163,7 +163,6 @@ public: void setIsManagementMessage(bool b); private: MessageAdapter& getAdapter() const; - void allDequeuesComplete(); mutable sys::Mutex lock; framing::FrameSet frames; diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index ab0225ef6b..e852edc86a 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -172,7 +172,7 @@ class MessageStore : public TransactionalStore, public Recoverable { */ virtual void dequeue(TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& queue) = 0; + const boost::shared_ptr<PersistableQueue>& queue) = 0; /** * Flushes all async messages to disk for the specified queue diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index cd9fd4c933..7da8372704 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -27,6 +27,7 @@ #define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); } using boost::intrusive_ptr; +using boost::shared_ptr; using qpid::framing::FieldTable; using std::string; @@ -127,7 +128,7 @@ void MessageStoreModule::enqueue(TransactionContext* ctxt, void MessageStoreModule::dequeue(TransactionContext* ctxt, const intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& queue) + const shared_ptr<PersistableQueue>& queue) { TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue)); } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 56b5a3c1ae..21cd3ff9d3 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -72,7 +72,7 @@ class MessageStoreModule : public MessageStore const PersistableQueue& queue); void dequeue(TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& queue); + const boost::shared_ptr<PersistableQueue>& queue); uint32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); bool isNull() const; diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index 43f600eaf1..b779d23c08 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -28,6 +28,7 @@ #include <iostream> using boost::intrusive_ptr; +using boost::shared_ptr; namespace qpid{ namespace broker{ @@ -103,9 +104,9 @@ void NullMessageStore::enqueue(TransactionContext*, void NullMessageStore::dequeue(TransactionContext*, const intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue&) + const shared_ptr<PersistableQueue>& q) { - msg->dequeueComplete(); + q->dequeueComplete(msg); } void NullMessageStore::flush(const qpid::broker::PersistableQueue&) {} diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index c6f402662e..e8bc5d9e62 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -28,6 +28,7 @@ #include "qpid/sys/Mutex.h" #include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -84,7 +85,7 @@ class QPID_BROKER_CLASS_EXTERN NullMessageStore : public MessageStore const PersistableQueue& queue); QPID_BROKER_EXTERN virtual void dequeue(TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& queue); + const boost::shared_ptr<PersistableQueue>& queue); QPID_BROKER_EXTERN virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue); QPID_BROKER_EXTERN virtual void flush(const qpid::broker::PersistableQueue& queue); ~NullMessageStore(){} diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index 7ba28eb293..0d5e32b40d 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -34,7 +34,6 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncDequeueCounter(0), store(0) {} @@ -43,18 +42,18 @@ void PersistableMessage::flush() syncList copy; { sys::ScopedLock<sys::Mutex> l(storeLock); - if (store) { - copy = synclist; - } else { + if (store) { + copy = synclist; + } else { return;//early exit as nothing to do - } + } } for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) { - PersistableQueue::shared_ptr q(i->lock()); + PersistableQueue::shared_ptr q(i->second.lock()); if (q) { q->flush(); } - } + } } void PersistableMessage::setContentReleased() @@ -70,8 +69,9 @@ bool PersistableMessage::isContentReleased() const bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ if (store && (queue->getPersistenceId()!=0)) { + sys::ScopedLock<sys::Mutex> l(storeLock); for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - PersistableQueue::shared_ptr q(i->lock()); + PersistableQueue::shared_ptr q(i->second.lock()); if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; } } @@ -84,7 +84,7 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; boost::weak_ptr<PersistableQueue> q(queue); - synclist.push_back(q); + synclist[queue->getName()] = q; } } @@ -93,37 +93,12 @@ void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, Messag enqueueStart(); } -bool PersistableMessage::isDequeueComplete() { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - return asyncDequeueCounter == 0; -} - -void PersistableMessage::dequeueComplete() { - bool notify = false; - { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - if (asyncDequeueCounter > 0) { - if (--asyncDequeueCounter == 0) { - notify = true; - } - } - } - if (notify) allDequeuesComplete(); -} - -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::dequeueComplete(PersistableQueue::shared_ptr queue, MessageStore* _store) +{ if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); - store = _store; - boost::weak_ptr<PersistableQueue> q(queue); - synclist.push_back(q); + synclist.erase(queue->getName()); } - dequeueAsync(); -} - -void PersistableMessage::dequeueAsync() { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - asyncDequeueCounter++; } PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index d29c2c45b4..76a44a4cf7 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -43,8 +43,7 @@ class MessageStore; */ class PersistableMessage : public Persistable { - typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; - sys::Mutex asyncDequeueLock; + typedef std::map< std::string, boost::weak_ptr<PersistableQueue> > syncList; sys::Mutex storeLock; /** @@ -58,17 +57,6 @@ class PersistableMessage : public Persistable */ AsyncCompletion ingressCompletion; - /** - * Tracks the number of outstanding asynchronous dequeue - * operations. When the message is dequeued asynchronously the - * count is incremented; when that dequeue completes it is - * decremented. Thus when it is 0, there are no outstanding - * dequeues. - */ - int asyncDequeueCounter; - - void dequeueAsync(); - syncList synclist; struct ContentReleaseState { @@ -81,8 +69,6 @@ class PersistableMessage : public Persistable ContentReleaseState contentReleaseState; protected: - /** Called when all dequeues are complete for this message. */ - virtual void allDequeuesComplete() = 0; void setContentReleased(); @@ -124,13 +110,10 @@ class PersistableMessage : public Persistable QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); + QPID_BROKER_EXTERN void dequeueComplete(PersistableQueue::shared_ptr queue, + MessageStore* _store); - QPID_BROKER_EXTERN bool isDequeueComplete(); - - QPID_BROKER_EXTERN void dequeueComplete(); - - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, - MessageStore* _store); + QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} bool isStoredOnQueue(PersistableQueue::shared_ptr queue); diff --git a/qpid/cpp/src/qpid/broker/PersistableQueue.h b/qpid/cpp/src/qpid/broker/PersistableQueue.h index 655d26bc74..3514c7eb53 100644 --- a/qpid/cpp/src/qpid/broker/PersistableQueue.h +++ b/qpid/cpp/src/qpid/broker/PersistableQueue.h @@ -26,10 +26,12 @@ #include "qpid/broker/Persistable.h" #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace broker { +class PersistableMessage; /** * Empty class to be used by any module that wanted to set an external per queue store into @@ -66,6 +68,9 @@ public: PersistableQueue():externalQueueStore(NULL){ }; + + /** the message has finished being dequeued from the store */ + virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&) = 0; protected: ExternalQueueStore* externalQueueStore; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 8efa8be3dc..04c496cbd5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -438,7 +438,8 @@ void Queue::purgeExpired() Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, + (DequeueDoneCallbackFactory*)0)); } } @@ -613,7 +614,8 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, + (DequeueDoneCallbackFactory*)0)); } if (inLastNodeFailure && persistLastNode){ @@ -652,8 +654,13 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) if (policy.get()) policy->enqueueAborted(msg); } -// return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) +/** + * returns false if the dequeue completed, otherwise the dequeue will complete + * asynchronously. If the caller needs to know when an asynchronous dequeue + * completes, it must provide a factory that will provide the callback. + */ +bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, + DequeueDoneCallbackFactory *factory) { ScopedUse u(barrier); if (!u.acquired) return false; @@ -670,9 +677,14 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) bool fp = msg.payload->isForcedPersistent(); if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) { if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { - msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + msg.payload->dequeueAsync(shared_from_this(), store); boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); - store->dequeue(ctxt, pmsg, *this); + if (factory) { + boost::shared_ptr<DequeueDoneCallback> callback((*factory)()); + Mutex::ScopedLock locker(messageLock); + pendingDequeueCallbacks[pmsg.get()] = callback; + } + store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done return true; } } @@ -1109,7 +1121,8 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) } } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, + (DequeueDoneCallbackFactory*)0)); pendingDequeues.clear(); } @@ -1198,6 +1211,27 @@ const Broker* Queue::getBroker() } +/** invoked from the store thread when the asynchronous dequeueing of the + * message has completed. */ +void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg ) +{ + msg->dequeueComplete(shared_from_this(), store); + + boost::shared_ptr<DequeueDoneCallback> cb; + { + Mutex::ScopedLock locker(messageLock); + std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i; + i = pendingDequeueCallbacks.find(msg.get()); + if (i != pendingDequeueCallbacks.end()) { + cb = i->second; + pendingDequeueCallbacks.erase(i); + } + } + if (cb) (*cb)(); + QPID_LOG(error, "dequeueComplete:=" << cb); +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index c4f1bcc07e..1c849e28fa 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -275,9 +275,30 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); void enqueueAborted(boost::intrusive_ptr<Message> msg); /** - * dequeue from store (only done once messages is acknowledged) + * dequeue from store (only done once messages is acknowledged). Dequeue + * -may- complete asynchronously. This method returns 'false' if the + * dequeue has completed, else 'true' if the dequeue is asynchronous. If + * the caller is interested in receiving notification when the asynchronous + * dequeue completes, it may pass a pointer to a factory functor that + * returns a shareable DequeueDoneCallback object. If the dequeue is + * completed synchronously, this pointer is ignored. If the dequeue will + * complete asynchronously, the factory is called to obtain a + * DequeueDoneCallback. When the dequeue completes, the + * DequeueDoneCallback is invoked. The callback should be prepared to + * execute on any thread. */ - QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg); + class DequeueDoneCallback + { + public: + virtual void operator()() = 0; + }; + typedef boost::function<boost::shared_ptr<DequeueDoneCallback>()> DequeueDoneCallbackFactory; + QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage& msg, + DequeueDoneCallbackFactory *factory = 0); + + /** invoked by store to signal dequeue() has completed */ + QPID_BROKER_EXTERN void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg); + /** * Inform the queue that a previous transactional dequeue * committed. @@ -382,6 +403,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, void flush(); const Broker* getBroker(); + + private: + std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> > pendingDequeueCallbacks; }; } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index ce86253f4a..0f670ded83 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -761,7 +761,122 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { return IsInSequenceSetAnd<Predicate>(s,p); } +/** Design notes for asychronous completion of Message.accept ** + * Message.Accept command cannot be considered "complete" until all messages + * identified by the sequence set that accompanys the command have been + * completely dequeued. A message dequeue may not be able to complet + * synchronously - specifically when the message is durable. Therefore, the + * Message.Accept command handling must be able to track asynchronous dequeues, + * and notify the Session when all dequeues are done (at which point the + * command completes). See QPID-3079. + */ +namespace { + + /** Manage a Message.accept that requires async completion of one or more + * message dequeue operations */ + class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext + { + mutable qpid::sys::Mutex lock; + std::set<DeliveryId> pending; // for dequeue to complete + bool ready; + SemanticState& state; + + public: + AsyncMessageAcceptCmd(SemanticState& _state) + : ready(false), state(_state) {} + + /** called from session to urge pending dequeues to complete ASAP */ + void flush() + { + std::set<DeliveryId> copy; + { + Mutex::ScopedLock l(lock); + copy = pending; + } + for (std::set<DeliveryId>::iterator i = copy.begin(); + i != copy.end(); ++i) { + for (DeliveryRecords::iterator r = state.getUnacked().begin(); + r != state.getUnacked().end(); ++r) { + if (r->getId() == *i) { + r->getQueue()->flush(); + break; + } + } + } + } + + /** add a pending dequeue to track */ + void add( const DeliveryId& id ) + { + Mutex::ScopedLock l(lock); + bool unique = pending.insert(id).second; + if (!unique) { + assert(false); + } + } + + /** signal this dequeue done. Note: may be run in *any* thread */ + void complete( const DeliveryId& id ) + { + Mutex::ScopedLock l(lock); + std::set<DeliveryId>::iterator i = pending.find(id); + assert(i != pending.end()); + pending.erase(i); + + if (ready && pending.empty()) { + framing::Invoker::Result r; + Mutex::ScopedUnlock ul(lock); + completed( r ); + } + } + + /** allow the Message.Accept to complete */ + void enable() + { + Mutex::ScopedLock l(lock); + if (pending.empty()) { + framing::Invoker::Result r; + Mutex::ScopedUnlock ul(lock); + completed( r ); + return; + } + ready = true; + } + }; + + + /** callback to indicate a single message has completed its dequeue. This + object is made available to the queue, which will invoke it when the + dequeue completes. */ + class DequeueDone : public Queue::DequeueDoneCallback + { + DeliveryId id; + boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; + public: + DequeueDone( const DeliveryId & _id, + boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) + : id(_id), cmd(_cmd) {} + void operator()() { cmd->complete( id ); } + }; + + + /** factory to create the above callback - passed to queue's dequeue + method, only called if dequeue is async! */ + boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, + const DeliveryId& id, + boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd ) + { + if (!cmd) { // first async dequeue creates the context + cmd.reset(new AsyncMessageAcceptCmd(*state)); + } + cmd->add( id ); + boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) ); + return x; + } +} + void SemanticState::accepted(const SequenceSet& commands) { + QPID_LOG(error, "SemanticState::accepted (" << commands << ")"); assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -785,12 +900,32 @@ void SemanticState::accepted(const SequenceSet& commands) { unacked.erase(removed, unacked.end()); } } else { - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(&DeliveryRecord::accept, _1, - (TransactionContext*) 0))); - unacked.erase(removed, unacked.end()); + /** @todo KAG - the following code removes the command from unacked + even if the dequeue has not completed. note that the command will + still not complete until all dequeues complete. I'm doing this to + avoid having to lock the unacked list, which would be necessary if + we remove when the dequeue completes. Is this ok? */ + boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; + DeliveryRecords::iterator i; + DeliveryRecords undone; + for (i = unacked.begin(); i < unacked.end(); ++i) { + if (i->coveredBy(&commands)) { + Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd); + if (i->accept((TransactionContext*) 0, &f) == false) { + undone.push_back(*i); + } + } + } + if (undone.empty()) + unacked.clear(); + else + unacked.swap(undone); + + if (cmd) { + boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd)); + session.registerAsyncCommand(pcmd); + cmd->enable(); + } } } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 253ce8dcf2..6f4a894f22 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/Invoker.h" #include "qpid/sys/OutputControl.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/OwnershipToken.h" @@ -37,7 +38,12 @@ namespace broker { class SessionContext : public OwnershipToken, public sys::OutputControl { - public: + protected: + class AsyncCommandManager; + + public: + class AsyncCommandContext; + virtual ~SessionContext(){} virtual bool isLocal(const ConnectionToken* t) const = 0; virtual bool isAttached() const = 0; @@ -47,7 +53,52 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual uint16_t getChannel() const = 0; virtual const SessionId& getSessionId() const = 0; virtual void addPendingExecutionSync() = 0; -}; + // pass async command context to Session, completion must not occur + // until -after- this call returns. + virtual void registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>&) = 0; + + // class for commands that need to complete asynchronously + friend class AsyncCommandContext; + class AsyncCommandContext : virtual public RefCounted + { + private: + framing::SequenceNumber id; + bool requiresAccept; + bool syncBitSet; + boost::intrusive_ptr<SessionContext::AsyncCommandManager> manager; + + public: + AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {} + virtual ~AsyncCommandContext() {} + + framing::SequenceNumber getId() { return id; } + void setId(const framing::SequenceNumber seq) { id = seq; } + bool getRequiresAccept() { return requiresAccept; } + void setRequiresAccept(const bool a) { requiresAccept = a; } + bool getSyncBitSet() { return syncBitSet; } + void setSyncBitSet(const bool s) { syncBitSet = s; } + void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); } + + /** notify session that this command has completed */ + void completed(const framing::Invoker::Result& r) + { + boost::intrusive_ptr<AsyncCommandContext> context(this); + manager->completePendingCommand( context, r ); + manager.reset(0); + } + + // to force completion as fast as possible (like when Sync arrives) + virtual void flush() = 0; + }; + + protected: + class AsyncCommandManager : public RefCounted + { + public: + virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, + const framing::Invoker::Result&) = 0; + }; + }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 957d5bd4d2..d84256b61b 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - asyncCommandCompleter(new AsyncCommandCompleter(this)) + asyncCommandManager(new AsyncCommandManager(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,7 +95,7 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { - asyncCommandCompleter->cancel(); + asyncCommandManager->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -126,7 +126,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); - asyncCommandCompleter->detached(); + asyncCommandManager->detached(); disableOutput(); handler = 0; if (mgmtObject != 0) @@ -147,7 +147,7 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } - asyncCommandCompleter->attached(); + asyncCommandManager->attached(); } void SessionState::abort() { @@ -204,22 +204,22 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) +{ currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). + syncCurrentCommand = method->isSync(); + acceptRequired = false; Invoker::Result invocation = invoke(adapter, *method); - if (currentCommandComplete) receiverCompleted(id); - if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); - } else if (invocation.hasResult()) { - getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync() && currentCommandComplete) { - sendAcceptAndCompletion(); + if (currentCommandComplete) { + completeCommand(id, invocation, false, syncCurrentCommand); } } + struct ScheduledCreditTask : public sys::TimerTask { sys::Timer& timer; SessionState& sessionState; @@ -260,6 +260,9 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) } msg->setPublisher(&getConnection()); msg->getIngressCompletion().begin(); + currentCommandComplete = true; // assumed + syncCurrentCommand = msg->getFrames().getMethod()->isSync(); + acceptRequired = msg->requiresAccept(); semanticState.handle(msg); msgBuilder.end(); IncompleteIngressMsgXfer xfer(this, msg); @@ -313,17 +316,19 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -/** Invoked when the given inbound message is finished being processed - * by all interested parties (eg. it is done being enqueued to all queues, - * its credit has been accounted for, etc). At this point, msg is considered - * by this receiver as 'completed' (as defined by AMQP 0_10) - */ -void SessionState::completeRcvMsg(SequenceNumber id, - bool requiresAccept, - bool requiresSync) +/** Complete a received command */ +void SessionState::completeCommand(const SequenceNumber& id, + const framing::Invoker::Result& results, + bool requiresAccept, + bool syncBitSet) { bool callSendCompletion = false; receiverCompleted(id); + + if (results.hasResult()) { + getProxy().getExecution().result(id, results.getResult()); + } + if (requiresAccept) // will cause msg's seq to appear in the next message.accept we send. accepted.add(id); @@ -340,7 +345,7 @@ void SessionState::completeRcvMsg(SequenceNumber id, } // if the sender has requested immediate notification of the completion... - if (requiresSync) { + if (syncBitSet) { sendAcceptAndCompletion(); } else if (callSendCompletion) { sendCompletion(); @@ -427,12 +432,25 @@ void SessionState::addPendingExecutionSync() if (receiverGetIncomplete().front() < syncCommandId) { currentCommandComplete = false; pendingExecutionSyncs.push(syncCommandId); - asyncCommandCompleter->flushPendingMessages(); + asyncCommandManager->flushPendingCommands(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); } } +void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd) +{ + /** @todo KAG: ensure this is invoked during handleCommand() context! */ + currentCommandComplete = false; + asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand ); +} + + +void SessionState::cancelAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd) +{ + asyncCommandManager->cancelPendingCommand(aCmd); +} + /** factory for creating a reference-counted IncompleteIngressMsgXfer object * which will be attached to a message that will be completed asynchronously. */ @@ -441,15 +459,14 @@ SessionState::IncompleteIngressMsgXfer::clone() { boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); - // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. - // If the client is pending the message.transfer completion, flush now to force immediate write to journal. - if (requiresSync) + // this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed() + // will be invoked directly. + pending = true; + boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb)); + session->registerAsyncCommand(ctxt); + if (ctxt->getSyncBitSet()) { + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. msg->flush(); - else { - // otherwise, we need to track this message in order to flush it if an execution.sync arrives - // before it has been completed (see flushPendingMessages()) - pending = true; - completerContext->addPendingMessage(msg); } return cb; } @@ -461,110 +478,129 @@ SessionState::IncompleteIngressMsgXfer::clone() */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access * the scheduledCompleterContext, since *this has a shared pointer to it. * but not session! */ session = 0; - QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId()); + completed(framing::Invoker::Result()); } else { // this path runs directly from the ac->end() call in handleContent() above, // so *session is definately valid. if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << id); - session->completeRcvMsg(id, requiresAccept, requiresSync); + QPID_LOG(debug, ": receive completed for msg seq=" << getId()); + session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet()); + } + if (pending) { + boost::intrusive_ptr<AsyncCommandContext> p(this); + session->cancelAsyncCommand(p); } } - completerContext = boost::intrusive_ptr<AsyncCommandCompleter>(); +} + + +void SessionState::IncompleteIngressMsgXfer::flush() +{ + msg->flush(); } /** Scheduled from an asynchronous command's completed callback to run on * the IO thread. */ -void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) +void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncCommandManager> ctxt) { - ctxt->completeCommands(); + ctxt->processCompletedCommands(); } -/** Track an ingress message that is pending completion */ -void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) +void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, + framing::SequenceNumber seq, + bool acceptRequired, bool syncBitSet) { + cmd->setId(seq); + cmd->setRequiresAccept(acceptRequired); + cmd->setSyncBitSet(syncBitSet); + cmd->setManager(this); qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); - bool unique = pendingMsgs.insert(item).second; - assert(unique); + std::pair<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > item(cmd->getId(), cmd); + bool unique = pendingCommands.insert(item).second; + if (!unique) assert(false); } -/** pending message has completed */ -void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.erase(id); + pendingCommands.erase(cmd->getId()); + cmd->setManager(0); } + /** done when an execution.sync arrives */ -void SessionState::AsyncCommandCompleter::flushPendingMessages() +void SessionState::AsyncCommandManager::flushPendingCommands() { - std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; + std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > copy; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + copy = pendingCommands; } // drop lock, so it is safe to call "flush()" - for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); + for (std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> >::iterator i = copy.begin(); i != copy.end(); ++i) { i->second->flush(); } } -/** mark an ingress Message.Transfer command as completed. +/** mark a pending command as completed. * This method must be thread safe - it may run on any thread. */ -void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync) +void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, + const framing::Invoker::Result& result) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - if (session && isAttached) { - MessageInfo msg(cmd, requiresAccept, requiresSync); - completedMsgs.push_back(msg); - if (completedMsgs.size() == 1) { + CommandInfo status(cmd->getId(), + result, + cmd->getRequiresAccept(), + cmd->getSyncBitSet()); + completedCommands.push_back(status); + if (completedCommands.size() == 1) { session->getConnection().requestIOProcessing(boost::bind(&schedule, - session->asyncCommandCompleter)); + session->asyncCommandManager)); } } + pendingCommands.erase(cmd->getId()); } /** Cause the session to complete all completed commands. * Executes on the IO thread. */ -void SessionState::AsyncCommandCompleter::completeCommands() +void SessionState::AsyncCommandManager::processCompletedCommands() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); // when session is destroyed, it clears the session pointer via cancel(). if (session && session->isAttached()) { - for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); - msg != completedMsgs.end(); ++msg) { - session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); + for (std::vector<CommandInfo>::iterator cmd = completedCommands.begin(); + cmd != completedCommands.end(); ++cmd) { + session->completeCommand(cmd->id, + cmd->results, + cmd->requiresAccept, + cmd->syncBitSet); } } - completedMsgs.clear(); + completedCommands.clear(); } /** cancel any pending calls to scheduleComplete */ -void SessionState::AsyncCommandCompleter::cancel() +void SessionState::AsyncCommandManager::cancel() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); session = 0; @@ -573,7 +609,7 @@ void SessionState::AsyncCommandCompleter::cancel() /** inform the completer that the session has attached, * allows command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::attached() +void SessionState::AsyncCommandManager::attached() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); isAttached = true; @@ -582,7 +618,7 @@ void SessionState::AsyncCommandCompleter::attached() /** inform the completer that the session has detached, * disables command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::detached() +void SessionState::AsyncCommandManager::detached() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); isAttached = false; diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index b43df0c0aa..6ee8a38f4e 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -25,6 +25,7 @@ #include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/framing/ServerInvoker.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Session.h" @@ -133,8 +134,17 @@ class SessionState : public qpid::SessionState, // belonging to inter-broker bridges void addManagementObject(); + // allows commands (dispatched via handleCommand()) to inform the session + // that they may complete asynchronously. + void registerAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&); + void cancelAsyncCommand(boost::intrusive_ptr<SessionContext::AsyncCommandContext>&); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); + /** finish command processing started in handleCommand() */ + void completeCommand(const framing::SequenceNumber&, + const framing::Invoker::Result&, bool requiresAccept, + bool syncBitSet); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); // indicate that the given ingress msg has been completely received by the @@ -173,99 +183,100 @@ class SessionState : public qpid::SessionState, boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<sys::TimerTask> flowControlTimer; - // sequence numbers for pending received Execution.Sync commands + // sequence numbers of received Execution.Sync commands that are pending completion. std::queue<SequenceNumber> pendingExecutionSyncs; + + // true if command completes during call to handleCommand() bool currentCommandComplete; + bool syncCurrentCommand; + bool acceptRequired; + protected: /** This class provides a context for completing asynchronous commands in a thread * safe manner. Asynchronous commands save their completion state in this class. - * This class then schedules the completeCommands() method in the IO thread. - * While running in the IO thread, completeCommands() may safely complete all + * This class then schedules the processCompletedCommands() method in the IO thread. + * While running in the IO thread, processCompletedCommands() may safely complete all * saved commands without the risk of colliding with other operations on this * SessionState. */ - class AsyncCommandCompleter : public RefCounted { + class AsyncCommandManager : public SessionContext::AsyncCommandManager { private: SessionState *session; bool isAttached; qpid::sys::Mutex completerLock; - // special-case message.transfer commands for optimization - struct MessageInfo { - SequenceNumber cmd; // message.transfer command id - bool requiresAccept; - bool requiresSync; - MessageInfo(SequenceNumber c, bool a, bool s) - : cmd(c), requiresAccept(a), requiresSync(s) {} + /** all commands pending completion */ + std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > pendingCommands; + + // Store information about completed commands that are pending the + // call to completeCommands() + struct CommandInfo { + SequenceNumber id; + framing::Invoker::Result results; + bool requiresAccept; // only if cmd==Message.transfer + bool syncBitSet; + CommandInfo(SequenceNumber c, const framing::Invoker::Result& r, bool a, bool s) + : id(c), results(r), requiresAccept(a), syncBitSet(s) {} }; - std::vector<MessageInfo> completedMsgs; - // If an ingress message does not require a Sync, we need to - // hold a reference to it in case an Execution.Sync command is received and we - // have to manually flush the message. - std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs; + std::vector<CommandInfo> completedCommands; + + /** finish processing all completed commands, runs in IO thread */ + void processCompletedCommands(); - /** complete all pending commands, runs in IO thread */ - void completeCommands(); + /** for scheduling a run of "processCompletedCommands()" on the IO thread */ + static void schedule(boost::intrusive_ptr<AsyncCommandManager>); - /** for scheduling a run of "completeCommands()" on the IO thread */ - static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>); public: - AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {}; - ~AsyncCommandCompleter() {}; + AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {}; + ~AsyncCommandManager() {}; /** track a message pending ingress completion */ - void addPendingMessage(boost::intrusive_ptr<Message> m); - void deletePendingMessage(SequenceNumber id); - void flushPendingMessages(); + //void addPendingMessage(boost::intrusive_ptr<Message> m); + //void deletePendingMessage(SequenceNumber id); + //void flushPendingMessages(); /** schedule the processing of a completed ingress message.transfer command */ - void scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync); + //void scheduleMsgCompletion(SequenceNumber cmd, + // bool requiresAccept, + // bool requiresSync); void cancel(); // called by SessionState destructor. void attached(); // called by SessionState on attach() void detached(); // called by SessionState on detach() - }; - boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter; - /** Abstract class that represents a single asynchronous command that is - * pending completion. - */ - class AsyncCommandContext : public AsyncCompletion::Callback - { - public: - AsyncCommandContext( SessionState *ss, SequenceNumber _id ) - : id(_id), completerContext(ss->asyncCommandCompleter) {} - virtual ~AsyncCommandContext() {} - - protected: - SequenceNumber id; - boost::intrusive_ptr<AsyncCommandCompleter> completerContext; + /** called by async command handlers */ + void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, + framing::SequenceNumber, bool, bool); + void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&); + void flushPendingCommands(); + void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&); }; + boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager; + + private: /** incomplete Message.transfer commands - inbound to broker from client */ - class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext + class IncompleteIngressMsgXfer : public AsyncCommandContext, + public AsyncCompletion::Callback { public: IncompleteIngressMsgXfer( SessionState *ss, boost::intrusive_ptr<Message> m ) - : AsyncCommandContext(ss, m->getCommandId()), - session(ss), + : session(ss), msg(m), - requiresAccept(m->requiresAccept()), - requiresSync(m->getFrames().getMethod()->isSync()), pending(false) {} virtual ~IncompleteIngressMsgXfer() {}; + // async completion calls virtual void completed(bool); virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone(); + // async cmd calls + virtual void flush(); + private: SessionState *session; // only valid if sync flag in callback is true boost::intrusive_ptr<Message> msg; - bool requiresAccept; - bool requiresSync; bool pending; // true if msg saved on pending list... }; diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 34e4592a15..a752e3afec 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -366,7 +366,7 @@ class TestMessageStoreOC : public MessageStore virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, - const PersistableQueue& /*queue*/) + const boost::shared_ptr<PersistableQueue>& /*queue*/) { if (error) throw Exception("Dequeue error test"); deqCnt++; diff --git a/qpid/cpp/src/tests/TestMessageStore.h b/qpid/cpp/src/tests/TestMessageStore.h index 20e0b755b2..7ef6364a8e 100644 --- a/qpid/cpp/src/tests/TestMessageStore.h +++ b/qpid/cpp/src/tests/TestMessageStore.h @@ -41,7 +41,7 @@ class TestMessageStore : public NullMessageStore void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& /*queue*/) + const boost::shared_ptr<PersistableQueue>& /*queue*/) { dequeued.push_back(msg); } |