diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 11:28:41 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 11:28:41 +0000 |
commit | 7000de985ebcde725045642ab7e7fa57038170f7 (patch) | |
tree | 246e2771ea359f0ed22f47c45e721c4e402ee513 | |
parent | 97eb48ea71077acb9ef2d104901035f69531c3f3 (diff) | |
download | qpid-python-7000de985ebcde725045642ab7e7fa57038170f7.tar.gz |
tidy up
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661296 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 99 |
1 files changed, 51 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 169da9275f..95cfa8d36c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -75,12 +75,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - private final AtomicBoolean _quiesced = new AtomicBoolean(false); - - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); @@ -124,7 +120,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); @@ -163,8 +158,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - AsyncDeliveryConfig.getAsyncDeliveryExecutor(); - try { _managedObject = new AMQQueueMBean(this); @@ -278,7 +271,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - deliverAsync(); + deliverAsync(subscription); } @@ -346,6 +339,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry entry; Subscription exclusiveSub = _exclusiveSubscriber; + if(exclusiveSub != null) { exclusiveSub.getSendLock(); @@ -353,6 +347,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { entry = _entries.add(message); + deliverToSubscription(exclusiveSub, entry); @@ -405,7 +400,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // this catches the case where we *just* miss an update int loops = 2; - while(!entry.isAcquired() && loops != 0) + while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0) { if(nextNode == null) { @@ -454,6 +449,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void deliverToSubscription(final Subscription sub, final QueueEntry entry) throws AMQException { + // the send lock is a read/write lock that prevents the subscription from changing status while we are in this + // block sub.getSendLock(); try { @@ -465,10 +462,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if(!sub.isBrowser() && !entry.acquire(sub)) { + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription sub.restoreCredit(entry); } else { + // Update the last seen marker for this subscription, if some other process hasn't already + // updated it QueueEntry queueEntryNode = sub.getLastSeenEntry(); if(_entries.next(queueEntryNode) == entry) { @@ -490,6 +491,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering + // Simple Queues don't :-) } private void incrementQueueSize(final AMQMessage message) @@ -513,6 +515,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) { + // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no + // interest in. QueueEntry node = sub.getLastSeenEntry(); while(node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)) ) { @@ -531,28 +535,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + if(node == entry) { + // If the first entry that subscription can process is the one we are trying to deliver to it, then we are + // good return true; } else { - node = sub.getLastSeenEntry(); - if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) + // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing + // no-one else has updated it to something furhter on in the list + updateLastSeenEntry(sub, entry); + return false; + } + + } + + private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry) + { + QueueEntry node = sub.getLastSeenEntry(); + + if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) + { + do { - do + if(sub.setLastSeenEntry(node,entry)) { - if(sub.setLastSeenEntry(node,entry)) - { - return true; - } - else - { - node = sub.getLastSeenEntry(); - } - } while (node != null && entry.compareTo(node) < 0); - } - return false; + return; + } + else + { + node = sub.getLastSeenEntry(); + } + } while (node != null && entry.compareTo(node) < 0); } } @@ -569,18 +585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // we don't make browsers send the same stuff twice if(!sub.isBrowser()) { - QueueEntry subEntry = sub.getLastSeenEntry(); - while(subEntry != null && entry.compareTo(subEntry)<0) - { - if(sub.setLastSeenEntry(subEntry,entry)) - { - break; - } - else - { - subEntry = sub.getLastSeenEntry(); - } - } + updateLastSeenEntry(sub, entry); } } @@ -1307,6 +1312,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + // if there's (potentially) more than one subscription the others will potentially not have been advanced to the + // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc + // which would give us memory "leak". + if(!isExclusiveSubscriber()) { advanceAllSubscriptions(); @@ -1494,7 +1503,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.set(null); } - + // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit + // therefore we should schedule this runner again (unless someone beats us to it :-) ). if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner)) { _asyncDelivery.execute(runner); @@ -1616,43 +1626,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _notificationChecks; } - - - - public ManagedObject getManagedObject() { return _managedObject; } - public int N(final Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } - private final class QueueEntryListener implements QueueEntry.StateChangeListener { private final QueueEntry _entry; + private final Subscription _sub; public QueueEntryListener(final Subscription sub, final QueueEntry entry) { _entry = entry; + _sub = sub; } public boolean equals(Object o) { - return _entry == ((QueueEntryListener)o)._entry; + return _entry == ((QueueEntryListener)o)._entry && _sub == ((QueueEntryListener)o)._sub; } public int hashCode() { - return System.identityHashCode(_entry); + return System.identityHashCode(_entry) ^ System.identityHashCode(_sub); } public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); - deliverAsync(); + deliverAsync(_sub); } } }
\ No newline at end of file |