summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp80
1 files changed, 50 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f593d7e443..84f025824c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _autodelete,
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0)
+ autoDeleteTimeout(0),
+ dispatching(boost::bind(&Queue::acquireStopped,this))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-// Inform the cluster of an acquired message on exit from a function
-// that does the acquiring. ClusterAcquireOnExit is declared *before*
-// any locks are taken. The calling function sets qmsg to the acquired
-// message with a lock held, but the call to Cluster::acquire() will
-// be outside the lock.
-struct ClusterAcquireOnExit {
+/** Mark a scope that acquires a message.
+ *
+ * ClusterAcquireScope is declared before are taken. The calling
+ * function sets qmsg with the lock held, but the call to
+ * Cluster::acquire() will happen after the lock is released.
+ *
+ * Also marks a Stoppable as busy for the duration of the scope.
+ **/
+struct ClusterAcquireScope {
Broker* broker;
+ Queue& queue;
QueuedMessage qmsg;
- ClusterAcquireOnExit(Broker* b) : broker(b) {}
- ~ClusterAcquireOnExit() {
- if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+
+ ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {}
+
+ ~ClusterAcquireScope() {
+ if (broker) {
+ // FIXME aconway 2011-06-27: Move to QueueContext.
+ // Avoid the indirection via queuename.
+ if (qmsg.queue) broker->getCluster().acquire(qmsg);
+ else broker->getCluster().empty(queue);
+ }
}
};
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- ClusterAcquireOnExit willAcquire(broker); // Outside lock
+ ClusterAcquireScope acquireScope(*this); // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
- willAcquire.qmsg = message;
+ acquireScope.qmsg = message;
return true;
} else {
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
- ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+ Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming
+ if (!stopper) {
+ QPID_LOG(trace, "Queue is stopped: " << name);
+ listeners.addListener(c);
+ return NO_MESSAGES;
+ }
+ ClusterAcquireScope acquireScope(*this); // Outside the lock
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- willAcquire.qmsg = msg;
+ acquireScope.qmsg = msg;
pop();
return CONSUMED;
} else {
@@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::shared_ptr c)
bool Queue::dispatch(Consumer::shared_ptr c)
{
- Stoppable::Scope doDispatch(dispatching);
- if (doDispatch) {
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
- return false;
- }
- } else { // Dispatching is stopped
- Mutex::ScopedLock locker(messageLock);
- listeners.addListener(c); // FIXME aconway 2011-05-05:
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
return false;
}
}
@@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr c){
}
QueuedMessage Queue::get(){
- ClusterAcquireOnExit willAcquire(broker); // Outside lock
+ ClusterAcquireScope acquireScope(*this); // Outside lock
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg)) willAcquire.qmsg = msg;
+ if (messages->pop(msg)) acquireScope.qmsg = msg;
return msg;
}
@@ -704,7 +715,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
if (!isEnqueued(msg)) return false;
if (!ctxt) dequeued(msg);
}
+
if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
+
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -902,6 +915,10 @@ void Queue::notifyDeleted()
set.notifyAll();
}
+void Queue::acquireStopped() {
+ if (broker) broker->getCluster().stopped(*this);
+}
+
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
@@ -1234,7 +1251,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
@@ -1268,10 +1285,13 @@ void Queue::UsageBarrier::destroy()
// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
void Queue::stop() {
+ // FIXME aconway 2011-05-25: rename dispatching - acquiring?
dispatching.stop();
}
void Queue::start() {
+ QPID_LOG(critical, "FIXME start context=" << clusterContext);
+ assert(clusterContext); // FIXME aconway 2011-06-08: XXX
dispatching.start();
notifyListener();
}