diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 243 |
1 files changed, 142 insertions, 101 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index dfad9157c5..c37d0e2202 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -27,12 +27,16 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; + +import javax.management.JMException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; @@ -99,13 +103,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private Exchange _alternateExchange; - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ + private final QueueEntryList<QueueEntry> _entries; - - protected final QueueEntryList _entries; - - protected final SubscriptionList _subscriptionList = new SubscriptionList(); + private final SubscriptionList _subscriptionList = new SubscriptionList(); private volatile Subscription _exclusiveSubscriber; @@ -137,19 +138,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicInteger _bindingCountHigh = new AtomicInteger(); /** max allowed size(KB) of a single message */ - public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); + private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); /** max allowed number of messages on a queue. */ - public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); + private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); /** max queue depth for the queue */ - public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); + private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); /** maximum message age before alerts occur */ - public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); + private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); /** the minimum interval between sending out consecutive alerts of the same type */ - public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); + private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); @@ -167,7 +168,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>(); + private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); @@ -455,7 +456,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - _activeSubscriberCount.incrementAndGet(); + if(subscription.isActive()) + { + _activeSubscriberCount.incrementAndGet(); + } subscription.setStateListener(this); subscription.setQueueContext(new QueueContext(_entries.getHead())); @@ -778,7 +782,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private boolean mightAssign(final Subscription sub, final QueueEntry entry) { if(_messageGroupManager == null || !sub.acquires()) + { return true; + } Subscription assigned = _messageGroupManager.getAssignedSubscription(entry); return (assigned == null) || (assigned == sub); } @@ -848,7 +854,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes QueueContext context = (QueueContext) subscription.getQueueContext(); if(context != null) { - QueueEntry subnode = context._lastSeenEntry; + QueueEntry subnode = context.getLastSeenEntry(); if(subnode.compareTo(entry)<0) { return false; @@ -872,7 +878,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) { QueueContext subContext = (QueueContext) sub.getQueueContext(); - QueueEntry releasedEntry = subContext._releasedEntry; + QueueEntry releasedEntry = subContext.getReleasedEntry(); QueueContext._lastSeenUpdater.set(subContext, entry); if(releasedEntry == entry) @@ -889,7 +895,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { QueueEntry oldEntry; - while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0) + while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0) { if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) { @@ -1113,6 +1119,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _stateChangeCount.get(); } + /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ + protected QueueEntryList getEntries() + { + return _entries; + } + + protected SubscriptionList getSubscriptionList() + { + return _subscriptionList; + } + public static interface QueueEntryFilter { @@ -1226,19 +1243,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, - String queueName, - ServerTransaction txn) throws IllegalArgumentException + String destinationQueueName) throws IllegalArgumentException { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } + final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1258,65 +1266,68 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); - - // Move the messages in on the message store. - for (final QueueEntry entry : entries) + final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); + boolean shouldRollback = true; + try { - final ServerMessage message = entry.getMessage(); - txn.enqueue(toQueue, message, - new ServerTransaction.Action() - { - - public void postCommit() + // Move the messages in on the message store. + for (final QueueEntry entry : entries) + { + final ServerMessage message = entry.getMessage(); + txn.enqueue(toQueue, message, + new ServerTransaction.Action() { - try + + public void postCommit() { - toQueue.enqueue(message); + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } } - catch (AMQException e) + + public void onRollback() { - throw new RuntimeException(e); + entry.release(); } - } - - public void onRollback() + }); + txn.dequeue(this, message, + new ServerTransaction.Action() { - entry.release(); - } - }); - txn.dequeue(this, message, - new ServerTransaction.Action() - { - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { + public void postCommit() + { + entry.discard(); + } - } - }); + public void onRollback() + { + } + }); + } + txn.commit(); + shouldRollback = false; + } + finally + { + if (shouldRollback) + { + txn.rollback(); + } } } public void copyMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, - String queueName, - final ServerTransaction txn) throws IllegalArgumentException + String destinationQueueName) throws IllegalArgumentException { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } + final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1334,36 +1345,63 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } }); - - // Move the messages in on the message store. - for (QueueEntry entry : entries) + final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore()); + boolean shouldRollback = true; + try { - final ServerMessage message = entry.getMessage(); - - txn.enqueue(toQueue, message, new ServerTransaction.Action() + // Copy the messages in on the message store. + for (QueueEntry entry : entries) { - public void postCommit() + final ServerMessage message = entry.getMessage(); + + txn.enqueue(toQueue, message, new ServerTransaction.Action() { - try + public void postCommit() { - toQueue.enqueue(message); + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } } - catch (AMQException e) + + public void onRollback() { - throw new RuntimeException(e); } - } - - public void onRollback() - { + }); - } - }); + } + txn.commit(); + shouldRollback = false; + } + finally + { + if (shouldRollback) + { + txn.rollback(); + } } } + private AMQQueue getValidatedDestinationQueue(String queueName) + { + final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (toQueue == null) + { + throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); + } + else if (toQueue == this) + { + throw new IllegalArgumentException("The destination queue can't be the same as the source queue"); + } + return toQueue; + } + public void removeMessagesFromQueue(long fromMessageId, long toMessageId) { @@ -1543,10 +1581,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes for(final QueueEntry entry : entries) { adapter.setEntry(entry); - final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter); + List<? extends BaseQueue> queues = _alternateExchange.route(adapter); + if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) + { + queues = _alternateExchange.getAlternateExchange().route(adapter); + } + final ServerMessage message = entry.getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) + if(queues != null && queues.size() != 0) { + final List<? extends BaseQueue> rerouteQueues = queues; txn.enqueue(rerouteQueues, entry.getMessage(), new ServerTransaction.Action() { @@ -1659,7 +1703,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes //Overfull log message _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - _blockedChannels.putIfAbsent(channel, Boolean.TRUE); + _blockedChannels.add(channel); channel.block(this); @@ -1692,11 +1736,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); } - - for(AMQSessionModel c : _blockedChannels.keySet()) + for(final AMQSessionModel blockedChannel : _blockedChannels) { - c.unblock(this); - _blockedChannels.remove(c); + blockedChannel.unblock(this); + _blockedChannels.remove(blockedChannel); } } } @@ -1714,7 +1757,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void deliverAsync(Subscription sub) { - //_stateChangeCount.incrementAndGet(); if(_exclusiveSubscriber == null) { deliverAsync(); @@ -1890,8 +1932,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes QueueContext context = (QueueContext) sub.getQueueContext(); if(context != null) { - QueueEntry lastSeen = context._lastSeenEntry; - QueueEntry releasedNode = context._releasedEntry; + QueueEntry lastSeen = context.getLastSeenEntry(); + QueueEntry releasedNode = context.getReleasedEntry(); QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); @@ -1913,8 +1955,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); } - lastSeen = context._lastSeenEntry; - releasedNode = context._releasedEntry; + lastSeen = context.getLastSeenEntry(); + releasedNode = context.getReleasedEntry(); node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); } return node; @@ -1930,8 +1972,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes QueueContext context = (QueueContext) sub.getQueueContext(); if(context != null) { - QueueEntry releasedNode = context._releasedEntry; - return releasedNode == null || releasedNode.compareTo(entry) < 0; + QueueEntry releasedNode = context.getReleasedEntry(); + return releasedNode != null && releasedNode.compareTo(entry) < 0; } else { @@ -2255,8 +2297,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public boolean equals(Object o) { - return o != null - && o instanceof SimpleAMQQueue.QueueEntryListener + return o instanceof SimpleAMQQueue.QueueEntryListener && _sub == ((QueueEntryListener) o)._sub; } |