diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java | 89 |
1 files changed, 67 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b73b8d7e07..882efd380d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -37,7 +37,9 @@ class SubscriptionSet implements WeightedSubscriptionManager /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _subscriptionsChange = new Object(); + + private final Object _changeLock = new Object(); + private volatile boolean _exclusive; /** Accessor for unit tests. */ @@ -48,7 +50,7 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - synchronized (_subscriptionsChange) + synchronized (_changeLock) { _subscriptions.add(subscription); } @@ -66,7 +68,7 @@ class SubscriptionSet implements WeightedSubscriptionManager // TODO: possibly need O(1) operation here. Subscription sub = null; - synchronized (_subscriptionsChange) + synchronized (_changeLock) { int subIndex = _subscriptions.indexOf(subscription); @@ -115,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager */ public Subscription nextSubscriber(QueueEntry msg) { - if (_subscriptions.isEmpty()) - { - return null; - } + try { @@ -142,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager private Subscription nextSubscriberImpl(QueueEntry msg) { - final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) + if(_exclusive) { - Subscription subscription = iterator.next(); - ++_currentSubscriber; - subscriberScanned(); - - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + try { - if (subscription.hasInterest(msg)) + Subscription subscription = _subscriptions.get(0); + subscriberScanned(); + + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (subscription.hasInterest(msg)) { - return subscription; + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } } } } + catch(IndexOutOfBoundsException e) + { + } + return null; } + else + { + if (_subscriptions.isEmpty()) + { + return null; + } + final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); + while (iterator.hasNext()) + { + Subscription subscription = iterator.next(); + ++_currentSubscriber; + subscriberScanned(); - return null; + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } + } + + return null; + } } /** Overridden in test classes. */ @@ -226,4 +259,16 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _subscriptions.size(); } + + + public Object getChangeLock() + { + return _changeLock; + } + + public void setExclusive(final boolean exclusive) + { + _exclusive = exclusive; + } + } |