summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java86
1 files changed, 58 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 2b12b8e14c..f66604a5c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -31,10 +31,17 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
/** Manages delivery of messages on behalf of a queue */
@@ -68,6 +75,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -111,7 +119,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !_messages.isEmpty() || !_hasContent.isEmpty();
}
finally
{
@@ -146,6 +154,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void setQueueHasContent(Subscription subscription)
+ {
+ _lock.lock();
+ try
+ {
+
+ _log.debug("Queue has content Set");
+ _hasContent.add(subscription);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
public synchronized List<AMQMessage> getMessages()
{
@@ -197,7 +219,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- while (message != null && (sub.isBrowser() || message.taken()))
+ while (message != null && (sub.isBrowser() || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -207,8 +229,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently
+ _log.warn(sub + ": asked to send messages but has none on given queue:" + queue);
+ return;
+ }
AMQMessage message = null;
try
{
@@ -221,14 +252,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message:" + message + " to :" + this);
}
- sub.send(message, _queue);
+ sub.send(message, queue);
//remove sent message from our queue.
messageQueue.poll();
- _totalMessageSize.addAndGet(-message.getSize());
+
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+ if (messageQueue.isEmpty())
+ {
+ if (messageQueue == sub.getResendQueue())
+ {
+ _hasContent.remove(sub);
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ //fixme
+ _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages");
+ //_messages.remove(message);
+ }
+ }
+
}
catch (FailedDequeueException e)
{
@@ -254,7 +301,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!sub.isSuspended())
{
- sendNextMessage(sub);
+ sendNextMessage(sub, _queue);
hasSubscribers = true;
}
@@ -262,25 +309,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
- }
- }
- }
- else
- {
- sendNextMessage(sub, _messages);
- }
- }
-
private AMQMessage poll()
{
return _messages.poll();
@@ -355,6 +383,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
+ //Mark message as taken
+ msg.taken(s);
//Deliver the message
s.send(msg, _queue);
}
@@ -405,8 +435,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:"
+ + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
" Processing:" + _processing.get());
}