diff options
author | Alan Conway <aconway@apache.org> | 2011-11-04 20:27:13 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-11-04 20:27:13 +0000 |
commit | 29e3b04915ef30f7e0f769cc1ee3994d99711fef (patch) | |
tree | 02e49caec0e4e7699413d36eab177a3d5bbb732d /qpid/cpp/src/qpid/cluster/exp/QueueContext.h | |
parent | 561fe4dd6234c085dc55bbd430dcab7427d2db29 (diff) | |
download | qpid-python-29e3b04915ef30f7e0f769cc1ee3994d99711fef.tar.gz |
QPID-2920: Batch acquire/dequeue messages in cluster.qpid-2920-active
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1197749 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 47 |
1 files changed, 33 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 20c2aabc1d..d7079ab8a5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -25,11 +25,12 @@ #include "LockedMap.h" #include "Ticker.h" #include "qpid/RefCounted.h" +#include "qpid/broker/Context.h" +#include "qpid/cluster/types.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/sys/AtomicValue.h" -#include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" -#include "qpid/cluster/types.h" -#include <boost/intrusive_ptr.hpp> +#include "qpid/sys/Time.h" namespace qpid { namespace broker { @@ -43,12 +44,13 @@ class Multicaster; class Group; /** - * Queue state that is not replicated to the cluster. - * Manages the local queue start/stop status. + * Local Queue state, manage start/stop consuming on the queue. + * Destroyed when the queue is destroyed, it must erase itself + * from any cluster data structures in its destructor. * -* THREAD SAFE: Called by connection threads and Ticker dispatch threads. + * THREAD SAFE: Called by connection threads and Ticker dispatch threads. */ -class QueueContext : public Ticker::Tickable { +class QueueContext : public broker::Context, Ticker::Tickable { public: QueueContext(broker::Queue&, Group&, size_t consumeTicks); ~QueueContext(); @@ -80,33 +82,50 @@ class QueueContext : public Ticker::Tickable { /** Called by MessageHandler to requeue a message. */ void requeue(uint32_t position, bool redelivered); + /** Called by BrokerContext when a mesages is acquired locally. */ + void localAcquire(uint32_t position); + /** Called by MessageHandler when a mesages is acquired. */ - void acquire(const broker::QueuedMessage& qm); + void acquire(uint32_t position); /** Called by MesageHandler when a message is dequeued. */ - broker::QueuedMessage dequeue(uint32_t position); + void dequeue(uint32_t position); - size_t getHash() const { return hash; } + /** Called by BrokerContext when a message is dequeued locally. */ + void localDequeue(uint32_t position); + + /** Called in deliver thread, take note of another brokers acquires/dequeues. */ + void consumed(const MemberId&, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued); + size_t getHash() const { return hash; } + broker::Queue& getQueue() { return queue; } + /** Get the cluster context for a broker queue. */ - static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ; + static QueueContext* get(broker::Queue&); private: + void sendConsumed(const sys::Mutex::ScopedLock&); + sys::Mutex lock; - size_t consumers; // Number of local consumers - bool consuming; // True if we have the lock & local consumers are active + QueueOwnership ownership; // Ownership status. + size_t consumers; // Number of local consumers. + bool consuming; // True if we have the lock. size_t ticks; // Ticks since we got the lock. // Following members are immutable - broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? + broker::Queue& queue; Multicaster& mcast; size_t hash; size_t maxTicks; // Max ticks we are allowed. + framing::SequenceSet acquired, dequeued; // Track local acquires/dequeues. // Following members are safe to use without holding a lock typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; UnackedMap unacked; + Group& group; }; }} // namespace qpid::cluster |