summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java37
1 files changed, 26 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index e14ed0f41d..51fbff76f4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -24,7 +24,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
public class AMQPriorityQueue extends SimpleAMQQueue
{
@@ -34,11 +33,19 @@ public class AMQPriorityQueue extends SimpleAMQQueue
final boolean autoDelete,
final VirtualHost virtualHost,
int priorities)
- throws AMQException
{
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ public AMQPriorityQueue(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, int priorities)
+ {
+ this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+ }
+
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
@@ -52,20 +59,28 @@ public class AMQPriorityQueue extends SimpleAMQQueue
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
+ if(!subscription.isClosed())
{
- if(subscription.setLastSeenEntry(subnode,entry))
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
{
- break;
- }
- else
- {
- subnode = subscription.getLastSeenEntry();
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+ }
}
}
}
}
-
}