diff options
Diffstat (limited to 'broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java')
-rw-r--r-- | broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java | 348 |
1 files changed, 0 insertions, 348 deletions
diff --git a/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java deleted file mode 100644 index dde76e5ba8..0000000000 --- a/broker/src/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.server.configuration.Configurator; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicBoolean; - - -/** - * Manages delivery of messages on behalf of a queue - */ -public class ConcurrentDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); - - @Configured(path = "advanced.compressBufferOnQueue", - defaultValue = "false") - public boolean compressBufferOnQueue; - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - - /** - * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. - */ - private ReentrantLock _lock = new ReentrantLock(); - - - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - - //Set values from configuration - Configurator.configure(this); - - if (compressBufferOnQueue) - { - _log.info("Compressing Buffers on queue."); - } - - _subscriptions = subscriptions; - _queue = queue; - } - - /** - * @return boolean if we are queueing - */ - private boolean queueing() - { - return hasQueuedMessages(); - } - - - /** - * @param msg to enqueue - * @return true if we are queue this message - */ - private boolean enqueue(AMQMessage msg) - { - if (msg.isImmediate()) - { - return false; - } - else - { - _lock.lock(); - try - { - if (queueing()) - { - return addMessageToQueue(msg); - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - } - - private void startQueueing(AMQMessage msg) - { - if (!msg.isImmediate()) - { - addMessageToQueue(msg); - } - } - - private boolean addMessageToQueue(AMQMessage msg) - { - // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) - { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - } - - _messages.offer(msg); - - return true; - } - - - public boolean hasQueuedMessages() - { - - _lock.lock(); - try - { - return !_messages.isEmpty(); - } - finally - { - _lock.unlock(); - } - - - } - - public int getQueueMessageCount() - { - return getMessageCount(); - } - - /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. - * - * @return int the number of messages in the delivery queue. - */ - private int getMessageCount() - { - return _messages.size(); - } - - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public synchronized void removeAMessageFromTop() throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(_queue); - } - } - - public synchronized void clearAllMessages() throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(_queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - AMQMessage message = peek(); - - //While we have messages to send and subscribers to send them to. - while (message != null && hasSubscribers) - { - // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); - Subscription next = _subscriptions.nextSubscriber(message); - //FIXME Is there still not the chance that this subscribe could be suspended between here and the send? - - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - next.send(message, _queue); - poll(); - message = peek(); - } - else - { - hasSubscribers = false; - } - } - } - catch (FailedDequeueException e) - { - _log.error("Unable to deliver message as dequeue failed: " + e, e); - } - finally - { - _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); - } - } - - private AMQMessage peek() - { - return _messages.peek(); - } - - private AMQMessage poll() - { - return _messages.poll(); - } - - Runner asyncDelivery = new Runner(); - - public void processAsync(Executor executor) - { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); - - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - executor.execute(asyncDelivery); - } - } - } - - public void deliver(String name, AMQMessage msg) throws FailedDequeueException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - // not queueing so deliver message to 'next' subscriber - _lock.lock(); - try - { - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - if (!msg.isImmediate()) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - } - else - { - s.send(msg, _queue); - msg.setDeliveredToConsumer(); - } - } - finally - { - _lock.unlock(); - } - } - } - - private class Runner implements Runnable - { - public void run() - { - boolean running = true; - while (running) - { - processQueue(); - - //Check that messages have not been added since we did our last peek(); - // Synchronize with the thread that adds to the queue. - // If the queue is still empty then we can exit - _lock.lock(); - try - { - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) - { - running = false; - _processing.set(false); - } - } - finally - { - _lock.unlock(); - } - } - } - } -} |