diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 7cc3f5f719..a8f778244e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; @@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Remove the subscription, returning it if it was found + * * @param subscription * @return null if no match was found */ @@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Return the next unsuspended subscription or null if not found. - * + * <p/> * Performance note: * This method can scan all items twice when looking for a subscription that is not * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this @@ -105,29 +108,59 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - try { - final Subscription result = nextSubscriber(); - if (result == null) { + try + { + final Subscription result = nextSubscriberImpl(msg); + if (result == null) + { _currentSubscriber = 0; - return nextSubscriber(); - } else { + return nextSubscriberImpl(msg); + } + else + { return result; } - } catch (IndexOutOfBoundsException e) { + } + catch (IndexOutOfBoundsException e) + { _currentSubscriber = 0; - return nextSubscriber(); + return nextSubscriber(msg); } } - private Subscription nextSubscriber() + private Subscription nextSubscriberImpl(AMQMessage msg) { final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) { + while (iterator.hasNext()) + { Subscription subscription = iterator.next(); ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) { - return subscription; + + if (!subscription.isSuspended()) + { + + if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg))) + { + return subscription; + } + // 2006-12-04 : It is fairer to simply skip the person who isn't interested. + // Although it does need to be looked at again. + +// else +// { +// //Don't take penalise a subscriber for not wanting this message. +// // This would introduce unfairness sticking with the current subscriber +// // will allow the next message to match.. although could lead to unfairness if: +// // subscribers: a(bin) b(text) c(text) +// // msgs : 1(text) 2(text) 3(bin) +// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get +// // the bin msg. +// // Never said this was fair round-robin-ing. +// //FIXME - Make a fair round robin. +// +// --_currentSubscriber; +// } } } return null; @@ -149,7 +182,10 @@ class SubscriptionSet implements WeightedSubscriptionManager { for (Subscription s : _subscriptions) { - if (!s.isSuspended()) return true; + if (!s.isSuspended()) + { + return true; + } } return false; } @@ -159,7 +195,10 @@ class SubscriptionSet implements WeightedSubscriptionManager int count = 0; for (Subscription s : _subscriptions) { - if (!s.isSuspended()) count++; + if (!s.isSuspended()) + { + count++; + } } return count; } @@ -167,9 +206,10 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Notification that a queue has been deleted. This is called so that the subscription can inform the * channel, which in turn can update its list of unacknowledged messages. + * * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { for (Subscription s : _subscriptions) { @@ -177,7 +217,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } } - int size() { + int size() + { return _subscriptions.size(); } } |