diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 217 |
1 files changed, 116 insertions, 101 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a095ef47ea..b003152db6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -83,7 +83,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** null means shared */ private final AMQShortString _owner; - private AuthorizationHolder _authorizationHolder; + private PrincipalHolder _prinicpalHolder; private boolean _exclusive = false; private AMQSessionModel _exclusiveOwner; @@ -102,7 +102,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final QueueEntryList _entries; - protected final SubscriptionList _subscriptionList = new SubscriptionList(); + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); + + private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); private volatile Subscription _exclusiveSubscriber; @@ -371,14 +373,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _owner; } - public AuthorizationHolder getAuthorizationHolder() + public PrincipalHolder getPrincipalHolder() { - return _authorizationHolder; + return _prinicpalHolder; } - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) + public void setPrincipalHolder(PrincipalHolder prinicpalHolder) { - _authorizationHolder = authorizationHolder; + _prinicpalHolder = prinicpalHolder; } @@ -600,25 +602,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _subscriptionList.getHead().getNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_lastSubscriptionNode.compareAndSet(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); - nextNode = node.findNext(); + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _subscriptionList.getHead().getNext(); } } } @@ -627,7 +629,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // this catches the case where we *just* miss an update int loops = 2; - while (entry.isAvailable() && loops != 0) + while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) { if (nextNode == null) { @@ -640,13 +642,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener Subscription sub = nextNode.getSubscription(); deliverToSubscription(sub, entry); } - nextNode = nextNode.findNext(); + nextNode = nextNode.getNext(); } } - if (entry.isAvailable()) + if (!(entry.isAcquired() || entry.isDeleted())) { checkSubscriptionsNotAheadOfDelivery(entry); @@ -803,6 +805,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void requeue(QueueEntryImpl entry, Subscription subscription) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + } + public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); @@ -940,7 +960,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDispensed()) + if (node != null && !node.isDeleted()) { entryList.add(node); } @@ -1044,7 +1064,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDispensed() && filter.accept(node)) + if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); } @@ -1238,6 +1258,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if ((messageId >= fromMessageId) && (messageId <= toMessageId) + && !node.isDeleted() && node.acquire()) { dequeueEntry(node); @@ -1267,7 +1288,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node); noDeletes = false; @@ -1297,7 +1318,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node, txn); if(++count == request) @@ -1564,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1583,6 +1604,52 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } + + private class Runner implements ReadWriteRunnable + { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + + public void run() + { + String originalName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(_name); + CurrentActor.set(_logActor); + + processQueue(this); + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + Thread.currentThread().setName(originalName); + } + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + + public String toString() + { + return _name; + } + } + public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1651,7 +1718,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = getNextAvailableEntry(sub); - if (node != null && node.isAvailable()) + if (node != null && !(node.isAcquired() || node.isDeleted())) { if (sub.hasInterest(node)) { @@ -1712,7 +1779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) { if (expired) { @@ -1741,40 +1808,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - * @throws AMQException - */ - public void processQueue(QueueRunner runner) throws AMQException + private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; + int extraLoops = 1; + long iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1791,14 +1832,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; + extraLoops = 1; } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + deliveryIncomplete = _subscriptionList.size() != 0; + boolean done; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1808,25 +1847,30 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) + + done = attemptDelivery(sub); + + if (done) { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) + if (extraLoops == 0) { - unregisterSubscription(sub); + deliveryIncomplete = false; + if (sub.isAutoClose()) + { + unregisterSubscription(sub); - sub.confirmAutoClose(); + sub.confirmAutoClose(); + } + } + else + { + extraLoops--; } } else { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; iterations--; + extraLoops = 1; } } finally @@ -1834,34 +1878,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } - - if(allSubscriptionsDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allSubscriptionsDone) - { - //All subscriptions reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; - lastLoop = true; - } - else - { - //some subscriptions can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - _asynchronousRunner.set(null); } - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit + // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { @@ -1881,8 +1901,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted and not dequeued - if (!node.isDispensed()) + // Only process nodes that are not currently deleted + if (!node.isDeleted()) { // If the node has exired then aquire it if (node.expired() && node.acquire()) @@ -2222,9 +2242,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } - - public LogActor getLogActor() - { - return _logActor; - } } |