diff options
4 files changed, 160 insertions, 42 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 77997a3685..0e4c450047 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -37,6 +37,7 @@ <enablePooledAllocator>false</enablePooledAllocator> <enableDirectBuffers>false</enableDirectBuffers> <framesize>65535</framesize> + <compressBufferOnQueue>false</compressBufferOnQueue> </advanced> <security> <principal-databases> diff --git a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java index 64a54fe803..0966a92d4b 100644 --- a/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java @@ -17,28 +17,35 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize; -import java.util.LinkedList; -import java.util.Queue; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * Manages delivery of messages on behalf of a queue - * */ -class DeliveryManager +public class DeliveryManager { private static final Logger _log = Logger.getLogger(DeliveryManager.class); + @Configured(path = "advanced.compressBufferOnQueue", + defaultValue = "false") + public boolean compressBufferOnQueue; /** * Holds any queued messages */ - private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>(); + private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>(); + //private int _messageCount; /** * Ensures that only one asynchronous task is running for this manager at * any time. @@ -50,72 +57,124 @@ class DeliveryManager private final SubscriptionManager _subscriptions; /** - * An indication of the mode we are in. If this is true then messages are - * being queued up in _messages for asynchronous delivery. If it is false - * then messages can be delivered directly as they come in. - */ - private boolean _queueing; - - /** * A reference to the queue we are delivering messages for. We need this to be able * to pass the code that handles acknowledgements a handle on the queue. */ private final AMQQueue _queue; + + private volatile int _queueSize = 0; + DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { + //Set values from configuration + Configurator.configure(this); + + if (compressBufferOnQueue) + { + _log.info("Compressing Buffers on queue."); + } + _subscriptions = subscriptions; _queue = queue; } - private synchronized boolean enqueue(AMQMessage msg) + /** + * @return boolean if we are queueing + */ + private boolean queueing() + { + return getMessageCount() != 0; + } + + + /** + * @param msg to enqueue + * @return true if we are queue this message + */ + private boolean enqueue(AMQMessage msg) { - if (_queueing) + if (msg.isImmediate()) { - if(msg.isImmediate()) + return false; + } + else + { + if (queueing()) { - //can't enqueue messages for whom immediate delivery is required - return false; + return addMessageToQueue(msg); } else { - _messages.offer(msg); - return true; + return false; } } - else + } + + private void startQueueing(AMQMessage msg) + { + if (!msg.isImmediate()) { - return false; + addMessageToQueue(msg); } } - private synchronized void startQueueing(AMQMessage msg) + private boolean addMessageToQueue(AMQMessage msg) { - _queueing = true; - enqueue(msg); + // Shrink the ContentBodies to their actual size to save memory. + // synchronize to ensure this msg is the next one to get added. + if (compressBufferOnQueue) + { + synchronized(_messages) + { + Iterator it = msg.getContentBodies().iterator(); + while (it.hasNext()) + { + ContentBody cb = (ContentBody) it.next(); + cb.reduceBufferToFit(); + } + + _messages.offer(msg); + _queueSize++; + } + } + else + { + _messages.offer(msg); + _queueSize++; + } + return true; } + /** * Determines whether there are queued messages. Sets _queueing to false if * there are no queued messages. This needs to be atomic. * * @return true if there are queued messages */ - private synchronized boolean hasQueuedMessages() + private boolean hasQueuedMessages() { - boolean empty = _messages.isEmpty(); - if (empty) - { - _queueing = false; - } - return !empty; + return getMessageCount() != 0; } - public synchronized int getQueueMessageCount() + public int getQueueMessageCount() + { + return getMessageCount(); + } + + /** + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. + * + * @return int the number of messages in the delivery queue. + */ + + private int getMessageCount() { return _messages.size(); } + protected synchronized List<AMQMessage> getMessages() { return new ArrayList<AMQMessage>(_messages); @@ -151,7 +210,9 @@ class DeliveryManager boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); while (hasQueuedMessages() && hasSubscribers) { - Subscription next = _subscriptions.nextSubscriber(peek()); + // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); + Subscription next = _subscriptions.nextSubscriber(peek()); + //We don't synchronize access to subscribers so need to re-check if (next != null) { @@ -169,20 +230,24 @@ class DeliveryManager } finally { + _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); _processing.set(false); } } - private synchronized AMQMessage peek() + private AMQMessage peek() { return _messages.peek(); } - private synchronized AMQMessage poll() + private AMQMessage poll() { + _queueSize--; return _messages.poll(); } + Runner asyncDelivery = new Runner(); + /** * Requests that the delivery manager start processing the queue asynchronously * if there is work that can be done (i.e. there are messages queued up and @@ -197,12 +262,16 @@ class DeliveryManager */ void processAsync(Executor executor) { + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) { //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { - executor.execute(new Runner()); + executor.execute(asyncDelivery); } } } @@ -214,8 +283,6 @@ class DeliveryManager * * @param name the name of the entity on whose behalf we are delivering the message * @param msg the message to deliver - * @throws NoConsumersException if there are no active subscribers to deliver - * the message to * @throws FailedDequeueException if the message could not be dequeued */ void deliver(String name, AMQMessage msg) throws FailedDequeueException @@ -224,11 +291,27 @@ class DeliveryManager if (!enqueue(msg)) { // not queueing so deliver message to 'next' subscriber - Subscription s = _subscriptions.nextSubscriber(msg); + Subscription s = _subscriptions.nextSubscriber(msg); if (s == null) { if (!msg.isImmediate()) { + if (_subscriptions instanceof SubscriptionSet) + { + if (_log.isDebugEnabled()) + { + _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers() + + " Size :" + ((SubscriptionSet) _subscriptions).size() + + " Empty :" + ((SubscriptionSet) _subscriptions).isEmpty()); + } + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers()); + } + } // no subscribers yet so enter 'queueing' mode and queue this message startQueueing(msg); } diff --git a/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java b/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java new file mode 100644 index 0000000000..628e03839a --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E> +{ + public int size() + { + if (isEmpty()) + { + return 0; + } + else + { + return 1; + } + } +} diff --git a/java/common/src/org/apache/qpid/framing/ContentBody.java b/java/common/src/org/apache/qpid/framing/ContentBody.java index d7b668534c..14344a3870 100644 --- a/java/common/src/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/org/apache/qpid/framing/ContentBody.java @@ -63,8 +63,7 @@ public class ContentBody extends AMQBody ByteBuffer newPayload = ByteBuffer.allocate(size); newPayload.put(payload); - newPayload.position(0); - newPayload.limit(size); + newPayload.flip(); //reduce reference count on payload payload.release(); |