diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 183 |
1 files changed, 110 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 7dfcae95c3..becd57752e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; @@ -47,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock; /** Manages delivery of messages on behalf of a queue */ -public class ConcurrentSelectorDeliveryManager implements DeliveryManager +public class ConcurrentSelectorDeliveryManager { private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); @@ -60,13 +61,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** Ensures that only one asynchronous task is running for this manager at any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); /** The subscriptions on the queue to whom messages are delivered */ - private final SubscriptionManager _subscriptions; + private final SubscriptionSet _subscriptions = new SubscriptionSet(); /** * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles * acknowledgements a handle on the queue. */ - private final AMQQueue _queue; + private final AMQQueueImpl _queue; /** * Flag used while moving messages from this queue to another. For moving messages the async delivery should also @@ -83,7 +84,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private ReentrantLock _lock = new ReentrantLock(); private AtomicLong _totalMessageSize = new AtomicLong(); private AtomicInteger _extraMessages = new AtomicInteger(); - private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); + private Set<DeliveryAgent> _hasContent = Collections.synchronizedSet(new HashSet<DeliveryAgent>()); private final Object _queueHeadLock = new Object(); private String _processingThreadName = ""; @@ -91,7 +92,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** Used by any reaping thread to purge messages */ private StoreContext _reapingStoreContext = new StoreContext(); - ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) + ConcurrentSelectorDeliveryManager(AMQQueueImpl queue) { //Set values from configuration @@ -102,11 +103,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.warn("Compressing Buffers on queue."); } - _subscriptions = subscriptions; _queue = queue; } + public SubscriptionSet getSubscribers() + { + return _subscriptions; + } + + private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst) { AMQMessage msg = entry.getMessage(); @@ -182,13 +188,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry) { + subscriberHasPendingResend(hasContent,_subscriptions.getDeliveryAgent(subscription),entry); + } + private void subscriberHasPendingResend(boolean hasContent, DeliveryAgent deliveryAgent, QueueEntry entry) + { _lock.lock(); try { if (hasContent) { _log.debug("Queue has adding subscriber content"); - _hasContent.add(subscription); + _hasContent.add(deliveryAgent); _totalMessageSize.addAndGet(entry.getSize()); _extraMessages.addAndGet(1); } @@ -197,7 +207,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug("Queue has removing subscriber content"); if (entry == null) { - _hasContent.remove(subscription); + _hasContent.remove(deliveryAgent); } else { @@ -302,20 +312,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void populatePreDeliveryQueue(Subscription subscription) { - if (_log.isDebugEnabled()) - { - _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); - } + populatePreDeliveryQueue(_subscriptions.getDeliveryAgent(subscription)); + } + private void populatePreDeliveryQueue(DeliveryAgent deliveryAgent) + { + Iterator<QueueEntry> currentQueue = _messages.iterator(); while (currentQueue.hasNext()) { QueueEntry entry = currentQueue.next(); - if (subscription.hasInterest(entry)) + if (deliveryAgent != null && deliveryAgent.hasInterest(entry)) { - subscription.enqueueForPreDelivery(entry, false); + deliveryAgent.enqueueForPreDelivery(entry, false); } } @@ -348,7 +359,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId()); } - _queue.dequeue(channel.getStoreContext(), entry); + entry.dequeue(channel.getStoreContext()); } synchronized (channel) { @@ -367,12 +378,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (!acks) { - entry.getMessage().decrementReference(channel.getStoreContext()); + entry.dispose(channel.getStoreContext()); } } finally { - entry.setDeliveredToConsumer(); + entry.setDeliveredToSubscription(); } return true; @@ -414,9 +425,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended() && sub.filtersMessages()) + DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub); + if (deliveryAgent != null && !sub.isSuspended() && sub.filtersMessages()) { - Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue(); + Queue<QueueEntry> preDeliveryQueue = deliveryAgent.getPreDeliveryQueue(); for (QueueEntry entry : messageList) { preDeliveryQueue.remove(entry); @@ -449,12 +461,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (entry != null) { - queue.dequeue(storeContext, entry); + entry.dequeue(storeContext); _totalMessageSize.addAndGet(-entry.getSize()); //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - entry.getMessage().decrementReference(storeContext); + entry.dispose(storeContext); } @@ -474,9 +486,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //and remove it _messages.poll(); - _queue.dequeue(storeContext, entry); + entry.dequeue(storeContext); - entry.getMessage().decrementReference(_reapingStoreContext); + entry.dispose(storeContext); entry = getNextMessage(); count++; @@ -506,7 +518,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) while (purgeMessage(entry, sub, purgeOnly)) { - AMQMessage message = entry.getMessage(); //remove the already taken message or expired QueueEntry removed = messages.poll(); @@ -514,14 +525,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager assert removed == entry; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(_queue) && !entry.taken(sub)) + if (entry.expired() && !entry.acquire(sub)) { _totalMessageSize.addAndGet(-entry.getSize()); // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - _queue.dequeue(_reapingStoreContext, entry); - - message.decrementReference(_reapingStoreContext); + entry.dequeue(_reapingStoreContext); + entry.dispose(_reapingStoreContext); if (_log.isInfoEnabled()) { @@ -534,7 +544,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug("Removed taken message:" + message.debugIdentity()); + _log.debug("Removed taken message:" + entry.debugIdentity()); } // try the next message @@ -546,24 +556,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * This method will return true if the message is to be purged from the queue. - * - * - * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) - * - * @param message - * @param sub - * - * @return - * - * @throws AMQException - */ - private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException - { - return purgeMessage(message, sub, false); - } - - /** - * This method will return true if the message is to be purged from the queue. * \ * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false * @@ -599,13 +591,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (sub != null) { // if we have a queue browser(we don't purge) so check mark the message as taken - purge = ((!sub.isBrowser() || message.isTaken())); + purge = ((!sub.isBrowser() || message.isAcquired())); } else { // if there is no subscription we are doing // a get or purging so mark message as taken. - message.isTaken(); + message.isAcquired(); // and then ensure that it gets purged purge = true; } @@ -615,17 +607,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { // If we are simply purging the queue don't take the message // just purge up to the next non-taken msg. - return purge && message.isTaken(); + return purge && message.isAcquired(); } else { // if we are purging then ensure we mark this message taken for the current subscriber // the current subscriber may be null in the case of a get or a purge but this is ok. - return purge && message.taken(sub); + return purge && message.acquire(sub); } } - public void sendNextMessage(Subscription sub, AMQQueue queue) + public void sendNextMessage(DeliveryAgent sub) { Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages); @@ -634,7 +626,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + - ") AMQQueue (" + System.identityHashCode(queue) + ")"); + ") AMQQueue (" + System.identityHashCode(_queue) + ")"); } if (messageQueue == null) @@ -642,18 +634,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector if (_log.isInfoEnabled()) { - _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue); + _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + _queue); } return; } QueueEntry entry = null; - QueueEntry removed = null; + QueueEntry removed; try { synchronized (_queueHeadLock) { - entry = getNextMessage(messageQueue, sub, false); + entry = getNextMessage(messageQueue, sub.getSubscription(), false); // message will be null if we have no messages in the messageQueue. if (entry == null) @@ -677,7 +669,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _totalMessageSize.addAndGet(-entry.getSize()); } - sub.send(entry, _queue); + sub.send(entry); //remove sent message from our queue. removed = messageQueue.poll(); @@ -739,11 +731,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - private void cleanMainQueue(Subscription sub) + private void cleanMainQueue(DeliveryAgent sub) { try { - getNextMessage(_messages, sub, true); + getNextMessage(_messages, sub.getSubscription(), true); } catch (AMQException e) { @@ -773,7 +765,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Only give the message to those that want them. if (sub.hasInterest(entry)) { - sub.enqueueForPreDelivery(entry, true); + DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub); + if(deliveryAgent != null) + { + deliveryAgent.enqueueForPreDelivery(entry, true); + } } } } @@ -806,13 +802,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - synchronized (sub.getSendLock()) + DeliveryAgent da = _subscriptions.getDeliveryAgent(sub); + if(da != null) { - if (!sub.isSuspended()) + synchronized (da.getSendLock()) { - sendNextMessage(sub, _queue); + if (!sub.isSuspended()) + { + sendNextMessage(da); - hasSubscribers = true; + hasSubscribers = true; + } } } } @@ -839,6 +839,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager try { Subscription s = _subscriptions.nextSubscriber(entry); + DeliveryAgent da = (s==null) ? null : _subscriptions.getDeliveryAgent(s); + if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing { @@ -861,16 +863,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } for (Subscription sub : _subscriptions.getSubscriptions()) { - + DeliveryAgent deliveryAgent = (s==null) ? null : _subscriptions.getDeliveryAgent(sub); // Only give the message to those that want them. - if (sub.hasInterest(entry)) + if (deliveryAgent != null && sub.hasInterest(entry)) { if (debugEnabled) { _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } - sub.enqueueForPreDelivery(entry, deliverFirst); + deliveryAgent.enqueueForPreDelivery(entry, deliverFirst); } } @@ -887,9 +889,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (s.filtersMessages()) { - if (s.getPreDeliveryQueue().size() > 0) + if (da.getPreDeliveryQueue().size() > 0) { - _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size()); + _log.error("Direct delivery from PDQ with queued msgs:" + da.getPreDeliveryQueue().size()); } } else if (_messages.size() > 0) @@ -899,7 +901,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //release lock now _lock.unlock(); - synchronized (s.getSendLock()) + Object sendLock = (da == null) ? new Object() : da.getSendLock(); + synchronized (sendLock) { if (!s.isSuspended()) { @@ -909,7 +912,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager System.identityHashCode(s) + ") :" + s); } - if (entry.taken(s)) + if (entry.acquire(s)) { //Message has been delivered so don't redeliver. // This can currently occur because of the recursive call below @@ -939,7 +942,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return; } //Deliver the message - s.send(entry, _queue); + da.send(entry); } else { @@ -955,7 +958,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Why do we do this? What was the reasoning? We should have a better approach // than recursion and rejecting if someone else sends it before we do. // - if (!entry.isTaken()) + if (!entry.isAcquired()) { if (debugEnabled) { @@ -996,6 +999,23 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager final Runner _asyncDelivery = new Runner(); + public void debug() + { + System.err.println(_extraMessages); + System.err.println(_messages); + + } + + public void start(final Subscription subscription) + { + DeliveryAgent da = _subscriptions.getDeliveryAgent(subscription); + if(da != null) + { + da.start(); + } + } + + private class Runner implements Runnable { public void run() @@ -1047,6 +1067,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + public void closeSubscription(final Subscription subscription) + { + _subscriptions.getDeliveryAgent(subscription).close(); + subscription.close(); + } + + + public void resend(final QueueEntry entry, final Subscription subscription) + { + final DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(subscription); + deliveryAgent.addToResendQueue(entry); + + } + + private String currentStatus() { return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + @@ -1058,4 +1093,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false"); } + + } |