summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-29 11:28:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-29 11:28:41 +0000
commit7000de985ebcde725045642ab7e7fa57038170f7 (patch)
tree246e2771ea359f0ed22f47c45e721c4e402ee513
parent97eb48ea71077acb9ef2d104901035f69531c3f3 (diff)
downloadqpid-python-7000de985ebcde725045642ab7e7fa57038170f7.tar.gz
tidy up
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661296 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java99
1 files changed, 51 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 169da9275f..95cfa8d36c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -75,12 +75,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- private final AtomicBoolean _quiesced = new AtomicBoolean(false);
-
-
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
@@ -124,7 +120,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -163,8 +158,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
- AsyncDeliveryConfig.getAsyncDeliveryExecutor();
-
try
{
_managedObject = new AMQQueueMBean(this);
@@ -278,7 +271,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- deliverAsync();
+ deliverAsync(subscription);
}
@@ -346,6 +339,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
+
if(exclusiveSub != null)
{
exclusiveSub.getSendLock();
@@ -353,6 +347,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
entry = _entries.add(message);
+
deliverToSubscription(exclusiveSub, entry);
@@ -405,7 +400,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// this catches the case where we *just* miss an update
int loops = 2;
- while(!entry.isAcquired() && loops != 0)
+ while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
{
if(nextNode == null)
{
@@ -454,6 +449,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
throws AMQException
{
+ // the send lock is a read/write lock that prevents the subscription from changing status while we are in this
+ // block
sub.getSendLock();
try
{
@@ -465,10 +462,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if(!sub.isBrowser() && !entry.acquire(sub))
{
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
sub.restoreCredit(entry);
}
else
{
+ // Update the last seen marker for this subscription, if some other process hasn't already
+ // updated it
QueueEntry queueEntryNode = sub.getLastSeenEntry();
if(_entries.next(queueEntryNode) == entry)
{
@@ -490,6 +491,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
+ // Simple Queues don't :-)
}
private void incrementQueueSize(final AMQMessage message)
@@ -513,6 +515,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
{
+ // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
+ // interest in.
QueueEntry node = sub.getLastSeenEntry();
while(node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)) )
{
@@ -531,28 +535,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+
if(node == entry)
{
+ // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
+ // good
return true;
}
else
{
- node = sub.getLastSeenEntry();
- if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
+ // no-one else has updated it to something furhter on in the list
+ updateLastSeenEntry(sub, entry);
+ return false;
+ }
+
+ }
+
+ private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ {
+ QueueEntry node = sub.getLastSeenEntry();
+
+ if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ {
+ do
{
- do
+ if(sub.setLastSeenEntry(node,entry))
{
- if(sub.setLastSeenEntry(node,entry))
- {
- return true;
- }
- else
- {
- node = sub.getLastSeenEntry();
- }
- } while (node != null && entry.compareTo(node) < 0);
- }
- return false;
+ return;
+ }
+ else
+ {
+ node = sub.getLastSeenEntry();
+ }
+ } while (node != null && entry.compareTo(node) < 0);
}
}
@@ -569,18 +585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// we don't make browsers send the same stuff twice
if(!sub.isBrowser())
{
- QueueEntry subEntry = sub.getLastSeenEntry();
- while(subEntry != null && entry.compareTo(subEntry)<0)
- {
- if(sub.setLastSeenEntry(subEntry,entry))
- {
- break;
- }
- else
- {
- subEntry = sub.getLastSeenEntry();
- }
- }
+ updateLastSeenEntry(sub, entry);
}
}
@@ -1307,6 +1312,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
+ // which would give us memory "leak".
+
if(!isExclusiveSubscriber())
{
advanceAllSubscriptions();
@@ -1494,7 +1503,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.set(null);
}
-
+ // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+ // therefore we should schedule this runner again (unless someone beats us to it :-) ).
if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
{
_asyncDelivery.execute(runner);
@@ -1616,43 +1626,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _notificationChecks;
}
-
-
-
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public int N(final Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
-
private final class QueueEntryListener implements QueueEntry.StateChangeListener
{
private final QueueEntry _entry;
+ private final Subscription _sub;
public QueueEntryListener(final Subscription sub, final QueueEntry entry)
{
_entry = entry;
+ _sub = sub;
}
public boolean equals(Object o)
{
- return _entry == ((QueueEntryListener)o)._entry;
+ return _entry == ((QueueEntryListener)o)._entry && _sub == ((QueueEntryListener)o)._sub;
}
public int hashCode()
{
- return System.identityHashCode(_entry);
+ return System.identityHashCode(_entry) ^ System.identityHashCode(_sub);
}
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync();
+ deliverAsync(_sub);
}
}
} \ No newline at end of file