diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 2233 |
1 files changed, 0 insertions, 2233 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java deleted file mode 100644 index b02d03a1ad..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ /dev/null @@ -1,2233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.QueueConfigType; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.SessionConfig; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.QueueActor; -import org.apache.qpid.server.logging.messages.QueueMessages; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.PrincipalHolder; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import javax.management.JMException; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener -{ - private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - - - private final VirtualHost _virtualHost; - - private final AMQShortString _name; - private final String _resourceName; - - /** null means shared */ - private final AMQShortString _owner; - - private PrincipalHolder _prinicpalHolder; - - private boolean _exclusive = false; - private AMQSessionModel _exclusiveOwner; - - - private final boolean _durable; - - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - - private Exchange _alternateExchange; - - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - - - - protected final QueueEntryList _entries; - - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); - - private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); - - private volatile Subscription _exclusiveSubscriber; - - - - private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); - - private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - - private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - private final AtomicLong _dequeueCount = new AtomicLong(); - private final AtomicLong _dequeueSize = new AtomicLong(); - private final AtomicLong _enqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); - private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); - private final AtomicLong _unackedMsgCount = new AtomicLong(0); - private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); - - private final AtomicInteger _bindingCountHigh = new AtomicInteger(); - - /** max allowed size(KB) of a single message */ - public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); - - /** max allowed number of messages on a queue. */ - public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); - - /** max queue depth for the queue */ - public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); - - /** maximum message age before alerts occur */ - public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); - - /** the minimum interval between sending out consecutive alerts of the same type */ - public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); - - private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); - - private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); - - private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - - - static final int MAX_ASYNC_DELIVERIES = 10; - - - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null); - private final Executor _asyncDelivery; - private AtomicInteger _deliveredMessages = new AtomicInteger(); - private AtomicBoolean _stopped = new AtomicBoolean(false); - - private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); - - private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - - - private LogSubject _logSubject; - private LogActor _logActor; - - private AMQQueueMBean _managedObject; - private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; - private boolean _nolocal; - - private final AtomicBoolean _overfull = new AtomicBoolean(false); - private boolean _deleteOnNoConsumers; - private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private UUID _id; - private final Map<String, Object> _arguments; - - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); - private ConfigurationPlugin _queueConfiguration; - - - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) - { - this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments); - } - - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) - { - this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); - } - - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) - { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); - } - - protected SimpleAMQQueue(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, - Map<String,Object> arguments) - { - - if (name == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } - - if (virtualHost == null) - { - throw new IllegalArgumentException("Virtual Host must not be null"); - } - - _name = name; - _resourceName = String.valueOf(name); - _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _exclusive = exclusive; - _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments; - - _id = virtualHost.getConfigStore().createId(); - - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - - _logSubject = new QueueLogSubject(this); - _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); - - // Log the correct creation message - - // Extract the number of priorities for this Queue. - // Leave it as 0 if we are a SimpleQueueEntryList - int priorities = 0; - if (entryListFactory instanceof PriorityQueueList.Factory) - { - priorities = ((PriorityQueueList)_entries).getPriorities(); - } - - // Log the creation of this Queue. - // The priorities display is toggled on if we set priorities > 0 - CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(String.valueOf(_owner), - priorities, - _owner != null, - autoDelete, - durable, !durable, - priorities > 0)); - - getConfigStore().addConfiguredObject(this); - - try - { - _managedObject = new AMQQueueMBean(this); - _managedObject.register(); - } - catch (JMException e) - { - _logger.error("AMQQueue MBean creation has failed ", e); - } - - resetNotifications(); - - } - - public void resetNotifications() - { - // This ensure that the notification checks for the configured alerts are created. - setMaximumMessageAge(_maximumMessageAge); - setMaximumMessageCount(_maximumMessageCount); - setMaximumMessageSize(_maximumMessageSize); - setMaximumQueueDepth(_maximumQueueDepth); - } - - // ------ Getters and Setters - - public void execute(ReadWriteRunnable runnable) - { - _asyncDelivery.execute(runnable); - } - - public AMQShortString getNameShortString() - { - return _name; - } - - public void setNoLocal(boolean nolocal) - { - _nolocal = nolocal; - } - - public UUID getId() - { - return _id; - } - - public QueueConfigType getConfigType() - { - return QueueConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - - public boolean isDurable() - { - return _durable; - } - - public boolean isExclusive() - { - return _exclusive; - } - - public void setExclusive(boolean exclusive) throws AMQException - { - _exclusive = exclusive; - - if(isDurable()) - { - getVirtualHost().getDurableConfigurationStore().updateQueue(this); - } - } - - public Exchange getAlternateExchange() - { - return _alternateExchange; - } - - public void setAlternateExchange(Exchange exchange) - { - if(_alternateExchange != null) - { - _alternateExchange.removeReference(this); - } - if(exchange != null) - { - exchange.addReference(this); - } - _alternateExchange = exchange; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public AMQShortString getOwner() - { - return _owner; - } - - public PrincipalHolder getPrincipalHolder() - { - return _prinicpalHolder; - } - - public void setPrincipalHolder(PrincipalHolder prinicpalHolder) - { - _prinicpalHolder = prinicpalHolder; - } - - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public String getName() - { - return getNameShortString().toString(); - } - - // ------ Manage Subscriptions - - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) - throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive - { - // Access control - if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) - { - throw new AMQSecurityException("Permission denied"); - } - - - if (hasExclusiveSubscriber()) - { - throw new ExistingExclusiveSubscription(); - } - - if (exclusive && !subscription.isTransient()) - { - if (getConsumerCount() != 0) - { - throw new ExistingSubscriptionPreventsExclusive(); - } - else - { - _exclusiveSubscriber = subscription; - } - } - - _activeSubscriberCount.incrementAndGet(); - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); - - if (!isDeleted()) - { - subscription.setQueue(this, exclusive); - if(_nolocal) - { - subscription.setNoLocal(_nolocal); - } - _subscriptionList.add(subscription); - - //Increment consumerCountHigh if necessary. (un)registerSubscription are both - //synchronized methods so we don't need additional synchronization here - if(_counsumerCountHigh.get() < getConsumerCount()) - { - _counsumerCountHigh.incrementAndGet(); - } - - if (isDeleted()) - { - subscription.queueDeleted(this); - } - } - else - { - // TODO - } - - deliverAsync(subscription); - - } - - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException - { - if (subscription == null) - { - throw new NullPointerException("subscription argument is null"); - } - - boolean removed = _subscriptionList.remove(subscription); - - if (removed) - { - subscription.close(); - // No longer can the queue have an exclusive consumer - setExclusiveSubscriber(null); - subscription.setQueueContext(null); - - // auto-delete queues must be deleted if there are no remaining subscribers - - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Auto-deleteing queue:" + this); - } - - delete(); - - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); - } - } - - } - - public boolean getDeleteOnNoConsumers() - { - return _deleteOnNoConsumers; - } - - public void setDeleteOnNoConsumers(boolean b) - { - _deleteOnNoConsumers = b; - } - - public void addBinding(final Binding binding) - { - _bindings.add(binding); - int bindingCount = _bindings.size(); - int bindingCountHigh; - while(bindingCount > (bindingCountHigh = _bindingCountHigh.get())) - { - if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount)) - { - break; - } - } - - reconfigure(); - } - - private void reconfigure() - { - //Reconfigure the queue for to reflect this new binding. - ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - - if (config != null) - { - // Reconfigure with new config. - configure(config); - } - } - - public int getBindingCountHigh() - { - return _bindingCountHigh.get(); - } - - public void removeBinding(final Binding binding) - { - _bindings.remove(binding); - - reconfigure(); - } - - public List<Binding> getBindings() - { - return Collections.unmodifiableList(_bindings); - } - - public int getBindingCount() - { - return getBindings().size(); - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message) throws AMQException - { - enqueue(message, null); - } - - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException - { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); - _totalMessagesReceived.incrementAndGet(); - - - QueueEntry entry; - Subscription exclusiveSub = _exclusiveSubscriber; - - if (exclusiveSub != null) - { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); - - deliverToSubscription(exclusiveSub, entry); - } - finally - { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); - /* - - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while (nextNode != null) - { - if (_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else - { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - } - } - - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) - { - if (nextNode == null) - { - loops--; - nextNode = _subscriptionList.getHead(); - } - else - { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); - } - nextNode = nextNode.getNext(); - - } - } - - - if (!(entry.isAcquired() || entry.isDeleted())) - { - checkSubscriptionsNotAheadOfDelivery(entry); - - deliverAsync(); - } - - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } - - if(action != null) - { - action.onEnqueue(entry); - } - - } - - private void deliverToSubscription(final Subscription sub, final QueueEntry entry) - throws AMQException - { - - sub.getSendLock(); - try - { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) - { - if (!sub.wouldSuspend(entry)) - { - if (sub.acquires() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.onDequeue(entry); - } - else - { - deliverMessage(sub, entry); - } - } - } - } - finally - { - sub.releaseSendLock(); - } - } - - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // This method is only required for queues which mess with ordering - // Simple Queues don't :-) - } - - private void incrementQueueSize(final ServerMessage message) - { - long size = message.getSize(); - getAtomicQueueSize().addAndGet(size); - _enqueueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageEnqueueSize.addAndGet(size); - _persistentMessageEnqueueCount.incrementAndGet(); - } - } - - private void incrementQueueCount() - { - getAtomicQueueCount().incrementAndGet(); - } - - private void incrementTxnEnqueueStats(final ServerMessage message) - { - SessionConfig session = message.getSessionConfig(); - - if(session !=null && session.isTransactional()) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - - private void deliverMessage(final Subscription sub, final QueueEntry entry) - throws AMQException - { - setLastSeenEntry(sub, entry); - - _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(); - - sub.send(entry); - } - - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException - { - return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); - } - - - private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) - { - QueueContext subContext = (QueueContext) sub.getQueueContext(); - QueueEntry releasedEntry = subContext._releasedEntry; - - QueueContext._lastSeenUpdater.set(subContext, entry); - if(releasedEntry == entry) - { - QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); - } - } - - private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry) - { - - QueueContext subContext = (QueueContext) sub.getQueueContext(); - if(subContext != null) - { - QueueEntry oldEntry; - - while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0) - { - if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) - { - break; - } - } - } - } - - public void requeue(QueueEntry entry) - { - - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance() && entry.isAvailable()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues()) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - - } - - public void dequeue(QueueEntry entry, Subscription sub) - { - decrementQueueCount(); - decrementQueueSize(entry); - if (entry.acquiredBySubscription()) - { - _deliveredMessages.decrementAndGet(); - } - - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - - checkCapacity(); - - } - - private void decrementQueueSize(final QueueEntry entry) - { - final ServerMessage message = entry.getMessage(); - long size = message.getSize(); - getAtomicQueueSize().addAndGet(-size); - _dequeueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageDequeueSize.addAndGet(size); - _persistentMessageDequeueCount.incrementAndGet(); - } - } - - void decrementQueueCount() - { - getAtomicQueueCount().decrementAndGet(); - _dequeueCount.incrementAndGet(); - } - - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException - { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ - - subscription.getSendLock(); - try - { - if (!subscription.isClosed()) - { - deliverMessage(subscription, entry); - return true; - } - else - { - return false; - } - } - finally - { - subscription.releaseSendLock(); - } - } - - public int getConsumerCount() - { - return _subscriptionList.size(); - } - - public int getConsumerCountHigh() - { - return _counsumerCountHigh.get(); - } - - public int getActiveConsumerCount() - { - return _activeSubscriberCount.get(); - } - - public boolean isUnused() - { - return getConsumerCount() == 0; - } - - public boolean isEmpty() - { - return getMessageCount() == 0; - } - - public int getMessageCount() - { - return getAtomicQueueCount().get(); - } - - public long getQueueDepth() - { - return getAtomicQueueSize().get(); - } - - public int getUndeliveredMessageCount() - { - int count = getMessageCount() - _deliveredMessages.get(); - if (count < 0) - { - return 0; - } - else - { - return count; - } - } - - public long getReceivedMessageCount() - { - return _totalMessagesReceived.get(); - } - - public long getOldestMessageArrivalTime() - { - QueueEntry entry = getOldestQueueEntry(); - return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); - } - - protected QueueEntry getOldestQueueEntry() - { - return _entries.next(_entries.getHead()); - } - - public boolean isDeleted() - { - return _deleted.get(); - } - - public List<QueueEntry> getMessagesOnTheQueue() - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) - { - entryList.add(node); - } - } - return entryList; - - } - - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) - { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.decrementAndGet(); - - } - else if (newState == Subscription.State.ACTIVE) - { - if (oldState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.incrementAndGet(); - - } - deliverAsync(sub); - } - } - - public int compareTo(final AMQQueue o) - { - return _name.compareTo(o.getNameShortString()); - } - - public AtomicInteger getAtomicQueueCount() - { - return _atomicQueueCount; - } - - public AtomicLong getAtomicQueueSize() - { - return _atomicQueueSize; - } - - public boolean hasExclusiveSubscriber() - { - return _exclusiveSubscriber != null; - } - - private void setExclusiveSubscriber(Subscription exclusiveSubscriber) - { - _exclusiveSubscriber = exclusiveSubscriber; - } - - public static interface QueueEntryFilter - { - public boolean accept(QueueEntry entry); - - public boolean filterComplete(); - } - - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return messageId >= fromMessageId && messageId <= toMessageId; - } - - public boolean filterComplete() - { - return false; - } - }); - } - - public QueueEntry getMessageOnTheQueue(final long messageId) - { - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - private boolean _complete; - - public boolean accept(QueueEntry entry) - { - _complete = entry.getMessage().getMessageNumber() == messageId; - return _complete; - } - - public boolean filterComplete() - { - return _complete; - } - }); - return entries.isEmpty() ? null : entries.get(0); - } - - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance() && !filter.filterComplete()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) - { - entryList.add(node); - } - } - return entryList; - - } - - /** - * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. - * - * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. - * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition - * @param toPosition - * @return - */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - private long position = 0; - - public boolean accept(QueueEntry entry) - { - position++; - return (position >= fromPosition) && (position <= toPosition); - } - - public boolean filterComplete() - { - return position >= toPosition; - } - }); - - } - - public void moveMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - ServerTransaction txn) throws IllegalArgumentException - { - - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return (messageId >= fromMessageId) - && (messageId <= toMessageId) - && entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - - - // Move the messages in on the message store. - for (final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - txn.enqueue(toQueue, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - entry.release(); - } - }); - txn.dequeue(this, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - - } - }); - - } - - } - - public void copyMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - final ServerTransaction txn) throws IllegalArgumentException - { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return ((messageId >= fromMessageId) - && (messageId <= toMessageId)); - } - - public boolean filterComplete() - { - return false; - } - }); - - - // Move the messages in on the message store. - for (QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - - txn.enqueue(toQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - - } - - } - - public void removeMessagesFromQueue(long fromMessageId, long toMessageId) - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - - final ServerMessage message = node.getMessage(); - if(message != null) - { - final long messageId = message.getMessageNumber(); - - if ((messageId >= fromMessageId) - && (messageId <= toMessageId) - && !node.isDeleted() - && node.acquire()) - { - dequeueEntry(node); - } - } - } - - } - - public void purge(final long request) throws AMQException - { - clear(request); - } - - public long getCreateTime() - { - return _createTime; - } - - // ------ Management functions - - public void deleteMessageFromTop() - { - QueueEntryIterator queueListIterator = _entries.iterator(); - boolean noDeletes = true; - - while (noDeletes && queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - dequeueEntry(node); - noDeletes = false; - } - - } - } - - public long clearQueue() throws AMQException - { - return clear(0l); - } - - private long clear(final long request) throws AMQSecurityException - { - //Perform ACLs - if (!getVirtualHost().getSecurityManager().authorisePurge(this)) - { - throw new AMQSecurityException("Permission denied: queue " + getName()); - } - - QueueEntryIterator queueListIterator = _entries.iterator(); - long count = 0; - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - dequeueEntry(node, txn); - if(++count == request) - { - break; - } - } - - } - - txn.commit(); - - return count; - } - - private void dequeueEntry(final QueueEntry node) - { - ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); - dequeueEntry(node, txn); - } - - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) - { - txn.dequeue(this, node.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - node.discard(); - } - - public void onRollback() - { - - } - }); - } - - public void addQueueDeleteTask(final Task task) - { - _deleteTaskList.add(task); - } - - public void removeQueueDeleteTask(final Task task) - { - _deleteTaskList.remove(task); - } - - // TODO list all thrown exceptions - public int delete() throws AMQSecurityException, AMQException - { - // Check access - if (!_virtualHost.getSecurityManager().authoriseDelete(this)) - { - throw new AMQSecurityException("Permission denied: " + getName()); - } - - if (!_deleted.getAndSet(true)) - { - - for (Binding b : getBindings()) - { - _virtualHost.getBindingFactory().removeBinding(b); - } - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - - while (subscriptionIter.advance()) - { - Subscription s = subscriptionIter.getNode().getSubscription(); - if (s != null) - { - s.queueDeleted(this); - } - } - - _virtualHost.getQueueRegistry().unregisterQueue(_name); - getConfigStore().removeConfiguredObject(this); - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - return entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); - - if(_alternateExchange != null) - { - - InboundMessageAdapter adapter = new InboundMessageAdapter(); - for(final QueueEntry entry : entries) - { - adapter.setEntry(entry); - final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter); - final ServerMessage message = entry.getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - txn.enqueue(rerouteQueues, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - - } - }); - txn.dequeue(this, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - } - }); - } - - } - - _alternateExchange.removeReference(this); - } - else - { - // TODO log discard - - for(final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - if(message != null) - { - txn.dequeue(this, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - } - }); - } - } - } - - txn.commit(); - - - if(_managedObject!=null) - { - _managedObject.unregister(); - } - - for (Task task : _deleteTaskList) - { - task.doTask(this); - } - - _deleteTaskList.clear(); - stop(); - - //Log Queue Deletion - CurrentActor.get().message(_logSubject, QueueMessages.DELETED()); - - } - return getMessageCount(); - - } - - public void stop() - { - if (!_stopped.getAndSet(true)) - { - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - } - } - - public void checkCapacity(AMQChannel channel) - { - if(_capacity != 0l) - { - if(_atomicQueueSize.get() > _capacity) - { - _overfull.set(true); - //Overfull log message - _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - - if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) - { - channel.block(this); - } - - if(_atomicQueueSize.get() <= _flowResumeCapacity) - { - - //Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - - channel.unblock(this); - _blockedChannels.remove(channel); - - } - - } - - - - } - } - - private void checkCapacity() - { - if(_capacity != 0L) - { - if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) - { - if(_overfull.compareAndSet(true,false)) - {//Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - } - - - for(AMQChannel c : _blockedChannels.keySet()) - { - c.unblock(this); - _blockedChannels.remove(c); - } - } - } - } - - - public void deliverAsync() - { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); - - if (_asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } - } - - public void deliverAsync(Subscription sub) - { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) - { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); - } - _asyncDelivery.execute(flusher); - } - - public void flushSubscription(Subscription sub) throws AMQException - { - // Access control - if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) - { - throw new AMQSecurityException("Permission denied: " + getName()); - } - flushSubscription(sub, Long.MAX_VALUE); - } - - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException - { - boolean atTail = false; - - while (!sub.isSuspended() && !atTail && iterations != 0) - { - try - { - sub.getSendLock(); - atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) - { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - - } - else if (!atTail) - { - iterations--; - } - } - finally - { - sub.releaseSendLock(); - } - } - - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the - // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc - // which would give us memory "leak". - - if (!hasExclusiveSubscriber()) - { - advanceAllSubscriptions(); - } - return atTail; - } - - /** - * Attempt delivery for the given subscription. - * - * Looks up the next node for the subscription and attempts to deliver it. - * - * @param sub - * @return true if we have completed all possible deliveries for this sub. - * @throws AMQException - */ - private boolean attemptDelivery(Subscription sub) throws AMQException - { - boolean atTail = false; - - boolean subActive = sub.isActive() && !sub.isSuspended(); - if (subActive) - { - - QueueEntry node = getNextAvailableEntry(sub); - - if (node != null && !(node.isAcquired() || node.isDeleted())) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (sub.acquires() && !node.acquire(sub)) - { - sub.onDequeue(node); - } - else - { - deliverMessage(sub, node); - } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub)); - } - } - - } - atTail = (node == null) || (_entries.next(node) == null); - } - return atTail || !subActive; - } - - protected void advanceAllSubscriptions() throws AMQException - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) - { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - Subscription sub = subNode.getSubscription(); - if(sub.acquires()) - { - getNextAvailableEntry(sub); - } - else - { - // TODO - } - } - } - - private QueueEntry getNextAvailableEntry(final Subscription sub) - throws AMQException - { - QueueContext context = (QueueContext) sub.getQueueContext(); - if(context != null) - { - QueueEntry lastSeen = context._lastSeenEntry; - QueueEntry releasedNode = context._releasedEntry; - - QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); - - boolean expired = false; - while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) - { - if (expired) - { - expired = false; - if (node.acquire()) - { - dequeueEntry(node); - } - } - - if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node)) - { - QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); - } - - lastSeen = context._lastSeenEntry; - releasedNode = context._releasedEntry; - node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); - } - return node; - } - else - { - return null; - } - } - - - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - * @throws AMQException - */ - public void processQueue(QueueRunner runner) throws AMQException - { - long stateChangeCount; - long previousStateChangeCount = Long.MIN_VALUE; - boolean deliveryIncomplete = true; - - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; - - _asynchronousRunner.compareAndSet(runner, null); - - // For every message enqueue/requeue the we fire deliveryAsync() which - // increases _stateChangeCount. If _sCC changes whilst we are in our loop - // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) - // then we will continue to run for a maximum of iterations. - // So whilst delivery/rejection is going on a processQueue thread will be running - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) - { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to - // move forward in the next iteration - - if (previousStateChangeCount != stateChangeCount) - { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; - } - - previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) - { - Subscription sub = subscriptionIter.getNode().getSubscription(); - sub.getSendLock(); - try - { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) - { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) - { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - } - } - else - { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; - iterations--; - } - } - finally - { - sub.releaseSendLock(); - } - } - - if(allSubscriptionsDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allSubscriptionsDone) - { - //All subscriptions reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; - lastLoop = true; - } - else - { - //some subscriptions can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - - _asynchronousRunner.set(null); - } - - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit - // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rescheduling runner:" + runner); - } - _asyncDelivery.execute(runner); - } - } - - public void checkMessageStatus() throws AMQException - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted - if (!node.isDeleted()) - { - // If the node has exired then aquire it - if (node.expired() && node.acquire()) - { - // Then dequeue it. - dequeueEntry(node); - } - else - { - if (_managedObject != null) - { - // There is a chance that the node could be deleted by - // the time the check actually occurs. So verify we - // can actually get the message to perform the check. - ServerMessage msg = node.getMessage(); - if (msg != null) - { - _managedObject.checkForNotification(msg); - } - } - } - } - } - - } - - public long getMinimumAlertRepeatGap() - { - return _minimumAlertRepeatGap; - } - - public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) - { - _minimumAlertRepeatGap = minimumAlertRepeatGap; - } - - public long getMaximumMessageAge() - { - return _maximumMessageAge; - } - - public void setMaximumMessageAge(long maximumMessageAge) - { - _maximumMessageAge = maximumMessageAge; - if (maximumMessageAge == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); - } - } - - public long getMaximumMessageCount() - { - return _maximumMessageCount; - } - - public void setMaximumMessageCount(final long maximumMessageCount) - { - _maximumMessageCount = maximumMessageCount; - if (maximumMessageCount == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); - } - - } - - public long getMaximumQueueDepth() - { - return _maximumQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(final long maximumQueueDepth) - { - _maximumQueueDepth = maximumQueueDepth; - if (maximumQueueDepth == 0L) - { - _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); - } - - } - - public long getMaximumMessageSize() - { - return _maximumMessageSize; - } - - public void setMaximumMessageSize(final long maximumMessageSize) - { - _maximumMessageSize = maximumMessageSize; - if (maximumMessageSize == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); - } - } - - public long getCapacity() - { - return _capacity; - } - - public void setCapacity(long capacity) - { - _capacity = capacity; - } - - public long getFlowResumeCapacity() - { - return _flowResumeCapacity; - } - - public void setFlowResumeCapacity(long flowResumeCapacity) - { - _flowResumeCapacity = flowResumeCapacity; - - checkCapacity(); - } - - public boolean isOverfull() - { - return _overfull.get(); - } - - public Set<NotificationCheck> getNotificationChecks() - { - return _notificationChecks; - } - - public ManagedObject getManagedObject() - { - return _managedObject; - } - - private final class QueueEntryListener implements QueueEntry.StateChangeListener - { - - private final Subscription _sub; - - public QueueEntryListener(final Subscription sub) - { - _sub = sub; - } - - public boolean equals(Object o) - { - assert o != null; - assert o instanceof QueueEntryListener; - return _sub == ((QueueEntryListener) o)._sub; - } - - public int hashCode() - { - return System.identityHashCode(_sub); - } - - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) - { - entry.removeStateChangeListener(this); - deliverAsync(_sub); - } - } - - public List<Long> getMessagesOnTheQueue(int num) - { - return getMessagesOnTheQueue(num, 0); - } - - public List<Long> getMessagesOnTheQueue(int num, int offset) - { - ArrayList<Long> ids = new ArrayList<Long>(num); - QueueEntryIterator it = _entries.iterator(); - for (int i = 0; i < offset; i++) - { - it.advance(); - } - - for (int i = 0; i < num && !it.atTail(); i++) - { - it.advance(); - ids.add(it.getNode().getMessage().getMessageNumber()); - } - return ids; - } - - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusive = true; - _exclusiveOwner = exclusiveOwner; - } - - - public void configure(ConfigurationPlugin config) - { - if (config != null) - { - if (config instanceof QueueConfiguration) - { - - setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge()); - setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth()); - setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); - setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); - setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); - _capacity = ((QueueConfiguration)config).getCapacity(); - _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); - } - - _queueConfiguration = config; - - } - } - - - public ConfigurationPlugin getConfiguration() - { - return _queueConfiguration; - } - - public String getResourceName() - { - return _resourceName; - } - - - public ConfigStore getConfigStore() - { - return getVirtualHost().getConfigStore(); - } - - public long getMessageDequeueCount() - { - return _dequeueCount.get(); - } - - public long getTotalEnqueueSize() - { - return _enqueueSize.get(); - } - - public long getTotalDequeueSize() - { - return _dequeueSize.get(); - } - - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - - public long getPersistentByteEnqueues() - { - return _persistentMessageEnqueueSize.get(); - } - - public long getPersistentByteDequeues() - { - return _persistentMessageDequeueSize.get(); - } - - public long getPersistentMsgEnqueues() - { - return _persistentMessageEnqueueCount.get(); - } - - public long getPersistentMsgDequeues() - { - return _persistentMessageDequeueCount.get(); - } - - - @Override - public String toString() - { - return String.valueOf(getNameShortString()); - } - - public long getUnackedMessageCountHigh() - { - return _unackedMsgCountHigh.get(); - } - - public long getUnackedMessageCount() - { - return _unackedMsgCount.get(); - } - - public void decrementUnackedMsgCount() - { - _unackedMsgCount.decrementAndGet(); - } - - private void incrementUnackedMsgCount() - { - long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - - long unackedMsgCountHigh; - while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) - { - if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount)) - { - break; - } - } - } - - public LogActor getLogActor() - { - return _logActor; - } -} |