summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java70
1 files changed, 40 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 871f063725..26b040aae0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -27,27 +27,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
+/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
class SubscriptionSet implements WeightedSubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /**
- * List of registered subscribers
- */
+ /** List of registered subscribers */
private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
- /**
- * Used to control the round robin delivery of content
- */
+ /** Used to control the round robin delivery of content */
private int _currentSubscriber;
+ private final Object _subscriptionsChange = new Object();
- /**
- * Accessor for unit tests.
- */
+
+ /** Accessor for unit tests. */
int getCurrentSubscriber()
{
return _currentSubscriber;
@@ -55,21 +48,43 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- _subscriptions.add(subscription);
+ synchronized (_subscriptionsChange)
+ {
+ _subscriptions.add(subscription);
+ }
}
/**
* Remove the subscription, returning it if it was found
*
* @param subscription
+ *
* @return null if no match was found
*/
public Subscription removeSubscriber(Subscription subscription)
{
- boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
- if (isRemoved)
+ // TODO: possibly need O(1) operation here.
+
+ Subscription sub = null;
+ synchronized (_subscriptionsChange)
{
- return subscription;
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
+ {
+ //we can't just return the passed in subscription as it is a new object
+ // and doesn't contain the stored state we need.
+ //NOTE while this may be removed now anyone with an iterator will still have it in the list!!
+ sub = _subscriptions.remove(subIndex);
+ }
+ else
+ {
+ _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription);
+ }
+ }
+ if (sub != null)
+ {
+ return sub;
}
else
{
@@ -92,14 +107,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Return the next unsuspended subscription or null if not found.
- * <p/>
- * Performance note:
- * This method can scan all items twice when looking for a subscription that is not
- * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
- * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
- * race conditions and when subscriptions are removed between calls to nextSubscriber, the
- * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all
+ * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions
+ * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed
+ * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
+ * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
public Subscription nextSubscriber(AMQMessage msg)
{
@@ -156,9 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- /**
- * Overridden in test classes.
- */
+ /** Overridden in test classes. */
protected void subscriberScanned()
{
}
@@ -199,8 +209,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Notification that a queue has been deleted. This is called so that the subscription can inform the
- * channel, which in turn can update its list of unacknowledged messages.
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
+ * in turn can update its list of unacknowledged messages.
*
* @param queue
*/