summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java46
1 files changed, 18 insertions, 28 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
index 0966a92d4b..e56af3c7a8 100644
--- a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
@@ -19,10 +19,10 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
import java.util.ArrayList;
import java.util.Iterator;
@@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Manages delivery of messages on behalf of a queue
*/
@@ -44,7 +45,7 @@ public class DeliveryManager
/**
* Holds any queued messages
*/
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>();
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
//private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
@@ -63,8 +64,6 @@ public class DeliveryManager
private final AMQQueue _queue;
- private volatile int _queueSize = 0;
-
DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
//Set values from configuration
@@ -84,7 +83,7 @@ public class DeliveryManager
*/
private boolean queueing()
{
- return getMessageCount() != 0;
+ return hasQueuedMessages();
}
@@ -121,28 +120,19 @@ public class DeliveryManager
private boolean addMessageToQueue(AMQMessage msg)
{
- // Shrink the ContentBodies to their actual size to save memory.
- // synchronize to ensure this msg is the next one to get added.
+ // Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- synchronized(_messages)
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
{
- Iterator it = msg.getContentBodies().iterator();
- while (it.hasNext())
- {
- ContentBody cb = (ContentBody) it.next();
- cb.reduceBufferToFit();
- }
-
- _messages.offer(msg);
- _queueSize++;
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
}
}
- else
- {
- _messages.offer(msg);
- _queueSize++;
- }
+
+ _messages.offer(msg);
+
return true;
}
@@ -153,9 +143,9 @@ public class DeliveryManager
*
* @return true if there are queued messages
*/
- private boolean hasQueuedMessages()
+ public boolean hasQueuedMessages()
{
- return getMessageCount() != 0;
+ return !_messages.isEmpty();
}
public int getQueueMessageCount()
@@ -216,7 +206,8 @@ public class DeliveryManager
//We don't synchronize access to subscribers so need to re-check
if (next != null)
{
- next.send(poll(), _queue);
+ next.send(peek(), _queue);
+ poll();
}
else
{
@@ -230,7 +221,7 @@ public class DeliveryManager
}
finally
{
- _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
+ _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
_processing.set(false);
}
}
@@ -242,7 +233,6 @@ public class DeliveryManager
private AMQMessage poll()
{
- _queueSize--;
return _messages.poll();
}
@@ -262,7 +252,7 @@ public class DeliveryManager
*/
void processAsync(Executor executor)
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" +
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
" Active:" + _subscriptions.hasActiveSubscribers() +
" Processing:" + _processing.get());