diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index de7109e131..fc4f6d7bf8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -37,8 +37,7 @@ namespace cluster { // FIXME aconway 2011-09-16: configurable timeout. QueueContext::QueueContext(broker::Queue& q, Multicaster& m) - : ownership(UNSUBSCRIBED), - timer(boost::bind(&QueueContext::timeout, this), + : timer(boost::bind(&QueueContext::timeout, this), q.getBroker()->getTimer(), 100*sys::TIME_MSEC), queue(q), mcast(m), consumers(0) @@ -53,7 +52,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. -void QueueContext::replicaState(QueueOwnership newOwnership) { +void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { + assert(before != after); // Invariants for ownership: // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped @@ -61,16 +61,11 @@ void QueueContext::replicaState(QueueOwnership newOwnership) { // SHARED_OWNER <=> timer started, queue started sys::Mutex::ScopedLock l(lock); - QueueOwnership before = ownership; - QueueOwnership after = newOwnership; - assert(before != after); - ownership = newOwnership; - if (!isOwner(before) && isOwner(after)) { // Took ownership queue.startConsumers(); if (after == SHARED_OWNER) timer.start(); } - else if (isOwner(before) && isOwner(after) && before != after) { + else if (isOwner(before) && isOwner(after)) { // Changed from shared to sole owner or vice versa if (after == SOLE_OWNER) timer.stop(); else timer.start(); @@ -78,7 +73,8 @@ void QueueContext::replicaState(QueueOwnership newOwnership) { // If we lost ownership then the queue and timer will already have // been stopped by timeout() } -// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue. + +// FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in connection threads when a consumer is added void QueueContext::consume(size_t n) { @@ -95,13 +91,12 @@ void QueueContext::cancel(size_t n) { // When consuming threads are stopped, this->stopped will be called. if (n == 0) { timer.stop(); - queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock? + queue.stopConsumers(); } } // Called in timer thread. void QueueContext::timeout() { - QPID_LOG(debug, "FIXME QueueContext::timeout"); // When all threads have stopped, queue will call stopped() queue.stopConsumers(); } @@ -109,7 +104,6 @@ void QueueContext::timeout() { // Callback set up by queue.stopConsumers() called in connection thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - QPID_LOG(debug, "FIXME QueueContext::stopped"); sys::Mutex::ScopedLock l(lock); if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( @@ -134,8 +128,8 @@ void QueueContext::acquire(const broker::QueuedMessage& qm) { unacked.put(qm.position, qm); } -void QueueContext::dequeue(uint32_t position) { - unacked.erase(position); +broker::QueuedMessage QueueContext::dequeue(uint32_t position) { + return unacked.pop(position); } boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { |