diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java | 70 |
1 files changed, 40 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 871f063725..26b040aae0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -27,27 +27,20 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -/** - * Holds a set of subscriptions for a queue and manages the round - * robin-ing of deliver etc. - */ +/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */ class SubscriptionSet implements WeightedSubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); - /** - * List of registered subscribers - */ + /** List of registered subscribers */ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); - /** - * Used to control the round robin delivery of content - */ + /** Used to control the round robin delivery of content */ private int _currentSubscriber; + private final Object _subscriptionsChange = new Object(); - /** - * Accessor for unit tests. - */ + + /** Accessor for unit tests. */ int getCurrentSubscriber() { return _currentSubscriber; @@ -55,21 +48,43 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - _subscriptions.add(subscription); + synchronized (_subscriptionsChange) + { + _subscriptions.add(subscription); + } } /** * Remove the subscription, returning it if it was found * * @param subscription + * * @return null if no match was found */ public Subscription removeSubscriber(Subscription subscription) { - boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here. - if (isRemoved) + // TODO: possibly need O(1) operation here. + + Subscription sub = null; + synchronized (_subscriptionsChange) { - return subscription; + int subIndex = _subscriptions.indexOf(subscription); + + if (subIndex != -1) + { + //we can't just return the passed in subscription as it is a new object + // and doesn't contain the stored state we need. + //NOTE while this may be removed now anyone with an iterator will still have it in the list!! + sub = _subscriptions.remove(subIndex); + } + else + { + _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription); + } + } + if (sub != null) + { + return sub; } else { @@ -92,14 +107,11 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Return the next unsuspended subscription or null if not found. - * <p/> - * Performance note: - * This method can scan all items twice when looking for a subscription that is not - * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this - * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of - * race conditions and when subscriptions are removed between calls to nextSubscriber, the - * IndexOutOfBoundsException also causes the scan to start at the beginning. + * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all + * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions + * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed + * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to + * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning. */ public Subscription nextSubscriber(AMQMessage msg) { @@ -156,9 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - /** - * Overridden in test classes. - */ + /** Overridden in test classes. */ protected void subscriberScanned() { } @@ -199,8 +209,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Notification that a queue has been deleted. This is called so that the subscription can inform the - * channel, which in turn can update its list of unacknowledged messages. + * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which + * in turn can update its list of unacknowledged messages. * * @param queue */ |