summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java30
1 files changed, 10 insertions, 20 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a3709641f0..16d24e74ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -81,7 +81,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicBoolean _quiesced = new AtomicBoolean(false);
- private final SubscriptionList _subscriptionList = new SubscriptionList(this);
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
private boolean _exclusiveSubscriber;
@@ -389,13 +389,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
- deliverMessage(sub, entry);
QueueEntry queueEntryNode = sub.getLastSeenEntry();
if(_entries.next(queueEntryNode) == entry)
{
sub.setLastSeenEntry(queueEntryNode,entry);
}
+ deliverMessage(sub, entry);
+
}
}
}
@@ -414,25 +415,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else if(!entry.isAcquired())
{
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
- {
- if(subscription.setLastSeenEntry(subnode,entry))
- {
- break;
- }
- else
- {
- subnode = subscription.getLastSeenEntry();
- }
- }
+ checkSubscriptionsNotAheadOfDelivery(entry);
- }
deliverAsync();
}
@@ -451,6 +435,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // This method is only required for queues which mess with ordering
+ }
+
private void incrementQueueSize(final AMQMessage message)
{
getAtomicQueueSize().addAndGet(message.getSize());
@@ -1204,6 +1193,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
QueueEntry node = sub.getLastSeenEntry();
+
while(node.isAcquired() || node.isDeleted() || node.expired())
{
if(!node.isAcquired() && !node.isDeleted() && node.expired())