summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java322
1 files changed, 224 insertions, 98 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 0fc8753a87..208a59516c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -24,9 +24,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -38,12 +43,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-/**
- * Manages delivery of messages on behalf of a queue
- */
+/** Manages delivery of messages on behalf of a queue */
public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -51,47 +56,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
@Configured(path = "advanced.compressBufferOnQueue",
defaultValue = "false")
public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
- private final ReentrantLock _messageAccessLock = new ReentrantLock();
+ /** Holds any queued messages */
+ private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
+ /** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
+ /** The subscriptions on the queue to whom messages are delivered */
private final SubscriptionManager _subscriptions;
/**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
+ * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
+ * acknowledgements a handle on the queue.
*/
private final AMQQueue _queue;
/**
- * Flag used while moving messages from this queue to another. For moving messages the async delivery
- * should also stop. This flat should be set to true to stop async delivery and set to false to enable
- * async delivery again.
+ * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
+ * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
*/
private AtomicBoolean _movingMessages = new AtomicBoolean();
-
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
+ * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
+ * of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
-
+ private AtomicInteger _extraMessages = new AtomicInteger();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,7 +103,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private boolean addMessageToQueue(AMQMessage msg)
+ private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
{
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
@@ -122,7 +116,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
_totalMessageSize.addAndGet(msg.getSize());
@@ -135,7 +136,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !(_messages.isEmpty() && _hasContent.isEmpty());
}
finally
{
@@ -149,18 +150,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
+ * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
*
* @return int the number of messages in the delivery queue.
*/
private int getMessageCount()
{
- return _messages.size();
+ return _messages.size() + _extraMessages.get();
}
-
public long getTotalMessageSize()
{
return _totalMessageSize.get();
@@ -172,6 +172,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ {
+ _lock.lock();
+ try
+ {
+ if (hasContent)
+ {
+ _log.debug("Queue has adding subscriber content");
+ _hasContent.add(subscription);
+ _totalMessageSize.addAndGet(msg.getSize());
+ _extraMessages.addAndGet(1);
+ }
+ else
+ {
+ _log.debug("Queue has removing subscriber content");
+ if (msg == null)
+ {
+ _hasContent.remove(subscription);
+ }
+ else
+ {
+ _totalMessageSize.addAndGet(-msg.getSize());
+ _extraMessages.addAndGet(-1);
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
public List<AMQMessage> getMessages()
{
@@ -195,7 +227,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = currentQueue.next();
if (subscription.hasInterest(message))
{
- subscription.enqueueForPreDelivery(message);
+ subscription.enqueueForPreDelivery(message, false);
}
}
}
@@ -203,7 +235,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
AMQMessage msg = getNextMessage();
- if(msg == null)
+ if (msg == null)
{
return false;
}
@@ -229,7 +261,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
_queue.dequeue(channel.getStoreContext(), msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -252,8 +284,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
- * so that the asyn delivery is also stopped.
+ * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
+ * the asyn delivery is also stopped.
*/
public void startMovingMessages()
{
@@ -262,8 +294,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
- * so that the async delivery can start again.
+ * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
+ * the async delivery can start again.
*/
public void stopMovingMessages()
{
@@ -276,6 +308,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Messages will be removed from this queue and all preDeliveryQueues
+ *
* @param messageList
*/
public void removeMovedMessages(List<AMQMessage> messageList)
@@ -308,7 +341,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+ *
* @param storeContext
+ *
* @throws AMQException
*/
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
@@ -318,11 +353,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
// mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
getNextMessage();
}
-
+
_lock.unlock();
}
@@ -335,7 +370,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
//mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
msg = getNextMessage();
count++;
@@ -347,20 +382,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public synchronized AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages);
+ return getNextMessage(_messages, null);
}
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
- {
- return getNextMessage(messages, false);
- }
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && (browsing || message.taken()))
+
+ while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -371,27 +401,76 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ ") from queue (" + System.identityHashCode(messageQueue) +
+ ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ }
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ }
+ return;
+ }
+
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub.isBrowser());
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+
+ if (messageQueue == sub.getResendQueue())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("All messages sent from resendQueue for " + sub);
+ }
+ if (messageQueue.isEmpty())
+ {
+ subscriberHasPendingResend(false, sub, null);
+ //better to use the above method as this keeps all the tracking in one location.
+// _hasContent.remove(sub);
+ }
+
+ _extraMessages.decrementAndGet();
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ _log.info("We could do clean up of the main _message queue here");
+ }
+
_totalMessageSize.addAndGet(-message.getSize());
}
catch (AMQException e)
@@ -403,6 +482,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* enqueues the messages in the list on the queue and all required predelivery queues
+ *
* @param storeContext
* @param movedMessageList
*/
@@ -411,7 +491,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
for (AMQMessage msg : movedMessageList)
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, true);
}
// enqueue on the pre delivery queues
@@ -422,7 +502,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, true);
}
}
}
@@ -430,8 +510,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
+ * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
+ * deliver().
*/
private void processQueue()
{
@@ -444,40 +524,43 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended())
+ synchronized (sub.getSendLock())
{
- sendNextMessage(sub);
-
- hasSubscribers = true;
- }
- }
- }
- }
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub, _queue);
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
+ hasSubscribers = true;
+ }
}
}
}
- else
- {
- sendNextMessage(sub, _messages);
- }
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
+// private void sendNextMessage(Subscription sub)
+// {
+// if (sub.hasFilters())
+// {
+// sendNextMessage(sub, sub.getPreDeliveryQueue());
+// if (sub.isAutoClose())
+// {
+// if (sub.getPreDeliveryQueue().isEmpty())
+// {
+// sub.close();
+// }
+// }
+// }
+// else
+// {
+// sendNextMessage(sub, _messages);
+// }
+// }
+
+ public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + msg);
+ _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
}
msg.release();
@@ -491,11 +574,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -504,7 +587,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
_log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to.");
+ " subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
@@ -528,7 +611,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, deliverFirst);
}
}
}
@@ -537,14 +620,47 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
//release lock now
_lock.unlock();
-
- if (_log.isDebugEnabled())
+ synchronized (s.getSendLock())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
+ }
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
+ }
+ }
}
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
@@ -593,9 +709,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.debug("Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -608,4 +722,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ private String currentStatus()
+ {
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
+ "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get() +
+ " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+ }
+
}