summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp24
1 files changed, 9 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index de7109e131..fc4f6d7bf8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -37,8 +37,7 @@ namespace cluster {
// FIXME aconway 2011-09-16: configurable timeout.
QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
- : ownership(UNSUBSCRIBED),
- timer(boost::bind(&QueueContext::timeout, this),
+ : timer(boost::bind(&QueueContext::timeout, this),
q.getBroker()->getTimer(),
100*sys::TIME_MSEC),
queue(q), mcast(m), consumers(0)
@@ -53,7 +52,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership newOwnership) {
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) {
+ assert(before != after);
// Invariants for ownership:
// UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
@@ -61,16 +61,11 @@ void QueueContext::replicaState(QueueOwnership newOwnership) {
// SHARED_OWNER <=> timer started, queue started
sys::Mutex::ScopedLock l(lock);
- QueueOwnership before = ownership;
- QueueOwnership after = newOwnership;
- assert(before != after);
- ownership = newOwnership;
-
if (!isOwner(before) && isOwner(after)) { // Took ownership
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
}
- else if (isOwner(before) && isOwner(after) && before != after) {
+ else if (isOwner(before) && isOwner(after)) {
// Changed from shared to sole owner or vice versa
if (after == SOLE_OWNER) timer.stop();
else timer.start();
@@ -78,7 +73,8 @@ void QueueContext::replicaState(QueueOwnership newOwnership) {
// If we lost ownership then the queue and timer will already have
// been stopped by timeout()
}
-// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
+
+// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
@@ -95,13 +91,12 @@ void QueueContext::cancel(size_t n) {
// When consuming threads are stopped, this->stopped will be called.
if (n == 0) {
timer.stop();
- queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+ queue.stopConsumers();
}
}
// Called in timer thread.
void QueueContext::timeout() {
- QPID_LOG(debug, "FIXME QueueContext::timeout");
// When all threads have stopped, queue will call stopped()
queue.stopConsumers();
}
@@ -109,7 +104,6 @@ void QueueContext::timeout() {
// Callback set up by queue.stopConsumers() called in connection thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- QPID_LOG(debug, "FIXME QueueContext::stopped");
sys::Mutex::ScopedLock l(lock);
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
@@ -134,8 +128,8 @@ void QueueContext::acquire(const broker::QueuedMessage& qm) {
unacked.put(qm.position, qm);
}
-void QueueContext::dequeue(uint32_t position) {
- unacked.erase(position);
+broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
+ return unacked.pop(position);
}
boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {