diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 54 |
1 files changed, 29 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 5bafb5eb0f..c244b57a2e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -24,10 +24,11 @@ #include <qpid/RefCounted.h> +#include "qpid/sys/Time.h" #include <qpid/sys/Mutex.h> +#include "qpid/cluster/types.h" #include <boost/intrusive_ptr.hpp> - // FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on // class broker::Cluster::Queue. This becomes the cluster context. @@ -35,55 +36,58 @@ namespace qpid { namespace broker { class Queue; } +namespace sys { +class Timer; +class TimerTask; +} + namespace cluster { class Multicaster; /** * Queue state that is not replicated to the cluster. - * Manages the local queue start/stop status + * Manages the local queue start/stop status. * - * Thread safe: Called by connection and dispatch threads. + * Thread safe: Called by connection, dispatch and timer threads. */ class QueueContext : public RefCounted { - // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr? public: QueueContext(broker::Queue& q, Multicaster& m); + ~QueueContext(); - /** Sharing ownership of queue, can acquire up to limit before releasing. - * Called in deliver thread. - */ - void sharedOwner(size_t limit); - - /** Sole owner of queue, no limits to acquiring */ - void soleOwner(); + /** Replica state has changed, called in deliver thread. */ + void replicaState(QueueOwnership); - /** - * Count an acquired message against the limit. - * Called from connection threads while consuming messages + /** Called when queue is stopped, no threads are dispatching. + * Connection or deliver thread. */ - void acquire(); - - /** Called if the queue becomes empty, from connection thread. */ - void empty(); - - /** Called when queue is stopped, connection or deliver thread. */ void stopped(); - /** Called when the last subscription to a queue is cancelled */ - void unsubscribed(); + /** Called when a consumer is added to the queue. + *@param n: nubmer of consumers after new one is added. + */ + void consume(size_t n); + + /** Called when a consumer is cancelled on the queue. + *@param n: nubmer of consumers after the cancel. + */ + void cancel(size_t n); /** Get the context for a broker queue. */ static boost::intrusive_ptr<QueueContext> get(broker::Queue&); + /** Called when the timer runs out: stop the queue. */ + void timeout(); + private: - void release(); + sys::Timer& timer; sys::Mutex lock; - enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner; - size_t count; // Count of dequeues remaining, 0 means no limit. broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; + boost::intrusive_ptr<sys::TimerTask> timerTask; + size_t consumers; // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing. }; |