summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java318
1 files changed, 159 insertions, 159 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 36dc04a5fc..78585997be 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -46,8 +46,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -56,8 +56,8 @@ import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueue implements AMQQueue,
- StateChangeListener<QueueSubscription, Subscription.State>,
- MessageGroupManager.SubscriptionResetHelper
+ StateChangeListener<QueueConsumer, Consumer.State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -92,9 +92,9 @@ public class SimpleAMQQueue implements AMQQueue,
private final QueueEntryList<QueueEntry> _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile QueueSubscription _exclusiveSubscriber;
+ private volatile QueueConsumer _exclusiveSubscriber;
@@ -177,8 +177,8 @@ public class SimpleAMQQueue implements AMQQueue,
private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -247,7 +247,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -378,15 +378,15 @@ public class SimpleAMQQueue implements AMQQueue,
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
@Override
- public QueueSubscription registerSubscription(final SubscriptionTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- EnumSet<Subscription.Option> optionSet) throws AMQException
+ public QueueConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
// Access control
@@ -396,59 +396,59 @@ public class SimpleAMQQueue implements AMQQueue,
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- boolean exclusive = optionSet.contains(Subscription.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(Subscription.Option.TRANSIENT);
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
if (exclusive && !isTransient && getConsumerCount() != 0)
{
- throw new ExistingSubscriptionPreventsExclusive();
+ throw new ExistingConsumerPreventsExclusive();
}
- QueueSubscription subscription = new QueueSubscription(filters, messageClass,
- optionSet.contains(Subscription.Option.ACQUIRES),
- optionSet.contains(Subscription.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target);
- target.subscriptionRegistered(subscription);
+ QueueConsumer consumer = new QueueConsumer(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
if (exclusive && !isTransient)
{
- _exclusiveSubscriber = subscription;
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
+ consumer.setQueue(this, exclusive);
if(_nolocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(_nolocal);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded(this, consumer);
}
}
- _subscriptionList.add(subscription);
+ _consumerList.add(consumer);
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
+ //Increment consumerCountHigh if necessary. (un)registerConsumer are both
//synchronized methods so we don't need additional synchronization here
if(_consumerCountHigh.get() < getConsumerCount())
{
@@ -457,7 +457,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (isDeleted())
{
- subscription.queueDeleted();
+ consumer.queueDeleted();
}
}
else
@@ -465,27 +465,27 @@ public class SimpleAMQQueue implements AMQQueue,
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
- return subscription;
+ return consumer;
}
- synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
{
@@ -494,20 +494,20 @@ public class SimpleAMQQueue implements AMQQueue,
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved(this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -516,57 +516,57 @@ public class SimpleAMQQueue implements AMQQueue,
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted();
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<Consumer> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<Consumer> consumers = new ArrayList<Consumer>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -644,35 +644,35 @@ public class SimpleAMQQueue implements AMQQueue,
QueueEntry entry;
- final QueueSubscription exclusiveSub = _exclusiveSubscriber;
+ final QueueConsumer exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -686,13 +686,13 @@ public class SimpleAMQQueue implements AMQQueue,
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- QueueSubscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -702,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -723,7 +723,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
- private void deliverToSubscription(final QueueSubscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
throws AMQException
{
@@ -732,14 +732,14 @@ public class SimpleAMQQueue implements AMQQueue,
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -755,7 +755,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private boolean assign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -769,17 +769,17 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private boolean mightAssign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- QueueSubscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -813,7 +813,7 @@ public class SimpleAMQQueue implements AMQQueue,
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueSubscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -824,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue,
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final QueueSubscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
if (subContext != null)
@@ -845,7 +845,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
@@ -865,11 +865,11 @@ public class SimpleAMQQueue implements AMQQueue,
public void requeue(QueueEntry entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -882,11 +882,11 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
@@ -914,17 +914,17 @@ public class SimpleAMQQueue implements AMQQueue,
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage((QueueSubscription)subscription, entry, false);
+ deliverMessage((QueueConsumer) consumer, entry, false);
return true;
}
else
@@ -934,7 +934,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -942,7 +942,7 @@ public class SimpleAMQQueue implements AMQQueue,
public int getConsumerCount()
{
- return _subscriptionList.size();
+ return _consumerList.size();
}
public int getConsumerCountHigh()
@@ -1025,16 +1025,16 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void stateChanged(QueueSubscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer sub, Consumer.State oldState, Consumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == Consumer.State.ACTIVE && newState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == Consumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1058,12 +1058,12 @@ public class SimpleAMQQueue implements AMQQueue,
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueSubscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1079,9 +1079,9 @@ public class SimpleAMQQueue implements AMQQueue,
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
@@ -1315,11 +1315,11 @@ public class SimpleAMQQueue implements AMQQueue,
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
s.queueDeleted();
@@ -1449,7 +1449,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1463,17 +1463,17 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ public boolean flushConsumer(Consumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1494,8 +1494,8 @@ public class SimpleAMQQueue implements AMQQueue,
sub.getSendLock();
}
- atTail = attemptDelivery((QueueSubscription)sub, true);
- if (atTail && getNextAvailableEntry((QueueSubscription)sub) == null)
+ atTail = attemptDelivery((QueueConsumer)sub, true);
+ if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
{
queueEmpty = true;
}
@@ -1529,21 +1529,21 @@ public class SimpleAMQQueue implements AMQQueue,
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub
@@ -1551,7 +1551,7 @@ public class SimpleAMQQueue implements AMQQueue,
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(QueueSubscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1570,7 +1570,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1581,7 +1581,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1594,13 +1594,13 @@ public class SimpleAMQQueue implements AMQQueue,
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- QueueSubscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1612,7 +1612,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private QueueEntry getNextAvailableEntry(final QueueSubscription sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
throws AMQException
{
QueueContext context = sub.getQueueContext();
@@ -1653,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
@@ -1672,14 +1672,14 @@ public class SimpleAMQQueue implements AMQQueue,
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1703,7 +1703,7 @@ public class SimpleAMQQueue implements AMQQueue,
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1714,8 +1714,8 @@ public class SimpleAMQQueue implements AMQQueue,
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1727,14 +1727,14 @@ public class SimpleAMQQueue implements AMQQueue,
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1742,8 +1742,8 @@ public class SimpleAMQQueue implements AMQQueue,
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1754,9 +1754,9 @@ public class SimpleAMQQueue implements AMQQueue,
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1775,24 +1775,24 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1970,9 +1970,9 @@ public class SimpleAMQQueue implements AMQQueue,
private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
- private final QueueSubscription _sub;
+ private final QueueConsumer _sub;
- public QueueEntryListener(final QueueSubscription sub)
+ public QueueEntryListener(final QueueConsumer sub)
{
_sub = sub;
}