diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 291 |
1 files changed, 187 insertions, 104 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7effb1c0f8..08dab4e5fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -155,11 +155,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - static final int MAX_ASYNC_DELIVERIES = 10; + static final int MAX_ASYNC_DELIVERIES = 80; private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null); + private final Executor _asyncDelivery; private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); @@ -188,6 +188,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private ConfigurationPlugin _queueConfiguration; private final boolean _isTopic; + /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ + private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { @@ -358,6 +360,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _alternateExchange = exchange; } + public void setAlternateExchange(String exchangeName) + { + if(exchangeName == null || exchangeName.equals("")) + { + _alternateExchange = null; + return; + } + + Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName)); + if (exchange == null) + { + throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost."); + } + setAlternateExchange(exchange); + } + public Map<String, Object> getArguments() { return _arguments; @@ -528,13 +546,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //Reconfigure the queue for to reflect this new binding. ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - if (config != null) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); + } // Reconfigure with new config. configure(config); } @@ -575,40 +592,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { + incrementTxnEnqueueStats(message); + incrementQueueCount(); + incrementQueueSize(message); + _totalMessagesReceived.incrementAndGet(); - Subscription exclusiveSub = _exclusiveSubscriber; + final Subscription exclusiveSub = _exclusiveSubscriber; if(!_isTopic || _subscriptionList.size()!=0) { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); - - QueueEntry entry; + QueueEntry entry = _entries.add(message); - if (exclusiveSub != null) + if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); - - deliverToSubscription(exclusiveSub, entry); - } - finally - { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - */ SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); SubscriptionList.SubscriptionNode nextNode = node.findNext(); @@ -654,12 +653,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + if (entry.isAvailable()) { checkSubscriptionsNotAheadOfDelivery(entry); - deliverAsync(); + if (exclusiveSub != null) + { + deliverAsync(exclusiveSub); + } + else + { + deliverAsync(); + } } if(_managedObject != null) @@ -678,30 +685,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { - sub.getSendLock(); - try + if(sub.trySendLock()) { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) + try { - if (!sub.wouldSuspend(entry)) + if (subscriptionReadyAndHasInterest(sub, entry) + && !sub.isSuspended()) { - if (sub.acquires() && !entry.acquire(sub)) + if (!sub.wouldSuspend(entry)) { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.onDequeue(entry); - } - else - { - deliverMessage(sub, entry); + if (sub.acquires() && !entry.acquire(sub)) + { + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(entry); + } + else + { + deliverMessage(sub, entry, false); + } } } } - } - finally - { - sub.releaseSendLock(); + finally + { + sub.releaseSendLock(); + } } } @@ -745,7 +754,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _byteTxnDequeues.addAndGet(entry.getSize()); } - private void deliverMessage(final Subscription sub, final QueueEntry entry) + private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -753,7 +762,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _deliveredMessages.incrementAndGet(); incrementUnackedMsgCount(); - sub.send(entry); + sub.send(entry, batch); if(_isTopic) { @@ -893,7 +902,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!subscription.isClosed()) { - deliverMessage(subscription, entry); + deliverMessage(subscription, entry, false); return true; } else @@ -1035,6 +1044,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _exclusiveSubscriber = exclusiveSubscriber; } + long getStateChangeCount() + { + return _stateChangeCount.get(); + } + + public static interface QueueEntryFilter { public boolean accept(QueueEntry entry); @@ -1335,7 +1350,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); while (queueListIterator.advance()) { @@ -1358,7 +1373,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void dequeueEntry(final QueueEntry node) { - ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); dequeueEntry(node, txn); } @@ -1435,7 +1450,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } }); - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); if(_alternateExchange != null) { @@ -1604,26 +1619,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + private QueueRunner _queueRunner = new QueueRunner(this); public void deliverAsync() { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); + _stateChangeCount.incrementAndGet(); + + _queueRunner.execute(_asyncDelivery); - if (_asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } } public void deliverAsync(Subscription sub) { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) + //_stateChangeCount.incrementAndGet(); + if(_exclusiveSubscriber == null) { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); + deliverAsync(); } - _asyncDelivery.execute(flusher); + else + { + SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); + if(flusher == null) + { + flusher = new SubFlushRunner(sub); + sub.set(SUB_FLUSH_RUNNER, flusher); + } + flusher.execute(_asyncDelivery); + } + } public void flushSubscription(Subscription sub) throws AMQException @@ -1639,25 +1662,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean flushSubscription(Subscription sub, long iterations) throws AMQException { boolean atTail = false; + final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + boolean queueEmpty = false; - while (!sub.isSuspended() && !atTail && iterations != 0) + try { - boolean queueEmpty = false; - try + + if(keepSendLockHeld) { sub.getSendLock(); - atTail = attemptDelivery(sub); - if (atTail && getNextAvailableEntry(sub) == null) + } + + while (!sub.isSuspended() && !atTail && iterations != 0) + { + try { - queueEmpty = true; + if(!keepSendLockHeld) + { + sub.getSendLock(); + } + + atTail = attemptDelivery(sub, true); + if (atTail && getNextAvailableEntry(sub) == null) + { + queueEmpty = true; + } + else if (!atTail) + { + iterations--; + } } - else if (!atTail) + finally { - iterations--; + if(!keepSendLockHeld) + { + sub.releaseSendLock(); + } } } - finally + } + finally + { + if(keepSendLockHeld) { sub.releaseSendLock(); } @@ -1665,8 +1712,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { sub.queueEmpty(); } + sub.flushBatched(); + } + // if there's (potentially) more than one subscription the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". @@ -1684,11 +1734,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * * Looks up the next node for the subscription and attempts to deliver it. * + * * @param sub + * @param batch * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(Subscription sub) throws AMQException + private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1706,11 +1758,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (sub.acquires() && !node.acquire(sub)) { - sub.onDequeue(node); + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(node); } else { - deliverMessage(sub, node); + deliverMessage(sub, node, batch); } } @@ -1814,23 +1868,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * @param runner the Runner to schedule * @throws AMQException */ - public void processQueue(QueueRunner runner) throws AMQException + public long processQueue(QueueRunner runner) throws AMQException { - long stateChangeCount; + long stateChangeCount = Long.MIN_VALUE; long previousStateChangeCount = Long.MIN_VALUE; + long rVal = Long.MIN_VALUE; boolean deliveryIncomplete = true; boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - _asynchronousRunner.compareAndSet(runner, null); + final int numSubs = _subscriptionList.size(); + + final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); // For every message enqueue/requeue the we fire deliveryAsync() which // increases _stateChangeCount. If _sCC changes whilst we are in our loop // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) // then we will continue to run for a maximum of iterations. // So whilst delivery/rejection is going on a processQueue thread will be running - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) + while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1841,6 +1898,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //further asynchronous delivery is required since the //previous loop. keep going if iteration slicing allows. lastLoop = false; + rVal = stateChangeCount; } previousStateChangeCount = stateChangeCount; @@ -1853,30 +1911,43 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); - try - { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) + + try { - if(lastLoop) + for(int i = 0 ; i < perSub; i++) { - sub.queueEmpty(); + //attempt delivery. returns true if no further delivery currently possible to this sub + subscriptionDone = attemptDelivery(sub, true); + if (subscriptionDone) + { + sub.flushBatched(); + //close autoClose subscriptions if we are not currently intent on continuing + if (lastLoop && !sub.isSuspended() ) + { + sub.queueEmpty(); + } + break; + } + else + { + //this subscription can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allSubscriptionsDone = false; + lastLoop = false; + if(--iterations == 0) + { + sub.flushBatched(); + break; + } + } } + + sub.flushBatched(); } - else + finally { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; - iterations--; + sub.releaseSendLock(); } - } - finally - { - sub.releaseSendLock(); - } } if(allSubscriptionsDone && lastLoop) @@ -1902,24 +1973,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliveryIncomplete = true; } - _asynchronousRunner.set(null); } // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) + if (iterations == 0) { if (_logger.isDebugEnabled()) { _logger.debug("Rescheduling runner:" + runner); } - _asyncDelivery.execute(runner); + return 0L; } + return rVal; + } public void checkMessageStatus() throws AMQException { - QueueEntryIterator queueListIterator = _entries.iterator(); while (queueListIterator.advance()) @@ -2150,6 +2221,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); + setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount()); _capacity = ((QueueConfiguration)config).getCapacity(); _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); } @@ -2271,4 +2343,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _logActor; } + + public int getMaximumDeliveryCount() + { + return _maximumDeliveryCount; + } + + public void setMaximumDeliveryCount(final int maximumDeliveryCount) + { + _maximumDeliveryCount = maximumDeliveryCount; + } + } |