diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 655 |
1 files changed, 307 insertions, 348 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 87d11a892e..4450a3ed0c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,15 +18,7 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; -import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager; -import org.apache.qpid.server.subscription.MessageGroupManager; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper +abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>, + StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>, + MessageGroupManager.ConsumerResetHelper<E,Q,L> { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -96,11 +94,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private Exchange _alternateExchange; - private final QueueEntryList<QueueEntry> _entries; + private final L _entries; - private final SubscriptionList _subscriptionList = new SubscriptionList(); + private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>(); - private volatile Subscription _exclusiveSubscriber; + private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber; @@ -120,13 +118,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); - private final AtomicInteger _consumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicLong _unackedMsgCount = new AtomicLong(0); - private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); private final AtomicLong _unackedMsgBytes = new AtomicLong(); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -165,14 +157,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); private LogSubject _logSubject; private LogActor _logActor; - private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; - private boolean _nolocal; + private boolean _noLocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); private boolean _deleteOnNoConsumers; @@ -185,20 +176,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ private int _maximumDeliveryCount; - private final MessageGroupManager _messageGroupManager; + private final MessageGroupManager<E,Q,L> _messageGroupManager; - private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = - new ArrayList<SubscriptionRegistrationListener>(); + private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners = + new ArrayList<ConsumerRegistrationListener<Q>>(); private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) - { - this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); - } - protected SimpleAMQQueue(UUID id, String name, boolean durable, @@ -206,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, Map<String,Object> arguments) + QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments) { if (name == null) @@ -225,7 +211,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _autoDelete = autoDelete; _exclusive = exclusive; _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList(this); + _entries = entryListFactory.createQueueEntryList((Q)this); _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); _id = id; @@ -251,13 +237,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), + new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -289,21 +275,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } catch (RejectedExecutionException ree) { - if (_stopped.get()) - { - // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. - } - else + // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. + if(!_stopped.get()) { _logger.error("Unexpected rejected execution", ree); throw ree; + } + } } public void setNoLocal(boolean nolocal) { - _nolocal = nolocal; + _noLocal = nolocal; } public UUID getId() @@ -388,11 +373,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _name; } - // ------ Manage Subscriptions + // ------ Manage Consumers + - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) - throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive + @Override + public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<Consumer.Option> optionSet) throws AMQException { + // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { @@ -400,58 +391,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - if (hasExclusiveSubscriber()) + if (hasExclusiveConsumer()) { - throw new ExistingExclusiveSubscription(); + throw new ExistingExclusiveConsumer(); } - if (exclusive && !subscription.isTransient()) + + boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + + if (exclusive && !isTransient && getConsumerCount() != 0) { - if (getConsumerCount() != 0) - { - throw new ExistingSubscriptionPreventsExclusive(); - } - else - { - _exclusiveSubscriber = subscription; - } + throw new ExistingConsumerPreventsExclusive(); + } + + QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass, + optionSet.contains(Consumer.Option.ACQUIRES), + optionSet.contains(Consumer.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + target.consumerAdded(consumer); + + + if (exclusive && !isTransient) + { + _exclusiveSubscriber = consumer; } - if(subscription.isActive()) + if(consumer.isActive()) { _activeSubscriberCount.incrementAndGet(); } - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); + + consumer.setStateListener(this); + consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead())); if (!isDeleted()) { - subscription.setQueue(this, exclusive); - if(_nolocal) + consumer.setQueue((Q)this, exclusive); + if(_noLocal) { - subscription.setNoLocal(_nolocal); + consumer.setNoLocal(true); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener<Q> listener : _consumerListeners) { - listener.subscriptionRegistered(this, subscription); + listener.consumerAdded((Q)this, consumer); } } - _subscriptionList.add(subscription); - - //Increment consumerCountHigh if necessary. (un)registerSubscription are both - //synchronized methods so we don't need additional synchronization here - if(_consumerCountHigh.get() < getConsumerCount()) - { - _consumerCountHigh.incrementAndGet(); - } + _consumerList.add(consumer); if (isDeleted()) { - subscription.queueDeleted(this); + consumer.queueDeleted(); } } else @@ -459,42 +453,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // TODO } - deliverAsync(subscription); + deliverAsync(consumer); + + return consumer; } - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException + synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException { - if (subscription == null) + if (consumer == null) { - throw new NullPointerException("subscription argument is null"); + throw new NullPointerException("consumer argument is null"); } - boolean removed = _subscriptionList.remove(subscription); + boolean removed = _consumerList.remove(consumer); if (removed) { - subscription.close(); + consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - subscription.setQueueContext(null); + consumer.setQueueContext(null); + + if(!isDeleted() && isExclusive() && getConsumerCount() == 0) + { + setAuthorizationHolder(null); + } if(_messageGroupManager != null) { - resetSubPointersForGroups(subscription, true); + resetSubPointersForGroups(consumer, true); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener<Q> listener : _consumerListeners) { - listener.subscriptionUnregistered(this, subscription); + listener.consumerRemoved((Q)this, consumer); } } // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) + if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -503,57 +504,57 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getVirtualHost().removeQueue(this); - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); + // we need to manually fire the event to the removed consumer (which was the last one left for this + // queue. This is because the delete method uses the consumer set which has just been cleared + consumer.queueDeleted(); } } } - public Collection<Subscription> getConsumers() + public Collection<QueueConsumer<?,E,Q,L>> getConsumers() { - List<Subscription> consumers = new ArrayList<Subscription>(); - SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator(); + List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator(); while(iter.advance()) { - consumers.add(iter.getNode().getSubscription()); + consumers.add(iter.getNode().getConsumer()); } return consumers; } - public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.add(listener); + _consumerListeners.add(listener); } } - public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.remove(listener); + _consumerListeners.remove(listener); } } - public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments) + public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); if(clearAssignments) { - _messageGroupManager.clearAssignments(subscription); + _messageGroupManager.clearAssignments(consumer); } if(entry != null) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance()) { - Subscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -591,11 +592,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - public int getBindingCountHigh() - { - return _bindingCountHigh.get(); - } - public void removeBinding(final Binding binding) { _bindings.remove(binding); @@ -617,59 +613,45 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message) throws AMQException - { - enqueue(message, null); - } - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException { - enqueue(message, false, action); - } - - public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException - { - - if(transactional) - { - incrementTxnEnqueueStats(message); - } incrementQueueCount(); incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); - QueueEntry entry; - final Subscription exclusiveSub = _exclusiveSubscriber; + E entry; + final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber; entry = _entries.add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_consumerList.updateMarkedNode(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); + node = _consumerList.getMarkedNode(); nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } } } @@ -683,13 +665,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (nextNode == null) { loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _consumerList.getHead(); } else { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + // if consumer at end, and active, offer + QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); } nextNode = nextNode.findNext(); @@ -699,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (entry.isAvailable()) { - checkSubscriptionsNotAheadOfDelivery(entry); + checkConsumersNotAheadOfDelivery(entry); if (exclusiveSub != null) { @@ -715,12 +697,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(action != null) { - action.onEnqueue(entry); + action.performAction(entry); } } - private void deliverToSubscription(final Subscription sub, final QueueEntry entry) + private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException { @@ -729,14 +711,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes try { if (!sub.isSuspended() - && subscriptionReadyAndHasInterest(sub, entry) + && consumerReadyAndHasInterest(sub, entry) && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { if (sub.acquires() && !assign(sub, entry)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(entry); } else @@ -752,7 +734,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private boolean assign(final Subscription sub, final QueueEntry entry) + private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry) { if(_messageGroupManager == null) { @@ -766,17 +748,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private boolean mightAssign(final Subscription sub, final QueueEntry entry) + private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry) { if(_messageGroupManager == null || !sub.acquires()) { return true; } - Subscription assigned = _messageGroupManager.getAssignedSubscription(entry); + QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); return (assigned == null) || (assigned == sub); } - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final E entry) { // This method is only required for queues which mess with ordering // Simple Queues don't :-) @@ -810,19 +792,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getAtomicQueueCount().incrementAndGet(); } - private void incrementTxnEnqueueStats(final ServerMessage message) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - - private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) + private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -833,18 +803,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes sub.send(entry, batch); } - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException + private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException { return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); } - private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) + private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext<E,Q,L> subContext = sub.getQueueContext(); if (subContext != null) { - QueueEntry releasedEntry = subContext.getReleasedEntry(); + E releasedEntry = subContext.getReleasedEntry(); QueueContext._lastSeenUpdater.set(subContext, entry); if(releasedEntry == entry) @@ -854,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry) + private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext<E,Q,L> subContext = sub.getQueueContext(); if(subContext != null) { - QueueEntry oldEntry; + E oldEntry; while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0) { @@ -872,13 +842,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - public void requeue(QueueEntry entry) + public void requeue(E entry) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance() && entry.isAvailable()) { - Subscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -891,25 +861,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void dequeue(QueueEntry entry, Subscription sub) + @Override + public void dequeue(E entry) { decrementQueueCount(); decrementQueueSize(entry); - if (entry.acquiredBySubscription()) + if (entry.acquiredByConsumer()) { _deliveredMessages.decrementAndGet(); } - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - checkCapacity(); } - private void decrementQueueSize(final QueueEntry entry) + private void decrementQueueSize(final E entry) { final ServerMessage message = entry.getMessage(); long size = message.getSize(); @@ -928,17 +894,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _dequeueCount.incrementAndGet(); } - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException + public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ + /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message + entry to resend and move back the consumer pointer. */ - subscription.getSendLock(); + consumer.getSendLock(); try { - if (!subscription.isClosed()) + if (!consumer.isClosed()) { - deliverMessage(subscription, entry, false); + deliverMessage(consumer, entry, false); return true; } else @@ -948,7 +914,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } finally { - subscription.releaseSendLock(); + consumer.releaseSendLock(); } } @@ -956,12 +922,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public int getConsumerCount() { - return _subscriptionList.size(); - } - - public int getConsumerCountHigh() - { - return _consumerCountHigh.get(); + return _consumerList.size(); } public int getActiveConsumerCount() @@ -1009,11 +970,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public long getOldestMessageArrivalTime() { - QueueEntry entry = getOldestQueueEntry(); + E entry = getOldestQueueEntry(); return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); } - protected QueueEntry getOldestQueueEntry() + protected E getOldestQueueEntry() { return _entries.next(_entries.getHead()); } @@ -1023,13 +984,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _deleted.get(); } - public List<QueueEntry> getMessagesOnTheQueue() + public List<E> getMessagesOnTheQueue() { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + ArrayList<E> entryList = new ArrayList<E>(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node != null && !node.isDeleted()) { entryList.add(node); @@ -1039,16 +1000,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState) { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) + if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE) { _activeSubscriberCount.decrementAndGet(); } - else if (newState == Subscription.State.ACTIVE) + else if (newState == QueueConsumer.State.ACTIVE) { - if (oldState != Subscription.State.ACTIVE) + if (oldState != QueueConsumer.State.ACTIVE) { _activeSubscriberCount.incrementAndGet(); @@ -1057,7 +1018,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - public int compareTo(final AMQQueue o) + public int compareTo(final Q o) { return _name.compareTo(o.getName()); } @@ -1072,12 +1033,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _atomicQueueSize; } - public boolean hasExclusiveSubscriber() + public boolean hasExclusiveConsumer() { return _exclusiveSubscriber != null; } - private void setExclusiveSubscriber(Subscription exclusiveSubscriber) + private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber) { _exclusiveSubscriber = exclusiveSubscriber; } @@ -1088,32 +1049,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - protected QueueEntryList getEntries() + protected L getEntries() { return _entries; } - protected SubscriptionList getSubscriptionList() + protected QueueConsumerList<E,Q,L> getConsumerList() { - return _subscriptionList; + return _consumerList; } - public static interface QueueEntryFilter + public static interface QueueEntryFilter<E extends QueueEntry> { - public boolean accept(QueueEntry entry); + public boolean accept(E entry); public boolean filterComplete(); } - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) + public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) { - return getMessagesOnTheQueue(new QueueEntryFilter() + return getMessagesOnTheQueue(new QueueEntryFilter<E>() { - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { final long messageId = entry.getMessage().getMessageNumber(); return messageId >= fromMessageId && messageId <= toMessageId; @@ -1126,13 +1087,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); } - public QueueEntry getMessageOnTheQueue(final long messageId) + public E getMessageOnTheQueue(final long messageId) { - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>() { private boolean _complete; - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { _complete = entry.getMessage().getMessageNumber() == messageId; return _complete; @@ -1146,13 +1107,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return entries.isEmpty() ? null : entries.get(0); } - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) + public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter) { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + ArrayList<E> entryList = new ArrayList<E>(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance() && !filter.filterComplete()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); @@ -1162,13 +1123,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void visit(final QueueEntryVisitor visitor) + public void visit(final QueueEntryVisitor<E> visitor) { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while(queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if(!node.isDeleted()) { @@ -1185,17 +1146,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes * * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition - * @param toPosition - * @return + * @param fromPosition first message position + * @param toPosition last message position + * @return list of messages */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) + public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) { - return getMessagesOnTheQueue(new QueueEntryFilter() + return getMessagesOnTheQueue(new QueueEntryFilter<E>() { private long position = 0; - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { position++; return (position >= fromPosition) && (position <= toPosition); @@ -1224,12 +1185,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // TODO - now only used by the tests public void deleteMessageFromTop() { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); boolean noDeletes = true; while (noDeletes && queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node.acquire()) { dequeueEntry(node); @@ -1252,14 +1213,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes throw new AMQSecurityException("Permission denied: queue " + getName()); } - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); long count = 0; ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node.acquire()) { dequeueEntry(node, txn); @@ -1276,13 +1237,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return count; } - private void dequeueEntry(final QueueEntry node) + private void dequeueEntry(final E node) { ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); dequeueEntry(node, txn); } - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) + private void dequeueEntry(final E node, ServerTransaction txn) { txn.dequeue(this, node.getMessage(), new ServerTransaction.Action() @@ -1300,18 +1261,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); } - public void addQueueDeleteTask(final Task task) + public void addQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Task task) + public void removeQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.remove(task); } // TODO list all thrown exceptions - public int delete() throws AMQSecurityException, AMQException + public int delete() throws AMQException { // Check access if (!_virtualHost.getSecurityManager().authoriseDelete(this)) @@ -1322,27 +1283,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (!_deleted.getAndSet(true)) { - for (Binding b : _bindings) + final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings); + + for (Binding b : bindingCopy) { b.getExchange().removeBinding(b); } - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - Subscription s = subscriptionIter.getNode().getSubscription(); + QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); if (s != null) { - s.queueDeleted(this); + s.queueDeleted(); } } - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>() { - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { return entry.acquire(); } @@ -1356,7 +1319,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - for(final QueueEntry entry : entries) + for(final E entry : entries) { // TODO log requeues with a post enqueue action int requeues = entry.routeToAlternate(null, txn); @@ -1375,9 +1338,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - for (Task task : _deleteTaskList) + for (Action<AMQQueue> task : _deleteTaskList) { - task.doTask(this); + task.performAction(this); } _deleteTaskList.clear(); @@ -1461,7 +1424,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void deliverAsync(Subscription sub) + public void deliverAsync(QueueConsumer<?,E,Q,L> sub) { if(_exclusiveSubscriber == null) { @@ -1469,28 +1432,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) - { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); - } + SubFlushRunner flusher = sub.getRunner(); flusher.execute(_asyncDelivery); } } - public void flushSubscription(Subscription sub) throws AMQException + void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { throw new AMQSecurityException("Permission denied: " + getName()); } - flushSubscription(sub, Long.MAX_VALUE); + flushConsumer(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException + boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1546,29 +1504,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the + // if there's (potentially) more than one consumer the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". - if (!hasExclusiveSubscriber()) + if (!hasExclusiveConsumer()) { - advanceAllSubscriptions(); + advanceAllConsumers(); } return atTail; } /** - * Attempt delivery for the given subscription. + * Attempt delivery for the given consumer. * - * Looks up the next node for the subscription and attempts to deliver it. + * Looks up the next node for the consumer and attempts to deliver it. * * - * @param sub - * @param batch + * @param sub the consumer + * @param batch true if processing can be batched * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException + private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1576,7 +1534,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (subActive) { - QueueEntry node = getNextAvailableEntry(sub); + E node = getNextAvailableEntry(sub); if (node != null && node.isAvailable()) { @@ -1587,7 +1545,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (sub.acquires() && !assign(sub, node)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(node); } else @@ -1598,7 +1556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else // Not enough Credit for message and wouldSuspend { - //QPID-1187 - Treat the subscription as suspended for this message + //QPID-1187 - Treat the consumer as suspended for this message // and wait for the message to be removed to continue delivery. subActive = false; node.addStateChangeListener(new QueueEntryListener(sub)); @@ -1611,13 +1569,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return atTail || !subActive; } - protected void advanceAllSubscriptions() throws AMQException + protected void advanceAllConsumers() throws AMQException { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) + QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator(); + while (consumerNodeIterator.advance()) { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - Subscription sub = subNode.getSubscription(); + QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode(); + QueueConsumer<?,E,Q,L> sub = subNode.getConsumer(); if(sub.acquires()) { getNextAvailableEntry(sub); @@ -1629,16 +1587,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - private QueueEntry getNextAvailableEntry(final Subscription sub) + private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub) throws AMQException { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext<E,Q,L> context = sub.getQueueContext(); if(context != null) { - QueueEntry lastSeen = context.getLastSeenEntry(); - QueueEntry releasedNode = context.getReleasedEntry(); + E lastSeen = context.getLastSeenEntry(); + E releasedNode = context.getReleasedEntry(); - QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); + E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || @@ -1670,12 +1628,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub) + public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub) { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext<E,Q,L> context = sub.getQueueContext(); if(context != null) { - QueueEntry releasedNode = context.getReleasedEntry(); + E releasedNode = context.getReleasedEntry(); return releasedNode != null && releasedNode.compareTo(entry) < 0; } else @@ -1689,14 +1647,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes * * A queue Runner is started whenever a state change occurs, e.g when a new * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are + * consumer (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to consumers unsuspending) which are * capable of accepting/delivering all messages then these messages would * otherwise remain on the queue. * * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue + * there are consumers that can deliver them. If there are no + * consumers capable of delivering the remaining messages on the queue * then processQueue should stop to prevent spinning. * * Since processQueue is runs in a fixed size Executor, it should not run @@ -1712,7 +1670,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes */ public long processQueue(QueueRunner runner) throws AMQException { - long stateChangeCount = Long.MIN_VALUE; + long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; long rVal = Long.MIN_VALUE; boolean deliveryIncomplete = true; @@ -1720,7 +1678,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - final int numSubs = _subscriptionList.size(); + final int numSubs = _consumerList.size(); final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); @@ -1731,8 +1689,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to + // we want to have one extra loop after every consumer has reached the point where it cannot move + // further, just in case the advance of one consumer in the last loop allows a different consumer to // move forward in the next iteration if (previousStateChangeCount != stateChangeCount) @@ -1744,14 +1702,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + boolean allConsumersDone = true; + boolean consumerDone; - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator(); //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - Subscription sub = subscriptionIter.getNode().getSubscription(); + QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer(); sub.getSendLock(); try @@ -1759,8 +1717,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes for(int i = 0 ; i < perSub; i++) { //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub, true); - if (subscriptionDone) + consumerDone = attemptDelivery(sub, true); + if (consumerDone) { sub.flushBatched(); if (lastLoop && !sub.isSuspended()) @@ -1771,9 +1729,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } else { - //this subscription can accept additional deliveries, so we must + //this consumer can accept additional deliveries, so we must //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; + allConsumersDone = false; lastLoop = false; if(--iterations == 0) { @@ -1792,24 +1750,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - if(allSubscriptionsDone && lastLoop) + if(allConsumersDone && lastLoop) { //We have done an extra loop already and there are again //again no further delivery attempts possible, only //keep going if state change demands it. deliveryIncomplete = false; } - else if(allSubscriptionsDone) + else if(allConsumersDone) { - //All subscriptions reported being done, but we have to do + //All consumers reported being done, but we have to do //an extra loop if the iterations are not exhausted and //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; + deliveryIncomplete = _consumerList.size() != 0; lastLoop = true; } else { - //some subscriptions can still accept more messages, + //some consumers can still accept more messages, //keep going if iteration count allows. lastLoop = false; deliveryIncomplete = true; @@ -1833,11 +1791,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void checkMessageStatus() throws AMQException { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); // Only process nodes that are not currently deleted and not dequeued if (!node.isDeleted()) { @@ -1984,12 +1942,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _notificationChecks; } - private final class QueueEntryListener implements QueueEntry.StateChangeListener + private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State> { - private final Subscription _sub; + private final QueueConsumer<?,E,Q,L> _sub; - public QueueEntryListener(final Subscription sub) + public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub) { _sub = sub; } @@ -2005,7 +1963,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return System.identityHashCode(_sub); } - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) + public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); deliverAsync(_sub); @@ -2076,26 +2034,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _dequeueSize.get(); } - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - public long getPersistentByteEnqueues() { return _persistentMessageEnqueueSize.get(); @@ -2123,11 +2061,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return getName(); } - public long getUnackedMessageCountHigh() - { - return _unackedMsgCountHigh.get(); - } - public long getUnackedMessageCount() { return _unackedMsgCount.get(); @@ -2138,25 +2071,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _unackedMsgBytes.get(); } - public void decrementUnackedMsgCount(QueueEntry queueEntry) + public void decrementUnackedMsgCount(E queueEntry) { _unackedMsgCount.decrementAndGet(); _unackedMsgBytes.addAndGet(-queueEntry.getSize()); } - private void incrementUnackedMsgCount(QueueEntry entry) + private void incrementUnackedMsgCount(E entry) { - long unackedMsgCount = _unackedMsgCount.incrementAndGet(); + _unackedMsgCount.incrementAndGet(); _unackedMsgBytes.addAndGet(entry.getSize()); - - long unackedMsgCountHigh; - while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) - { - if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount)) - { - break; - } - } } public LogActor getLogActor() @@ -2224,4 +2148,39 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return (String) _arguments.get(Queue.DESCRIPTION); } + public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) + { + txn.enqueue(this,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + SimpleAMQQueue.this.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + + } + } |