summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java26
1 files changed, 26 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 8ef28fbcd2..ba6b392d13 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.AMQException;
public class AMQPriorityQueue extends SimpleAMQQueue
@@ -37,5 +39,29 @@ public class AMQPriorityQueue extends SimpleAMQQueue
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ @Override
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // check that all subscriptions are not in advance of the entry
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ QueueEntry subnode = subscription.getLastSeenEntry();
+ while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
+ {
+ if(subscription.setLastSeenEntry(subnode,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = subscription.getLastSeenEntry();
+ }
+ }
+
+ }
+ }
+
}