summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java291
1 files changed, 187 insertions, 104 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7effb1c0f8..08dab4e5fc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -155,11 +155,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
- static final int MAX_ASYNC_DELIVERIES = 10;
+ static final int MAX_ASYNC_DELIVERIES = 80;
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
- private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null);
+
private final Executor _asyncDelivery;
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
@@ -188,6 +188,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private ConfigurationPlugin _queueConfiguration;
private final boolean _isTopic;
+ /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
+ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -358,6 +360,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_alternateExchange = exchange;
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ if(exchangeName == null || exchangeName.equals(""))
+ {
+ _alternateExchange = null;
+ return;
+ }
+
+ Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
+ if (exchange == null)
+ {
+ throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
+ }
+ setAlternateExchange(exchange);
+ }
+
public Map<String, Object> getArguments()
{
return _arguments;
@@ -528,13 +546,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//Reconfigure the queue for to reflect this new binding.
ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
- }
-
if (config != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
+ }
// Reconfigure with new config.
configure(config);
}
@@ -575,40 +592,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
+ incrementTxnEnqueueStats(message);
+ incrementQueueCount();
+ incrementQueueSize(message);
+
_totalMessagesReceived.incrementAndGet();
- Subscription exclusiveSub = _exclusiveSubscriber;
+ final Subscription exclusiveSub = _exclusiveSubscriber;
if(!_isTopic || _subscriptionList.size()!=0)
{
- incrementTxnEnqueueStats(message);
- incrementQueueCount();
- incrementQueueSize(message);
-
- QueueEntry entry;
+ QueueEntry entry = _entries.add(message);
- if (exclusiveSub != null)
+ if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
- exclusiveSub.getSendLock();
-
- try
- {
- entry = _entries.add(message);
-
- deliverToSubscription(exclusiveSub, entry);
- }
- finally
- {
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
/*
-
iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
-
*/
SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
SubscriptionList.SubscriptionNode nextNode = node.findNext();
@@ -654,12 +653,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
if (entry.isAvailable())
{
checkSubscriptionsNotAheadOfDelivery(entry);
- deliverAsync();
+ if (exclusiveSub != null)
+ {
+ deliverAsync(exclusiveSub);
+ }
+ else
+ {
+ deliverAsync();
+ }
}
if(_managedObject != null)
@@ -678,30 +685,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
- sub.getSendLock();
- try
+ if(sub.trySendLock())
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ try
{
- if (!sub.wouldSuspend(entry))
+ if (subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended())
{
- if (sub.acquires() && !entry.acquire(sub))
+ if (!sub.wouldSuspend(entry))
{
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.onDequeue(entry);
- }
- else
- {
- deliverMessage(sub, entry);
+ if (sub.acquires() && !entry.acquire(sub))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
+ }
}
}
}
- }
- finally
- {
- sub.releaseSendLock();
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
}
@@ -745,7 +754,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_byteTxnDequeues.addAndGet(entry.getSize());
}
- private void deliverMessage(final Subscription sub, final QueueEntry entry)
+ private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -753,7 +762,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_deliveredMessages.incrementAndGet();
incrementUnackedMsgCount();
- sub.send(entry);
+ sub.send(entry, batch);
if(_isTopic)
{
@@ -893,7 +902,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!subscription.isClosed())
{
- deliverMessage(subscription, entry);
+ deliverMessage(subscription, entry, false);
return true;
}
else
@@ -1035,6 +1044,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_exclusiveSubscriber = exclusiveSubscriber;
}
+ long getStateChangeCount()
+ {
+ return _stateChangeCount.get();
+ }
+
+
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -1335,7 +1350,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
- ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
@@ -1358,7 +1373,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void dequeueEntry(final QueueEntry node)
{
- ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
@@ -1435,7 +1450,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
});
- ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
if(_alternateExchange != null)
{
@@ -1604,26 +1619,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ private QueueRunner _queueRunner = new QueueRunner(this);
public void deliverAsync()
{
- QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+ _stateChangeCount.incrementAndGet();
+
+ _queueRunner.execute(_asyncDelivery);
- if (_asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
}
public void deliverAsync(Subscription sub)
{
- SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
- if(flusher == null)
+ //_stateChangeCount.incrementAndGet();
+ if(_exclusiveSubscriber == null)
{
- flusher = new SubFlushRunner(sub);
- sub.set(SUB_FLUSH_RUNNER, flusher);
+ deliverAsync();
}
- _asyncDelivery.execute(flusher);
+ else
+ {
+ SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+ if(flusher == null)
+ {
+ flusher = new SubFlushRunner(sub);
+ sub.set(SUB_FLUSH_RUNNER, flusher);
+ }
+ flusher.execute(_asyncDelivery);
+ }
+
}
public void flushSubscription(Subscription sub) throws AMQException
@@ -1639,25 +1662,49 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
{
boolean atTail = false;
+ final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ boolean queueEmpty = false;
- while (!sub.isSuspended() && !atTail && iterations != 0)
+ try
{
- boolean queueEmpty = false;
- try
+
+ if(keepSendLockHeld)
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
- if (atTail && getNextAvailableEntry(sub) == null)
+ }
+
+ while (!sub.isSuspended() && !atTail && iterations != 0)
+ {
+ try
{
- queueEmpty = true;
+ if(!keepSendLockHeld)
+ {
+ sub.getSendLock();
+ }
+
+ atTail = attemptDelivery(sub, true);
+ if (atTail && getNextAvailableEntry(sub) == null)
+ {
+ queueEmpty = true;
+ }
+ else if (!atTail)
+ {
+ iterations--;
+ }
}
- else if (!atTail)
+ finally
{
- iterations--;
+ if(!keepSendLockHeld)
+ {
+ sub.releaseSendLock();
+ }
}
}
- finally
+ }
+ finally
+ {
+ if(keepSendLockHeld)
{
sub.releaseSendLock();
}
@@ -1665,8 +1712,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
sub.queueEmpty();
}
+ sub.flushBatched();
+
}
+
// if there's (potentially) more than one subscription the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
@@ -1684,11 +1734,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
*
* Looks up the next node for the subscription and attempts to deliver it.
*
+ *
* @param sub
+ * @param batch
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(Subscription sub) throws AMQException
+ private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1706,11 +1758,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (sub.acquires() && !node.acquire(sub))
{
- sub.onDequeue(node);
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(node);
}
else
{
- deliverMessage(sub, node);
+ deliverMessage(sub, node, batch);
}
}
@@ -1814,23 +1868,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
* @param runner the Runner to schedule
* @throws AMQException
*/
- public void processQueue(QueueRunner runner) throws AMQException
+ public long processQueue(QueueRunner runner) throws AMQException
{
- long stateChangeCount;
+ long stateChangeCount = Long.MIN_VALUE;
long previousStateChangeCount = Long.MIN_VALUE;
+ long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- _asynchronousRunner.compareAndSet(runner, null);
+ final int numSubs = _subscriptionList.size();
+
+ final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
// For every message enqueue/requeue the we fire deliveryAsync() which
// increases _stateChangeCount. If _sCC changes whilst we are in our loop
// (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
// then we will continue to run for a maximum of iterations.
// So whilst delivery/rejection is going on a processQueue thread will be running
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
+ while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1841,6 +1898,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//further asynchronous delivery is required since the
//previous loop. keep going if iteration slicing allows.
lastLoop = false;
+ rVal = stateChangeCount;
}
previousStateChangeCount = stateChangeCount;
@@ -1853,30 +1911,43 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
- try
- {
- //attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub);
- if (subscriptionDone)
+
+ try
{
- if(lastLoop)
+ for(int i = 0 ; i < perSub; i++)
{
- sub.queueEmpty();
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub, true);
+ if (subscriptionDone)
+ {
+ sub.flushBatched();
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && !sub.isSuspended() )
+ {
+ sub.queueEmpty();
+ }
+ break;
+ }
+ else
+ {
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
+ if(--iterations == 0)
+ {
+ sub.flushBatched();
+ break;
+ }
+ }
}
+
+ sub.flushBatched();
}
- else
+ finally
{
- //this subscription can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
- lastLoop = false;
- iterations--;
+ sub.releaseSendLock();
}
- }
- finally
- {
- sub.releaseSendLock();
- }
}
if(allSubscriptionsDone && lastLoop)
@@ -1902,24 +1973,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliveryIncomplete = true;
}
- _asynchronousRunner.set(null);
}
// If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
+ if (iterations == 0)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Rescheduling runner:" + runner);
}
- _asyncDelivery.execute(runner);
+ return 0L;
}
+ return rVal;
+
}
public void checkMessageStatus() throws AMQException
{
-
QueueEntryIterator queueListIterator = _entries.iterator();
while (queueListIterator.advance())
@@ -2150,6 +2221,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
_capacity = ((QueueConfiguration)config).getCapacity();
_flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
}
@@ -2271,4 +2343,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _logActor;
}
+
+ public int getMaximumDeliveryCount()
+ {
+ return _maximumDeliveryCount;
+ }
+
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+ {
+ _maximumDeliveryCount = maximumDeliveryCount;
+ }
+
}