diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 86 |
1 files changed, 58 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 2b12b8e14c..f66604a5c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -31,10 +31,17 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; /** Manages delivery of messages on behalf of a queue */ @@ -68,6 +75,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private ReentrantLock _lock = new ReentrantLock(); private AtomicLong _totalMessageSize = new AtomicLong(); + private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -111,7 +119,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); try { - return !_messages.isEmpty(); + return !_messages.isEmpty() || !_hasContent.isEmpty(); } finally { @@ -146,6 +154,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); } + public void setQueueHasContent(Subscription subscription) + { + _lock.lock(); + try + { + + _log.debug("Queue has content Set"); + _hasContent.add(subscription); + } + finally + { + _lock.unlock(); + } + } public synchronized List<AMQMessage> getMessages() { @@ -197,7 +219,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - while (message != null && (sub.isBrowser() || message.taken())) + while (message != null && (sub.isBrowser() || message.taken(sub))) { //remove the already taken message messages.poll(); @@ -207,8 +229,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } - public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) + public void sendNextMessage(Subscription sub, AMQQueue queue) { + + Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + + if (messageQueue == null) + { + // There is no queue with messages currently + _log.warn(sub + ": asked to send messages but has none on given queue:" + queue); + return; + } AMQMessage message = null; try { @@ -221,14 +252,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message:" + message + " to :" + sub); + _log.debug("Async Delivery Message:" + message + " to :" + this); } - sub.send(message, _queue); + sub.send(message, queue); //remove sent message from our queue. messageQueue.poll(); - _totalMessageSize.addAndGet(-message.getSize()); + + //If we don't remove the message from _messages + // Otherwise the Async send will never end + if (messageQueue.isEmpty()) + { + if (messageQueue == sub.getResendQueue()) + { + _hasContent.remove(sub); + } + else if (messageQueue == sub.getPreDeliveryQueue()) + { + //fixme + _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages"); + //_messages.remove(message); + } + } + } catch (FailedDequeueException e) { @@ -254,7 +301,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!sub.isSuspended()) { - sendNextMessage(sub); + sendNextMessage(sub, _queue); hasSubscribers = true; } @@ -262,25 +309,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - private void sendNextMessage(Subscription sub) - { - if (sub.hasFilters()) - { - sendNextMessage(sub, sub.getPreDeliveryQueue()); - if (sub.isAutoClose()) - { - if (sub.getPreDeliveryQueue().isEmpty()) - { - sub.close(); - } - } - } - else - { - sendNextMessage(sub, _messages); - } - } - private AMQMessage poll() { return _messages.poll(); @@ -355,6 +383,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } + //Mark message as taken + msg.taken(s); //Deliver the message s.send(msg, _queue); } @@ -405,8 +435,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:" + + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() + " Processing:" + _processing.get()); } |