diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 322 |
1 files changed, 224 insertions, 98 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 0fc8753a87..208a59516c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -24,9 +24,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.Set; +import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -38,12 +43,12 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.util.MessageQueue; +import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -/** - * Manages delivery of messages on behalf of a queue - */ +/** Manages delivery of messages on behalf of a queue */ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); @@ -51,47 +56,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager @Configured(path = "advanced.compressBufferOnQueue", defaultValue = "false") public boolean compressBufferOnQueue; - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - - private final ReentrantLock _messageAccessLock = new ReentrantLock(); + /** Holds any queued messages */ + private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ + /** Ensures that only one asynchronous task is running for this manager at any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ + /** The subscriptions on the queue to whom messages are delivered */ private final SubscriptionManager _subscriptions; /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. + * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles + * acknowledgements a handle on the queue. */ private final AMQQueue _queue; /** - * Flag used while moving messages from this queue to another. For moving messages the async delivery - * should also stop. This flat should be set to true to stop async delivery and set to false to enable - * async delivery again. + * Flag used while moving messages from this queue to another. For moving messages the async delivery should also + * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again. */ private AtomicBoolean _movingMessages = new AtomicBoolean(); - + /** * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. + * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be + * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition + * of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); private AtomicLong _totalMessageSize = new AtomicLong(); - + private AtomicInteger _extraMessages = new AtomicInteger(); + private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -109,7 +103,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - private boolean addMessageToQueue(AMQMessage msg) + private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst) { // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) @@ -122,7 +116,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - _messages.offer(msg); + if (deliverFirst) + { + _messages.pushHead(msg); + } + else + { + _messages.offer(msg); + } _totalMessageSize.addAndGet(msg.getSize()); @@ -135,7 +136,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); try { - return !_messages.isEmpty(); + return !(_messages.isEmpty() && _hasContent.isEmpty()); } finally { @@ -149,18 +150,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine + * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. * * @return int the number of messages in the delivery queue. */ private int getMessageCount() { - return _messages.size(); + return _messages.size() + _extraMessages.get(); } - public long getTotalMessageSize() { return _totalMessageSize.get(); @@ -172,6 +172,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); } + public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg) + { + _lock.lock(); + try + { + if (hasContent) + { + _log.debug("Queue has adding subscriber content"); + _hasContent.add(subscription); + _totalMessageSize.addAndGet(msg.getSize()); + _extraMessages.addAndGet(1); + } + else + { + _log.debug("Queue has removing subscriber content"); + if (msg == null) + { + _hasContent.remove(subscription); + } + else + { + _totalMessageSize.addAndGet(-msg.getSize()); + _extraMessages.addAndGet(-1); + } + } + } + finally + { + _lock.unlock(); + } + } + public List<AMQMessage> getMessages() { @@ -195,7 +227,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = currentQueue.next(); if (subscription.hasInterest(message)) { - subscription.enqueueForPreDelivery(message); + subscription.enqueueForPreDelivery(message, false); } } } @@ -203,7 +235,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException { AMQMessage msg = getNextMessage(); - if(msg == null) + if (msg == null) { return false; } @@ -229,7 +261,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } _queue.dequeue(channel.getStoreContext(), msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -252,8 +284,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, - * so that the asyn delivery is also stopped. + * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that + * the asyn delivery is also stopped. */ public void startMovingMessages() { @@ -262,8 +294,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, - * so that the async delivery can start again. + * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that + * the async delivery can start again. */ public void stopMovingMessages() { @@ -276,6 +308,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Messages will be removed from this queue and all preDeliveryQueues + * * @param messageList */ public void removeMovedMessages(List<AMQMessage> messageList) @@ -308,7 +341,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Now with implementation of predelivery queues, this method will mark the message on the top as taken. + * * @param storeContext + * * @throws AMQException */ public void removeAMessageFromTop(StoreContext storeContext) throws AMQException @@ -318,11 +353,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (msg != null) { // mark this message as taken and get it removed - msg.taken(); + msg.taken(null); _queue.dequeue(storeContext, msg); getNextMessage(); } - + _lock.unlock(); } @@ -335,7 +370,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { //mark this message as taken and get it removed - msg.taken(); + msg.taken(null); _queue.dequeue(storeContext, msg); msg = getNextMessage(); count++; @@ -347,20 +382,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public synchronized AMQMessage getNextMessage() throws AMQException { - return getNextMessage(_messages); + return getNextMessage(_messages, null); } - - private AMQMessage getNextMessage(Queue<AMQMessage> messages) - { - return getNextMessage(messages, false); - } - - private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing) + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) { AMQMessage message = messages.peek(); - while (message != null && (browsing || message.taken())) + + while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub))) { //remove the already taken message messages.poll(); @@ -371,27 +401,76 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } - public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) { + + Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + + if (_log.isTraceEnabled()) + { + _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + ") from queue (" + System.identityHashCode(messageQueue) + + ") AMQQueue (" + System.identityHashCode(queue) + ")"); + } + + if (messageQueue == null) + { + // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector + if (_log.isDebugEnabled()) + { + _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + } + return; + } + AMQMessage message = null; try { - message = getNextMessage(messageQueue, sub.isBrowser()); + message = getNextMessage(messageQueue, sub); // message will be null if we have no messages in the messageQueue. if (message == null) { + if (_log.isTraceEnabled()) + { + _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } return; } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message:" + message + " to :" + sub); + _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } sub.send(message, _queue); //remove sent message from our queue. messageQueue.poll(); + //If we don't remove the message from _messages + // Otherwise the Async send will never end + + if (messageQueue == sub.getResendQueue()) + { + if (_log.isTraceEnabled()) + { + _log.trace("All messages sent from resendQueue for " + sub); + } + if (messageQueue.isEmpty()) + { + subscriberHasPendingResend(false, sub, null); + //better to use the above method as this keeps all the tracking in one location. +// _hasContent.remove(sub); + } + + _extraMessages.decrementAndGet(); + } + else if (messageQueue == sub.getPreDeliveryQueue()) + { + _log.info("We could do clean up of the main _message queue here"); + } + _totalMessageSize.addAndGet(-message.getSize()); } catch (AMQException e) @@ -403,6 +482,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * enqueues the messages in the list on the queue and all required predelivery queues + * * @param storeContext * @param movedMessageList */ @@ -411,7 +491,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); for (AMQMessage msg : movedMessageList) { - addMessageToQueue(msg); + addMessageToQueue(msg, true); } // enqueue on the pre delivery queues @@ -422,7 +502,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Only give the message to those that want them. if (sub.hasInterest(msg)) { - sub.enqueueForPreDelivery(msg); + sub.enqueueForPreDelivery(msg, true); } } } @@ -430,8 +510,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). + * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke + * deliver(). */ private void processQueue() { @@ -444,40 +524,43 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended()) + synchronized (sub.getSendLock()) { - sendNextMessage(sub); - - hasSubscribers = true; - } - } - } - } + if (!sub.isSuspended()) + { + sendNextMessage(sub, _queue); - private void sendNextMessage(Subscription sub) - { - if (sub.hasFilters()) - { - sendNextMessage(sub, sub.getPreDeliveryQueue()); - if (sub.isAutoClose()) - { - if (sub.getPreDeliveryQueue().isEmpty()) - { - sub.close(); + hasSubscribers = true; + } } } } - else - { - sendNextMessage(sub, _messages); - } } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException +// private void sendNextMessage(Subscription sub) +// { +// if (sub.hasFilters()) +// { +// sendNextMessage(sub, sub.getPreDeliveryQueue()); +// if (sub.isAutoClose()) +// { +// if (sub.getPreDeliveryQueue().isEmpty()) +// { +// sub.close(); +// } +// } +// } +// else +// { +// sendNextMessage(sub, _messages); +// } +// } + + public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :" + msg); + _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg); } msg.release(); @@ -491,11 +574,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery"); + _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } if (!msg.getMessagePublishInfo().isImmediate()) { - addMessageToQueue(msg); + addMessageToQueue(msg, deliverFirst); //release lock now message is on queue. _lock.unlock(); @@ -504,7 +587,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + - " subscribers to give the message to."); + " subscribers to give the message to:" + currentStatus()); } for (Subscription sub : _subscriptions.getSubscriptions()) { @@ -528,7 +611,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } - sub.enqueueForPreDelivery(msg); + sub.enqueueForPreDelivery(msg, deliverFirst); } } } @@ -537,14 +620,47 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //release lock now _lock.unlock(); - - if (_log.isDebugEnabled()) + synchronized (s.getSendLock()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + - System.identityHashCode(s) + ") :" + s); + if (!s.isSuspended()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + System.identityHashCode(s) + ") :" + s); + } + msg.taken(s); + //Deliver the message + s.send(msg, _queue); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + } + } + + if (!msg.isTaken()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); + } + + deliver(context, name, msg, deliverFirst); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); + } + } } - //Deliver the message - s.send(msg, _queue); } } finally @@ -593,9 +709,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); + _log.debug("Processing Async." + currentStatus()); } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) @@ -608,4 +722,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + private String currentStatus() + { + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " + + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get() + + " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") "; + } + } |