summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java28
1 files changed, 17 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 2383d6e0be..37e8029796 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -61,19 +61,25 @@ public class AMQPriorityQueue extends SimpleAMQQueue
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
- QueueContext context = (QueueContext) subscription.getQueueContext();
- QueueEntry subnode = context._lastSeenEntry;
- QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ if(!subscription.isClosed())
{
- if(_releasedUpdater.compareAndSet(context,released,entry))
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
{
- break;
- }
- else
- {
- subnode = context._lastSeenEntry;
- released = context._releasedEntry;
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ {
+ if(_releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+ }
}
}