summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java2289
1 files changed, 2289 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
new file mode 100644
index 0000000000..fb36433799
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -0,0 +1,2289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.QueueActor;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
+import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
+import org.apache.qpid.server.subscription.MessageGroupManager;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+{
+
+ private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
+ public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
+ private static final String QPID_NO_GROUP = "qpid.no-group";
+ private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
+
+ // TODO - should make this configurable at the vhost / broker level
+ private static final int DEFAULT_MAX_GROUPS = 255;
+
+ private final VirtualHost _virtualHost;
+
+ private final String _name;
+
+ /** null means shared */
+ private final String _owner;
+
+ private AuthorizationHolder _authorizationHolder;
+
+ private boolean _exclusive = false;
+ private AMQSessionModel _exclusiveOwner;
+
+
+ private final boolean _durable;
+
+ /** If true, this queue is deleted when the last subscriber is removed */
+ private final boolean _autoDelete;
+
+ private Exchange _alternateExchange;
+
+
+ private final QueueEntryList<QueueEntry> _entries;
+
+ private final SubscriptionList _subscriptionList = new SubscriptionList();
+
+ private volatile Subscription _exclusiveSubscriber;
+
+
+
+ private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
+
+ private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+
+ private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
+
+ private final AtomicLong _totalMessagesReceived = new AtomicLong();
+
+ private final AtomicLong _dequeueCount = new AtomicLong();
+ private final AtomicLong _dequeueSize = new AtomicLong();
+ private final AtomicLong _enqueueCount = new AtomicLong();
+ private final AtomicLong _enqueueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
+ private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
+ private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
+ private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCount = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
+ private final AtomicLong _unackedMsgBytes = new AtomicLong();
+
+ private final AtomicInteger _bindingCountHigh = new AtomicInteger();
+
+ /** max allowed size(KB) of a single message */
+ private long _maximumMessageSize;
+
+ /** max allowed number of messages on a queue. */
+ private long _maximumMessageCount;
+
+ /** max queue depth for the queue */
+ private long _maximumQueueDepth;
+
+ /** maximum message age before alerts occur */
+ private long _maximumMessageAge;
+
+ /** the minimum interval between sending out consecutive alerts of the same type */
+ private long _minimumAlertRepeatGap;
+
+ private long _capacity;
+
+ private long _flowResumeCapacity;
+
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
+
+ static final int MAX_ASYNC_DELIVERIES = 80;
+
+
+ private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
+
+ private final Executor _asyncDelivery;
+ private AtomicInteger _deliveredMessages = new AtomicInteger();
+ private AtomicBoolean _stopped = new AtomicBoolean(false);
+
+ private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+ private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
+
+ private LogSubject _logSubject;
+ private LogActor _logActor;
+
+ private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
+ private boolean _nolocal;
+
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private boolean _deleteOnNoConsumers;
+ private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+ private UUID _id;
+ private final Map<String, Object> _arguments;
+
+ //TODO : persist creation time
+ private long _createTime = System.currentTimeMillis();
+
+ /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
+ private int _maximumDeliveryCount;
+ private final MessageGroupManager _messageGroupManager;
+
+ private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
+ new ArrayList<SubscriptionRegistrationListener>();
+
+ private AMQQueue.NotificationListener _notificationListener;
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
+
+ public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
+ {
+ this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
+ }
+
+ protected SimpleAMQQueue(UUID id,
+ String name,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ VirtualHost virtualHost,
+ QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+ {
+
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
+ if (virtualHost == null)
+ {
+ throw new IllegalArgumentException("Virtual Host must not be null");
+ }
+
+ _name = name;
+ _durable = durable;
+ _owner = owner;
+ _autoDelete = autoDelete;
+ _exclusive = exclusive;
+ _virtualHost = virtualHost;
+ _entries = entryListFactory.createQueueEntryList(this);
+ _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
+
+ _id = id;
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ _logSubject = new QueueLogSubject(this);
+ _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
+
+ // Log the creation of this Queue.
+ // The priorities display is toggled on if we set priorities > 0
+ CurrentActor.get().message(_logSubject,
+ QueueMessages.CREATED(String.valueOf(_owner),
+ _entries.getPriorities(),
+ _owner != null,
+ autoDelete,
+ durable, !durable,
+ _entries.getPriorities() > 0));
+
+ if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
+ {
+ if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+ && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ {
+ Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+ _messageGroupManager =
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+ defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
+ this);
+ }
+ else
+ {
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
+ }
+ }
+ else
+ {
+ _messageGroupManager = null;
+ }
+
+ resetNotifications();
+
+ }
+
+ public void resetNotifications()
+ {
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+ }
+
+ // ------ Getters and Setters
+
+ public void execute(Runnable runnable)
+ {
+ try
+ {
+ _asyncDelivery.execute(runnable);
+ }
+ catch (RejectedExecutionException ree)
+ {
+ if (_stopped.get())
+ {
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ }
+ else
+ {
+ _logger.error("Unexpected rejected execution", ree);
+ throw ree;
+ }
+ }
+ }
+
+ public void setNoLocal(boolean nolocal)
+ {
+ _nolocal = nolocal;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public Exchange getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
+ }
+ if(exchange != null)
+ {
+ exchange.addReference(this);
+ }
+ _alternateExchange = exchange;
+ }
+
+
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return new ArrayList<String>(_arguments.keySet());
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
+ {
+ return _arguments.get(attrName);
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ public String getOwner()
+ {
+ return _owner;
+ }
+
+ public AuthorizationHolder getAuthorizationHolder()
+ {
+ return _authorizationHolder;
+ }
+
+ public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
+ {
+ _authorizationHolder = authorizationHolder;
+ }
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ // ------ Manage Subscriptions
+
+ public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
+ throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
+ {
+ // Access control
+ if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
+ {
+ throw new AMQSecurityException("Permission denied");
+ }
+
+
+ if (hasExclusiveSubscriber())
+ {
+ throw new ExistingExclusiveSubscription();
+ }
+
+ if (exclusive && !subscription.isTransient())
+ {
+ if (getConsumerCount() != 0)
+ {
+ throw new ExistingSubscriptionPreventsExclusive();
+ }
+ else
+ {
+ _exclusiveSubscriber = subscription;
+ }
+ }
+
+ if(subscription.isActive())
+ {
+ _activeSubscriberCount.incrementAndGet();
+ }
+ subscription.setStateListener(this);
+ subscription.setQueueContext(new QueueContext(_entries.getHead()));
+
+ if (!isDeleted())
+ {
+ subscription.setQueue(this, exclusive);
+ if(_nolocal)
+ {
+ subscription.setNoLocal(_nolocal);
+ }
+
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionRegistered(this, subscription);
+ }
+ }
+
+ _subscriptionList.add(subscription);
+
+ //Increment consumerCountHigh if necessary. (un)registerSubscription are both
+ //synchronized methods so we don't need additional synchronization here
+ if(_counsumerCountHigh.get() < getConsumerCount())
+ {
+ _counsumerCountHigh.incrementAndGet();
+ }
+
+ if (isDeleted())
+ {
+ subscription.queueDeleted(this);
+ }
+ }
+ else
+ {
+ // TODO
+ }
+
+ deliverAsync(subscription);
+
+ }
+
+ public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+ {
+ if (subscription == null)
+ {
+ throw new NullPointerException("subscription argument is null");
+ }
+
+ boolean removed = _subscriptionList.remove(subscription);
+
+ if (removed)
+ {
+ subscription.close();
+ // No longer can the queue have an exclusive consumer
+ setExclusiveSubscriber(null);
+ subscription.setQueueContext(null);
+
+ if(_messageGroupManager != null)
+ {
+ resetSubPointersForGroups(subscription, true);
+ }
+
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionUnregistered(this, subscription);
+ }
+ }
+
+ // auto-delete queues must be deleted if there are no remaining subscribers
+
+ if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Auto-deleteing queue:" + this);
+ }
+
+ getVirtualHost().removeQueue(this);
+
+ // we need to manually fire the event to the removed subscription (which was the last one left for this
+ // queue. This is because the delete method uses the subscription set which has just been cleared
+ subscription.queueDeleted(this);
+ }
+ }
+
+ }
+
+ public Collection<Subscription> getConsumers()
+ {
+ List<Subscription> consumers = new ArrayList<Subscription>();
+ SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ while(iter.advance())
+ {
+ consumers.add(iter.getNode().getSubscription());
+ }
+ return consumers;
+
+ }
+
+ public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.add(listener);
+ }
+ }
+
+ public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.remove(listener);
+ }
+ }
+
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ if(clearAssignments)
+ {
+ _messageGroupManager.clearAssignments(subscription);
+ }
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+ }
+
+ public boolean getDeleteOnNoConsumers()
+ {
+ return _deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean b)
+ {
+ _deleteOnNoConsumers = b;
+ }
+
+ public void addBinding(final Binding binding)
+ {
+ _bindings.add(binding);
+ int bindingCount = _bindings.size();
+ int bindingCountHigh;
+ while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
+ {
+ if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
+ {
+ break;
+ }
+ }
+ }
+
+ public int getBindingCountHigh()
+ {
+ return _bindingCountHigh.get();
+ }
+
+ public void removeBinding(final Binding binding)
+ {
+ _bindings.remove(binding);
+ }
+
+ public List<Binding> getBindings()
+ {
+ return Collections.unmodifiableList(_bindings);
+ }
+
+ public int getBindingCount()
+ {
+ return getBindings().size();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ // ------ Enqueue / Dequeue
+ public void enqueue(ServerMessage message) throws AMQException
+ {
+ enqueue(message, null);
+ }
+
+ public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ {
+ enqueue(message, false, action);
+ }
+
+ public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ {
+
+ if(transactional)
+ {
+ incrementTxnEnqueueStats(message);
+ }
+ incrementQueueCount();
+ incrementQueueSize(message);
+
+ _totalMessagesReceived.incrementAndGet();
+
+
+ QueueEntry entry;
+ final Subscription exclusiveSub = _exclusiveSubscriber;
+ entry = _entries.add(message);
+
+ if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
+ {
+ /*
+
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+
+ */
+ SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+ SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ if (nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().findNext();
+ }
+ while (nextNode != null)
+ {
+ if (_subscriptionList.updateMarkedNode(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _subscriptionList.getMarkedNode();
+ nextNode = node.findNext();
+ if (nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().findNext();
+ }
+ }
+ }
+
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while (entry.isAvailable() && loops != 0)
+ {
+ if (nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.findNext();
+
+ }
+ }
+
+
+ if (entry.isAvailable())
+ {
+ checkSubscriptionsNotAheadOfDelivery(entry);
+
+ if (exclusiveSub != null)
+ {
+ deliverAsync(exclusiveSub);
+ }
+ else
+ {
+ deliverAsync();
+ }
+ }
+
+ checkForNotification(entry.getMessage());
+
+ if(action != null)
+ {
+ action.onEnqueue(entry);
+ }
+
+ }
+
+ private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ throws AMQException
+ {
+
+ if(sub.trySendLock())
+ {
+ try
+ {
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
+ && mightAssign(sub, entry)
+ && !sub.wouldSuspend(entry))
+ {
+ if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
+ }
+ }
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+ }
+
+ private boolean assign(final Subscription sub, final QueueEntry entry)
+ {
+ return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ }
+
+
+ private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ {
+ if(_messageGroupManager == null || !sub.acquires())
+ {
+ return true;
+ }
+ Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ return (assigned == null) || (assigned == sub);
+ }
+
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // This method is only required for queues which mess with ordering
+ // Simple Queues don't :-)
+ }
+
+ private void incrementQueueSize(final ServerMessage message)
+ {
+ long size = message.getSize();
+ getAtomicQueueSize().addAndGet(size);
+ _enqueueCount.incrementAndGet();
+ _enqueueSize.addAndGet(size);
+ if(message.isPersistent() && isDurable())
+ {
+ _persistentMessageEnqueueSize.addAndGet(size);
+ _persistentMessageEnqueueCount.incrementAndGet();
+ }
+ }
+
+ public long getTotalDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public long getTotalEnqueueCount()
+ {
+ return _enqueueCount.get();
+ }
+
+ private void incrementQueueCount()
+ {
+ getAtomicQueueCount().incrementAndGet();
+ }
+
+ private void incrementTxnEnqueueStats(final ServerMessage message)
+ {
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
+ }
+
+ private void incrementTxnDequeueStats(QueueEntry entry)
+ {
+ _msgTxnDequeues.incrementAndGet();
+ _byteTxnDequeues.addAndGet(entry.getSize());
+ }
+
+ private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
+ throws AMQException
+ {
+ setLastSeenEntry(sub, entry);
+
+ _deliveredMessages.incrementAndGet();
+ incrementUnackedMsgCount(entry);
+
+ sub.send(entry, batch);
+ }
+
+ private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
+ {
+ return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
+ }
+
+
+ private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ {
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ if (subContext != null)
+ {
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
+ {
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ }
+ }
+ }
+
+ private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
+ {
+
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ if(subContext != null)
+ {
+ QueueEntry oldEntry;
+
+ while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ public void requeue(QueueEntry entry)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance() && entry.isAvailable())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+
+ public void dequeue(QueueEntry entry, Subscription sub)
+ {
+ decrementQueueCount();
+ decrementQueueSize(entry);
+ if (entry.acquiredBySubscription())
+ {
+ _deliveredMessages.decrementAndGet();
+ }
+
+ if(sub != null && sub.isSessionTransactional())
+ {
+ incrementTxnDequeueStats(entry);
+ }
+
+ checkCapacity();
+
+ }
+
+ private void decrementQueueSize(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ long size = message.getSize();
+ getAtomicQueueSize().addAndGet(-size);
+ _dequeueSize.addAndGet(size);
+ if(message.isPersistent() && isDurable())
+ {
+ _persistentMessageDequeueSize.addAndGet(size);
+ _persistentMessageDequeueCount.incrementAndGet();
+ }
+ }
+
+ void decrementQueueCount()
+ {
+ getAtomicQueueCount().decrementAndGet();
+ _dequeueCount.incrementAndGet();
+ }
+
+ public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ {
+ /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
+ entry to resend and move back the subscription pointer. */
+
+ subscription.getSendLock();
+ try
+ {
+ if (!subscription.isClosed())
+ {
+ deliverMessage(subscription, entry, false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ subscription.releaseSendLock();
+ }
+ }
+
+
+
+ public int getConsumerCount()
+ {
+ return _subscriptionList.size();
+ }
+
+ public int getConsumerCountHigh()
+ {
+ return _counsumerCountHigh.get();
+ }
+
+ public int getActiveConsumerCount()
+ {
+ return _activeSubscriberCount.get();
+ }
+
+ public boolean isUnused()
+ {
+ return getConsumerCount() == 0;
+ }
+
+ public boolean isEmpty()
+ {
+ return getMessageCount() == 0;
+ }
+
+ public int getMessageCount()
+ {
+ return getAtomicQueueCount().get();
+ }
+
+ public long getQueueDepth()
+ {
+ return getAtomicQueueSize().get();
+ }
+
+ public int getUndeliveredMessageCount()
+ {
+ int count = getMessageCount() - _deliveredMessages.get();
+ if (count < 0)
+ {
+ return 0;
+ }
+ else
+ {
+ return count;
+ }
+ }
+
+ public long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived.get();
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ QueueEntry entry = getOldestQueueEntry();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
+ }
+
+ protected QueueEntry getOldestQueueEntry()
+ {
+ return _entries.next(_entries.getHead());
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted.get();
+ }
+
+ public List<QueueEntry> getMessagesOnTheQueue()
+ {
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryIterator queueListIterator = _entries.iterator();
+ while (queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ if (node != null && !node.isDispensed())
+ {
+ entryList.add(node);
+ }
+ }
+ return entryList;
+
+ }
+
+ public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ {
+ if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ {
+ _activeSubscriberCount.decrementAndGet();
+
+ }
+ else if (newState == Subscription.State.ACTIVE)
+ {
+ if (oldState != Subscription.State.ACTIVE)
+ {
+ _activeSubscriberCount.incrementAndGet();
+
+ }
+ deliverAsync(sub);
+ }
+ }
+
+ public int compareTo(final AMQQueue o)
+ {
+ return _name.compareTo(o.getName());
+ }
+
+ public AtomicInteger getAtomicQueueCount()
+ {
+ return _atomicQueueCount;
+ }
+
+ public AtomicLong getAtomicQueueSize()
+ {
+ return _atomicQueueSize;
+ }
+
+ public boolean hasExclusiveSubscriber()
+ {
+ return _exclusiveSubscriber != null;
+ }
+
+ private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ {
+ _exclusiveSubscriber = exclusiveSubscriber;
+ }
+
+ long getStateChangeCount()
+ {
+ return _stateChangeCount.get();
+ }
+
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+ protected QueueEntryList getEntries()
+ {
+ return _entries;
+ }
+
+ protected SubscriptionList getSubscriptionList()
+ {
+ return _subscriptionList;
+ }
+
+
+ public static interface QueueEntryFilter
+ {
+ public boolean accept(QueueEntry entry);
+
+ public boolean filterComplete();
+ }
+
+
+
+ public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ {
+ return getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageNumber();
+ return messageId >= fromMessageId && messageId <= toMessageId;
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+ }
+
+ public QueueEntry getMessageOnTheQueue(final long messageId)
+ {
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+ private boolean _complete;
+
+ public boolean accept(QueueEntry entry)
+ {
+ _complete = entry.getMessage().getMessageNumber() == messageId;
+ return _complete;
+ }
+
+ public boolean filterComplete()
+ {
+ return _complete;
+ }
+ });
+ return entries.isEmpty() ? null : entries.get(0);
+ }
+
+ public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+ {
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryIterator queueListIterator = _entries.iterator();
+ while (queueListIterator.advance() && !filter.filterComplete())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ if (!node.isDispensed() && filter.accept(node))
+ {
+ entryList.add(node);
+ }
+ }
+ return entryList;
+
+ }
+
+ public void visit(final QueueEntryVisitor visitor)
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ if(!node.isDispensed())
+ {
+ if(visitor.visit(node))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
+ * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
+ * @param fromPosition
+ * @param toPosition
+ * @return
+ */
+ public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+ {
+ return getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+ private long position = 0;
+
+ public boolean accept(QueueEntry entry)
+ {
+ position++;
+ return (position >= fromPosition) && (position <= toPosition);
+ }
+
+ public boolean filterComplete()
+ {
+ return position >= toPosition;
+ }
+ });
+
+ }
+
+ public void purge(final long request) throws AMQException
+ {
+ clear(request);
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ // ------ Management functions
+
+ // TODO - now only used by the tests
+ public void deleteMessageFromTop()
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+ boolean noDeletes = true;
+
+ while (noDeletes && queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ if (node.acquire())
+ {
+ dequeueEntry(node);
+ noDeletes = false;
+ }
+
+ }
+ }
+
+ public long clearQueue() throws AMQException
+ {
+ return clear(0l);
+ }
+
+ private long clear(final long request) throws AMQSecurityException
+ {
+ //Perform ACLs
+ if (!getVirtualHost().getSecurityManager().authorisePurge(this))
+ {
+ throw new AMQSecurityException("Permission denied: queue " + getName());
+ }
+
+ QueueEntryIterator queueListIterator = _entries.iterator();
+ long count = 0;
+
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+
+ while (queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ if (node.acquire())
+ {
+ dequeueEntry(node, txn);
+ if(++count == request)
+ {
+ break;
+ }
+ }
+
+ }
+
+ txn.commit();
+
+ return count;
+ }
+
+ private void dequeueEntry(final QueueEntry node)
+ {
+ ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
+ dequeueEntry(node, txn);
+ }
+
+ private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+ {
+ txn.dequeue(this, node.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ node.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ public void addQueueDeleteTask(final Task task)
+ {
+ _deleteTaskList.add(task);
+ }
+
+ public void removeQueueDeleteTask(final Task task)
+ {
+ _deleteTaskList.remove(task);
+ }
+
+ // TODO list all thrown exceptions
+ public int delete() throws AMQSecurityException, AMQException
+ {
+ // Check access
+ if (!_virtualHost.getSecurityManager().authoriseDelete(this))
+ {
+ throw new AMQSecurityException("Permission denied: " + getName());
+ }
+
+ if (!_deleted.getAndSet(true))
+ {
+
+ for (Binding b : _bindings)
+ {
+ b.getExchange().removeBinding(b);
+ }
+
+ SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+
+ while (subscriptionIter.advance())
+ {
+ Subscription s = subscriptionIter.getNode().getSubscription();
+ if (s != null)
+ {
+ s.queueDeleted(this);
+ }
+ }
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ return entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+
+ if(_alternateExchange != null)
+ {
+
+ InboundMessageAdapter adapter = new InboundMessageAdapter();
+ for(final QueueEntry entry : entries)
+ {
+ adapter.setEntry(entry);
+ List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+ if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
+ {
+ queues = _alternateExchange.getAlternateExchange().route(adapter);
+ }
+
+ final ServerMessage message = entry.getMessage();
+ if(queues != null && queues.size() != 0)
+ {
+ final List<? extends BaseQueue> rerouteQueues = queues;
+ txn.enqueue(rerouteQueues, entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ try
+ {
+ for(BaseQueue queue : rerouteQueues)
+ {
+ queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(this, entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+
+ }
+
+ _alternateExchange.removeReference(this);
+ }
+ else
+ {
+ // TODO log discard
+
+ for(final QueueEntry entry : entries)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ txn.dequeue(this, message,
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ }
+ }
+
+ txn.commit();
+
+ for (Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+
+ _deleteTaskList.clear();
+ stop();
+
+ //Log Queue Deletion
+ CurrentActor.get().message(_logSubject, QueueMessages.DELETED());
+
+ }
+ return getMessageCount();
+
+ }
+
+ public void stop()
+ {
+ if (!_stopped.getAndSet(true))
+ {
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ }
+ }
+
+ public void checkCapacity(AMQSessionModel channel)
+ {
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ _overfull.set(true);
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
+
+ _blockedChannels.add(channel);
+
+ channel.block(this);
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ if(_overfull.compareAndSet(true,false))
+ {//Underfull log message
+ _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+ }
+
+ for(final AMQSessionModel blockedChannel : _blockedChannels)
+ {
+ blockedChannel.unblock(this);
+ _blockedChannels.remove(blockedChannel);
+ }
+ }
+ }
+ }
+
+ private QueueRunner _queueRunner = new QueueRunner(this);
+
+ public void deliverAsync()
+ {
+ _stateChangeCount.incrementAndGet();
+
+ _queueRunner.execute(_asyncDelivery);
+
+ }
+
+ public void deliverAsync(Subscription sub)
+ {
+ if(_exclusiveSubscriber == null)
+ {
+ deliverAsync();
+ }
+ else
+ {
+ SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+ if(flusher == null)
+ {
+ flusher = new SubFlushRunner(sub);
+ sub.set(SUB_FLUSH_RUNNER, flusher);
+ }
+ flusher.execute(_asyncDelivery);
+ }
+
+ }
+
+ public void flushSubscription(Subscription sub) throws AMQException
+ {
+ // Access control
+ if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
+ {
+ throw new AMQSecurityException("Permission denied: " + getName());
+ }
+ flushSubscription(sub, Long.MAX_VALUE);
+ }
+
+ public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ {
+ boolean atTail = false;
+ final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ boolean queueEmpty = false;
+
+ try
+ {
+ if(keepSendLockHeld)
+ {
+ sub.getSendLock();
+ }
+ while (!sub.isSuspended() && !atTail && iterations != 0)
+ {
+ try
+ {
+ if(!keepSendLockHeld)
+ {
+ sub.getSendLock();
+ }
+
+ atTail = attemptDelivery(sub, true);
+ if (atTail && getNextAvailableEntry(sub) == null)
+ {
+ queueEmpty = true;
+ }
+ else if (!atTail)
+ {
+ iterations--;
+ }
+ }
+ finally
+ {
+ if(!keepSendLockHeld)
+ {
+ sub.releaseSendLock();
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(keepSendLockHeld)
+ {
+ sub.releaseSendLock();
+ }
+ if(queueEmpty)
+ {
+ sub.queueEmpty();
+ }
+
+ sub.flushBatched();
+
+ }
+
+
+ // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
+ // which would give us memory "leak".
+
+ if (!hasExclusiveSubscriber())
+ {
+ advanceAllSubscriptions();
+ }
+ return atTail;
+ }
+
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ *
+ * @param sub
+ * @param batch
+ * @return true if we have completed all possible deliveries for this sub.
+ * @throws AMQException
+ */
+ private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
+ {
+ boolean atTail = false;
+
+ boolean subActive = sub.isActive() && !sub.isSuspended();
+ if (subActive)
+ {
+
+ QueueEntry node = getNextAvailableEntry(sub);
+
+ if (node != null && node.isAvailable())
+ {
+ if (sub.hasInterest(node) && mightAssign(sub, node))
+ {
+ if (!sub.wouldSuspend(node))
+ {
+ if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node, batch);
+ }
+
+ }
+ else // Not enough Credit for message and wouldSuspend
+ {
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub));
+ }
+ }
+
+ }
+ atTail = (node == null) || (_entries.next(node) == null);
+ }
+ return atTail || !subActive;
+ }
+
+ protected void advanceAllSubscriptions() throws AMQException
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ while (subscriberIter.advance())
+ {
+ SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
+ Subscription sub = subNode.getSubscription();
+ if(sub.acquires())
+ {
+ getNextAvailableEntry(sub);
+ }
+ else
+ {
+ // TODO
+ }
+ }
+ }
+
+ private QueueEntry getNextAvailableEntry(final Subscription sub)
+ throws AMQException
+ {
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry lastSeen = context.getLastSeenEntry();
+ QueueEntry releasedNode = context.getReleasedEntry();
+
+ QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+
+ boolean expired = false;
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+ !mightAssign(sub,node)))
+ {
+ if (expired)
+ {
+ expired = false;
+ if (node.acquire())
+ {
+ dequeueEntry(node);
+ }
+ }
+
+ if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+ {
+ QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+ }
+
+ lastSeen = context.getLastSeenEntry();
+ releasedNode = context.getReleasedEntry();
+ node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+ }
+ return node;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ {
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry releasedNode = context.getReleasedEntry();
+ return releasedNode != null && releasedNode.compareTo(entry) < 0;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Used by queue Runners to asynchronously deliver messages to consumers.
+ *
+ * A queue Runner is started whenever a state change occurs, e.g when a new
+ * message arrives on the queue and cannot be immediately delivered to a
+ * subscription (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * capable of accepting/delivering all messages then these messages would
+ * otherwise remain on the queue.
+ *
+ * processQueue should be running while there are messages on the queue AND
+ * there are subscriptions that can deliver them. If there are no
+ * subscriptions capable of delivering the remaining messages on the queue
+ * then processQueue should stop to prevent spinning.
+ *
+ * Since processQueue is runs in a fixed size Executor, it should not run
+ * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+ * incoming messages may not be able to be scheduled in the thread pool
+ * because all threads are working on clearing down large queues). To solve
+ * this problem, after an arbitrary number of message deliveries the
+ * processQueue job stops iterating, resubmits itself to the executor, and
+ * ends the current instance
+ *
+ * @param runner the Runner to schedule
+ * @throws AMQException
+ */
+ public long processQueue(QueueRunner runner) throws AMQException
+ {
+ long stateChangeCount = Long.MIN_VALUE;
+ long previousStateChangeCount = Long.MIN_VALUE;
+ long rVal = Long.MIN_VALUE;
+ boolean deliveryIncomplete = true;
+
+ boolean lastLoop = false;
+ int iterations = MAX_ASYNC_DELIVERIES;
+
+ final int numSubs = _subscriptionList.size();
+
+ final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
+
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
+ while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
+ {
+ // we want to have one extra loop after every subscription has reached the point where it cannot move
+ // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // move forward in the next iteration
+
+ if (previousStateChangeCount != stateChangeCount)
+ {
+ //further asynchronous delivery is required since the
+ //previous loop. keep going if iteration slicing allows.
+ lastLoop = false;
+ rVal = stateChangeCount;
+ }
+
+ previousStateChangeCount = stateChangeCount;
+ boolean allSubscriptionsDone = true;
+ boolean subscriptionDone;
+
+ SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ //iterate over the subscribers and try to advance their pointer
+ while (subscriptionIter.advance())
+ {
+ Subscription sub = subscriptionIter.getNode().getSubscription();
+ sub.getSendLock();
+
+ try
+ {
+ for(int i = 0 ; i < perSub; i++)
+ {
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub, true);
+ if (subscriptionDone)
+ {
+ sub.flushBatched();
+ if (lastLoop && !sub.isSuspended())
+ {
+ sub.queueEmpty();
+ }
+ break;
+ }
+ else
+ {
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
+ if(--iterations == 0)
+ {
+ sub.flushBatched();
+ break;
+ }
+ }
+
+ }
+
+ sub.flushBatched();
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
+ if(allSubscriptionsDone && lastLoop)
+ {
+ //We have done an extra loop already and there are again
+ //again no further delivery attempts possible, only
+ //keep going if state change demands it.
+ deliveryIncomplete = false;
+ }
+ else if(allSubscriptionsDone)
+ {
+ //All subscriptions reported being done, but we have to do
+ //an extra loop if the iterations are not exhausted and
+ //there is still any work to be done
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ lastLoop = true;
+ }
+ else
+ {
+ //some subscriptions can still accept more messages,
+ //keep going if iteration count allows.
+ lastLoop = false;
+ deliveryIncomplete = true;
+ }
+
+ }
+
+ // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
+ // therefore we should schedule this runner again (unless someone beats us to it :-) ).
+ if (iterations == 0)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
+ return 0L;
+ }
+ return rVal;
+
+ }
+
+ public void checkMessageStatus() throws AMQException
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while (queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ // Only process nodes that are not currently deleted and not dequeued
+ if (!node.isDispensed())
+ {
+ // If the node has exired then acquire it
+ if (node.expired() && node.acquire())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
+ }
+ else
+ {
+ // There is a chance that the node could be deleted by
+ // the time the check actually occurs. So verify we
+ // can actually get the message to perform the check.
+ ServerMessage msg = node.getMessage();
+ if (msg != null)
+ {
+ checkForNotification(msg);
+ }
+ }
+ }
+ }
+
+ }
+
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ if (maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ }
+
+ public long getMaximumMessageCount()
+ {
+ return _maximumMessageCount;
+ }
+
+ public void setMaximumMessageCount(final long maximumMessageCount)
+ {
+ _maximumMessageCount = maximumMessageCount;
+ if (maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+ }
+
+ public long getMaximumQueueDepth()
+ {
+ return _maximumQueueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
+ {
+ _maximumQueueDepth = maximumQueueDepth;
+ if (maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
+ }
+
+ public long getMaximumMessageSize()
+ {
+ return _maximumMessageSize;
+ }
+
+ public void setMaximumMessageSize(final long maximumMessageSize)
+ {
+ _maximumMessageSize = maximumMessageSize;
+ if (maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ }
+
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+
+ checkCapacity();
+ }
+
+ public boolean isOverfull()
+ {
+ return _overfull.get();
+ }
+
+ public Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
+ }
+
+ private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ {
+
+ private final Subscription _sub;
+
+ public QueueEntryListener(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof SimpleAMQQueue.QueueEntryListener
+ && _sub == ((QueueEntryListener) o)._sub;
+ }
+
+ public int hashCode()
+ {
+ return System.identityHashCode(_sub);
+ }
+
+ public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ {
+ entry.removeStateChangeListener(this);
+ deliverAsync(_sub);
+ }
+ }
+
+ public List<Long> getMessagesOnTheQueue(int num)
+ {
+ return getMessagesOnTheQueue(num, 0);
+ }
+
+ public List<Long> getMessagesOnTheQueue(int num, int offset)
+ {
+ ArrayList<Long> ids = new ArrayList<Long>(num);
+ QueueEntryIterator it = _entries.iterator();
+ for (int i = 0; i < offset; i++)
+ {
+ it.advance();
+ }
+
+ for (int i = 0; i < num && !it.atTail(); i++)
+ {
+ it.advance();
+ ids.add(it.getNode().getMessage().getMessageNumber());
+ }
+ return ids;
+ }
+
+ public AMQSessionModel getExclusiveOwningSession()
+ {
+ return _exclusiveOwner;
+ }
+
+ public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
+ {
+ _exclusive = true;
+ _exclusiveOwner = exclusiveOwner;
+ }
+
+
+ public void configure(QueueConfiguration config)
+ {
+ if (config != null)
+ {
+ setMaximumMessageAge(config.getMaximumMessageAge());
+ setMaximumQueueDepth(config.getMaximumQueueDepth());
+ setMaximumMessageSize(config.getMaximumMessageSize());
+ setMaximumMessageCount(config.getMaximumMessageCount());
+ setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(config.getMaxDeliveryCount());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
+ }
+ }
+
+ public long getMessageDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public long getTotalEnqueueSize()
+ {
+ return _enqueueSize.get();
+ }
+
+ public long getTotalDequeueSize()
+ {
+ return _dequeueSize.get();
+ }
+
+ public long getByteTxnEnqueues()
+ {
+ return _byteTxnEnqueues.get();
+ }
+
+ public long getByteTxnDequeues()
+ {
+ return _byteTxnDequeues.get();
+ }
+
+ public long getMsgTxnEnqueues()
+ {
+ return _msgTxnEnqueues.get();
+ }
+
+ public long getMsgTxnDequeues()
+ {
+ return _msgTxnDequeues.get();
+ }
+
+ public long getPersistentByteEnqueues()
+ {
+ return _persistentMessageEnqueueSize.get();
+ }
+
+ public long getPersistentByteDequeues()
+ {
+ return _persistentMessageDequeueSize.get();
+ }
+
+ public long getPersistentMsgEnqueues()
+ {
+ return _persistentMessageEnqueueCount.get();
+ }
+
+ public long getPersistentMsgDequeues()
+ {
+ return _persistentMessageDequeueCount.get();
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return getName();
+ }
+
+ public long getUnackedMessageCountHigh()
+ {
+ return _unackedMsgCountHigh.get();
+ }
+
+ public long getUnackedMessageCount()
+ {
+ return _unackedMsgCount.get();
+ }
+
+ public long getUnackedMessageBytes()
+ {
+ return _unackedMsgBytes.get();
+ }
+
+ public void decrementUnackedMsgCount(QueueEntry queueEntry)
+ {
+ _unackedMsgCount.decrementAndGet();
+ _unackedMsgBytes.addAndGet(-queueEntry.getSize());
+ }
+
+ private void incrementUnackedMsgCount(QueueEntry entry)
+ {
+ long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+ _unackedMsgBytes.addAndGet(entry.getSize());
+
+ long unackedMsgCountHigh;
+ while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
+ {
+ if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
+ {
+ break;
+ }
+ }
+ }
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+ public int getMaximumDeliveryCount()
+ {
+ return _maximumDeliveryCount;
+ }
+
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+ {
+ _maximumDeliveryCount = maximumDeliveryCount;
+ }
+
+ /**
+ * Checks if there is any notification to send to the listeners
+ */
+ private void checkForNotification(ServerMessage<?> msg) throws AMQException
+ {
+ final Set<NotificationCheck> notificationChecks = getNotificationChecks();
+ final AMQQueue.NotificationListener listener = _notificationListener;
+
+ if(listener != null && !notificationChecks.isEmpty())
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
+ {
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ {
+ if (check.notifyIfNecessary(msg, this, listener))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
+ }
+ }
+ }
+
+ public void setNotificationListener(AMQQueue.NotificationListener listener)
+ {
+ _notificationListener = listener;
+ }
+
+ @Override
+ public void setDescription(String description)
+ {
+ if (description == null)
+ {
+ _arguments.remove(Queue.DESCRIPTION);
+ }
+ else
+ {
+ _arguments.put(Queue.DESCRIPTION, description);
+ }
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return (String) _arguments.get(Queue.DESCRIPTION);
+ }
+
+}