summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java77
1 files changed, 59 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 7cc3f5f719..a8f778244e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,29 +108,59 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+
+ if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg)))
+ {
+ return subscription;
+ }
+ // 2006-12-04 : It is fairer to simply skip the person who isn't interested.
+ // Although it does need to be looked at again.
+
+// else
+// {
+// //Don't take penalise a subscriber for not wanting this message.
+// // This would introduce unfairness sticking with the current subscriber
+// // will allow the next message to match.. although could lead to unfairness if:
+// // subscribers: a(bin) b(text) c(text)
+// // msgs : 1(text) 2(text) 3(bin)
+// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get
+// // the bin msg.
+// // Never said this was fair round-robin-ing.
+// //FIXME - Make a fair round robin.
+//
+// --_currentSubscriber;
+// }
}
}
return null;
@@ -149,7 +182,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +195,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -167,9 +206,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
@@ -177,7 +217,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}