diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 318 |
1 files changed, 159 insertions, 159 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 36dc04a5fc..78585997be 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -46,8 +46,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -56,8 +56,8 @@ import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueue implements AMQQueue, - StateChangeListener<QueueSubscription, Subscription.State>, - MessageGroupManager.SubscriptionResetHelper + StateChangeListener<QueueConsumer, Consumer.State>, + MessageGroupManager.ConsumerResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -92,9 +92,9 @@ public class SimpleAMQQueue implements AMQQueue, private final QueueEntryList<QueueEntry> _entries; - private final SubscriptionList _subscriptionList = new SubscriptionList(); + private final QueueConsumerList _consumerList = new QueueConsumerList(); - private volatile QueueSubscription _exclusiveSubscriber; + private volatile QueueConsumer _exclusiveSubscriber; @@ -177,8 +177,8 @@ public class SimpleAMQQueue implements AMQQueue, private int _maximumDeliveryCount; private final MessageGroupManager _messageGroupManager; - private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = - new ArrayList<SubscriptionRegistrationListener>(); + private final Collection<ConsumerRegistrationListener> _consumerListeners = + new ArrayList<ConsumerRegistrationListener>(); private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; @@ -247,7 +247,7 @@ public class SimpleAMQQueue implements AMQQueue, } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -378,15 +378,15 @@ public class SimpleAMQQueue implements AMQQueue, return _name; } - // ------ Manage Subscriptions + // ------ Manage Consumers @Override - public QueueSubscription registerSubscription(final SubscriptionTarget target, - final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final String consumerName, - EnumSet<Subscription.Option> optionSet) throws AMQException + public QueueConsumer addConsumer(final ConsumerTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<Consumer.Option> optionSet) throws AMQException { // Access control @@ -396,59 +396,59 @@ public class SimpleAMQQueue implements AMQQueue, } - if (hasExclusiveSubscriber()) + if (hasExclusiveConsumer()) { - throw new ExistingExclusiveSubscription(); + throw new ExistingExclusiveConsumer(); } - boolean exclusive = optionSet.contains(Subscription.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Subscription.Option.TRANSIENT); + boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); if (exclusive && !isTransient && getConsumerCount() != 0) { - throw new ExistingSubscriptionPreventsExclusive(); + throw new ExistingConsumerPreventsExclusive(); } - QueueSubscription subscription = new QueueSubscription(filters, messageClass, - optionSet.contains(Subscription.Option.ACQUIRES), - optionSet.contains(Subscription.Option.SEES_REQUEUES), - consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target); - target.subscriptionRegistered(subscription); + QueueConsumer consumer = new QueueConsumer(filters, messageClass, + optionSet.contains(Consumer.Option.ACQUIRES), + optionSet.contains(Consumer.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + target.consumerAdded(consumer); if (exclusive && !isTransient) { - _exclusiveSubscriber = subscription; + _exclusiveSubscriber = consumer; } - if(subscription.isActive()) + if(consumer.isActive()) { _activeSubscriberCount.incrementAndGet(); } - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); + consumer.setStateListener(this); + consumer.setQueueContext(new QueueContext(_entries.getHead())); if (!isDeleted()) { - subscription.setQueue(this, exclusive); + consumer.setQueue(this, exclusive); if(_nolocal) { - subscription.setNoLocal(_nolocal); + consumer.setNoLocal(_nolocal); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionRegistered(this, subscription); + listener.consumerAdded(this, consumer); } } - _subscriptionList.add(subscription); + _consumerList.add(consumer); - //Increment consumerCountHigh if necessary. (un)registerSubscription are both + //Increment consumerCountHigh if necessary. (un)registerConsumer are both //synchronized methods so we don't need additional synchronization here if(_consumerCountHigh.get() < getConsumerCount()) { @@ -457,7 +457,7 @@ public class SimpleAMQQueue implements AMQQueue, if (isDeleted()) { - subscription.queueDeleted(); + consumer.queueDeleted(); } } else @@ -465,27 +465,27 @@ public class SimpleAMQQueue implements AMQQueue, // TODO } - deliverAsync(subscription); + deliverAsync(consumer); - return subscription; + return consumer; } - synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException + synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException { - if (subscription == null) + if (consumer == null) { - throw new NullPointerException("subscription argument is null"); + throw new NullPointerException("consumer argument is null"); } - boolean removed = _subscriptionList.remove(subscription); + boolean removed = _consumerList.remove(consumer); if (removed) { - subscription.close(); + consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - subscription.setQueueContext(null); + consumer.setQueueContext(null); if(!isDeleted() && isExclusive() && getConsumerCount() == 0) { @@ -494,20 +494,20 @@ public class SimpleAMQQueue implements AMQQueue, if(_messageGroupManager != null) { - resetSubPointersForGroups(subscription, true); + resetSubPointersForGroups(consumer, true); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionUnregistered(this, subscription); + listener.consumerRemoved(this, consumer); } } // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) + if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -516,57 +516,57 @@ public class SimpleAMQQueue implements AMQQueue, getVirtualHost().removeQueue(this); - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(); + // we need to manually fire the event to the removed consumer (which was the last one left for this + // queue. This is because the delete method uses the consumer set which has just been cleared + consumer.queueDeleted(); } } } - public Collection<Subscription> getConsumers() + public Collection<Consumer> getConsumers() { - List<Subscription> consumers = new ArrayList<Subscription>(); - SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator(); + List<Consumer> consumers = new ArrayList<Consumer>(); + QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); while(iter.advance()) { - consumers.add(iter.getNode().getSubscription()); + consumers.add(iter.getNode().getConsumer()); } return consumers; } - public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.add(listener); + _consumerListeners.add(listener); } } - public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.remove(listener); + _consumerListeners.remove(listener); } } - public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments) + public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); if(clearAssignments) { - _messageGroupManager.clearAssignments(subscription); + _messageGroupManager.clearAssignments(consumer); } if(entry != null) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance()) { - QueueSubscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -644,35 +644,35 @@ public class SimpleAMQQueue implements AMQQueue, QueueEntry entry; - final QueueSubscription exclusiveSub = _exclusiveSubscriber; + final QueueConsumer exclusiveSub = _exclusiveSubscriber; entry = _entries.add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_consumerList.updateMarkedNode(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); + node = _consumerList.getMarkedNode(); nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } } } @@ -686,13 +686,13 @@ public class SimpleAMQQueue implements AMQQueue, if (nextNode == null) { loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _consumerList.getHead(); } else { - // if subscription at end, and active, offer - QueueSubscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + // if consumer at end, and active, offer + QueueConsumer sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); } nextNode = nextNode.findNext(); @@ -702,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue, if (entry.isAvailable()) { - checkSubscriptionsNotAheadOfDelivery(entry); + checkConsumersNotAheadOfDelivery(entry); if (exclusiveSub != null) { @@ -723,7 +723,7 @@ public class SimpleAMQQueue implements AMQQueue, } - private void deliverToSubscription(final QueueSubscription sub, final QueueEntry entry) + private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry) throws AMQException { @@ -732,14 +732,14 @@ public class SimpleAMQQueue implements AMQQueue, try { if (!sub.isSuspended() - && subscriptionReadyAndHasInterest(sub, entry) + && consumerReadyAndHasInterest(sub, entry) && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { if (sub.acquires() && !assign(sub, entry)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(entry); } else @@ -755,7 +755,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private boolean assign(final QueueSubscription sub, final QueueEntry entry) + private boolean assign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null) { @@ -769,17 +769,17 @@ public class SimpleAMQQueue implements AMQQueue, } } - private boolean mightAssign(final QueueSubscription sub, final QueueEntry entry) + private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null || !sub.acquires()) { return true; } - QueueSubscription assigned = _messageGroupManager.getAssignedSubscription(entry); + QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); return (assigned == null) || (assigned == sub); } - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering // Simple Queues don't :-) @@ -813,7 +813,7 @@ public class SimpleAMQQueue implements AMQQueue, getAtomicQueueCount().incrementAndGet(); } - private void deliverMessage(final QueueSubscription sub, final QueueEntry entry, boolean batch) + private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -824,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue, sub.send(entry, batch); } - private boolean subscriptionReadyAndHasInterest(final QueueSubscription sub, final QueueEntry entry) throws AMQException + private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException { return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); } - private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry) + private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry) { QueueContext subContext = sub.getQueueContext(); if (subContext != null) @@ -845,7 +845,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry) + private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry) { QueueContext subContext = sub.getQueueContext(); @@ -865,11 +865,11 @@ public class SimpleAMQQueue implements AMQQueue, public void requeue(QueueEntry entry) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance() && entry.isAvailable()) { - QueueSubscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -882,11 +882,11 @@ public class SimpleAMQQueue implements AMQQueue, } - public void dequeue(QueueEntry entry, Subscription sub) + public void dequeue(QueueEntry entry, Consumer sub) { decrementQueueCount(); decrementQueueSize(entry); - if (entry.acquiredBySubscription()) + if (entry.acquiredByConsumer()) { _deliveredMessages.decrementAndGet(); } @@ -914,17 +914,17 @@ public class SimpleAMQQueue implements AMQQueue, _dequeueCount.incrementAndGet(); } - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException + public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ + /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message + entry to resend and move back the consumer pointer. */ - subscription.getSendLock(); + consumer.getSendLock(); try { - if (!subscription.isClosed()) + if (!consumer.isClosed()) { - deliverMessage((QueueSubscription)subscription, entry, false); + deliverMessage((QueueConsumer) consumer, entry, false); return true; } else @@ -934,7 +934,7 @@ public class SimpleAMQQueue implements AMQQueue, } finally { - subscription.releaseSendLock(); + consumer.releaseSendLock(); } } @@ -942,7 +942,7 @@ public class SimpleAMQQueue implements AMQQueue, public int getConsumerCount() { - return _subscriptionList.size(); + return _consumerList.size(); } public int getConsumerCountHigh() @@ -1025,16 +1025,16 @@ public class SimpleAMQQueue implements AMQQueue, } - public void stateChanged(QueueSubscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(QueueConsumer sub, Consumer.State oldState, Consumer.State newState) { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) + if (oldState == Consumer.State.ACTIVE && newState != Consumer.State.ACTIVE) { _activeSubscriberCount.decrementAndGet(); } - else if (newState == Subscription.State.ACTIVE) + else if (newState == Consumer.State.ACTIVE) { - if (oldState != Subscription.State.ACTIVE) + if (oldState != Consumer.State.ACTIVE) { _activeSubscriberCount.incrementAndGet(); @@ -1058,12 +1058,12 @@ public class SimpleAMQQueue implements AMQQueue, return _atomicQueueSize; } - public boolean hasExclusiveSubscriber() + public boolean hasExclusiveConsumer() { return _exclusiveSubscriber != null; } - private void setExclusiveSubscriber(QueueSubscription exclusiveSubscriber) + private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) { _exclusiveSubscriber = exclusiveSubscriber; } @@ -1079,9 +1079,9 @@ public class SimpleAMQQueue implements AMQQueue, return _entries; } - protected SubscriptionList getSubscriptionList() + protected QueueConsumerList getConsumerList() { - return _subscriptionList; + return _consumerList; } @@ -1315,11 +1315,11 @@ public class SimpleAMQQueue implements AMQQueue, b.getExchange().removeBinding(b); } - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - QueueSubscription s = subscriptionIter.getNode().getSubscription(); + QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); if (s != null) { s.queueDeleted(); @@ -1449,7 +1449,7 @@ public class SimpleAMQQueue implements AMQQueue, } - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { if(_exclusiveSubscriber == null) { @@ -1463,17 +1463,17 @@ public class SimpleAMQQueue implements AMQQueue, } - public void flushSubscription(Subscription sub) throws AMQException + public void flushConsumer(Consumer sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { throw new AMQSecurityException("Permission denied: " + getName()); } - flushSubscription(sub, Long.MAX_VALUE); + flushConsumer(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException + public boolean flushConsumer(Consumer sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1494,8 +1494,8 @@ public class SimpleAMQQueue implements AMQQueue, sub.getSendLock(); } - atTail = attemptDelivery((QueueSubscription)sub, true); - if (atTail && getNextAvailableEntry((QueueSubscription)sub) == null) + atTail = attemptDelivery((QueueConsumer)sub, true); + if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null) { queueEmpty = true; } @@ -1529,21 +1529,21 @@ public class SimpleAMQQueue implements AMQQueue, } - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the + // if there's (potentially) more than one consumer the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". - if (!hasExclusiveSubscriber()) + if (!hasExclusiveConsumer()) { - advanceAllSubscriptions(); + advanceAllConsumers(); } return atTail; } /** - * Attempt delivery for the given subscription. + * Attempt delivery for the given consumer. * - * Looks up the next node for the subscription and attempts to deliver it. + * Looks up the next node for the consumer and attempts to deliver it. * * * @param sub @@ -1551,7 +1551,7 @@ public class SimpleAMQQueue implements AMQQueue, * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(QueueSubscription sub, boolean batch) throws AMQException + private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1570,7 +1570,7 @@ public class SimpleAMQQueue implements AMQQueue, if (sub.acquires() && !assign(sub, node)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(node); } else @@ -1581,7 +1581,7 @@ public class SimpleAMQQueue implements AMQQueue, } else // Not enough Credit for message and wouldSuspend { - //QPID-1187 - Treat the subscription as suspended for this message + //QPID-1187 - Treat the consumer as suspended for this message // and wait for the message to be removed to continue delivery. subActive = false; node.addStateChangeListener(new QueueEntryListener(sub)); @@ -1594,13 +1594,13 @@ public class SimpleAMQQueue implements AMQQueue, return atTail || !subActive; } - protected void advanceAllSubscriptions() throws AMQException + protected void advanceAllConsumers() throws AMQException { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + while (consumerNodeIterator.advance()) { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - QueueSubscription sub = subNode.getSubscription(); + QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); + QueueConsumer sub = subNode.getConsumer(); if(sub.acquires()) { getNextAvailableEntry(sub); @@ -1612,7 +1612,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private QueueEntry getNextAvailableEntry(final QueueSubscription sub) + private QueueEntry getNextAvailableEntry(final QueueConsumer sub) throws AMQException { QueueContext context = sub.getQueueContext(); @@ -1653,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub) + public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub) { QueueContext context = sub.getQueueContext(); if(context != null) @@ -1672,14 +1672,14 @@ public class SimpleAMQQueue implements AMQQueue, * * A queue Runner is started whenever a state change occurs, e.g when a new * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are + * consumer (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to consumers unsuspending) which are * capable of accepting/delivering all messages then these messages would * otherwise remain on the queue. * * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue + * there are consumers that can deliver them. If there are no + * consumers capable of delivering the remaining messages on the queue * then processQueue should stop to prevent spinning. * * Since processQueue is runs in a fixed size Executor, it should not run @@ -1703,7 +1703,7 @@ public class SimpleAMQQueue implements AMQQueue, boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - final int numSubs = _subscriptionList.size(); + final int numSubs = _consumerList.size(); final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); @@ -1714,8 +1714,8 @@ public class SimpleAMQQueue implements AMQQueue, // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to + // we want to have one extra loop after every consumer has reached the point where it cannot move + // further, just in case the advance of one consumer in the last loop allows a different consumer to // move forward in the next iteration if (previousStateChangeCount != stateChangeCount) @@ -1727,14 +1727,14 @@ public class SimpleAMQQueue implements AMQQueue, } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + boolean allConsumersDone = true; + boolean consumerDone; - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - QueueSubscription sub = subscriptionIter.getNode().getSubscription(); + QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); sub.getSendLock(); try @@ -1742,8 +1742,8 @@ public class SimpleAMQQueue implements AMQQueue, for(int i = 0 ; i < perSub; i++) { //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub, true); - if (subscriptionDone) + consumerDone = attemptDelivery(sub, true); + if (consumerDone) { sub.flushBatched(); if (lastLoop && !sub.isSuspended()) @@ -1754,9 +1754,9 @@ public class SimpleAMQQueue implements AMQQueue, } else { - //this subscription can accept additional deliveries, so we must + //this consumer can accept additional deliveries, so we must //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; + allConsumersDone = false; lastLoop = false; if(--iterations == 0) { @@ -1775,24 +1775,24 @@ public class SimpleAMQQueue implements AMQQueue, } } - if(allSubscriptionsDone && lastLoop) + if(allConsumersDone && lastLoop) { //We have done an extra loop already and there are again //again no further delivery attempts possible, only //keep going if state change demands it. deliveryIncomplete = false; } - else if(allSubscriptionsDone) + else if(allConsumersDone) { - //All subscriptions reported being done, but we have to do + //All consumers reported being done, but we have to do //an extra loop if the iterations are not exhausted and //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; + deliveryIncomplete = _consumerList.size() != 0; lastLoop = true; } else { - //some subscriptions can still accept more messages, + //some consumers can still accept more messages, //keep going if iteration count allows. lastLoop = false; deliveryIncomplete = true; @@ -1970,9 +1970,9 @@ public class SimpleAMQQueue implements AMQQueue, private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State> { - private final QueueSubscription _sub; + private final QueueConsumer _sub; - public QueueEntryListener(final QueueSubscription sub) + public QueueEntryListener(final QueueConsumer sub) { _sub = sub; } |