summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java82
1 files changed, 43 insertions, 39 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 882efd380d..117799ad87 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -22,22 +22,27 @@ package org.apache.qpid.server.queue;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.subscription.Subscription;
/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
-class SubscriptionSet implements WeightedSubscriptionManager
+class SubscriptionSet implements SubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
/** List of registered subscribers */
- private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+ private final List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+
+ private final Map<Subscription, DeliveryAgent> _deliveryAgents =
+ new ConcurrentHashMap<Subscription, DeliveryAgent>();
+
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
-
private final Object _changeLock = new Object();
private volatile boolean _exclusive;
@@ -53,6 +58,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
synchronized (_changeLock)
{
_subscriptions.add(subscription);
+ _deliveryAgents.put(subscription, new DeliveryAgent(subscription));
}
}
@@ -66,33 +72,22 @@ class SubscriptionSet implements WeightedSubscriptionManager
public Subscription removeSubscriber(Subscription subscription)
{
// TODO: possibly need O(1) operation here.
-
- Subscription sub = null;
+
synchronized (_changeLock)
{
- int subIndex = _subscriptions.indexOf(subscription);
- if (subIndex != -1)
+ if (_subscriptions.remove(subscription))
{
- //we can't just return the passed in subscription as it is a new object
- // and doesn't contain the stored state we need.
- //NOTE while this may be removed now anyone with an iterator will still have it in the list!!
- sub = _subscriptions.remove(subIndex);
+ _deliveryAgents.remove(subscription);
+ return subscription;
}
else
{
- _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription);
+ _log.error("Unable to remove subscription:" + subscription);
+ debugDumpSubscription(subscription);
+ return null;
}
}
- if (sub != null)
- {
- return sub;
- }
- else
- {
- debugDumpSubscription(subscription);
- return null;
- }
}
private void debugDumpSubscription(Subscription subscription)
@@ -148,17 +143,18 @@ class SubscriptionSet implements WeightedSubscriptionManager
Subscription subscription = _subscriptions.get(0);
subscriberScanned();
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ if (!subscription.isSuspended() )
{
if (subscription.hasInterest(msg))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ DeliveryAgent deliverAgent = _deliveryAgents.get(subscription);
+
+ if (deliverAgent.ableToDeliver())
{
- return subscription;
+ if(!subscription.wouldSuspend(msg))
+ {
+ return subscription;
+ }
}
}
}
@@ -177,19 +173,17 @@ class SubscriptionSet implements WeightedSubscriptionManager
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
while (iterator.hasNext())
{
+
Subscription subscription = iterator.next();
+ DeliveryAgent deliverAgent = _deliveryAgents.get(subscription);
++_currentSubscriber;
subscriberScanned();
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ if (!(deliverAgent == null || subscription.isSuspended()))
{
- if (subscription.hasInterest(msg))
+ if (subscription.hasInterest(msg) && deliverAgent.ableToDeliver())
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.wouldSuspend(msg))
{
return subscription;
}
@@ -228,12 +222,12 @@ class SubscriptionSet implements WeightedSubscriptionManager
return false;
}
- public int getWeight()
+ public int getActiveConsumerCount()
{
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended())
+ if (s.isActive())
{
count++;
}
@@ -241,13 +235,18 @@ class SubscriptionSet implements WeightedSubscriptionManager
return count;
}
+ public int getConsumerCount()
+ {
+ return size();
+ }
+
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
* in turn can update its list of unacknowledged messages.
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue) throws AMQException
+ public void queueDeleted(AMQQueue queue)
{
for (Subscription s : _subscriptions)
{
@@ -271,4 +270,9 @@ class SubscriptionSet implements WeightedSubscriptionManager
_exclusive = exclusive;
}
+
+ public DeliveryAgent getDeliveryAgent(final Subscription sub)
+ {
+ return _deliveryAgents.get(sub);
+ }
}