diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java new file mode 100644 index 0000000000..0c4b8a0b42 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -0,0 +1,485 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** This is an abstract base class to handle */ +public abstract class FlowableBaseQueueEntryList implements QueueEntryList +{ + protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); + + private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); + private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); + /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ + + protected long _memoryUsageMaximum = -1L; + + /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ + protected long _memoryUsageMinimum = 0; + private volatile AtomicBoolean _flowed; + private QueueBackingStore _backingStore; + protected AMQQueue _queue; + private Executor _inhaler; + private Executor _purger; + private AtomicBoolean _stopped; + private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); + protected boolean _disabled; + private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); + private static final int BATCH_PROCESS_COUNT = 100; + + FlowableBaseQueueEntryList(AMQQueue queue) + { + _queue = queue; + _flowed = new AtomicBoolean(false); + VirtualHost vhost = queue.getVirtualHost(); + if (vhost != null) + { + _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue); + } + + _stopped = new AtomicBoolean(false); + _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _disabled = true; + } + + public void setFlowed(boolean flowed) + { + if (_flowed.get() != flowed) + { + _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); + _flowed.set(flowed); + } + } + + protected void showUsage() + { + showUsage(""); + } + + protected void showUsage(String prefix) + { + if (_log.isDebugEnabled()) + { + _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed() + + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() + + "/" + dataSize()); + } + } + + public boolean isFlowed() + { + return _flowed.get(); + } + + public int size() + { + return _atomicQueueCount.get(); + } + + public long dataSize() + { + return _atomicQueueSize.get(); + } + + public long memoryUsed() + { + return _atomicQueueInMemory.get(); + } + + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + _memoryUsageMaximum = maximumMemoryUsage; + + if (maximumMemoryUsage >= 0) + { + _disabled = false; + } + + // Don't attempt to start the inhaler/purger unless we have a minimum value specified. + if (_memoryUsageMaximum >= 0) + { + setMemoryUsageMinimum(_memoryUsageMaximum / 2); + + // if we have now have to much memory in use we need to purge. + if (_memoryUsageMaximum < _atomicQueueInMemory.get()) + { + setFlowed(true); + startPurger(); + } + } + else + { + if (_log.isInfoEnabled()) + { + _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); + } + _disabled = true; + } + } + + public long getMemoryUsageMaximum() + { + return _memoryUsageMaximum; + } + + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + _memoryUsageMinimum = minimumMemoryUsage; + + // Don't attempt to start the inhaler unless we have a minimum value specified. + if (_memoryUsageMinimum > 0) + { + checkAndStartInhaler(); + } + } + + private void checkAndStartInhaler() + { + // If we've increased the minimum memory above what we have in memory then + // we need to inhale more if there is more + if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) + { + startInhaler(); + } + } + + private void startInhaler() + { + MessageInhaler inhaler = new MessageInhaler(); + + if (_asynchronousInhaler.compareAndSet(null, inhaler)) + { + _inhaler.execute(inhaler); + } + } + + private void startPurger() + { + MessagePurger purger = new MessagePurger(); + + if (_asynchronousPurger.compareAndSet(null, purger)) + { + _purger.execute(purger); + } + } + + public long getMemoryUsageMinimum() + { + return _memoryUsageMinimum; + } + + /** + * Only to be called by the QueueEntry + * + * @param queueEntry the entry to unload + */ + public void entryUnloadedUpdateMemory(QueueEntry queueEntry) + { + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); + } + + checkAndStartInhaler(); + } + + /** + * Only to be called from the QueueEntry + * + * @param queueEntry the entry to load + */ + public void entryLoadedUpdateMemory(QueueEntry queueEntry) + { + if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + { + _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); + setFlowed(true); + startPurger(); + } + } + + public void stop() + { + if (!_stopped.getAndSet(true)) + { + // The SimpleAMQQueue keeps running when stopped so we should just release the services + // rather than actively shutdown our threads. + //Shutdown thread for inhaler. + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + + _backingStore.close(); + } + } + + protected void incrementCounters(final QueueEntryImpl queueEntry) + { + _atomicQueueCount.incrementAndGet(); + _atomicQueueSize.addAndGet(queueEntry.getSize()); + long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + + if (!_disabled && inUseMemory > _memoryUsageMaximum) + { + setFlowed(true); + queueEntry.unload(); + } + } + + protected void dequeued(QueueEntryImpl queueEntry) + { + _atomicQueueCount.decrementAndGet(); + _atomicQueueSize.addAndGet(-queueEntry.getSize()); + if (!queueEntry.isFlowed()) + { + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0 on dequeue."); + } + } + } + + public QueueBackingStore getBackingStore() + { + return _backingStore; + } + + private class MessageInhaler implements Runnable + { + public void run() + { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); + try + { + inhaleList(this); + } + finally + { + Thread.currentThread().setName(threadName); + } + } + } + + private void inhaleList(MessageInhaler messageInhaler) + { + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Running:" + _queue.getName()); + showUsage("Inhaler Running:" + _queue.getName()); + } + // If in memory count is at or over max then we can't inhale + if (_atomicQueueInMemory.get() >= _memoryUsageMaximum) + { + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start inhaling as we are already over quota:" + + _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + } + return; + } + + _asynchronousInhaler.compareAndSet(messageInhaler, null); + int inhaled = 1; + + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available + && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do + && (inhaled > 0) // ensure we could inhale something + && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler + { + inhaled = 0; + QueueEntryIterator iterator = iterator(); + + // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread. + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) + && !iterator.getNode().isAvailable() && iterator.advance()) + { + //Find first AVAILABLE node + } + + // Because the above loop checks then moves on to the next entry a check for atTail will return true but + // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based + // on the return from advance() which returns true if it can advance. + boolean atEndofList = false; + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do + && !atEndofList) // We have reached end of list QueueEntries + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && entry.isFlowed()) + { + if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum) + { + // We don't have space for this message so we need to stop inhaling. + if (_log.isDebugEnabled()) + { + _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity()); + } + inhaled = BATCH_PROCESS_COUNT; + } + else + { + entry.load(); + inhaled++; + } + } + + atEndofList = !iterator.advance(); + } + + if (iterator.atTail()) + { + setFlowed(false); + } + + _asynchronousInhaler.set(null); + } + + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Stopping:" + _queue.getName()); + showUsage("Inhaler Stopping:" + _queue.getName()); + } + + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. + if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + { + if (_log.isInfoEnabled()) + { + _log.info("Rescheduling Inhaler:" + _queue.getName()); + } + _inhaler.execute(messageInhaler); + } + + } + + private class MessagePurger implements Runnable + { + public void run() + { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); + try + { + purgeList(this); + } + finally + { + Thread.currentThread().setName(threadName); + } + } + } + + private void purgeList(MessagePurger messagePurger) + { + // If in memory count is at or over max then we can't inhale + if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + { + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start purging as we are already below our minimum cache level:" + + _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum); + } + return; + } + + if (_log.isInfoEnabled()) + { + _log.info("Purger Running:" + _queue.getName()); + showUsage("Purger Running:" + _queue.getName()); + } + + _asynchronousPurger.compareAndSet(messagePurger, null); + int purged = 0; + + while ((_atomicQueueInMemory.get() > _memoryUsageMinimum) + && purged < BATCH_PROCESS_COUNT + && _asynchronousPurger.compareAndSet(null, messagePurger)) + { + QueueEntryIterator iterator = iterator(); + + //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message + // as it may have just become AQUIRED and not yet delivered. + + //To be safe only purge available messages. This should be fine as long as we have a small prefetch. + while (!iterator.getNode().isAvailable() && iterator.advance()) + { + //Find first AVAILABLE node + } + + // Count up the memory usage to find our minimum point + long memoryUsage = 0; + boolean atTail = false; + while ((memoryUsage < _memoryUsageMaximum) && !atTail) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + memoryUsage += entry.getSize(); + } + + atTail = !iterator.advance(); + } + + //Purge remainging mesages on queue + while (!atTail && (purged < BATCH_PROCESS_COUNT)) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + entry.unload(); + purged++; + } + + atTail = !iterator.advance(); + } + + _asynchronousPurger.set(null); + } + + if (_log.isInfoEnabled()) + { + _log.info("Purger Stopping:" + _queue.getName()); + showUsage("Purger Stopping:" + _queue.getName()); + } + + //If we are still flowed and are over the minimum value then schedule to run again. + if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum) + { + _log.info("Rescheduling Purger:" + _queue.getName()); + _purger.execute(messagePurger); + } + } +} |