summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java89
1 files changed, 67 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index b73b8d7e07..882efd380d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -37,7 +37,9 @@ class SubscriptionSet implements WeightedSubscriptionManager
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
- private final Object _subscriptionsChange = new Object();
+
+ private final Object _changeLock = new Object();
+ private volatile boolean _exclusive;
/** Accessor for unit tests. */
@@ -48,7 +50,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
_subscriptions.add(subscription);
}
@@ -66,7 +68,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
// TODO: possibly need O(1) operation here.
Subscription sub = null;
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
int subIndex = _subscriptions.indexOf(subscription);
@@ -115,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
*/
public Subscription nextSubscriber(QueueEntry msg)
{
- if (_subscriptions.isEmpty())
- {
- return null;
- }
+
try
{
@@ -142,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager
private Subscription nextSubscriberImpl(QueueEntry msg)
{
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext())
+ if(_exclusive)
{
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
-
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ try
{
- if (subscription.hasInterest(msg))
+ Subscription subscription = _subscriptions.get(0);
+ subscriberScanned();
+
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (subscription.hasInterest(msg))
{
- return subscription;
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
}
}
}
+ catch(IndexOutOfBoundsException e)
+ {
+ }
+ return null;
}
+ else
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext())
+ {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
- return null;
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
}
/** Overridden in test classes. */
@@ -226,4 +259,16 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
return _subscriptions.size();
}
+
+
+ public Object getChangeLock()
+ {
+ return _changeLock;
+ }
+
+ public void setExclusive(final boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
}