diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 200 |
1 files changed, 166 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 907d68b733..eabc8ebf38 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -211,6 +211,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** @return the state of the async processor. */ + public boolean isProcessingAsync() + { + return _processing.get(); + } + /** * Returns all the messages in the Queue * @@ -281,9 +287,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (currentQueue.hasNext()) { AMQMessage message = currentQueue.next(); - if (subscription.hasInterest(message)) + if (!message.getDeliveredToConsumer()) { - subscription.enqueueForPreDelivery(message, false); + if (subscription.hasInterest(message)) + { + subscription.enqueueForPreDelivery(message, false); + } } } } @@ -330,6 +339,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager deliveryTag, _queue.getMessageCount()); _totalMessageSize.addAndGet(-msg.getSize()); } + + if (!acks) + { + msg.decrementReference(channel.getStoreContext()); + } } finally { @@ -402,14 +416,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws AMQException */ - public void removeAMessageFromTop(StoreContext storeContext) throws AMQException + public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException { _lock.lock(); AMQMessage message = _messages.poll(); + if (message != null) { + queue.dequeue(storeContext, message); + _totalMessageSize.addAndGet(-message.getSize()); + + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + message.decrementReference(storeContext); + } _lock.unlock(); @@ -429,6 +450,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.poll(); _queue.dequeue(storeContext, msg); + + msg.decrementReference(_reapingStoreContext); + msg = getNextMessage(); count++; } @@ -447,15 +471,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private AMQMessage getNextMessage() throws AMQException { - return getNextMessage(_messages, null); + return getNextMessage(_messages, null, false); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException { AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (purgeMessage(message, sub)) + while (purgeMessage(message, sub, purgeOnly)) { // if we are purging then ensure we mark this message taken for the current subscriber // the current subscriber may be null in the case of a get or a purge but this is ok. @@ -467,12 +491,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager assert removed == message; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(_queue)) + if (message.expired(_queue) && !message.getDeliveredToConsumer()) { _totalMessageSize.addAndGet(-message.getSize()); // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - message.dequeue(_reapingStoreContext, _queue); + _queue.dequeue(_reapingStoreContext, message); + + message.decrementReference(_reapingStoreContext); if (_log.isInfoEnabled()) { @@ -496,17 +522,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * This method will return true if the message is to be purged from the queue. + * This method will return true if the message is to be purged from the queue. * * - * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) + * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) + * * @param message * @param sub + * * @return + * * @throws AMQException */ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException { + return purgeMessage(message, sub, false); + } + + /** + * This method will return true if the message is to be purged from the queue. + * \ + * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false + * + * @param message + * @param sub + * @param purgeOnly When set to false the message will be taken by the given Subscription. + * + * @return if the msg should be purged + * + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException + { //Original.. complicated while loop control // (message != null // && ( @@ -541,9 +588,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - // if we are purging then ensure we mark this message taken for the current subscriber - // the current subscriber may be null in the case of a get or a purge but this is ok. - return purge && message.taken(_queue, sub); + if (purgeOnly) + { + // If we are simply purging the queue don't take the message + // just purge up to the next non-taken msg. + return purge && message.isTaken(_queue); + } + else + { + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } } public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) @@ -574,7 +630,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { synchronized (_queueHeadLock) { - message = getNextMessage(messageQueue, sub); + message = getNextMessage(messageQueue, sub, false); // message will be null if we have no messages in the messageQueue. if (message == null) @@ -592,6 +648,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager ") to :" + System.identityHashCode(sub)); } + + if (messageQueue == _messages) + { + _totalMessageSize.addAndGet(-message.getSize()); + } + sub.send(message, _queue); //remove sent message from our queue. @@ -635,14 +697,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //fixme - we should do the clean up as the message remains on the _message queue // this is resulting in the next consumer receiving the message and then attempting to purge it // - _log.info(debugIdentity() + "We should do clean up of the main _message queue here"); + cleanMainQueue(sub); } } - if ((message != null) && (messageQueue == _messages)) - { - _totalMessageSize.addAndGet(-message.getSize()); - } } catch (AMQException e) { @@ -658,6 +716,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + private void cleanMainQueue(Subscription sub) + { + try + { + getNextMessage(_messages, sub, true); + } + catch (AMQException e) + { + _log.warn("Problem during main queue purge:" + e.getMessage()); + } + } + /** * enqueues the messages in the list on the queue and all required predelivery queues * @@ -747,7 +817,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) //no-one can take the message right now. + if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing { if (debugEnabled) { @@ -791,10 +861,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager sub.enqueueForPreDelivery(msg, deliverFirst); } } + + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! + if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) + { + _queue.deliverAsync(); + } + } } else { + + if (s.filtersMessages()) + { + if (s.getPreDeliveryQueue().size() > 0) + { + _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size()); + } + } + else if (_messages.size() > 0) + { + _log.error("Direct delivery from MainQueue queued msgs:" + _messages.size()); + } + //release lock now _lock.unlock(); synchronized (s.getSendLock()) @@ -806,7 +896,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - msg.taken(_queue, s); + + if (msg.taken(_queue, s)) + { + //Message has been delivered so don't redeliver. + // This can currently occur because of the recursive call below + // During unit tests the send can occur + // client then rejects + // this reject then releases the message by the time the + // if(!msg.isTaken()) call is made below + // the message has been released so that thread loops to send the message again + // of course by the time it gets back to here. the thread that released the + // message is now ready to send it. Here is a sample trace for reference +//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]} +//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657 +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1) +//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]} + // Note: In the last request to take the message from thread 4,5 the message has been + // taken by the previous call done by thread 2,5 + + + return; + } //Deliver the message s.send(msg, _queue); } @@ -820,6 +939,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + // + // Why do we do this? What was the reasoning? We should have a better approach + // than recursion and rejecting if someone else sends it before we do. + // if (!msg.isTaken(_queue)) { if (debugEnabled) @@ -859,12 +982,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return id; } - Runner asyncDelivery = new Runner(); + final Runner _asyncDelivery = new Runner(); private class Runner implements Runnable { public void run() { + String startName = Thread.currentThread().getName(); + Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName); boolean running = true; while (running && !_movingMessages.get()) { @@ -873,13 +998,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Check that messages have not been added since we did our last peek(); // Synchronize with the thread that adds to the queue. // If the queue is still empty then we can exit - - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + synchronized (_asyncDelivery) { - running = false; - _processing.set(false); + if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + { + running = false; + _processing.set(false); + } } } + Thread.currentThread().setName(startName); } } @@ -890,24 +1018,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(debugIdentity() + "Processing Async." + currentStatus()); } - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) + synchronized (_asyncDelivery) { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) { - if (_log.isDebugEnabled()) + //are we already running? if so, don't re-run + if (_processing.compareAndSet(false, true)) { - _log.debug(debugIdentity() + "Executing Async process."); + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Executing Async process."); + } + executor.execute(_asyncDelivery); } - executor.execute(asyncDelivery); } } } private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + + "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + + ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + |