summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java1174
1 files changed, 1174 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
new file mode 100644
index 0000000000..b028d60b19
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -0,0 +1,1174 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
+
+import javax.management.JMException;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+{
+ private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
+ private final AMQShortString _name;
+
+ /** null means shared */
+ private final AMQShortString _owner;
+
+ private final boolean _durable;
+
+ /** If true, this queue is deleted when the last subscriber is removed */
+ private final boolean _autoDelete;
+
+ private final VirtualHost _virtualHost;
+
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+ private final ExchangeBindings _bindings = new ExchangeBindings(this);
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+ private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
+ private final AtomicInteger _messageCount = new AtomicInteger(0);
+
+ private final AtomicLong _depth = new AtomicLong(0L);
+
+
+ private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
+
+ private final AtomicBoolean _quiesced = new AtomicBoolean(false);
+
+ /**
+ * the _enqueueLock is used to control the entry of new messages onto the queue. In normal operation many threads
+ * may concurrently enqueue messages. However while certain operations are being carried out (e.g. clearing the
+ * queue), it is important to prevent new messages being added to the queue. To obtain this behaviour we use the
+ * readLock for shared "enqueue" access and the write lock for the exclusive access.
+ */
+ // private final ReadWriteLock _enqueueLock = new ReentrantReadWriteLock();
+
+
+ private final Lock _subscriberLock = new ReentrantLock();
+
+// private final List<Subscription> _subscriberList = new CopyOnWriteArrayList<Subscription>();
+
+
+ private final SubscriptionList _subscriptionList = new SubscriptionList(this);
+ private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+
+ private boolean _exclusiveSubscriber;
+
+
+ private final QueueEntryList _entries;
+
+
+ private final AMQQueueMBean _managedObject;
+ private final Executor _asyncDelivery;
+ private final AtomicLong _totalMessagesReceived = new AtomicLong();
+
+
+
+ /** max allowed size(KB) of a single message */
+ @Configured(path = "maximumMessageSize", defaultValue = "0")
+ public long _maximumMessageSize;
+
+ /** max allowed number of messages on a queue. */
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public long _maximumMessageCount;
+
+ /** max queue depth for the queue */
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth;
+
+ /** maximum message age before alerts occur */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge;
+
+ /** the minimum interval between sending out consequetive alerts of the same type */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap;
+
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
+
+ private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
+ private AtomicReference _asynchronousRunner = new AtomicReference(null);
+ private AtomicInteger _deliveredMessages = new AtomicInteger();
+
+
+ SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
+ throws AMQException
+ {
+
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
+ if (virtualHost == null)
+ {
+ throw new IllegalArgumentException("Virtual Host must not be null");
+ }
+
+ _name = name;
+ _durable = durable;
+ _owner = owner;
+ _autoDelete = autoDelete;
+ _virtualHost = virtualHost;
+ _entries = new QueueEntryList(this);
+
+ _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor();
+
+ try
+ {
+ _managedObject = new AMQQueueMBean(this);
+ _managedObject.register();
+ }
+ catch (JMException e)
+ {
+ throw new AMQException("AMQQueue MBean creation has failed ", e);
+ }
+
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
+ }
+
+ // ------ Getters and Setters
+
+ public AMQShortString getName()
+ {
+ return _name;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+
+ // ------ bind and unbind
+
+ public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
+ {
+ exchange.registerQueue(routingKey, this, arguments);
+ if (isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ }
+
+ _bindings.addBinding(routingKey, arguments, exchange);
+ }
+
+ public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
+ {
+ exchange.deregisterQueue(routingKey, this, arguments);
+ if (isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ }
+
+ boolean removed = _bindings.remove(routingKey, arguments, exchange);
+ if(!removed)
+ {
+ _logger.error("Mismatch between queue bindings and exchange record of bindings");
+ }
+ }
+
+
+ // ------ Manage Subscriptions
+
+ public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
+ {
+
+
+ if(_exclusiveSubscriber)
+ {
+ throw new ExistingExclusiveSubscription();
+ }
+
+ if(exclusive && getConsumerCount() != 0)
+ {
+ throw new ExistingSubscriptionPreventsExclusive();
+ }
+
+ _exclusiveSubscriber = exclusive;
+
+ _activeSubscriberCount.incrementAndGet();
+ subscription.setStateListener(this);
+ subscription.setQueueContext(null,_entries.getHead());
+
+ if(!_deleted.get())
+ {
+ subscription.setQueue(this);
+ _subscriptionList.add(subscription);
+ if(_deleted.get())
+ {
+ subscription.queueDeleted(this);
+ }
+ }
+ else
+ {
+ // TODO
+ }
+
+
+ deliverAsync();
+
+ }
+
+ public void unregisterSubscription(final Subscription subscription) throws AMQException
+ {
+ if(subscription == null)
+ {
+ throw new NullPointerException("subscription argument is null");
+ }
+
+
+
+
+ boolean removed = _subscriptionList.remove(subscription);
+
+
+
+ if(removed)
+ {
+ subscription.close();
+ // No longer can the queue have an exclusive consumer
+ _exclusiveSubscriber = false;
+
+
+
+
+
+ // auto-delete queues must be deleted if there are no remaining subscribers
+
+ if (_autoDelete && getConsumerCount() == 0)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Auto-deleteing queue:" + this);
+ }
+
+ delete();
+
+ // we need to manually fire the event to the removed subscription (which was the last one left for this
+ // queue. This is because the delete method uses the subscription set which has just been cleared
+ subscription.queueDeleted(this);
+ }
+ }
+
+
+ }
+
+
+ // ------ Enqueue / Dequeue
+
+ public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ {
+ // need to get the enqueue lock
+
+
+ _messageCount.incrementAndGet();
+
+ _totalMessagesReceived.incrementAndGet();
+ _depth.addAndGet(message.getSize());
+
+ QueueEntry entry = _entries.add(message);
+
+ /*
+
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
+ if(nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().getNext();
+ }
+ while(nextNode != null)
+ {
+ if(_lastSubscriptionNode.compareAndSet(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if(nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().getNext();
+ }
+ }
+ }
+
+
+
+ int loops = 2;
+
+ while(!entry.isAcquired() && loops != 0)
+ {
+ if(nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ synchronized(sub.getSendLock())
+ {
+ if(subscriptionReady(sub, entry)
+ && !sub.isSuspended()
+ && sub.isActive())
+ {
+
+ if( sub.hasInterest(entry) )
+ {
+ if( !sub.wouldSuspend(entry))
+ {
+ if(!sub.isBrowser() && entry.acquire(sub))
+ {
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry);
+ QueueEntryList.QueueEntryNode queueEntryNode = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ if(queueEntryNode.getNext() == entry)
+ {
+ sub.setQueueContext(queueEntryNode,entry);
+ }
+
+ }
+ }
+ }
+ else
+ {
+ QueueEntryList.QueueEntryNode queueEntryNode = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ if(queueEntryNode.getNext() == entry)
+ {
+ sub.setQueueContext(queueEntryNode,entry);
+ }
+ }
+ }
+ }
+ }
+ nextNode = nextNode.getNext();
+
+ }
+
+
+
+ if(entry.immediateAndNotDelivered())
+ {
+ dequeue(storeContext, entry);
+ }
+ else if(!entry.isAcquired())
+ {
+ deliverAsync();
+ }
+
+
+ try
+ {
+ _managedObject.checkForNotification(entry.getMessage());
+ }
+ catch (JMException e)
+ {
+ throw new AMQException("Unable to get notification from manage queue: " + e, e);
+ }
+
+
+ return entry;
+
+ }
+
+ private void deliverMessage(final Subscription sub, final QueueEntry entry)
+ throws AMQException
+ {
+ _deliveredMessages.incrementAndGet();
+ sub.send(entry);
+
+ }
+
+ private boolean subscriptionReady(final Subscription sub, final QueueEntry entry)
+ {
+
+ QueueEntryList.QueueEntryNode node = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ while(node.isAcquired() || node.isDeleted() || !sub.hasInterest(node) )
+ {
+
+ QueueEntryList.QueueEntryNode newNode = node.getNext();
+ if(newNode != null)
+ {
+ sub.setQueueContext(node, newNode);
+ node = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ }
+ else
+ {
+ break;
+ }
+
+ }
+
+ return node == entry;
+
+ }
+
+ public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ {
+
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while(subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if(!sub.isBrowser())
+ {
+ QueueEntry subEntry = (QueueEntry) sub.getQueueContext();
+ while(entry.compareTo(subEntry)<0)
+ {
+ if(sub.setQueueContext(subEntry,entry))
+ {
+ break;
+ }
+ else
+ {
+ subEntry = (QueueEntry) sub.getQueueContext();
+ }
+ }
+ }
+ }
+
+
+ deliverAsync();
+
+
+ }
+
+ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+ {
+ _messageCount.decrementAndGet();
+ _depth.addAndGet(-entry.getMessage().getSize());
+ if(entry.acquiredBySubscription())
+ {
+ _deliveredMessages.decrementAndGet();
+ }
+
+ try
+ {
+ AMQMessage msg = entry.getMessage();
+ if(isDurable() && msg.isPersistent())
+ {
+ _virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId());
+ }
+ ((QueueEntryList.QueueEntryNode)entry).delete();
+
+ }
+ catch (MessageCleanupException e)
+ {
+ // Message was dequeued, but could not then be deleted
+ // though it is no longer referenced. This should be very
+ // rare and can be detected and cleaned up on recovery or
+ // done through some form of manual intervention.
+ _logger.error(e, e);
+ }
+ catch (AMQException e)
+ {
+ throw new FailedDequeueException(_name.toString(), e);
+ }
+
+
+ }
+
+ public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ {
+ /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
+ entry to resend and move back the subscription pointer. */
+
+ synchronized(subscription.getSendLock())
+ {
+ if(!subscription.isClosed())
+ {
+ deliverMessage(subscription, entry);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+
+
+
+
+ public int getConsumerCount()
+ {
+ return _subscriptionList.size();
+ }
+
+ public int getActiveConsumerCount()
+ {
+ return _activeSubscriberCount.get();
+ }
+
+ public boolean isUnused()
+ {
+ return getConsumerCount() == 0;
+ }
+
+ public boolean isEmpty()
+ {
+ return getMessageCount() == 0;
+ }
+
+ public int getMessageCount()
+ {
+ return _messageCount.get();
+ }
+
+ public long getQueueDepth()
+ {
+ return _depth.get();
+ }
+
+ public int getUndeliveredMessageCount()
+ {
+ int count = getMessageCount() - _deliveredMessages.get();
+ if(count < 0)
+ {
+ return 0;
+ }
+ else
+ {
+ return count;
+ }
+ }
+
+
+ public long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived.get();
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted.get();
+ }
+
+
+
+ public List<QueueEntry> getMessagesOnTheQueue()
+ {
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryList.QueueEntryIterator queueListIterator = _entries.iterator();
+ while(queueListIterator.advance())
+ {
+ QueueEntryList.QueueEntryNode node = queueListIterator.getNode();
+ if(node != null && !node.isDeleted())
+ {
+ entryList.add(node);
+ }
+ }
+ return entryList;
+
+ }
+
+ public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ {
+ if(oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ {
+ _activeSubscriberCount.decrementAndGet();
+
+ }
+ else if(newState == Subscription.State.ACTIVE && oldState != Subscription.State.ACTIVE)
+ {
+ _activeSubscriberCount.incrementAndGet();
+ deliverAsync();
+ }
+ }
+
+ public static interface QueueEntryFilter
+ {
+ public boolean accept(QueueEntry entry);
+
+ public boolean filterComplete();
+ }
+
+
+
+ public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ {
+ return getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ return messageId >= fromMessageId && messageId <= toMessageId;
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+ }
+
+ public QueueEntry getMessageOnTheQueue(final long messageId)
+ {
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+ private boolean _complete;
+
+ public boolean accept(QueueEntry entry)
+ {
+ _complete = entry.getMessage().getMessageId() == messageId;
+ return _complete;
+ }
+
+ public boolean filterComplete()
+ {
+ return _complete;
+ }
+ });
+ return entries.isEmpty() ? null : entries.get(0);
+ }
+
+
+ public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+ {
+ ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ QueueEntryList.QueueEntryIterator queueListIterator = _entries.iterator();
+ while(queueListIterator.advance() && !filter.filterComplete())
+ {
+ QueueEntryList.QueueEntryNode node = queueListIterator.getNode();
+ if(!node.isDeleted() && filter.accept(node))
+ {
+ entryList.add(node);
+ }
+ }
+ return entryList;
+
+ }
+
+
+ public void moveMessagesToAnotherQueue(long fromMessageId,
+ long toMessageId,
+ String queueName,
+ StoreContext storeContext)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void copyMessagesToAnotherQueue(long fromMessageId,
+ long toMessageId,
+ String queueName,
+ StoreContext storeContext)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+
+
+ public void quiesce()
+ {
+ _quiesced.set(true);
+ }
+
+ public void start()
+ {
+ if(_quiesced.compareAndSet(true,false))
+ {
+ deliverAsync();
+ }
+ }
+
+
+
+
+ // ------ Management functions
+
+
+ public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ {
+ QueueEntryList.QueueEntryIterator queueListIterator = _entries.iterator();
+ boolean noDeletes = true;
+
+ while(noDeletes && queueListIterator.advance() )
+ {
+ QueueEntryList.QueueEntryNode node = queueListIterator.getNode();
+ if(!node.isDeleted() && !node.acquire(null))
+ {
+ node.dequeue(storeContext);
+
+ node.dispose(storeContext);
+
+ noDeletes = false;
+ }
+
+ }
+ }
+
+ public long clearQueue(StoreContext storeContext) throws AMQException
+ {
+
+ QueueEntryList.QueueEntryIterator queueListIterator = _entries.iterator();
+ long count = 0;
+
+ while(queueListIterator.advance())
+ {
+ QueueEntryList.QueueEntryNode node = queueListIterator.getNode();
+ if(!node.isDeleted() && !node.acquire(null))
+ {
+ node.dequeue(storeContext);
+
+ node.dispose(storeContext);
+
+ count++;
+ }
+
+ }
+ return count;
+
+ }
+
+
+ public void addQueueDeleteTask(final Task task)
+ {
+ _deleteTaskList.add(task);
+ }
+
+ public int delete() throws AMQException
+ {
+ if (!_deleted.getAndSet(true))
+ {
+
+ SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+
+ while (subscriptionIter.advance())
+ {
+ Subscription s = subscriptionIter.getNode().getSubscription();
+ if(s != null)
+ {
+ s.queueDeleted(this);
+ }
+ }
+
+ _bindings.deregister();
+ _virtualHost.getQueueRegistry().unregisterQueue(_name);
+
+
+ _managedObject.unregister();
+ for (Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+
+ _deleteTaskList.clear();
+ }
+ return getMessageCount();
+
+ }
+
+
+ public void deliverAsync()
+ {
+ _stateChangeCount.incrementAndGet();
+
+ if(_asynchronousRunner.get() == null)
+ {
+ _asyncDelivery.execute(new Runner());
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("async delivery (Q: " + getName() + " VH: " + _virtualHost.getName() +")");
+ try
+ {
+ processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ Thread.currentThread().setName(startName);
+ }
+ }
+
+
+ private void processQueue(Runnable runner) throws AMQException
+ {
+ long stateChangeCount;
+ long previousStateChangeCount = Long.MIN_VALUE;
+ boolean deliveryIncomplete = true;
+
+ int extraLoops = 1;
+
+ while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+ {
+ // we want to have one extra loop after the every subscription has reached the point where it cannot move
+ // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // move forward in the next iteration
+
+ if(previousStateChangeCount != stateChangeCount)
+ {
+ extraLoops = 1;
+ }
+
+ previousStateChangeCount = stateChangeCount;
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ boolean done = true;
+
+
+ SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ //iterate over the subscribers and try to advance their pointer
+ while(subscriptionIter.advance())
+ {
+ Subscription sub = subscriptionIter.getNode().getSubscription();
+ if(sub != null)
+ {
+ synchronized(sub.getSendLock())
+ {
+ if(sub.isActive())
+ {
+ boolean advanced = false;
+
+ QueueEntryList.QueueEntryNode node = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ while(node.isAcquired() || node.isDeleted() || node.expired())
+ {
+ if(!node.isAcquired() && !node.isDeleted() && node.expired())
+ {
+ if(node.acquire(null))
+ {
+ final StoreContext reapingStoreContext = new StoreContext();
+ node.dequeue(reapingStoreContext);
+ node.dispose(reapingStoreContext);
+
+ }
+ }
+ QueueEntryList.QueueEntryNode newNode = node.getNext();
+ if(newNode != null)
+ {
+ sub.setQueueContext(node, newNode);
+ node = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ }
+ else
+ {
+ break;
+ }
+
+ }
+ if(!(node.isAcquired() || node.isDeleted()))
+ {
+ if(!sub.isSuspended())
+ {
+ if(sub.hasInterest(node))
+ {
+ if(!sub.wouldSuspend(node))
+ {
+ if(!sub.isBrowser() && node.acquire(sub))
+ {
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node);
+
+ if(sub.isBrowser())
+ {
+ QueueEntryList.QueueEntryNode newNode = node.getNext();
+
+ if(newNode != null)
+ {
+ sub.setQueueContext(node, newNode);
+ node = (QueueEntryList.QueueEntryNode) sub.getQueueContext();
+ advanced = true;
+ }
+
+
+ }
+ }
+ done = false;
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntryList.QueueEntryNode newNode = node.getNext();
+ if(newNode != null)
+ {
+ sub.setQueueContext(node, newNode);
+ }
+ }
+ }
+ }
+ final boolean atTail = (node.getNext() == null);
+
+ done = done && atTail;
+
+ if(atTail && !advanced && sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
+
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConumerTag());
+
+ }
+
+ }
+ }
+ }
+ if(done)
+ {
+ if(extraLoops == 0)
+ {
+ deliveryIncomplete = false;
+ }
+ else
+ {
+ extraLoops--;
+ }
+ }
+ else
+ {
+ extraLoops = 1;
+ }
+ }
+
+
+
+ _asynchronousRunner.set(null);
+ }
+
+
+ }
+
+ public boolean performGet(final AMQProtocolSession session, final AMQChannel channel, final boolean b)
+ throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public void removeExpiredIfNoSubscribers() throws AMQException
+ {
+
+ final StoreContext storeContext = new StoreContext();
+
+ QueueEntryList.QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntryList.QueueEntryNode node = queueListIterator.getNode();
+ if(!node.isDeleted() && node.expired() && node.acquire(null))
+ {
+
+ node.dequeue(storeContext);
+
+ node.dispose(storeContext);
+
+ }
+
+ }
+
+ }
+
+
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ }
+
+ public long getMaximumMessageCount()
+ {
+ return _maximumMessageCount;
+ }
+
+ public void setMaximumMessageCount(final long maximumMessageCount)
+ {
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
+ }
+
+ public long getMaximumQueueDepth()
+ {
+ return _maximumQueueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
+ {
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
+ }
+
+ public long getMaximumMessageSize()
+ {
+ return _maximumMessageSize;
+ }
+
+ public void setMaximumMessageSize(final long maximumMessageSize)
+ {
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ }
+
+
+ public Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
+ }
+
+
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _managedObject;
+ }
+
+ public int compareTo(final Object o)
+ {
+ return _name.compareTo(((AMQQueue) o).getName());
+ }
+} \ No newline at end of file