summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java655
1 files changed, 307 insertions, 348 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 87d11a892e..4450a3ed0c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,15 +18,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
-import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
+ StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
+ MessageGroupManager.ConsumerResetHelper<E,Q,L>
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -96,11 +94,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private Exchange _alternateExchange;
- private final QueueEntryList<QueueEntry> _entries;
+ private final L _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
- private volatile Subscription _exclusiveSubscriber;
+ private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
@@ -120,13 +118,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
- private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
- private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -165,14 +157,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
private LogActor _logActor;
- private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
- private boolean _nolocal;
+ private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private boolean _deleteOnNoConsumers;
@@ -185,20 +176,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount;
- private final MessageGroupManager _messageGroupManager;
+ private final MessageGroupManager<E,Q,L> _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener<Q>>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
- public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
- {
- this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
- }
-
protected SimpleAMQQueue(UUID id,
String name,
boolean durable,
@@ -206,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+ QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
{
if (name == null)
@@ -225,7 +211,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_autoDelete = autoDelete;
_exclusive = exclusive;
_virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList(this);
+ _entries = entryListFactory.createQueueEntryList((Q)this);
_arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
@@ -251,13 +237,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+ new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -289,21 +275,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
catch (RejectedExecutionException ree)
{
- if (_stopped.get())
- {
- // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
- }
- else
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ if(!_stopped.get())
{
_logger.error("Unexpected rejected execution", ree);
throw ree;
+
}
+
}
}
public void setNoLocal(boolean nolocal)
{
- _nolocal = nolocal;
+ _noLocal = nolocal;
}
public UUID getId()
@@ -388,11 +373,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
+
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
- throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
+ @Override
+ public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
+
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
@@ -400,58 +391,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- if (exclusive && !subscription.isTransient())
+
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+
+ if (exclusive && !isTransient && getConsumerCount() != 0)
{
- if (getConsumerCount() != 0)
- {
- throw new ExistingSubscriptionPreventsExclusive();
- }
- else
- {
- _exclusiveSubscriber = subscription;
- }
+ throw new ExistingConsumerPreventsExclusive();
+ }
+
+ QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
+
+
+ if (exclusive && !isTransient)
+ {
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
- if(_nolocal)
+ consumer.setQueue((Q)this, exclusive);
+ if(_noLocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded((Q)this, consumer);
}
}
- _subscriptionList.add(subscription);
-
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
- //synchronized methods so we don't need additional synchronization here
- if(_consumerCountHigh.get() < getConsumerCount())
- {
- _consumerCountHigh.incrementAndGet();
- }
+ _consumerList.add(consumer);
if (isDeleted())
{
- subscription.queueDeleted(this);
+ consumer.queueDeleted();
}
}
else
@@ -459,42 +453,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
+
+ return consumer;
}
- public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
+
+ if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+ {
+ setAuthorizationHolder(null);
+ }
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved((Q)this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -503,57 +504,57 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -591,11 +592,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- public int getBindingCountHigh()
- {
- return _bindingCountHigh.get();
- }
-
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
@@ -617,59 +613,45 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message) throws AMQException
- {
- enqueue(message, null);
- }
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException
{
- enqueue(message, false, action);
- }
-
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
- {
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry;
- final Subscription exclusiveSub = _exclusiveSubscriber;
+ E entry;
+ final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -683,13 +665,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -699,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -715,12 +697,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
- private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
throws AMQException
{
@@ -729,14 +711,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -752,7 +734,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private boolean assign(final Subscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null)
{
@@ -766,17 +748,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final E entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -810,19 +792,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
- private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -833,18 +803,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if (subContext != null)
{
- QueueEntry releasedEntry = subContext.getReleasedEntry();
+ E releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
@@ -854,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if(subContext != null)
{
- QueueEntry oldEntry;
+ E oldEntry;
while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
@@ -872,13 +842,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- public void requeue(QueueEntry entry)
+ public void requeue(E entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -891,25 +861,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ @Override
+ public void dequeue(E entry)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
- private void decrementQueueSize(final QueueEntry entry)
+ private void decrementQueueSize(final E entry)
{
final ServerMessage message = entry.getMessage();
long size = message.getSize();
@@ -928,17 +894,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage(subscription, entry, false);
+ deliverMessage(consumer, entry, false);
return true;
}
else
@@ -948,7 +914,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -956,12 +922,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public int getConsumerCount()
{
- return _subscriptionList.size();
- }
-
- public int getConsumerCountHigh()
- {
- return _consumerCountHigh.get();
+ return _consumerList.size();
}
public int getActiveConsumerCount()
@@ -1009,11 +970,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public long getOldestMessageArrivalTime()
{
- QueueEntry entry = getOldestQueueEntry();
+ E entry = getOldestQueueEntry();
return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- protected QueueEntry getOldestQueueEntry()
+ protected E getOldestQueueEntry()
{
return _entries.next(_entries.getHead());
}
@@ -1023,13 +984,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _deleted.get();
}
- public List<QueueEntry> getMessagesOnTheQueue()
+ public List<E> getMessagesOnTheQueue()
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node != null && !node.isDeleted())
{
entryList.add(node);
@@ -1039,16 +1000,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == QueueConsumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1057,7 +1018,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- public int compareTo(final AMQQueue o)
+ public int compareTo(final Q o)
{
return _name.compareTo(o.getName());
}
@@ -1072,12 +1033,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1088,32 +1049,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- protected QueueEntryList getEntries()
+ protected L getEntries()
{
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList<E,Q,L> getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
- public static interface QueueEntryFilter
+ public static interface QueueEntryFilter<E extends QueueEntry>
{
- public boolean accept(QueueEntry entry);
+ public boolean accept(E entry);
public boolean filterComplete();
}
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1126,13 +1087,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
});
}
- public QueueEntry getMessageOnTheQueue(final long messageId)
+ public E getMessageOnTheQueue(final long messageId)
{
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private boolean _complete;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
_complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
@@ -1146,13 +1107,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return entries.isEmpty() ? null : entries.get(0);
}
- public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+ public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
@@ -1162,13 +1123,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void visit(final QueueEntryVisitor visitor)
+ public void visit(final QueueEntryVisitor<E> visitor)
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while(queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if(!node.isDeleted())
{
@@ -1185,17 +1146,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
*
* The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
- * @param fromPosition
- * @param toPosition
- * @return
+ * @param fromPosition first message position
+ * @param toPosition last message position
+ * @return list of messages
*/
- public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+ public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private long position = 0;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
position++;
return (position >= fromPosition) && (position <= toPosition);
@@ -1224,12 +1185,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// TODO - now only used by the tests
public void deleteMessageFromTop()
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
boolean noDeletes = true;
while (noDeletes && queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node);
@@ -1252,14 +1213,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
throw new AMQSecurityException("Permission denied: queue " + getName());
}
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node, txn);
@@ -1276,13 +1237,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return count;
}
- private void dequeueEntry(final QueueEntry node)
+ private void dequeueEntry(final E node)
{
ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+ private void dequeueEntry(final E node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
new ServerTransaction.Action()
@@ -1300,18 +1261,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
// TODO list all thrown exceptions
- public int delete() throws AMQSecurityException, AMQException
+ public int delete() throws AMQException
{
// Check access
if (!_virtualHost.getSecurityManager().authoriseDelete(this))
@@ -1322,27 +1283,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
return entry.acquire();
}
@@ -1356,7 +1319,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- for(final QueueEntry entry : entries)
+ for(final E entry : entries)
{
// TODO log requeues with a post enqueue action
int requeues = entry.routeToAlternate(null, txn);
@@ -1375,9 +1338,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1461,7 +1424,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1469,28 +1432,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
- if(flusher == null)
- {
- flusher = new SubFlushRunner(sub);
- sub.set(SUB_FLUSH_RUNNER, flusher);
- }
+ SubFlushRunner flusher = sub.getRunner();
flusher.execute(_asyncDelivery);
}
}
- public void flushSubscription(Subscription sub) throws AMQException
+ void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1546,29 +1504,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
- * @param sub
- * @param batch
+ * @param sub the consumer
+ * @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1576,7 +1534,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (subActive)
{
- QueueEntry node = getNextAvailableEntry(sub);
+ E node = getNextAvailableEntry(sub);
if (node != null && node.isAvailable())
{
@@ -1587,7 +1545,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1598,7 +1556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1611,13 +1569,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- Subscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
+ QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1629,16 +1587,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private QueueEntry getNextAvailableEntry(final Subscription sub)
+ private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
throws AMQException
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry lastSeen = context.getLastSeenEntry();
- QueueEntry releasedNode = context.getReleasedEntry();
+ E lastSeen = context.getLastSeenEntry();
+ E releasedNode = context.getReleasedEntry();
- QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+ E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1670,12 +1628,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry releasedNode = context.getReleasedEntry();
+ E releasedNode = context.getReleasedEntry();
return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
@@ -1689,14 +1647,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1712,7 +1670,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
*/
public long processQueue(QueueRunner runner) throws AMQException
{
- long stateChangeCount = Long.MIN_VALUE;
+ long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
@@ -1720,7 +1678,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1731,8 +1689,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1744,14 +1702,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1759,8 +1717,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1771,9 +1729,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1792,24 +1750,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1833,11 +1791,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void checkMessageStatus() throws AMQException
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
if (!node.isDeleted())
{
@@ -1984,12 +1942,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
{
- private final Subscription _sub;
+ private final QueueConsumer<?,E,Q,L> _sub;
- public QueueEntryListener(final Subscription sub)
+ public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
{
_sub = sub;
}
@@ -2005,7 +1963,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return System.identityHashCode(_sub);
}
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2076,26 +2034,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
@@ -2123,11 +2061,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return getName();
}
- public long getUnackedMessageCountHigh()
- {
- return _unackedMsgCountHigh.get();
- }
-
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
@@ -2138,25 +2071,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _unackedMsgBytes.get();
}
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
+ public void decrementUnackedMsgCount(E queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(QueueEntry entry)
+ private void incrementUnackedMsgCount(E entry)
{
- long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+ _unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
-
- long unackedMsgCountHigh;
- while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
- {
- if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
- {
- break;
- }
- }
}
public LogActor getLogActor()
@@ -2224,4 +2148,39 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return (String) _arguments.get(Queue.DESCRIPTION);
}
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ {
+ txn.enqueue(this,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+
+ }
+
}