summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java510
1 files changed, 240 insertions, 270 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 87d11a892e..f4a9794fcd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,15 +18,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
-import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
+ StateChangeListener<QueueConsumer, QueueConsumer.State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -98,9 +96,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final QueueEntryList<QueueEntry> _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile Subscription _exclusiveSubscriber;
+ private volatile QueueConsumer _exclusiveSubscriber;
@@ -120,13 +118,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
- private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
- private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -165,7 +157,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
@@ -187,8 +179,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -257,7 +249,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -388,11 +380,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
- throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
+
+ @Override
+ public synchronized QueueConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
+
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
@@ -400,58 +398,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- if (exclusive && !subscription.isTransient())
+
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+
+ if (exclusive && !isTransient && getConsumerCount() != 0)
{
- if (getConsumerCount() != 0)
- {
- throw new ExistingSubscriptionPreventsExclusive();
- }
- else
- {
- _exclusiveSubscriber = subscription;
- }
+ throw new ExistingConsumerPreventsExclusive();
+ }
+
+ QueueConsumer consumer = new QueueConsumer(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
+
+
+ if (exclusive && !isTransient)
+ {
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
+ consumer.setQueue(this, exclusive);
if(_nolocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(_nolocal);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded(this, consumer);
}
}
- _subscriptionList.add(subscription);
-
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
- //synchronized methods so we don't need additional synchronization here
- if(_consumerCountHigh.get() < getConsumerCount())
- {
- _consumerCountHigh.incrementAndGet();
- }
+ _consumerList.add(consumer);
if (isDeleted())
{
- subscription.queueDeleted(this);
+ consumer.queueDeleted();
}
}
else
@@ -459,42 +460,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
+
+ return consumer;
}
- public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
+
+ if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+ {
+ setAuthorizationHolder(null);
+ }
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved(this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -503,57 +511,57 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<QueueConsumer> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -617,23 +625,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message) throws AMQException
- {
- enqueue(message, null);
- }
-
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
- {
- enqueue(message, false, action);
- }
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
@@ -641,35 +635,35 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
QueueEntry entry;
- final Subscription exclusiveSub = _exclusiveSubscriber;
+ final QueueConsumer exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -683,13 +677,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -699,7 +693,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -715,12 +709,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
- private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
throws AMQException
{
@@ -729,14 +723,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -752,7 +746,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private boolean assign(final Subscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -766,17 +760,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -810,19 +804,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
- private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -833,15 +815,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if (subContext != null)
{
QueueEntry releasedEntry = subContext.getReleasedEntry();
@@ -854,10 +836,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if(subContext != null)
{
QueueEntry oldEntry;
@@ -874,11 +856,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void requeue(QueueEntry entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -891,20 +873,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
@@ -928,17 +905,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage(subscription, entry, false);
+ deliverMessage((QueueConsumer) consumer, entry, false);
return true;
}
else
@@ -948,7 +925,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -956,12 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public int getConsumerCount()
{
- return _subscriptionList.size();
- }
-
- public int getConsumerCountHigh()
- {
- return _consumerCountHigh.get();
+ return _consumerList.size();
}
public int getActiveConsumerCount()
@@ -1039,16 +1011,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == QueueConsumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1072,12 +1044,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1093,9 +1065,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
@@ -1300,12 +1272,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1322,19 +1294,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
@@ -1375,9 +1349,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1461,7 +1435,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1469,28 +1443,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
- if(flusher == null)
- {
- flusher = new SubFlushRunner(sub);
- sub.set(SUB_FLUSH_RUNNER, flusher);
- }
+ SubFlushRunner flusher = sub.getRunner();
flusher.execute(_asyncDelivery);
}
}
- public void flushSubscription(Subscription sub) throws AMQException
+ void flushConsumer(QueueConsumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1511,8 +1480,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
sub.getSendLock();
}
- atTail = attemptDelivery(sub, true);
- if (atTail && getNextAvailableEntry(sub) == null)
+ atTail = attemptDelivery((QueueConsumer)sub, true);
+ if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
{
queueEmpty = true;
}
@@ -1546,21 +1515,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub
@@ -1568,7 +1537,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1587,7 +1556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1598,7 +1567,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1611,13 +1580,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- Subscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1629,10 +1598,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- private QueueEntry getNextAvailableEntry(final Subscription sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
throws AMQException
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
QueueEntry lastSeen = context.getLastSeenEntry();
@@ -1670,9 +1639,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
QueueEntry releasedNode = context.getReleasedEntry();
@@ -1689,14 +1658,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1720,7 +1689,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1731,8 +1700,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1744,14 +1713,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1759,8 +1728,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1771,9 +1740,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1792,24 +1761,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1984,12 +1953,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
{
- private final Subscription _sub;
+ private final QueueConsumer _sub;
- public QueueEntryListener(final Subscription sub)
+ public QueueEntryListener(final QueueConsumer sub)
{
_sub = sub;
}
@@ -2005,7 +1974,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return System.identityHashCode(_sub);
}
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2076,26 +2045,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
@@ -2123,11 +2072,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return getName();
}
- public long getUnackedMessageCountHigh()
- {
- return _unackedMsgCountHigh.get();
- }
-
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
@@ -2146,17 +2090,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private void incrementUnackedMsgCount(QueueEntry entry)
{
- long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+ _unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
-
- long unackedMsgCountHigh;
- while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
- {
- if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
- {
- break;
- }
- }
}
public LogActor getLogActor()
@@ -2224,4 +2159,39 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return (String) _arguments.get(Queue.DESCRIPTION);
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ {
+ txn.enqueue(this,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+
+ }
+
}