summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java1604
1 files changed, 0 insertions, 1604 deletions
diff --git a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
deleted file mode 100644
index 7e7e8b2114..0000000000
--- a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ /dev/null
@@ -1,1604 +0,0 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.pool.ReadWriteRunnable;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.configuration.Configured;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import javax.management.JMException;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
-{
- private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
-
- private final AMQShortString _name;
-
- /** null means shared */
- private final AMQShortString _owner;
-
- private final boolean _durable;
-
- /** If true, this queue is deleted when the last subscriber is removed */
- private final boolean _autoDelete;
-
- private final VirtualHost _virtualHost;
-
- /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- private final ExchangeBindings _bindings = new ExchangeBindings(this);
-
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
- private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
-
- protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
- private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
-
- private volatile Subscription _exclusiveSubscriber;
-
- protected final QueueEntryList _entries;
-
- private final AMQQueueMBean _managedObject;
- private final Executor _asyncDelivery;
- private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
- /** max allowed size(KB) of a single message */
- @Configured(path = "maximumMessageSize", defaultValue = "0")
- public long _maximumMessageSize;
-
- /** max allowed number of messages on a queue. */
- @Configured(path = "maximumMessageCount", defaultValue = "0")
- public long _maximumMessageCount;
-
- /** max queue depth for the queue */
- @Configured(path = "maximumQueueDepth", defaultValue = "0")
- public long _maximumQueueDepth;
-
- /** maximum message age before alerts occur */
- @Configured(path = "maximumMessageAge", defaultValue = "0")
- public long _maximumMessageAge;
-
- /** the minimum interval between sending out consequetive alerts of the same type */
- @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
- public long _minimumAlertRepeatGap;
-
- private static final int MAX_ASYNC_DELIVERIES = 10;
-
- private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
- private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
- private AtomicReference _asynchronousRunner = new AtomicReference(null);
- private AtomicInteger _deliveredMessages = new AtomicInteger();
- private AtomicBoolean _stopped = new AtomicBoolean(false);
-
- protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
- }
-
- protected SimpleAMQQueue(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory)
- throws AMQException
- {
-
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _owner = owner;
- _autoDelete = autoDelete;
- _virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList(this);
-
- _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
-
- try
- {
- _managedObject = new AMQQueueMBean(this);
- _managedObject.register();
- }
- catch (JMException e)
- {
- throw new AMQException("AMQQueue MBean creation has failed ", e);
- }
-
- resetNotifications();
-
- }
-
- private void resetNotifications()
- {
- // This ensure that the notification checks for the configured alerts are created.
- setMaximumMessageAge(_maximumMessageAge);
- setMaximumMessageCount(_maximumMessageCount);
- setMaximumMessageSize(_maximumMessageSize);
- setMaximumQueueDepth(_maximumQueueDepth);
- }
-
- // ------ Getters and Setters
-
- public AMQShortString getName()
- {
- return _name;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public AMQShortString getOwner()
- {
- return _owner;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- // ------ bind and unbind
-
- public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- exchange.registerQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- }
-
- _bindings.addBinding(routingKey, arguments, exchange);
- }
-
- public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- exchange.deregisterQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- }
-
- boolean removed = _bindings.remove(routingKey, arguments, exchange);
- if (!removed)
- {
- _logger.error("Mismatch between queue bindings and exchange record of bindings");
- }
- }
-
- public List<ExchangeBinding> getExchangeBindings()
- {
- return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
- }
-
- // ------ Manage Subscriptions
-
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
- {
-
- if (isExclusiveSubscriber())
- {
- throw new ExistingExclusiveSubscription();
- }
-
- if (exclusive)
- {
- if (getConsumerCount() != 0)
- {
- throw new ExistingSubscriptionPreventsExclusive();
- }
- else
- {
- _exclusiveSubscriber = subscription;
-
- }
- }
-
- _activeSubscriberCount.incrementAndGet();
- subscription.setStateListener(this);
- subscription.setLastSeenEntry(null, _entries.getHead());
-
- if (!isDeleted())
- {
- subscription.setQueue(this);
- _subscriptionList.add(subscription);
- if (isDeleted())
- {
- subscription.queueDeleted(this);
- }
- }
- else
- {
- // TODO
- }
-
- deliverAsync(subscription);
-
- }
-
- public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
- {
- if (subscription == null)
- {
- throw new NullPointerException("subscription argument is null");
- }
-
- boolean removed = _subscriptionList.remove(subscription);
-
- if (removed)
- {
- subscription.close();
- // No longer can the queue have an exclusive consumer
- setExclusiveSubscriber(null);
-
- QueueEntry lastSeen;
-
- while ((lastSeen = subscription.getLastSeenEntry()) != null)
- {
- subscription.setLastSeenEntry(lastSeen, null);
- }
-
- // auto-delete queues must be deleted if there are no remaining subscribers
-
- if (_autoDelete && getConsumerCount() == 0)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Auto-deleteing queue:" + this);
- }
-
- delete();
-
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
- }
- }
-
- }
-
- // ------ Enqueue / Dequeue
-
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
- {
-
- incrementQueueCount();
- incrementQueueSize(message);
-
- _totalMessagesReceived.incrementAndGet();
-
- QueueEntry entry;
- Subscription exclusiveSub = _exclusiveSubscriber;
-
- if (exclusiveSub != null)
- {
- exclusiveSub.getSendLock();
-
- try
- {
- entry = _entries.add(message);
-
- deliverToSubscription(exclusiveSub, entry);
-
- // where there is more than one producer there's a reasonable chance that even though there is
- // no "queueing" we do not deliver because we get an interleving of _entries.add and
- // deliverToSubscription between threads. Therefore have one more try.
- if (!(entry.isAcquired() || entry.isDeleted()))
- {
- deliverToSubscription(exclusiveSub, entry);
- }
- }
- finally
- {
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
- /*
-
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
-
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while (nextNode != null)
- {
- if (_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
- {
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- }
- }
-
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _subscriptionList.getHead();
- }
- else
- {
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
- }
- nextNode = nextNode.getNext();
-
- }
- }
-
- if (entry.immediateAndNotDelivered())
- {
- dequeue(storeContext, entry);
- entry.dispose(storeContext);
- }
- else if (!(entry.isAcquired() || entry.isDeleted()))
- {
- checkSubscriptionsNotAheadOfDelivery(entry);
-
- deliverAsync();
- }
-
- try
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
- catch (JMException e)
- {
- throw new AMQException("Unable to get notification from manage queue: " + e, e);
- }
-
- return entry;
-
- }
-
- private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
- throws AMQException
- {
-
- sub.getSendLock();
- try
- {
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
- {
- if (!sub.wouldSuspend(entry))
- {
- if (!sub.isBrowser() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
-
- deliverMessage(sub, entry);
-
- }
- }
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
-
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // This method is only required for queues which mess with ordering
- // Simple Queues don't :-)
- }
-
- private void incrementQueueSize(final AMQMessage message)
- {
- getAtomicQueueSize().addAndGet(message.getSize());
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
- }
-
- private void deliverMessage(final Subscription sub, final QueueEntry entry)
- throws AMQException
- {
- _deliveredMessages.incrementAndGet();
- sub.send(entry);
-
- }
-
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
- {
-
- // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
- // interest in.
- QueueEntry node = sub.getLastSeenEntry();
- while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
- {
-
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- node = null;
- break;
- }
-
- }
-
- if (node == entry)
- {
- // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
- // good
- return true;
- }
- else
- {
- // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
- // no-one else has updated it to something furhter on in the list
- //TODO - check
- //updateLastSeenEntry(sub, entry);
- return false;
- }
-
- }
-
- private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
- {
- QueueEntry node = sub.getLastSeenEntry();
-
- if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
- {
- do
- {
- if (sub.setLastSeenEntry(node, entry))
- {
- return;
- }
- else
- {
- node = sub.getLastSeenEntry();
- }
- }
- while (node != null && entry.compareTo(node) < 0);
- }
-
- }
-
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
- {
-
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (!sub.isBrowser())
- {
- updateLastSeenEntry(sub, entry);
- }
- }
-
- deliverAsync();
-
- }
-
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
- {
- decrementQueueCount();
- decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
- {
- _deliveredMessages.decrementAndGet();
- }
-
- try
- {
- AMQMessage msg = entry.getMessage();
- if (msg.isPersistent())
- {
- _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
- }
- //entry.dispose(storeContext);
-
- }
- catch (MessageCleanupException e)
- {
- // Message was dequeued, but could not then be deleted
- // though it is no longer referenced. This should be very
- // rare and can be detected and cleaned up on recovery or
- // done through some form of manual intervention.
- _logger.error(e, e);
- }
- catch (AMQException e)
- {
- throw new FailedDequeueException(_name.toString(), e);
- }
-
- }
-
- private void decrementQueueSize(final QueueEntry entry)
- {
- getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- }
-
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
- {
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
-
- subscription.getSendLock();
- try
- {
- if (!subscription.isClosed())
- {
- deliverMessage(subscription, entry);
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- subscription.releaseSendLock();
- }
- }
-
- public int getConsumerCount()
- {
- return _subscriptionList.size();
- }
-
- public int getActiveConsumerCount()
- {
- return _activeSubscriberCount.get();
- }
-
- public boolean isUnused()
- {
- return getConsumerCount() == 0;
- }
-
- public boolean isEmpty()
- {
- return getMessageCount() == 0;
- }
-
- public int getMessageCount()
- {
- return getAtomicQueueCount().get();
- }
-
- public long getQueueDepth()
- {
- return getAtomicQueueSize().get();
- }
-
- public int getUndeliveredMessageCount()
- {
- int count = getMessageCount() - _deliveredMessages.get();
- if (count < 0)
- {
- return 0;
- }
- else
- {
- return count;
- }
- }
-
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
- }
-
- public long getOldestMessageArrivalTime()
- {
- QueueEntry entry = getOldestQueueEntry();
- return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
- }
-
- protected QueueEntry getOldestQueueEntry()
- {
- return _entries.next(_entries.getHead());
- }
-
- public boolean isDeleted()
- {
- return _deleted.get();
- }
-
- public List<QueueEntry> getMessagesOnTheQueue()
- {
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDeleted())
- {
- entryList.add(node);
- }
- }
- return entryList;
-
- }
-
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
- {
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
- {
- _activeSubscriberCount.decrementAndGet();
-
- }
- else if (newState == Subscription.State.ACTIVE)
- {
- if (oldState != Subscription.State.ACTIVE)
- {
- _activeSubscriberCount.incrementAndGet();
-
- }
- deliverAsync(sub);
- }
- }
-
- public int compareTo(final AMQQueue o)
- {
- return _name.compareTo(o.getName());
- }
-
- public AtomicInteger getAtomicQueueCount()
- {
- return _atomicQueueCount;
- }
-
- public AtomicLong getAtomicQueueSize()
- {
- return _atomicQueueSize;
- }
-
- private boolean isExclusiveSubscriber()
- {
- return _exclusiveSubscriber != null;
- }
-
- private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
- {
- _exclusiveSubscriber = exclusiveSubscriber;
- }
-
- public static interface QueueEntryFilter
- {
- public boolean accept(QueueEntry entry);
-
- public boolean filterComplete();
- }
-
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
- {
- return getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- return messageId >= fromMessageId && messageId <= toMessageId;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
- }
-
- public QueueEntry getMessageOnTheQueue(final long messageId)
- {
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
- private boolean _complete;
-
- public boolean accept(QueueEntry entry)
- {
- _complete = entry.getMessage().getMessageId() == messageId;
- return _complete;
- }
-
- public boolean filterComplete()
- {
- return _complete;
- }
- });
- return entries.isEmpty() ? null : entries.get(0);
- }
-
- public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
- {
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
- while (queueListIterator.advance() && !filter.filterComplete())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
- {
- entryList.add(node);
- }
- }
- return entryList;
-
- }
-
- public void moveMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String queueName,
- StoreContext storeContext)
- {
-
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- return (messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && entry.acquire();
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- try
- {
- store.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : entries)
- {
- AMQMessage message = entry.getMessage();
-
- if (message.isPersistent() && toQueue.isDurable())
- {
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
- }
- // dequeue does not decrement the refence count
- entry.dequeue(storeContext);
- }
-
- // Commit and flush the move transcations.
- try
- {
- store.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- store.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
-
- try
- {
- for (QueueEntry entry : entries)
- {
- toQueue.enqueue(storeContext, entry.getMessage());
-
- }
- }
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void copyMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String queueName,
- final StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
- {
- if (!entry.isDeleted())
- {
- return entry.getMessage().incrementReference();
- }
- }
-
- return false;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- try
- {
- store.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : entries)
- {
- AMQMessage message = entry.getMessage();
-
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
- {
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
- }
- }
-
- // Commit and flush the move transcations.
- try
- {
- store.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- store.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
-
- try
- {
- for (QueueEntry entry : entries)
- {
- if (entry.getMessage().isReferenced())
- {
- toQueue.enqueue(storeContext, entry.getMessage());
- }
- }
- }
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
- {
-
- try
- {
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
-
- final long messageId = node.getMessage().getMessageId();
-
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && !node.isDeleted()
- && node.acquire())
- {
- node.discard(storeContext);
- }
-
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- // ------ Management functions
-
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
- {
- QueueEntryIterator queueListIterator = _entries.iterator();
- boolean noDeletes = true;
-
- while (noDeletes && queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
- {
- node.discard(storeContext);
- noDeletes = false;
- }
-
- }
- }
-
- public long clearQueue(StoreContext storeContext) throws AMQException
- {
-
- QueueEntryIterator queueListIterator = _entries.iterator();
- long count = 0;
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
- {
- node.discard(storeContext);
- count++;
- }
-
- }
- return count;
-
- }
-
- public void addQueueDeleteTask(final Task task)
- {
- _deleteTaskList.add(task);
- }
-
- public int delete() throws AMQException
- {
- if (!_deleted.getAndSet(true))
- {
-
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
-
- while (subscriptionIter.advance())
- {
- Subscription s = subscriptionIter.getNode().getSubscription();
- if (s != null)
- {
- s.queueDeleted(this);
- }
- }
-
- _bindings.deregister();
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
-
- _managedObject.unregister();
- for (Task task : _deleteTaskList)
- {
- task.doTask(this);
- }
-
- _deleteTaskList.clear();
- stop();
- }
- return getMessageCount();
-
- }
-
- public void stop()
- {
- if (!_stopped.getAndSet(true))
- {
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
- }
- }
-
- public void deliverAsync()
- {
- _stateChangeCount.incrementAndGet();
-
- Runner runner = new Runner();
-
- if (_asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
- }
-
- public void deliverAsync(Subscription sub)
- {
- _asyncDelivery.execute(new SubFlushRunner(sub));
- }
-
- private class Runner implements ReadWriteRunnable
- {
- public void run()
- {
- try
- {
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
-
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
- }
-
- private class SubFlushRunner implements ReadWriteRunnable
- {
- private final Subscription _sub;
-
- public SubFlushRunner(Subscription sub)
- {
- _sub = sub;
- }
-
- public void run()
- {
- boolean complete = false;
- try
- {
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
-
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
- }
-
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
- }
-
- public void flushSubscription(Subscription sub) throws AMQException
- {
- flushSubscription(sub, Long.MAX_VALUE);
- }
-
- public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
- {
- boolean atTail = false;
-
- while (!sub.isSuspended() && !atTail && iterations != 0)
- {
- try
- {
- sub.getSendLock();
- atTail = attemptDelivery(sub);
- if (atTail && sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
- else if (!atTail)
- {
- iterations--;
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
-
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
- // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
- // which would give us memory "leak".
-
- if (!isExclusiveSubscriber())
- {
- advanceAllSubscriptions();
- }
- return atTail;
- }
-
- private boolean attemptDelivery(Subscription sub) throws AMQException
- {
- boolean atTail = false;
- boolean advanced = false;
- boolean subActive = sub.isActive();
- if (subActive)
- {
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
- {
- deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- }
- }
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
-
- }
- atTail = (_entries.next(node) == null) && !advanced;
- }
- return atTail || !subActive;
- }
-
- protected void advanceAllSubscriptions() throws AMQException
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
- {
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- Subscription sub = subNode.getSubscription();
- moveSubscriptionToNextNode(sub);
- }
- }
-
- private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
- throws AMQException
- {
- QueueEntry node = sub.getLastSeenEntry();
-
- while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
- {
- if (!node.isAcquired() && !node.isDeleted() && node.expired())
- {
- if (node.acquire())
- {
- final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
- }
- }
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- break;
- }
-
- }
- return node;
- }
-
- private void processQueue(Runnable runner) throws AMQException
- {
- long stateChangeCount;
- long previousStateChangeCount = Long.MIN_VALUE;
- boolean deliveryIncomplete = true;
-
- int extraLoops = 1;
- Long iterations = new Long(MAX_ASYNC_DELIVERIES);
-
- _asynchronousRunner.compareAndSet(runner, null);
-
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
- {
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
- // move forward in the next iteration
-
- if (previousStateChangeCount != stateChangeCount)
- {
- extraLoops = 1;
- }
-
- previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done = true;
-
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
- //iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
- {
- boolean closeConsumer = false;
- Subscription sub = subscriptionIter.getNode().getSubscription();
- sub.getSendLock();
- try
- {
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
- if (done)
- {
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
- if (sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
- }
- else
- {
- extraLoops--;
- }
- }
- else
- {
- iterations--;
- extraLoops = 1;
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- _asynchronousRunner.set(null);
- }
-
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
- // therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
- }
-
- public void removeExpiredIfNoSubscribers() throws AMQException
- {
-
- final StoreContext storeContext = new StoreContext();
-
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.expired() && node.acquire())
- {
-
- node.discard(storeContext);
- }
-
- }
-
- }
-
- public long getMinimumAlertRepeatGap()
- {
- return _minimumAlertRepeatGap;
- }
-
- public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
- {
- _minimumAlertRepeatGap = minimumAlertRepeatGap;
- }
-
- public long getMaximumMessageAge()
- {
- return _maximumMessageAge;
- }
-
- public void setMaximumMessageAge(long maximumMessageAge)
- {
- _maximumMessageAge = maximumMessageAge;
- if (maximumMessageAge == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- }
-
- public long getMaximumMessageCount()
- {
- return _maximumMessageCount;
- }
-
- public void setMaximumMessageCount(final long maximumMessageCount)
- {
- _maximumMessageCount = maximumMessageCount;
- if (maximumMessageCount == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
-
- }
-
- public long getMaximumQueueDepth()
- {
- return _maximumQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(final long maximumQueueDepth)
- {
- _maximumQueueDepth = maximumQueueDepth;
- if (maximumQueueDepth == 0L)
- {
- _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
-
- }
-
- public long getMaximumMessageSize()
- {
- return _maximumMessageSize;
- }
-
- public void setMaximumMessageSize(final long maximumMessageSize)
- {
- _maximumMessageSize = maximumMessageSize;
- if (maximumMessageSize == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- }
-
- public Set<NotificationCheck> getNotificationChecks()
- {
- return _notificationChecks;
- }
-
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
-
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
- {
- private final QueueEntry _entry;
- private final Subscription _sub;
-
- public QueueEntryListener(final Subscription sub, final QueueEntry entry)
- {
- _entry = entry;
- _sub = sub;
- }
-
- public boolean equals(Object o)
- {
- return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
- }
-
- public int hashCode()
- {
- return System.identityHashCode(_entry) ^ System.identityHashCode(_sub);
- }
-
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
- {
- entry.removeStateChangeListener(this);
- deliverAsync(_sub);
- }
- }
-
- public List<Long> getMessagesOnTheQueue(int num)
- {
- return getMessagesOnTheQueue(num, 0);
- }
-
- public List<Long> getMessagesOnTheQueue(int num, int offset)
- {
- ArrayList<Long> ids = new ArrayList<Long>(num);
- QueueEntryIterator it = _entries.iterator();
- for (int i = 0; i < offset; i++)
- {
- it.advance();
- }
-
- for (int i = 0; i < num && !it.atTail(); i++)
- {
- it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
- }
- return ids;
- }
-
- public void configure(Configuration queueConfiguration)
- {
- Configurator.configure(this, queueConfiguration);
- resetNotifications();
- }
-} \ No newline at end of file