summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-24 11:50:09 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-24 11:50:09 +0000
commit74dbde147951a10f7d9d817eb86046845087a00f (patch)
treebea7a633e6bffd21f4a749f3adaa972f54f708e9 /java
parentc58c6dedd5f681f9b06b7bdd3a50dc671f995d93 (diff)
downloadqpid-python-74dbde147951a10f7d9d817eb86046845087a00f.tar.gz
QPID-48
Enabled buffer compression on delivery queue as a configuration option from the xml.(advanced.compressBufferOnQueue) Changed DeliveryManager.java to use ConcurrentLinkedQueueNoSize.java this is the standard ConcurrentLinkedQueue but where the size() method returns 0(empty) or 1(has content) as it is an expensive operation. Previously there was a race condition with the use of the _queueing boolean. Under load the consumer would sometimes stop getting messages. This was due to the messages being enqueued without starting the Worker thread. There is still an issue (QPID-54) on high receiving load messages rate received by a client drops. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@467313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/etc/config.xml1
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java163
-rw-r--r--java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java35
-rw-r--r--java/common/src/org/apache/qpid/framing/ContentBody.java3
4 files changed, 160 insertions, 42 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 77997a3685..0e4c450047 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -37,6 +37,7 @@
<enablePooledAllocator>false</enablePooledAllocator>
<enableDirectBuffers>false</enableDirectBuffers>
<framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
</advanced>
<security>
<principal-databases>
diff --git a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
index 64a54fe803..0966a92d4b 100644
--- a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
@@ -17,28 +17,35 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
-import java.util.LinkedList;
-import java.util.Queue;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages delivery of messages on behalf of a queue
- *
*/
-class DeliveryManager
+public class DeliveryManager
{
private static final Logger _log = Logger.getLogger(DeliveryManager.class);
+ @Configured(path = "advanced.compressBufferOnQueue",
+ defaultValue = "false")
+ public boolean compressBufferOnQueue;
/**
* Holds any queued messages
*/
- private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>();
+ //private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
* any time.
@@ -50,72 +57,124 @@ class DeliveryManager
private final SubscriptionManager _subscriptions;
/**
- * An indication of the mode we are in. If this is true then messages are
- * being queued up in _messages for asynchronous delivery. If it is false
- * then messages can be delivered directly as they come in.
- */
- private boolean _queueing;
-
- /**
* A reference to the queue we are delivering messages for. We need this to be able
* to pass the code that handles acknowledgements a handle on the queue.
*/
private final AMQQueue _queue;
+
+ private volatile int _queueSize = 0;
+
DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
+ //Set values from configuration
+ Configurator.configure(this);
+
+ if (compressBufferOnQueue)
+ {
+ _log.info("Compressing Buffers on queue.");
+ }
+
_subscriptions = subscriptions;
_queue = queue;
}
- private synchronized boolean enqueue(AMQMessage msg)
+ /**
+ * @return boolean if we are queueing
+ */
+ private boolean queueing()
+ {
+ return getMessageCount() != 0;
+ }
+
+
+ /**
+ * @param msg to enqueue
+ * @return true if we are queue this message
+ */
+ private boolean enqueue(AMQMessage msg)
{
- if (_queueing)
+ if (msg.isImmediate())
{
- if(msg.isImmediate())
+ return false;
+ }
+ else
+ {
+ if (queueing())
{
- //can't enqueue messages for whom immediate delivery is required
- return false;
+ return addMessageToQueue(msg);
}
else
{
- _messages.offer(msg);
- return true;
+ return false;
}
}
- else
+ }
+
+ private void startQueueing(AMQMessage msg)
+ {
+ if (!msg.isImmediate())
{
- return false;
+ addMessageToQueue(msg);
}
}
- private synchronized void startQueueing(AMQMessage msg)
+ private boolean addMessageToQueue(AMQMessage msg)
{
- _queueing = true;
- enqueue(msg);
+ // Shrink the ContentBodies to their actual size to save memory.
+ // synchronize to ensure this msg is the next one to get added.
+ if (compressBufferOnQueue)
+ {
+ synchronized(_messages)
+ {
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
+ }
+
+ _messages.offer(msg);
+ _queueSize++;
+ }
+ }
+ else
+ {
+ _messages.offer(msg);
+ _queueSize++;
+ }
+ return true;
}
+
/**
* Determines whether there are queued messages. Sets _queueing to false if
* there are no queued messages. This needs to be atomic.
*
* @return true if there are queued messages
*/
- private synchronized boolean hasQueuedMessages()
+ private boolean hasQueuedMessages()
{
- boolean empty = _messages.isEmpty();
- if (empty)
- {
- _queueing = false;
- }
- return !empty;
+ return getMessageCount() != 0;
}
- public synchronized int getQueueMessageCount()
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+
+ private int getMessageCount()
{
return _messages.size();
}
+
protected synchronized List<AMQMessage> getMessages()
{
return new ArrayList<AMQMessage>(_messages);
@@ -151,7 +210,9 @@ class DeliveryManager
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
while (hasQueuedMessages() && hasSubscribers)
{
- Subscription next = _subscriptions.nextSubscriber(peek());
+ // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
+ Subscription next = _subscriptions.nextSubscriber(peek());
+
//We don't synchronize access to subscribers so need to re-check
if (next != null)
{
@@ -169,20 +230,24 @@ class DeliveryManager
}
finally
{
+ _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
_processing.set(false);
}
}
- private synchronized AMQMessage peek()
+ private AMQMessage peek()
{
return _messages.peek();
}
- private synchronized AMQMessage poll()
+ private AMQMessage poll()
{
+ _queueSize--;
return _messages.poll();
}
+ Runner asyncDelivery = new Runner();
+
/**
* Requests that the delivery manager start processing the queue asynchronously
* if there is work that can be done (i.e. there are messages queued up and
@@ -197,12 +262,16 @@ class DeliveryManager
*/
void processAsync(Executor executor)
{
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
- executor.execute(new Runner());
+ executor.execute(asyncDelivery);
}
}
}
@@ -214,8 +283,6 @@ class DeliveryManager
*
* @param name the name of the entity on whose behalf we are delivering the message
* @param msg the message to deliver
- * @throws NoConsumersException if there are no active subscribers to deliver
- * the message to
* @throws FailedDequeueException if the message could not be dequeued
*/
void deliver(String name, AMQMessage msg) throws FailedDequeueException
@@ -224,11 +291,27 @@ class DeliveryManager
if (!enqueue(msg))
{
// not queueing so deliver message to 'next' subscriber
- Subscription s = _subscriptions.nextSubscriber(msg);
+ Subscription s = _subscriptions.nextSubscriber(msg);
if (s == null)
{
if (!msg.isImmediate())
{
+ if (_subscriptions instanceof SubscriptionSet)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers()
+ + " Size :" + ((SubscriptionSet) _subscriptions).size()
+ + " Empty :" + ((SubscriptionSet) _subscriptions).isEmpty());
+ }
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers());
+ }
+ }
// no subscribers yet so enter 'queueing' mode and queue this message
startQueueing(msg);
}
diff --git a/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java b/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
new file mode 100644
index 0000000000..628e03839a
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
+{
+ public int size()
+ {
+ if (isEmpty())
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+}
diff --git a/java/common/src/org/apache/qpid/framing/ContentBody.java b/java/common/src/org/apache/qpid/framing/ContentBody.java
index d7b668534c..14344a3870 100644
--- a/java/common/src/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/org/apache/qpid/framing/ContentBody.java
@@ -63,8 +63,7 @@ public class ContentBody extends AMQBody
ByteBuffer newPayload = ByteBuffer.allocate(size);
newPayload.put(payload);
- newPayload.position(0);
- newPayload.limit(size);
+ newPayload.flip();
//reduce reference count on payload
payload.release();