summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java217
1 files changed, 116 insertions, 101 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a095ef47ea..b003152db6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** null means shared */
private final AMQShortString _owner;
- private AuthorizationHolder _authorizationHolder;
+ private PrincipalHolder _prinicpalHolder;
private boolean _exclusive = false;
private AMQSessionModel _exclusiveOwner;
@@ -102,7 +102,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
protected final QueueEntryList _entries;
- protected final SubscriptionList _subscriptionList = new SubscriptionList();
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+ private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
private volatile Subscription _exclusiveSubscriber;
@@ -371,14 +373,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _owner;
}
- public AuthorizationHolder getAuthorizationHolder()
+ public PrincipalHolder getPrincipalHolder()
{
- return _authorizationHolder;
+ return _prinicpalHolder;
}
- public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
+ public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
{
- _authorizationHolder = authorizationHolder;
+ _prinicpalHolder = prinicpalHolder;
}
@@ -600,25 +602,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _subscriptionList.getHead().getNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_lastSubscriptionNode.compareAndSet(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
- nextNode = node.findNext();
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _subscriptionList.getHead().getNext();
}
}
}
@@ -627,7 +629,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// this catches the case where we *just* miss an update
int loops = 2;
- while (entry.isAvailable() && loops != 0)
+ while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
{
if (nextNode == null)
{
@@ -640,13 +642,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
Subscription sub = nextNode.getSubscription();
deliverToSubscription(sub, entry);
}
- nextNode = nextNode.findNext();
+ nextNode = nextNode.getNext();
}
}
- if (entry.isAvailable())
+ if (!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -803,6 +805,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void requeue(QueueEntryImpl entry, Subscription subscription)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+ }
+
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
@@ -940,7 +960,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDispensed())
+ if (node != null && !node.isDeleted())
{
entryList.add(node);
}
@@ -1044,7 +1064,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDispensed() && filter.accept(node))
+ if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
@@ -1238,6 +1258,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
+ && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1267,7 +1288,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1297,7 +1318,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1564,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void deliverAsync()
{
- QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1583,6 +1604,52 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(flusher);
}
+
+ private class Runner implements ReadWriteRunnable
+ {
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_logActor);
+
+ processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+ }
+
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1651,7 +1718,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && node.isAvailable())
+ if (node != null && !(node.isAcquired() || node.isDeleted()))
{
if (sub.hasInterest(node))
{
@@ -1712,7 +1779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1741,40 +1808,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- /**
- * Used by queue Runners to asynchronously deliver messages to consumers.
- *
- * A queue Runner is started whenever a state change occurs, e.g when a new
- * message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
- * capable of accepting/delivering all messages then these messages would
- * otherwise remain on the queue.
- *
- * processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
- * then processQueue should stop to prevent spinning.
- *
- * Since processQueue is runs in a fixed size Executor, it should not run
- * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
- * incoming messages may not be able to be scheduled in the thread pool
- * because all threads are working on clearing down large queues). To solve
- * this problem, after an arbitrary number of message deliveries the
- * processQueue job stops iterating, resubmits itself to the executor, and
- * ends the current instance
- *
- * @param runner the Runner to schedule
- * @throws AMQException
- */
- public void processQueue(QueueRunner runner) throws AMQException
+ private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- boolean lastLoop = false;
- int iterations = MAX_ASYNC_DELIVERIES;
+ int extraLoops = 1;
+ long iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1791,14 +1832,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (previousStateChangeCount != stateChangeCount)
{
- //further asynchronous delivery is required since the
- //previous loop. keep going if iteration slicing allows.
- lastLoop = false;
+ extraLoops = 1;
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ boolean done;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1808,25 +1847,30 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.getSendLock();
try
{
- //attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub);
- if (subscriptionDone)
+
+ done = attemptDelivery(sub);
+
+ if (done)
{
- //close autoClose subscriptions if we are not currently intent on continuing
- if (lastLoop && sub.isAutoClose())
+ if (extraLoops == 0)
{
- unregisterSubscription(sub);
+ deliveryIncomplete = false;
+ if (sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
- sub.confirmAutoClose();
+ sub.confirmAutoClose();
+ }
+ }
+ else
+ {
+ extraLoops--;
}
}
else
{
- //this subscription can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
- lastLoop = false;
iterations--;
+ extraLoops = 1;
}
}
finally
@@ -1834,34 +1878,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.releaseSendLock();
}
}
-
- if(allSubscriptionsDone && lastLoop)
- {
- //We have done an extra loop already and there are again
- //again no further delivery attempts possible, only
- //keep going if state change demands it.
- deliveryIncomplete = false;
- }
- else if(allSubscriptionsDone)
- {
- //All subscriptions reported being done, but we have to do
- //an extra loop if the iterations are not exhausted and
- //there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
- lastLoop = true;
- }
- else
- {
- //some subscriptions can still accept more messages,
- //keep going if iteration count allows.
- lastLoop = false;
- deliveryIncomplete = true;
- }
-
_asynchronousRunner.set(null);
}
- // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
+ // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
@@ -1881,8 +1901,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted and not dequeued
- if (!node.isDispensed())
+ // Only process nodes that are not currently deleted
+ if (!node.isDeleted())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
@@ -2222,9 +2242,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
}
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
}