summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.h')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h54
1 files changed, 29 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 5bafb5eb0f..c244b57a2e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -24,10 +24,11 @@
#include <qpid/RefCounted.h>
+#include "qpid/sys/Time.h"
#include <qpid/sys/Mutex.h>
+#include "qpid/cluster/types.h"
#include <boost/intrusive_ptr.hpp>
-
// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on
// class broker::Cluster::Queue. This becomes the cluster context.
@@ -35,55 +36,58 @@ namespace qpid {
namespace broker {
class Queue;
}
+namespace sys {
+class Timer;
+class TimerTask;
+}
+
namespace cluster {
class Multicaster;
/**
* Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status
+ * Manages the local queue start/stop status.
*
- * Thread safe: Called by connection and dispatch threads.
+ * Thread safe: Called by connection, dispatch and timer threads.
*/
class QueueContext : public RefCounted {
- // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr?
public:
QueueContext(broker::Queue& q, Multicaster& m);
+ ~QueueContext();
- /** Sharing ownership of queue, can acquire up to limit before releasing.
- * Called in deliver thread.
- */
- void sharedOwner(size_t limit);
-
- /** Sole owner of queue, no limits to acquiring */
- void soleOwner();
+ /** Replica state has changed, called in deliver thread. */
+ void replicaState(QueueOwnership);
- /**
- * Count an acquired message against the limit.
- * Called from connection threads while consuming messages
+ /** Called when queue is stopped, no threads are dispatching.
+ * Connection or deliver thread.
*/
- void acquire();
-
- /** Called if the queue becomes empty, from connection thread. */
- void empty();
-
- /** Called when queue is stopped, connection or deliver thread. */
void stopped();
- /** Called when the last subscription to a queue is cancelled */
- void unsubscribed();
+ /** Called when a consumer is added to the queue.
+ *@param n: nubmer of consumers after new one is added.
+ */
+ void consume(size_t n);
+
+ /** Called when a consumer is cancelled on the queue.
+ *@param n: nubmer of consumers after the cancel.
+ */
+ void cancel(size_t n);
/** Get the context for a broker queue. */
static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
+ /** Called when the timer runs out: stop the queue. */
+ void timeout();
+
private:
- void release();
+ sys::Timer& timer;
sys::Mutex lock;
- enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner;
- size_t count; // Count of dequeues remaining, 0 means no limit.
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
+ size_t consumers;
// FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
};