summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-03-25 13:01:10 +0000
committerRobert Gemmell <robbie@apache.org>2011-03-25 13:01:10 +0000
commitdd1621a65a5cbc02de76d96a5752d9023adf9fc8 (patch)
tree5a9929e3a0e85ad343fb173a330f530e46acdb04
parent19f781317e57fc3679ebba1e26176f2d7976e877 (diff)
downloadqpid-python-dd1621a65a5cbc02de76d96a5752d9023adf9fc8.tar.gz
QPID-3165: ensure all subscriptions are checked before making the decision on whether to stop delivering. Use a boolean instead of doing a 0/1 toggle and update variables to generally clarify logic. Use an int instead of a Long for the iteration decrementing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1085354 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java93
1 files changed, 71 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 9edb97feec..4d0bef90ae 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1503,14 +1503,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ /**
+ * Used by queue Runners to asynchronously deliver messages to consumers.
+ *
+ * A queue Runner is started whenever a state change occurs, e.g when a new
+ * message arrives on the queue and cannot be immediately delivered to a
+ * subscription (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * capable of accepting/delivering all messages then these messages would
+ * otherwise remain on the queue.
+ *
+ * processQueue should be running while there are messages on the queue AND
+ * there are subscriptions that can deliver them. If there are no
+ * subscriptions capable of delivering the remaining messages on the queue
+ * then processQueue should stop to prevent spinning.
+ *
+ * Since processQueue is runs in a fixed size Executor, it should not run
+ * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+ * incoming messages may not be able to be scheduled in the thread pool
+ * because all threads are working on clearing down large queues). To solve
+ * this problem, after an arbitrary number of message deliveries the
+ * processQueue job stops iterating, resubmits itself to the executor, and
+ * ends the current instance
+ *
+ * @param runner the Runner to schedule
+ * @throws AMQException
+ */
private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- int extraLoops = 1;
- Long iterations = new Long(MAX_ASYNC_DELIVERIES);
+ boolean lastLoop = false;
+ int iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1527,12 +1553,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (previousStateChangeCount != stateChangeCount)
{
- extraLoops = 1;
+ //further asynchronous delivery is required since the
+ //previous loop. keep going if iteration slicing allows.
+ lastLoop = false;
}
previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done;
+ boolean allSubscriptionsDone = true;
+ boolean subscriptionDone;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1542,29 +1570,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.getSendLock();
try
{
- done = attemptDelivery(sub);
- if (done)
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub);
+ if (subscriptionDone)
{
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
- if (sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
- }
- else
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && sub.isAutoClose())
{
- extraLoops--;
+ unregisterSubscription(sub);
+
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
}
}
else
{
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
iterations--;
- extraLoops = 1;
}
}
finally
@@ -1572,10 +1597,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.releaseSendLock();
}
}
+
+ if(allSubscriptionsDone && lastLoop)
+ {
+ //We have done an extra loop already and there are again
+ //again no further delivery attempts possible, only
+ //keep going if state change demands it.
+ deliveryIncomplete = false;
+ }
+ else if(allSubscriptionsDone)
+ {
+ //All subscriptions reported being done, but we have to do
+ //an extra loop if the iterations are not exhausted and
+ //there is still any work to be done
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ lastLoop = true;
+ }
+ else
+ {
+ //some subscriptions can still accept more messages,
+ //keep going if iteration count allows.
+ lastLoop = false;
+ deliveryIncomplete = true;
+ }
+
_asynchronousRunner.set(null);
}
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+ // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{