summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp7
1 files changed, 5 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 122163ee7e..60b218da14 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -56,7 +56,7 @@ QueueContext::~QueueContext() {
// FIXME aconway 2011-07-27: revisit shutdown logic.
// timeout() could be called concurrently with destructor.
sys::Mutex::ScopedLock l(lock);
- timerTask->cancel();
+ if (timerTask) timerTask->cancel();
}
void QueueContext::replicaState(QueueOwnership state) {
@@ -84,6 +84,7 @@ void QueueContext::replicaState(QueueOwnership state) {
// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
@@ -91,6 +92,7 @@ void QueueContext::consume(size_t n) {
framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
+// Called in connection threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
@@ -100,6 +102,7 @@ void QueueContext::cancel(size_t n) {
void QueueContext::timeout() {
QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
queue.stop();
+ // When all threads have stopped, queue will call stopped()
}
@@ -109,7 +112,7 @@ void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
// FIXME aconway 2011-07-28: review thread safety of state.
// Deffered call to stopped doesn't sit well.
- // queueActive is invaled while stop is in progress?
+ // queueActive is invalid while stop is in progress?
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));