diff options
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java | 46 |
1 files changed, 18 insertions, 28 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java index 0966a92d4b..e56af3c7a8 100644 --- a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java @@ -19,10 +19,10 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize; import java.util.ArrayList; import java.util.Iterator; @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; + /** * Manages delivery of messages on behalf of a queue */ @@ -44,7 +45,7 @@ public class DeliveryManager /** * Holds any queued messages */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>(); + private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); //private int _messageCount; /** * Ensures that only one asynchronous task is running for this manager at @@ -63,8 +64,6 @@ public class DeliveryManager private final AMQQueue _queue; - private volatile int _queueSize = 0; - DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { //Set values from configuration @@ -84,7 +83,7 @@ public class DeliveryManager */ private boolean queueing() { - return getMessageCount() != 0; + return hasQueuedMessages(); } @@ -121,28 +120,19 @@ public class DeliveryManager private boolean addMessageToQueue(AMQMessage msg) { - // Shrink the ContentBodies to their actual size to save memory. - // synchronize to ensure this msg is the next one to get added. + // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) { - synchronized(_messages) + Iterator it = msg.getContentBodies().iterator(); + while (it.hasNext()) { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - - _messages.offer(msg); - _queueSize++; + ContentBody cb = (ContentBody) it.next(); + cb.reduceBufferToFit(); } } - else - { - _messages.offer(msg); - _queueSize++; - } + + _messages.offer(msg); + return true; } @@ -153,9 +143,9 @@ public class DeliveryManager * * @return true if there are queued messages */ - private boolean hasQueuedMessages() + public boolean hasQueuedMessages() { - return getMessageCount() != 0; + return !_messages.isEmpty(); } public int getQueueMessageCount() @@ -216,7 +206,8 @@ public class DeliveryManager //We don't synchronize access to subscribers so need to re-check if (next != null) { - next.send(poll(), _queue); + next.send(peek(), _queue); + poll(); } else { @@ -230,7 +221,7 @@ public class DeliveryManager } finally { - _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); + _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); _processing.set(false); } } @@ -242,7 +233,6 @@ public class DeliveryManager private AMQMessage poll() { - _queueSize--; return _messages.poll(); } @@ -262,7 +252,7 @@ public class DeliveryManager */ void processAsync(Executor executor) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" + + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + " Active:" + _subscriptions.hasActiveSubscribers() + " Processing:" + _processing.get()); |