diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 496 |
1 files changed, 203 insertions, 293 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d7eb304c92..d42bd6cf03 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -19,8 +19,10 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,12 +30,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import javax.management.JMException; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; @@ -52,7 +53,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -70,6 +70,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group"; @@ -77,11 +78,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // TODO - should make this configurable at the vhost / broker level private static final int DEFAULT_MAX_GROUPS = 255; - private final VirtualHost _virtualHost; private final AMQShortString _name; - private final String _resourceName; /** null means shared */ private final AMQShortString _owner; @@ -118,6 +117,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _dequeueCount = new AtomicLong(); private final AtomicLong _dequeueSize = new AtomicLong(); + private final AtomicLong _enqueueCount = new AtomicLong(); private final AtomicLong _enqueueSize = new AtomicLong(); private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong(); private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); @@ -130,6 +130,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicLong _unackedMsgCount = new AtomicLong(0); private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); + private final AtomicLong _unackedMsgBytes = new AtomicLong(); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -173,7 +174,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private LogSubject _logSubject; private LogActor _logActor; - private AMQQueueMBean _managedObject; private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; private boolean _nolocal; @@ -185,12 +185,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes //TODO : persist creation time private long _createTime = System.currentTimeMillis(); + private UUID _qmfId; private ConfigurationPlugin _queueConfiguration; /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); private final MessageGroupManager _messageGroupManager; + private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = + new ArrayList<SubscriptionRegistrationListener>(); + + private AMQQueue.NotificationListener _notificationListener; + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments); @@ -227,17 +234,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } _name = name; - _resourceName = String.valueOf(name); _durable = durable; _owner = owner; _autoDelete = autoDelete; _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments; + _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments); _id = id; - + _qmfId = getConfigStore().createId(); _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); @@ -255,16 +261,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getConfigStore().addConfiguredObject(this); - try - { - _managedObject = new AMQQueueMBean(this); - _managedObject.register(); - } - catch (JMException e) - { - _logger.error("AMQQueue MBean creation has failed ", e); - } - if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) { if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1")) @@ -302,7 +298,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void execute(Runnable runnable) { - _asyncDelivery.execute(runnable); + try + { + _asyncDelivery.execute(runnable); + } + catch (RejectedExecutionException ree) + { + if (_stopped.get()) + { + // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. + } + else + { + _logger.error("Unexpected rejected execution", ree); + throw ree; + } + } } public AMQShortString getNameShortString() @@ -320,6 +331,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _id; } + @Override + public UUID getQMFId() + { + return _qmfId; + } + public QueueConfigType getConfigType() { return QueueConfigType.getInstance(); @@ -339,15 +356,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { return _exclusive; } - - public void setExclusive(boolean exclusive) throws AMQException + + public void setExclusive(boolean exclusive) { _exclusive = exclusive; - - if(isDurable()) - { - getVirtualHost().getMessageStore().updateQueue(this); - } } public Exchange getAlternateExchange() @@ -368,22 +380,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _alternateExchange = exchange; } - public void setAlternateExchange(String exchangeName) - { - if(exchangeName == null || exchangeName.equals("")) - { - _alternateExchange = null; - return; - } - - Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName)); - if (exchange == null) - { - throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost."); - } - setAlternateExchange(exchange); - } - + /** + * Arguments used to create this queue. The caller is assured + * that null will never be returned. + */ public Map<String, Object> getArguments() { return _arguments; @@ -430,8 +430,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { throw new AMQSecurityException("Permission denied"); } - - + + if (hasExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); @@ -463,15 +463,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { subscription.setNoLocal(_nolocal); } + + synchronized (_subscriptionListeners) + { + for(SubscriptionRegistrationListener listener : _subscriptionListeners) + { + listener.subscriptionRegistered(this, subscription); + } + } + _subscriptionList.add(subscription); - + //Increment consumerCountHigh if necessary. (un)registerSubscription are both //synchronized methods so we don't need additional synchronization here if(_counsumerCountHigh.get() < getConsumerCount()) { _counsumerCountHigh.incrementAndGet(); } - + if (isDeleted()) { subscription.queueDeleted(this); @@ -507,6 +516,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes resetSubPointersForGroups(subscription, true); } + synchronized (_subscriptionListeners) + { + for(SubscriptionRegistrationListener listener : _subscriptionListeners) + { + listener.subscriptionUnregistered(this, subscription); + } + } + // auto-delete queues must be deleted if there are no remaining subscribers if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) @@ -526,6 +543,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } + public Collection<Subscription> getConsumers() + { + List<Subscription> consumers = new ArrayList<Subscription>(); + SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator(); + while(iter.advance()) + { + consumers.add(iter.getNode().getSubscription()); + } + return consumers; + + } + + public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + { + synchronized (_subscriptionListeners) + { + _subscriptionListeners.add(listener); + } + } + + public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + { + synchronized (_subscriptionListeners) + { + _subscriptionListeners.remove(listener); + } + } + public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments) { QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); @@ -576,10 +621,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes break; } } - + reconfigure(); } - + private void reconfigure() { //Reconfigure the queue for to reflect this new binding. @@ -604,7 +649,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void removeBinding(final Binding binding) { _bindings.remove(binding); - + reconfigure(); } @@ -718,10 +763,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } + checkForNotification(entry.getMessage()); if(action != null) { @@ -738,8 +780,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { try { - if (!sub.isSuspended() - && subscriptionReadyAndHasInterest(sub, entry) + if (!sub.isSuspended() + && subscriptionReadyAndHasInterest(sub, entry) && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { @@ -788,6 +830,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { long size = message.getSize(); getAtomicQueueSize().addAndGet(size); + _enqueueCount.incrementAndGet(); _enqueueSize.addAndGet(size); if(message.isPersistent() && isDurable()) { @@ -796,19 +839,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } + public long getTotalDequeueCount() + { + return _dequeueCount.get(); + } + + public long getTotalEnqueueCount() + { + return _enqueueCount.get(); + } + private void incrementQueueCount() { getAtomicQueueCount().incrementAndGet(); } - + private void incrementTxnEnqueueStats(final ServerMessage message) { _msgTxnEnqueues.incrementAndGet(); _byteTxnEnqueues.addAndGet(message.getSize()); } - + private void incrementTxnDequeueStats(QueueEntry entry) - { + { _msgTxnDequeues.incrementAndGet(); _byteTxnDequeues.addAndGet(entry.getSize()); } @@ -819,7 +872,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes setLastSeenEntry(sub, entry); _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(); + incrementUnackedMsgCount(entry); sub.send(entry, batch); } @@ -833,12 +886,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) { QueueContext subContext = (QueueContext) sub.getQueueContext(); - QueueEntry releasedEntry = subContext.getReleasedEntry(); - - QueueContext._lastSeenUpdater.set(subContext, entry); - if(releasedEntry == entry) + if (subContext != null) { - QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); + QueueEntry releasedEntry = subContext.getReleasedEntry(); + + QueueContext._lastSeenUpdater.set(subContext, entry); + if(releasedEntry == entry) + { + QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); + } } } @@ -887,7 +943,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { _deliveredMessages.decrementAndGet(); } - + if(sub != null && sub.isSessionTransactional()) { incrementTxnDequeueStats(entry); @@ -940,11 +996,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } + + public int getConsumerCount() { return _subscriptionList.size(); } - + public int getConsumerCountHigh() { return _counsumerCountHigh.get(); @@ -1148,7 +1206,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void visit(final Visitor visitor) + public void visit(final QueueEntryVisitor visitor) { QueueEntryIterator queueListIterator = _entries.iterator(); @@ -1195,192 +1253,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void moveMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String destinationQueueName) throws IllegalArgumentException - { - - final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return (messageId >= fromMessageId) - && (messageId <= toMessageId) - && entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - - final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - boolean shouldRollback = true; - try - { - // Move the messages in on the message store. - for (final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - txn.enqueue(toQueue, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - entry.release(); - } - }); - txn.dequeue(this, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - - } - }); - } - txn.commit(); - shouldRollback = false; - } - finally - { - if (shouldRollback) - { - txn.rollback(); - } - } - - } - - public void copyMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String destinationQueueName) throws IllegalArgumentException - { - final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return ((messageId >= fromMessageId) - && (messageId <= toMessageId)); - } - - public boolean filterComplete() - { - return false; - } - }); - - final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore()); - boolean shouldRollback = true; - try - { - // Copy the messages in on the message store. - for (QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - - txn.enqueue(toQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - } - }); - - } - - txn.commit(); - shouldRollback = false; - } - finally - { - if (shouldRollback) - { - txn.rollback(); - } - } - - } - - private AMQQueue getValidatedDestinationQueue(String queueName) - { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue can't be the same as the source queue"); - } - return toQueue; - } - - public void removeMessagesFromQueue(long fromMessageId, long toMessageId) - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - - final ServerMessage message = node.getMessage(); - if(message != null) - { - final long messageId = message.getMessageNumber(); - - if ((messageId >= fromMessageId) - && (messageId <= toMessageId) - && node.acquire()) - { - dequeueEntry(node); - } - } - } - - } - public void purge(final long request) throws AMQException { clear(request); @@ -1393,6 +1265,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // ------ Management functions + // TODO - now only used by the tests public void deleteMessageFromTop() { QueueEntryIterator queueListIterator = _entries.iterator(); @@ -1411,7 +1284,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } public long clearQueue() throws AMQException - { + { return clear(0l); } @@ -1422,7 +1295,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { throw new AMQSecurityException("Permission denied: queue " + getName()); } - + QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; @@ -1489,7 +1362,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { throw new AMQSecurityException("Permission denied: " + getName()); } - + if (!_deleted.getAndSet(true)) { @@ -1617,12 +1490,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes txn.commit(); - - if(_managedObject!=null) - { - _managedObject.unregister(); - } - for (Task task : _deleteTaskList) { task.doTask(this); @@ -2101,16 +1968,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - if (_managedObject != null) + // There is a chance that the node could be deleted by + // the time the check actually occurs. So verify we + // can actually get the message to perform the check. + ServerMessage msg = node.getMessage(); + if (msg != null) { - // There is a chance that the node could be deleted by - // the time the check actually occurs. So verify we - // can actually get the message to perform the check. - ServerMessage msg = node.getMessage(); - if (msg != null) - { - _managedObject.checkForNotification(msg); - } + checkForNotification(msg); } } } @@ -2235,11 +2099,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _notificationChecks; } - public ManagedObject getManagedObject() - { - return _managedObject; - } - private final class QueueEntryListener implements QueueEntry.StateChangeListener { @@ -2330,12 +2189,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _queueConfiguration; } - public String getResourceName() - { - return _resourceName; - } - - public ConfigStore getConfigStore() { return getVirtualHost().getConfigStore(); @@ -2355,22 +2208,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { return _dequeueSize.get(); } - + public long getByteTxnEnqueues() { return _byteTxnEnqueues.get(); } - + public long getByteTxnDequeues() { return _byteTxnDequeues.get(); } - + public long getMsgTxnEnqueues() { return _msgTxnEnqueues.get(); } - + public long getMsgTxnDequeues() { return _msgTxnDequeues.get(); @@ -2407,21 +2260,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { return _unackedMsgCountHigh.get(); } - + public long getUnackedMessageCount() { return _unackedMsgCount.get(); } - - public void decrementUnackedMsgCount() + + public long getUnackedMessageBytes() + { + return _unackedMsgBytes.get(); + } + + public void decrementUnackedMsgCount(QueueEntry queueEntry) { _unackedMsgCount.decrementAndGet(); + _unackedMsgBytes.addAndGet(-queueEntry.getSize()); } - - private void incrementUnackedMsgCount() + + private void incrementUnackedMsgCount(QueueEntry entry) { long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - + _unackedMsgBytes.addAndGet(entry.getSize()); + long unackedMsgCountHigh; while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) { @@ -2447,4 +2307,54 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _maximumDeliveryCount = maximumDeliveryCount; } + /** + * Checks if there is any notification to send to the listeners + */ + private void checkForNotification(ServerMessage<?> msg) throws AMQException + { + final Set<NotificationCheck> notificationChecks = getNotificationChecks(); + final AMQQueue.NotificationListener listener = _notificationListener; + + if(listener != null && !notificationChecks.isEmpty()) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) + { + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + { + if (check.notifyIfNecessary(msg, this, listener)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } + } + } + } + + public void setNotificationListener(AMQQueue.NotificationListener listener) + { + _notificationListener = listener; + } + + @Override + public void setDescription(String description) + { + if (description == null) + { + _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION); + } + else + { + _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description); + } + } + + @Override + public String getDescription() + { + return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION); + } + } |