diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 510 |
1 files changed, 240 insertions, 270 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 87d11a892e..f4a9794fcd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,15 +18,7 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; -import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager; -import org.apache.qpid.server.subscription.MessageGroupManager; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper +public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, + StateChangeListener<QueueConsumer, QueueConsumer.State>, + MessageGroupManager.ConsumerResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -98,9 +96,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final QueueEntryList<QueueEntry> _entries; - private final SubscriptionList _subscriptionList = new SubscriptionList(); + private final QueueConsumerList _consumerList = new QueueConsumerList(); - private volatile Subscription _exclusiveSubscriber; + private volatile QueueConsumer _exclusiveSubscriber; @@ -120,13 +118,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); - private final AtomicInteger _consumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicLong _unackedMsgCount = new AtomicLong(0); - private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); private final AtomicLong _unackedMsgBytes = new AtomicLong(); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -165,7 +157,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); private LogSubject _logSubject; @@ -187,8 +179,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private int _maximumDeliveryCount; private final MessageGroupManager _messageGroupManager; - private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = - new ArrayList<SubscriptionRegistrationListener>(); + private final Collection<ConsumerRegistrationListener> _consumerListeners = + new ArrayList<ConsumerRegistrationListener>(); private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; @@ -257,7 +249,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -388,11 +380,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _name; } - // ------ Manage Subscriptions + // ------ Manage Consumers - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) - throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive + + @Override + public synchronized QueueConsumer addConsumer(final ConsumerTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<Consumer.Option> optionSet) throws AMQException { + // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { @@ -400,58 +398,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - if (hasExclusiveSubscriber()) + if (hasExclusiveConsumer()) { - throw new ExistingExclusiveSubscription(); + throw new ExistingExclusiveConsumer(); } - if (exclusive && !subscription.isTransient()) + + boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + + if (exclusive && !isTransient && getConsumerCount() != 0) { - if (getConsumerCount() != 0) - { - throw new ExistingSubscriptionPreventsExclusive(); - } - else - { - _exclusiveSubscriber = subscription; - } + throw new ExistingConsumerPreventsExclusive(); + } + + QueueConsumer consumer = new QueueConsumer(filters, messageClass, + optionSet.contains(Consumer.Option.ACQUIRES), + optionSet.contains(Consumer.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + target.consumerAdded(consumer); + + + if (exclusive && !isTransient) + { + _exclusiveSubscriber = consumer; } - if(subscription.isActive()) + if(consumer.isActive()) { _activeSubscriberCount.incrementAndGet(); } - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); + + consumer.setStateListener(this); + consumer.setQueueContext(new QueueContext(_entries.getHead())); if (!isDeleted()) { - subscription.setQueue(this, exclusive); + consumer.setQueue(this, exclusive); if(_nolocal) { - subscription.setNoLocal(_nolocal); + consumer.setNoLocal(_nolocal); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionRegistered(this, subscription); + listener.consumerAdded(this, consumer); } } - _subscriptionList.add(subscription); - - //Increment consumerCountHigh if necessary. (un)registerSubscription are both - //synchronized methods so we don't need additional synchronization here - if(_consumerCountHigh.get() < getConsumerCount()) - { - _consumerCountHigh.incrementAndGet(); - } + _consumerList.add(consumer); if (isDeleted()) { - subscription.queueDeleted(this); + consumer.queueDeleted(); } } else @@ -459,42 +460,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // TODO } - deliverAsync(subscription); + deliverAsync(consumer); + + return consumer; } - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException + synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException { - if (subscription == null) + if (consumer == null) { - throw new NullPointerException("subscription argument is null"); + throw new NullPointerException("consumer argument is null"); } - boolean removed = _subscriptionList.remove(subscription); + boolean removed = _consumerList.remove(consumer); if (removed) { - subscription.close(); + consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - subscription.setQueueContext(null); + consumer.setQueueContext(null); + + if(!isDeleted() && isExclusive() && getConsumerCount() == 0) + { + setAuthorizationHolder(null); + } if(_messageGroupManager != null) { - resetSubPointersForGroups(subscription, true); + resetSubPointersForGroups(consumer, true); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionUnregistered(this, subscription); + listener.consumerRemoved(this, consumer); } } // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) + if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -503,57 +511,57 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getVirtualHost().removeQueue(this); - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); + // we need to manually fire the event to the removed consumer (which was the last one left for this + // queue. This is because the delete method uses the consumer set which has just been cleared + consumer.queueDeleted(); } } } - public Collection<Subscription> getConsumers() + public Collection<QueueConsumer> getConsumers() { - List<Subscription> consumers = new ArrayList<Subscription>(); - SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator(); + List<QueueConsumer> consumers = new ArrayList<QueueConsumer>(); + QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); while(iter.advance()) { - consumers.add(iter.getNode().getSubscription()); + consumers.add(iter.getNode().getConsumer()); } return consumers; } - public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.add(listener); + _consumerListeners.add(listener); } } - public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.remove(listener); + _consumerListeners.remove(listener); } } - public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments) + public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); if(clearAssignments) { - _messageGroupManager.clearAssignments(subscription); + _messageGroupManager.clearAssignments(consumer); } if(entry != null) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance()) { - Subscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -617,23 +625,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message) throws AMQException - { - enqueue(message, null); - } - - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException - { - enqueue(message, false, action); - } - public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException { - - if(transactional) - { - incrementTxnEnqueueStats(message); - } incrementQueueCount(); incrementQueueSize(message); @@ -641,35 +635,35 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes QueueEntry entry; - final Subscription exclusiveSub = _exclusiveSubscriber; + final QueueConsumer exclusiveSub = _exclusiveSubscriber; entry = _entries.add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_consumerList.updateMarkedNode(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); + node = _consumerList.getMarkedNode(); nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } } } @@ -683,13 +677,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (nextNode == null) { loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _consumerList.getHead(); } else { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + // if consumer at end, and active, offer + QueueConsumer sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); } nextNode = nextNode.findNext(); @@ -699,7 +693,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (entry.isAvailable()) { - checkSubscriptionsNotAheadOfDelivery(entry); + checkConsumersNotAheadOfDelivery(entry); if (exclusiveSub != null) { @@ -715,12 +709,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(action != null) { - action.onEnqueue(entry); + action.performAction(entry); } } - private void deliverToSubscription(final Subscription sub, final QueueEntry entry) + private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry) throws AMQException { @@ -729,14 +723,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes try { if (!sub.isSuspended() - && subscriptionReadyAndHasInterest(sub, entry) + && consumerReadyAndHasInterest(sub, entry) && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { if (sub.acquires() && !assign(sub, entry)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(entry); } else @@ -752,7 +746,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private boolean assign(final Subscription sub, final QueueEntry entry) + private boolean assign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null) { @@ -766,17 +760,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private boolean mightAssign(final Subscription sub, final QueueEntry entry) + private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null || !sub.acquires()) { return true; } - Subscription assigned = _messageGroupManager.getAssignedSubscription(entry); + QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); return (assigned == null) || (assigned == sub); } - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering // Simple Queues don't :-) @@ -810,19 +804,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getAtomicQueueCount().incrementAndGet(); } - private void incrementTxnEnqueueStats(final ServerMessage message) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - - private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) + private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -833,15 +815,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes sub.send(entry, batch); } - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException + private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException { return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); } - private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) + private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext subContext = sub.getQueueContext(); if (subContext != null) { QueueEntry releasedEntry = subContext.getReleasedEntry(); @@ -854,10 +836,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry) + private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext subContext = sub.getQueueContext(); if(subContext != null) { QueueEntry oldEntry; @@ -874,11 +856,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void requeue(QueueEntry entry) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance() && entry.isAvailable()) { - Subscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -891,20 +873,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void dequeue(QueueEntry entry, Subscription sub) + public void dequeue(QueueEntry entry, Consumer sub) { decrementQueueCount(); decrementQueueSize(entry); - if (entry.acquiredBySubscription()) + if (entry.acquiredByConsumer()) { _deliveredMessages.decrementAndGet(); } - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - checkCapacity(); } @@ -928,17 +905,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _dequeueCount.incrementAndGet(); } - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException + public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ + /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message + entry to resend and move back the consumer pointer. */ - subscription.getSendLock(); + consumer.getSendLock(); try { - if (!subscription.isClosed()) + if (!consumer.isClosed()) { - deliverMessage(subscription, entry, false); + deliverMessage((QueueConsumer) consumer, entry, false); return true; } else @@ -948,7 +925,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } finally { - subscription.releaseSendLock(); + consumer.releaseSendLock(); } } @@ -956,12 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public int getConsumerCount() { - return _subscriptionList.size(); - } - - public int getConsumerCountHigh() - { - return _consumerCountHigh.get(); + return _consumerList.size(); } public int getActiveConsumerCount() @@ -1039,16 +1011,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState) { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) + if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE) { _activeSubscriberCount.decrementAndGet(); } - else if (newState == Subscription.State.ACTIVE) + else if (newState == QueueConsumer.State.ACTIVE) { - if (oldState != Subscription.State.ACTIVE) + if (oldState != QueueConsumer.State.ACTIVE) { _activeSubscriberCount.incrementAndGet(); @@ -1072,12 +1044,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _atomicQueueSize; } - public boolean hasExclusiveSubscriber() + public boolean hasExclusiveConsumer() { return _exclusiveSubscriber != null; } - private void setExclusiveSubscriber(Subscription exclusiveSubscriber) + private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) { _exclusiveSubscriber = exclusiveSubscriber; } @@ -1093,9 +1065,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _entries; } - protected SubscriptionList getSubscriptionList() + protected QueueConsumerList getConsumerList() { - return _subscriptionList; + return _consumerList; } @@ -1300,12 +1272,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); } - public void addQueueDeleteTask(final Task task) + public void addQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Task task) + public void removeQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.remove(task); } @@ -1322,19 +1294,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (!_deleted.getAndSet(true)) { - for (Binding b : _bindings) + final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings); + + for (Binding b : bindingCopy) { b.getExchange().removeBinding(b); } - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - Subscription s = subscriptionIter.getNode().getSubscription(); + QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); if (s != null) { - s.queueDeleted(this); + s.queueDeleted(); } } @@ -1375,9 +1349,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - for (Task task : _deleteTaskList) + for (Action<AMQQueue> task : _deleteTaskList) { - task.doTask(this); + task.performAction(this); } _deleteTaskList.clear(); @@ -1461,7 +1435,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void deliverAsync(Subscription sub) + public void deliverAsync(QueueConsumer sub) { if(_exclusiveSubscriber == null) { @@ -1469,28 +1443,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) - { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); - } + SubFlushRunner flusher = sub.getRunner(); flusher.execute(_asyncDelivery); } } - public void flushSubscription(Subscription sub) throws AMQException + void flushConsumer(QueueConsumer sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { throw new AMQSecurityException("Permission denied: " + getName()); } - flushSubscription(sub, Long.MAX_VALUE); + flushConsumer(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException + boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1511,8 +1480,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes sub.getSendLock(); } - atTail = attemptDelivery(sub, true); - if (atTail && getNextAvailableEntry(sub) == null) + atTail = attemptDelivery((QueueConsumer)sub, true); + if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null) { queueEmpty = true; } @@ -1546,21 +1515,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the + // if there's (potentially) more than one consumer the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". - if (!hasExclusiveSubscriber()) + if (!hasExclusiveConsumer()) { - advanceAllSubscriptions(); + advanceAllConsumers(); } return atTail; } /** - * Attempt delivery for the given subscription. + * Attempt delivery for the given consumer. * - * Looks up the next node for the subscription and attempts to deliver it. + * Looks up the next node for the consumer and attempts to deliver it. * * * @param sub @@ -1568,7 +1537,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException + private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1587,7 +1556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (sub.acquires() && !assign(sub, node)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(node); } else @@ -1598,7 +1567,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else // Not enough Credit for message and wouldSuspend { - //QPID-1187 - Treat the subscription as suspended for this message + //QPID-1187 - Treat the consumer as suspended for this message // and wait for the message to be removed to continue delivery. subActive = false; node.addStateChangeListener(new QueueEntryListener(sub)); @@ -1611,13 +1580,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return atTail || !subActive; } - protected void advanceAllSubscriptions() throws AMQException + protected void advanceAllConsumers() throws AMQException { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + while (consumerNodeIterator.advance()) { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - Subscription sub = subNode.getSubscription(); + QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); + QueueConsumer sub = subNode.getConsumer(); if(sub.acquires()) { getNextAvailableEntry(sub); @@ -1629,10 +1598,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private QueueEntry getNextAvailableEntry(final Subscription sub) + private QueueEntry getNextAvailableEntry(final QueueConsumer sub) throws AMQException { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext context = sub.getQueueContext(); if(context != null) { QueueEntry lastSeen = context.getLastSeenEntry(); @@ -1670,9 +1639,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub) + public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub) { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext context = sub.getQueueContext(); if(context != null) { QueueEntry releasedNode = context.getReleasedEntry(); @@ -1689,14 +1658,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes * * A queue Runner is started whenever a state change occurs, e.g when a new * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are + * consumer (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to consumers unsuspending) which are * capable of accepting/delivering all messages then these messages would * otherwise remain on the queue. * * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue + * there are consumers that can deliver them. If there are no + * consumers capable of delivering the remaining messages on the queue * then processQueue should stop to prevent spinning. * * Since processQueue is runs in a fixed size Executor, it should not run @@ -1720,7 +1689,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - final int numSubs = _subscriptionList.size(); + final int numSubs = _consumerList.size(); final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); @@ -1731,8 +1700,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to + // we want to have one extra loop after every consumer has reached the point where it cannot move + // further, just in case the advance of one consumer in the last loop allows a different consumer to // move forward in the next iteration if (previousStateChangeCount != stateChangeCount) @@ -1744,14 +1713,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + boolean allConsumersDone = true; + boolean consumerDone; - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - Subscription sub = subscriptionIter.getNode().getSubscription(); + QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); sub.getSendLock(); try @@ -1759,8 +1728,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes for(int i = 0 ; i < perSub; i++) { //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub, true); - if (subscriptionDone) + consumerDone = attemptDelivery(sub, true); + if (consumerDone) { sub.flushBatched(); if (lastLoop && !sub.isSuspended()) @@ -1771,9 +1740,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - //this subscription can accept additional deliveries, so we must + //this consumer can accept additional deliveries, so we must //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; + allConsumersDone = false; lastLoop = false; if(--iterations == 0) { @@ -1792,24 +1761,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - if(allSubscriptionsDone && lastLoop) + if(allConsumersDone && lastLoop) { //We have done an extra loop already and there are again //again no further delivery attempts possible, only //keep going if state change demands it. deliveryIncomplete = false; } - else if(allSubscriptionsDone) + else if(allConsumersDone) { - //All subscriptions reported being done, but we have to do + //All consumers reported being done, but we have to do //an extra loop if the iterations are not exhausted and //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; + deliveryIncomplete = _consumerList.size() != 0; lastLoop = true; } else { - //some subscriptions can still accept more messages, + //some consumers can still accept more messages, //keep going if iteration count allows. lastLoop = false; deliveryIncomplete = true; @@ -1984,12 +1953,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _notificationChecks; } - private final class QueueEntryListener implements QueueEntry.StateChangeListener + private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State> { - private final Subscription _sub; + private final QueueConsumer _sub; - public QueueEntryListener(final Subscription sub) + public QueueEntryListener(final QueueConsumer sub) { _sub = sub; } @@ -2005,7 +1974,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return System.identityHashCode(_sub); } - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) + public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); deliverAsync(_sub); @@ -2076,26 +2045,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _dequeueSize.get(); } - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - public long getPersistentByteEnqueues() { return _persistentMessageEnqueueSize.get(); @@ -2123,11 +2072,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return getName(); } - public long getUnackedMessageCountHigh() - { - return _unackedMsgCountHigh.get(); - } - public long getUnackedMessageCount() { return _unackedMsgCount.get(); @@ -2146,17 +2090,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private void incrementUnackedMsgCount(QueueEntry entry) { - long unackedMsgCount = _unackedMsgCount.incrementAndGet(); + _unackedMsgCount.incrementAndGet(); _unackedMsgBytes.addAndGet(entry.getSize()); - - long unackedMsgCountHigh; - while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) - { - if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount)) - { - break; - } - } } public LogActor getLogActor() @@ -2224,4 +2159,39 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return (String) _arguments.get(Queue.DESCRIPTION); } + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + { + txn.enqueue(this,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + SimpleAMQQueue.this.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + + } + } |