diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 30 |
1 files changed, 10 insertions, 20 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a3709641f0..16d24e74ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -81,7 +81,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicBoolean _quiesced = new AtomicBoolean(false); - private final SubscriptionList _subscriptionList = new SubscriptionList(this); + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); private boolean _exclusiveSubscriber; @@ -389,13 +389,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - deliverMessage(sub, entry); QueueEntry queueEntryNode = sub.getLastSeenEntry(); if(_entries.next(queueEntryNode) == entry) { sub.setLastSeenEntry(queueEntryNode,entry); } + deliverMessage(sub, entry); + } } } @@ -414,25 +415,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else if(!entry.isAcquired()) { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - QueueEntry subnode = subscription.getLastSeenEntry(); - while((entry.compareTo(subnode) < 0) && !entry.isAcquired()) - { - if(subscription.setLastSeenEntry(subnode,entry)) - { - break; - } - else - { - subnode = subscription.getLastSeenEntry(); - } - } + checkSubscriptionsNotAheadOfDelivery(entry); - } deliverAsync(); } @@ -451,6 +435,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + { + // This method is only required for queues which mess with ordering + } + private void incrementQueueSize(final AMQMessage message) { getAtomicQueueSize().addAndGet(message.getSize()); @@ -1204,6 +1193,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { QueueEntry node = sub.getLastSeenEntry(); + while(node.isAcquired() || node.isDeleted() || node.expired()) { if(!node.isAcquired() && !node.isDeleted() && node.expired()) |