summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java200
1 files changed, 166 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 907d68b733..eabc8ebf38 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -211,6 +211,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ /** @return the state of the async processor. */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
/**
* Returns all the messages in the Queue
*
@@ -281,9 +287,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (currentQueue.hasNext())
{
AMQMessage message = currentQueue.next();
- if (subscription.hasInterest(message))
+ if (!message.getDeliveredToConsumer())
{
- subscription.enqueueForPreDelivery(message, false);
+ if (subscription.hasInterest(message))
+ {
+ subscription.enqueueForPreDelivery(message, false);
+ }
}
}
}
@@ -330,6 +339,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
deliveryTag, _queue.getMessageCount());
_totalMessageSize.addAndGet(-msg.getSize());
}
+
+ if (!acks)
+ {
+ msg.decrementReference(channel.getStoreContext());
+ }
}
finally
{
@@ -402,14 +416,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @throws AMQException
*/
- public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+ public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_lock.lock();
AMQMessage message = _messages.poll();
+
if (message != null)
{
+ queue.dequeue(storeContext, message);
+
_totalMessageSize.addAndGet(-message.getSize());
+
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ message.decrementReference(storeContext);
+
}
_lock.unlock();
@@ -429,6 +450,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_messages.poll();
_queue.dequeue(storeContext, msg);
+
+ msg.decrementReference(_reapingStoreContext);
+
msg = getNextMessage();
count++;
}
@@ -447,15 +471,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages, null);
+ return getNextMessage(_messages, null, false);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException
{
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(message, sub))
+ while (purgeMessage(message, sub, purgeOnly))
{
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
@@ -467,12 +491,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
assert removed == message;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue))
+ if (message.expired(_queue) && !message.getDeliveredToConsumer())
{
_totalMessageSize.addAndGet(-message.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- message.dequeue(_reapingStoreContext, _queue);
+ _queue.dequeue(_reapingStoreContext, message);
+
+ message.decrementReference(_reapingStoreContext);
if (_log.isInfoEnabled())
{
@@ -496,17 +522,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This method will return true if the message is to be purged from the queue.
+ * This method will return true if the message is to be purged from the queue.
*
*
- * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ *
* @param message
* @param sub
+ *
* @return
+ *
* @throws AMQException
*/
private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
{
+ return purgeMessage(message, sub, false);
+ }
+
+ /**
+ * This method will return true if the message is to be purged from the queue.
+ * \
+ * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false
+ *
+ * @param message
+ * @param sub
+ * @param purgeOnly When set to false the message will be taken by the given Subscription.
+ *
+ * @return if the msg should be purged
+ *
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException
+ {
//Original.. complicated while loop control
// (message != null
// && (
@@ -541,9 +588,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- // if we are purging then ensure we mark this message taken for the current subscriber
- // the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.taken(_queue, sub);
+ if (purgeOnly)
+ {
+ // If we are simply purging the queue don't take the message
+ // just purge up to the next non-taken msg.
+ return purge && message.isTaken(_queue);
+ }
+ else
+ {
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(_queue, sub);
+ }
}
public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
@@ -574,7 +630,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
synchronized (_queueHeadLock)
{
- message = getNextMessage(messageQueue, sub);
+ message = getNextMessage(messageQueue, sub, false);
// message will be null if we have no messages in the messageQueue.
if (message == null)
@@ -592,6 +648,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
") to :" + System.identityHashCode(sub));
}
+
+ if (messageQueue == _messages)
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
+
sub.send(message, _queue);
//remove sent message from our queue.
@@ -635,14 +697,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//fixme - we should do the clean up as the message remains on the _message queue
// this is resulting in the next consumer receiving the message and then attempting to purge it
//
- _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
+ cleanMainQueue(sub);
}
}
- if ((message != null) && (messageQueue == _messages))
- {
- _totalMessageSize.addAndGet(-message.getSize());
- }
}
catch (AMQException e)
{
@@ -658,6 +716,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ private void cleanMainQueue(Subscription sub)
+ {
+ try
+ {
+ getNextMessage(_messages, sub, true);
+ }
+ catch (AMQException e)
+ {
+ _log.warn("Problem during main queue purge:" + e.getMessage());
+ }
+ }
+
/**
* enqueues the messages in the list on the queue and all required predelivery queues
*
@@ -747,7 +817,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null) //no-one can take the message right now.
+ if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
{
if (debugEnabled)
{
@@ -791,10 +861,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
sub.enqueueForPreDelivery(msg, deliverFirst);
}
}
+
+ //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong!
+ if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
}
}
else
{
+
+ if (s.filtersMessages())
+ {
+ if (s.getPreDeliveryQueue().size() > 0)
+ {
+ _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size());
+ }
+ }
+ else if (_messages.size() > 0)
+ {
+ _log.error("Direct delivery from MainQueue queued msgs:" + _messages.size());
+ }
+
//release lock now
_lock.unlock();
synchronized (s.getSendLock())
@@ -806,7 +896,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(_queue, s);
+
+ if (msg.taken(_queue, s))
+ {
+ //Message has been delivered so don't redeliver.
+ // This can currently occur because of the recursive call below
+ // During unit tests the send can occur
+ // client then rejects
+ // this reject then releases the message by the time the
+ // if(!msg.isTaken()) call is made below
+ // the message has been released so that thread loops to send the message again
+ // of course by the time it gets back to here. the thread that released the
+ // message is now ready to send it. Here is a sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]}
+ // Note: In the last request to take the message from thread 4,5 the message has been
+ // taken by the previous call done by thread 2,5
+
+
+ return;
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -820,6 +939,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ //
+ // Why do we do this? What was the reasoning? We should have a better approach
+ // than recursion and rejecting if someone else sends it before we do.
+ //
if (!msg.isTaken(_queue))
{
if (debugEnabled)
@@ -859,12 +982,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return id;
}
- Runner asyncDelivery = new Runner();
+ final Runner _asyncDelivery = new Runner();
private class Runner implements Runnable
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -873,13 +998,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Check that messages have not been added since we did our last peek();
// Synchronize with the thread that adds to the queue.
// If the queue is still empty then we can exit
-
- if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ synchronized (_asyncDelivery)
{
- running = false;
- _processing.set(false);
+ if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -890,24 +1018,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(debugIdentity() + "Processing Async." + currentStatus());
}
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ synchronized (_asyncDelivery)
{
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
- if (_log.isDebugEnabled())
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
{
- _log.debug(debugIdentity() + "Executing Async process.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
+ executor.execute(_asyncDelivery);
}
- executor.execute(asyncDelivery);
}
}
}
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
+ ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +