diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index e14ed0f41d..51fbff76f4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -24,7 +24,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.AMQException; public class AMQPriorityQueue extends SimpleAMQQueue { @@ -34,11 +33,19 @@ public class AMQPriorityQueue extends SimpleAMQQueue final boolean autoDelete, final VirtualHost virtualHost, int priorities) - throws AMQException { super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities)); } + public AMQPriorityQueue(String queueName, + boolean durable, + String owner, + boolean autoDelete, + VirtualHost virtualHost, int priorities) + { + this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities); + } + public int getPriorities() { return ((PriorityQueueList) _entries).getPriorities(); @@ -52,20 +59,28 @@ public class AMQPriorityQueue extends SimpleAMQQueue while(subIter.advance() && !entry.isAcquired()) { final Subscription subscription = subIter.getNode().getSubscription(); - QueueEntry subnode = subscription.getLastSeenEntry(); - while((entry.compareTo(subnode) < 0) && !entry.isAcquired()) + if(!subscription.isClosed()) { - if(subscription.setLastSeenEntry(subnode,entry)) + QueueContext context = (QueueContext) subscription.getQueueContext(); + if(context != null) { - break; - } - else - { - subnode = subscription.getLastSeenEntry(); + QueueEntry subnode = context._lastSeenEntry; + QueueEntry released = context._releasedEntry; + while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0)) + { + if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) + { + break; + } + else + { + subnode = context._lastSeenEntry; + released = context._releasedEntry; + } + } } } } } - } |