diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java | 82 |
1 files changed, 43 insertions, 39 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 882efd380d..117799ad87 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -22,22 +22,27 @@ package org.apache.qpid.server.queue; import java.util.List; import java.util.ListIterator; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.subscription.Subscription; /** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */ -class SubscriptionSet implements WeightedSubscriptionManager +class SubscriptionSet implements SubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); /** List of registered subscribers */ - private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); + private final List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); + + private final Map<Subscription, DeliveryAgent> _deliveryAgents = + new ConcurrentHashMap<Subscription, DeliveryAgent>(); + /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _changeLock = new Object(); private volatile boolean _exclusive; @@ -53,6 +58,7 @@ class SubscriptionSet implements WeightedSubscriptionManager synchronized (_changeLock) { _subscriptions.add(subscription); + _deliveryAgents.put(subscription, new DeliveryAgent(subscription)); } } @@ -66,33 +72,22 @@ class SubscriptionSet implements WeightedSubscriptionManager public Subscription removeSubscriber(Subscription subscription) { // TODO: possibly need O(1) operation here. - - Subscription sub = null; + synchronized (_changeLock) { - int subIndex = _subscriptions.indexOf(subscription); - if (subIndex != -1) + if (_subscriptions.remove(subscription)) { - //we can't just return the passed in subscription as it is a new object - // and doesn't contain the stored state we need. - //NOTE while this may be removed now anyone with an iterator will still have it in the list!! - sub = _subscriptions.remove(subIndex); + _deliveryAgents.remove(subscription); + return subscription; } else { - _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription); + _log.error("Unable to remove subscription:" + subscription); + debugDumpSubscription(subscription); + return null; } } - if (sub != null) - { - return sub; - } - else - { - debugDumpSubscription(subscription); - return null; - } } private void debugDumpSubscription(Subscription subscription) @@ -148,17 +143,18 @@ class SubscriptionSet implements WeightedSubscriptionManager Subscription subscription = _subscriptions.get(0); subscriberScanned(); - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + if (!subscription.isSuspended() ) { if (subscription.hasInterest(msg)) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + DeliveryAgent deliverAgent = _deliveryAgents.get(subscription); + + if (deliverAgent.ableToDeliver()) { - return subscription; + if(!subscription.wouldSuspend(msg)) + { + return subscription; + } } } } @@ -177,19 +173,17 @@ class SubscriptionSet implements WeightedSubscriptionManager final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); while (iterator.hasNext()) { + Subscription subscription = iterator.next(); + DeliveryAgent deliverAgent = _deliveryAgents.get(subscription); ++_currentSubscriber; subscriberScanned(); - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + if (!(deliverAgent == null || subscription.isSuspended())) { - if (subscription.hasInterest(msg)) + if (subscription.hasInterest(msg) && deliverAgent.ableToDeliver()) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (!subscription.wouldSuspend(msg)) { return subscription; } @@ -228,12 +222,12 @@ class SubscriptionSet implements WeightedSubscriptionManager return false; } - public int getWeight() + public int getActiveConsumerCount() { int count = 0; for (Subscription s : _subscriptions) { - if (!s.isSuspended()) + if (s.isActive()) { count++; } @@ -241,13 +235,18 @@ class SubscriptionSet implements WeightedSubscriptionManager return count; } + public int getConsumerCount() + { + return size(); + } + /** * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which * in turn can update its list of unacknowledged messages. * * @param queue */ - public void queueDeleted(AMQQueue queue) throws AMQException + public void queueDeleted(AMQQueue queue) { for (Subscription s : _subscriptions) { @@ -271,4 +270,9 @@ class SubscriptionSet implements WeightedSubscriptionManager _exclusive = exclusive; } + + public DeliveryAgent getDeliveryAgent(final Subscription sub) + { + return _deliveryAgents.get(sub); + } } |