summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java1098
1 files changed, 0 insertions, 1098 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
deleted file mode 100644
index becd57752e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ /dev/null
@@ -1,1098 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
-import org.apache.qpid.util.MessageQueue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/** Manages delivery of messages on behalf of a queue */
-public class ConcurrentSelectorDeliveryManager
-{
- private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
-
- @Configured(path = "advanced.compressBufferOnQueue",
- defaultValue = "false")
- public boolean compressBufferOnQueue;
- /** Holds any queued messages */
- private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
-
- /** Ensures that only one asynchronous task is running for this manager at any time. */
- private final AtomicBoolean _processing = new AtomicBoolean();
- /** The subscriptions on the queue to whom messages are delivered */
- private final SubscriptionSet _subscriptions = new SubscriptionSet();
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
- * acknowledgements a handle on the queue.
- */
- private final AMQQueueImpl _queue;
-
- /**
- * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
- * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
- */
- private AtomicBoolean _movingMessages = new AtomicBoolean();
-
- /**
- * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
- * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
- * of messages to the queue.
- */
- private ReentrantLock _lock = new ReentrantLock();
- private AtomicLong _totalMessageSize = new AtomicLong();
- private AtomicInteger _extraMessages = new AtomicInteger();
- private Set<DeliveryAgent> _hasContent = Collections.synchronizedSet(new HashSet<DeliveryAgent>());
- private final Object _queueHeadLock = new Object();
- private String _processingThreadName = "";
-
-
- /** Used by any reaping thread to purge messages */
- private StoreContext _reapingStoreContext = new StoreContext();
-
- ConcurrentSelectorDeliveryManager(AMQQueueImpl queue)
- {
-
- //Set values from configuration
- Configurator.configure(this);
-
- if (compressBufferOnQueue)
- {
- _log.warn("Compressing Buffers on queue.");
- }
-
- _queue = queue;
- }
-
-
- public SubscriptionSet getSubscribers()
- {
- return _subscriptions;
- }
-
-
- private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
- {
- AMQMessage msg = entry.getMessage();
- // Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
- {
- Iterator<ContentChunk> it = msg.getContentBodyIterator();
- while (it.hasNext())
- {
- ContentChunk cb = it.next();
- cb.reduceToFit();
- }
- }
-
- if (deliverFirst)
- {
- synchronized (_queueHeadLock)
- {
- _messages.pushHead(entry);
- }
- }
- else
- {
- _messages.offer(entry);
- }
-
- _totalMessageSize.addAndGet(msg.getSize());
-
- return true;
- }
-
-
- public boolean hasQueuedMessages()
- {
- _lock.lock();
- try
- {
- return !(_messages.isEmpty() && _hasContent.isEmpty());
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public int getQueueMessageCount()
- {
- return getMessageCount();
- }
-
- /**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
- * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
- *
- * @return int the number of messages in the delivery queue.
- */
- private int getMessageCount()
- {
- return _messages.size() + _extraMessages.get();
- }
-
-
- public long getTotalMessageSize()
- {
- return _totalMessageSize.get();
- }
-
- public long getOldestMessageArrival()
- {
- QueueEntry entry = _messages.peek();
- return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
- }
-
- public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
- {
- subscriberHasPendingResend(hasContent,_subscriptions.getDeliveryAgent(subscription),entry);
- }
- private void subscriberHasPendingResend(boolean hasContent, DeliveryAgent deliveryAgent, QueueEntry entry)
- {
- _lock.lock();
- try
- {
- if (hasContent)
- {
- _log.debug("Queue has adding subscriber content");
- _hasContent.add(deliveryAgent);
- _totalMessageSize.addAndGet(entry.getSize());
- _extraMessages.addAndGet(1);
- }
- else
- {
- _log.debug("Queue has removing subscriber content");
- if (entry == null)
- {
- _hasContent.remove(deliveryAgent);
- }
- else
- {
- _totalMessageSize.addAndGet(-entry.getSize());
- _extraMessages.addAndGet(-1);
- }
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * NOTE : This method should only be called when there are no active subscribers
- */
- public void removeExpired() throws AMQException
- {
- _lock.lock();
-
-
- for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
- {
- QueueEntry entry = iter.next();
- if(entry.expired())
- {
- // fixme: Currently we have to update the total byte size here for the data in the queue
- _totalMessageSize.addAndGet(-entry.getSize());
- _queue.dequeue(_reapingStoreContext,entry);
- iter.remove();
- }
- }
-
-
- _lock.unlock();
- }
-
- /** @return the state of the async processor. */
- public boolean isProcessingAsync()
- {
- return _processing.get();
- }
-
- /**
- * Returns all the messages in the Queue
- *
- * @return List of messages
- */
- public List<QueueEntry> getMessages()
- {
- _lock.lock();
- List<QueueEntry> list = new ArrayList<QueueEntry>();
-
- for (QueueEntry entry : _messages)
- {
- list.add(entry);
- }
- _lock.unlock();
-
- return list;
- }
-
- /**
- * Returns messages within the range of given messageIds
- *
- * @param fromMessageId
- * @param toMessageId
- *
- * @return
- */
- public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
- {
- if (fromMessageId <= 0 || toMessageId <= 0)
- {
- return null;
- }
-
- long maxMessageCount = toMessageId - fromMessageId + 1;
-
- _lock.lock();
-
- List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
- for (QueueEntry entry : _messages)
- {
- long msgId = entry.getMessage().getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
- {
- foundMessagesList.add(entry);
- }
- // break if the no of messages are found
- if (foundMessagesList.size() == maxMessageCount)
- {
- break;
- }
- }
- _lock.unlock();
-
- return foundMessagesList;
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- populatePreDeliveryQueue(_subscriptions.getDeliveryAgent(subscription));
- }
-
- private void populatePreDeliveryQueue(DeliveryAgent deliveryAgent)
- {
-
- Iterator<QueueEntry> currentQueue = _messages.iterator();
-
- while (currentQueue.hasNext())
- {
- QueueEntry entry = currentQueue.next();
-
- if (deliveryAgent != null && deliveryAgent.hasInterest(entry))
- {
- deliveryAgent.enqueueForPreDelivery(entry, false);
- }
-
- }
- }
-
- public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
- {
- QueueEntry entry = getNextMessage();
- if (entry == null)
- {
- return false;
- }
- else
- {
-
- try
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!acks)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
- }
- entry.dequeue(channel.getStoreContext());
- }
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
-
- if (acks)
- {
- channel.addUnacknowledgedMessage(entry, deliveryTag, null);
- }
-
- protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
- deliveryTag, _queue.getMessageCount());
-
- }
- _totalMessageSize.addAndGet(-entry.getSize());
-
- if (!acks)
- {
- entry.dispose(channel.getStoreContext());
- }
- }
- finally
- {
- entry.setDeliveredToSubscription();
- }
- return true;
-
- }
- }
-
- /**
- * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
- * the asyn delivery is also stopped.
- */
- public void startMovingMessages()
- {
- _movingMessages.set(true);
- }
-
- /**
- * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
- * the async delivery can start again.
- */
- public void stopMovingMessages()
- {
- _movingMessages.set(false);
- if (_lock.isHeldByCurrentThread())
- {
- _lock.unlock();
- }
- }
-
- /**
- * Messages will be removed from this queue and all preDeliveryQueues
- *
- * @param messageList
- */
- public void removeMovedMessages(List<QueueEntry> messageList)
- {
- // Remove from the
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- if (hasSubscribers)
- {
- for (Subscription sub : _subscriptions.getSubscriptions())
- {
- DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub);
- if (deliveryAgent != null && !sub.isSuspended() && sub.filtersMessages())
- {
- Queue<QueueEntry> preDeliveryQueue = deliveryAgent.getPreDeliveryQueue();
- for (QueueEntry entry : messageList)
- {
- preDeliveryQueue.remove(entry);
- }
- }
- }
- }
-
- for (QueueEntry entry : messageList)
- {
- if (_messages.remove(entry))
- {
- _totalMessageSize.getAndAdd(-entry.getSize());
- }
- }
- }
-
- /**
- * Now with implementation of predelivery queues, this method will mark the message on the top as taken.
- *
- * @param storeContext
- *
- * @throws AMQException
- */
- public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException
- {
- _lock.lock();
-
- QueueEntry entry = _messages.poll();
-
- if (entry != null)
- {
- entry.dequeue(storeContext);
-
- _totalMessageSize.addAndGet(-entry.getSize());
-
- //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- entry.dispose(storeContext);
-
- }
-
- _lock.unlock();
- }
-
- public long clearAllMessages(StoreContext storeContext) throws AMQException
- {
- long count = 0;
- _lock.lock();
-
- synchronized (_queueHeadLock)
- {
- QueueEntry entry = getNextMessage();
- while (entry != null)
- {
- //and remove it
- _messages.poll();
-
- entry.dequeue(storeContext);
-
- entry.dispose(storeContext);
-
- entry = getNextMessage();
- count++;
- }
- _totalMessageSize.set(0L);
- }
- _lock.unlock();
- return count;
- }
-
- /**
- * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
- *
- * @return the next message or null
- *
- * @throws org.apache.qpid.AMQException
- */
- private QueueEntry getNextMessage() throws AMQException
- {
- return getNextMessage(_messages, null, false);
- }
-
- private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
- {
- QueueEntry entry = messages.peek();
-
- //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(entry, sub, purgeOnly))
- {
-
- //remove the already taken message or expired
- QueueEntry removed = messages.poll();
-
- assert removed == entry;
-
- // if the message expired then the _totalMessageSize needs adjusting
- if (entry.expired() && !entry.acquire(sub))
- {
- _totalMessageSize.addAndGet(-entry.getSize());
-
- // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- entry.dequeue(_reapingStoreContext);
- entry.dispose(_reapingStoreContext);
-
- if (_log.isInfoEnabled())
- {
- _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
- }
- }
-
- //else the clean up is not required as the message has already been taken for this queue therefore
- // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Removed taken message:" + entry.debugIdentity());
- }
-
- // try the next message
- entry = messages.peek();
- }
-
- return entry;
- }
-
- /**
- * This method will return true if the message is to be purged from the queue.
- * \
- * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false
- *
- * @param message
- * @param sub
- * @param purgeOnly When set to false the message will be taken by the given Subscription.
- *
- * @return if the msg should be purged
- *
- * @throws AMQException
- */
- private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
- {
- //Original.. complicated while loop control
-// (message != null
-// && (
-// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
-// || sub == null)
-// && message.taken(_queue, sub));
-
- boolean purge = false;
-
- // if the message is null then don't purge as we have no messagse.
- if (message != null)
- {
- // Check that the message hasn't expired.
- if (message.expired())
- {
- return true;
- }
-
- // if we have a subscriber perform message checks
- if (sub != null)
- {
- // if we have a queue browser(we don't purge) so check mark the message as taken
- purge = ((!sub.isBrowser() || message.isAcquired()));
- }
- else
- {
- // if there is no subscription we are doing
- // a get or purging so mark message as taken.
- message.isAcquired();
- // and then ensure that it gets purged
- purge = true;
- }
- }
-
- if (purgeOnly)
- {
- // If we are simply purging the queue don't take the message
- // just purge up to the next non-taken msg.
- return purge && message.isAcquired();
- }
- else
- {
- // if we are purging then ensure we mark this message taken for the current subscriber
- // the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.acquire(sub);
- }
- }
-
- public void sendNextMessage(DeliveryAgent sub)
- {
-
- Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
-
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
- ") from queue (" + System.identityHashCode(messageQueue) +
- ") AMQQueue (" + System.identityHashCode(_queue) + ")");
- }
-
- if (messageQueue == null)
- {
- // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
- if (_log.isInfoEnabled())
- {
- _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + _queue);
- }
- return;
- }
-
- QueueEntry entry = null;
- QueueEntry removed;
- try
- {
- synchronized (_queueHeadLock)
- {
- entry = getNextMessage(messageQueue, sub.getSubscription(), false);
-
- // message will be null if we have no messages in the messageQueue.
- if (entry == null)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
- }
- return;
- }
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
- ") by :" + System.identityHashCode(this) +
- ") to :" + System.identityHashCode(sub));
- }
-
-
- if (messageQueue == _messages)
- {
- _totalMessageSize.addAndGet(-entry.getSize());
- }
-
- sub.send(entry);
-
- //remove sent message from our queue.
- removed = messageQueue.poll();
- //If we don't remove the message from _messages
- // Otherwise the Async send will never end
- }
-
- if (removed != entry)
- {
- _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
- ") by :" + System.identityHashCode(this) +
- ") to :" + System.identityHashCode(sub));
- }
-
-
- if (messageQueue == sub.getResendQueue())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub);
- }
- if (messageQueue.isEmpty())
- {
- subscriberHasPendingResend(false, sub, null);
- //better to use the above method as this keeps all the tracking in one location.
- // _hasContent.remove(sub);
- }
-
- _extraMessages.decrementAndGet();
- }
- else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser())
- {
- if (_log.isInfoEnabled())
- {
- //fixme - we should do the clean up as the message remains on the _message queue
- // this is resulting in the next consumer receiving the message and then attempting to purge it
- //
- cleanMainQueue(sub);
- }
- }
-
- }
- catch (AMQException e)
- {
- if (entry != null)
- {
- entry.release();
- }
- else
- {
- _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
- }
- _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
- }
- }
-
- private void cleanMainQueue(DeliveryAgent sub)
- {
- try
- {
- getNextMessage(_messages, sub.getSubscription(), true);
- }
- catch (AMQException e)
- {
- _log.warn("Problem during main queue purge:" + e.getMessage());
- }
- }
-
- /**
- * enqueues the messages in the list on the queue and all required predelivery queues
- *
- * @param storeContext
- * @param movedMessageList
- */
- public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
- {
- _lock.lock();
- for (QueueEntry entry : movedMessageList)
- {
- addMessageToQueue(entry, false);
- }
-
- // enqueue on the pre delivery queues
- for (Subscription sub : _subscriptions.getSubscriptions())
- {
- for (QueueEntry entry : movedMessageList)
- {
- // Only give the message to those that want them.
- if (sub.hasInterest(entry))
- {
- DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub);
- if(deliveryAgent != null)
- {
- deliveryAgent.enqueueForPreDelivery(entry, true);
- }
- }
- }
- }
- _lock.unlock();
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
- * deliver().
- */
- private void processQueue()
- {
- //record thread name
- if (_log.isDebugEnabled())
- {
- _processingThreadName = Thread.currentThread().getName();
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
- }
-
- // Continue to process delivery while we haveSubscribers and messages
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
-
- while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get())
- {
- hasSubscribers = false;
-
- for (Subscription sub : _subscriptions.getSubscriptions())
- {
- DeliveryAgent da = _subscriptions.getDeliveryAgent(sub);
- if(da != null)
- {
- synchronized (da.getSendLock())
- {
- if (!sub.isSuspended())
- {
- sendNextMessage(da);
-
- hasSubscribers = true;
- }
- }
- }
- }
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
- }
-
- }
-
- public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
- {
-
- final boolean debugEnabled = _log.isDebugEnabled();
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
- }
-
- //Check if we have someone to deliver the message to.
- _lock.lock();
- try
- {
- Subscription s = _subscriptions.nextSubscriber(entry);
- DeliveryAgent da = (s==null) ? null : _subscriptions.getDeliveryAgent(s);
-
-
- if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
- }
- if (!entry.getMessage().getMessagePublishInfo().isImmediate())
- {
- addMessageToQueue(entry, deliverFirst);
-
- //release lock now message is on queue.
- _lock.unlock();
-
- //Pre Deliver to all subscriptions
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to:" + currentStatus());
- }
- for (Subscription sub : _subscriptions.getSubscriptions())
- {
- DeliveryAgent deliveryAgent = (s==null) ? null : _subscriptions.getDeliveryAgent(sub);
- // Only give the message to those that want them.
- if (deliveryAgent != null && sub.hasInterest(entry))
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
- ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
- }
- deliveryAgent.enqueueForPreDelivery(entry, deliverFirst);
- }
- }
-
- //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong!
- if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers())
- {
- _queue.deliverAsync();
- }
-
- }
- }
- else
- {
-
- if (s.filtersMessages())
- {
- if (da.getPreDeliveryQueue().size() > 0)
- {
- _log.error("Direct delivery from PDQ with queued msgs:" + da.getPreDeliveryQueue().size());
- }
- }
- else if (_messages.size() > 0)
- {
- _log.error("Direct delivery from MainQueue queued msgs:" + _messages.size());
- }
-
- //release lock now
- _lock.unlock();
- Object sendLock = (da == null) ? new Object() : da.getSendLock();
- synchronized (sendLock)
- {
- if (!s.isSuspended())
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
- System.identityHashCode(s) + ") :" + s);
- }
-
- if (entry.acquire(s))
- {
- //Message has been delivered so don't redeliver.
- // This can currently occur because of the recursive call below
- // During unit tests the send can occur
- // client then rejects
- // this reject then releases the message by the time the
- // if(!msg.isTaken()) call is made below
- // the message has been released so that thread loops to send the message again
- // of course by the time it gets back to here. the thread that released the
- // message is now ready to send it. Here is a sample trace for reference
-//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
-//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
-//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1)
-//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
-//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
-//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]}
-//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
-//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657
-//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
-//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
-//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1)
-//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]}
- // Note: In the last request to take the message from thread 4,5 the message has been
- // taken by the previous call done by thread 2,5
-
-
- return;
- }
- //Deliver the message
- da.send(entry);
- }
- else
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
- }
- }
- }
-
- //
- // Why do we do this? What was the reasoning? We should have a better approach
- // than recursion and rejecting if someone else sends it before we do.
- //
- if (!entry.isAcquired())
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
- " Subscriber:" + System.identityHashCode(s));
- }
-
- deliver(context, name, entry, deliverFirst);
- }
- else
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + " Message(" + entry.toString() +
- ") has been taken so disregarding deliver request to Subscriber:" +
- System.identityHashCode(s));
- }
- }
- }
-
- }
- finally
- {
- //ensure lock is released
- if (_lock.isHeldByCurrentThread())
- {
- _lock.unlock();
- }
- }
- }
-
- private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
-
- private String debugIdentity()
- {
- return id;
- }
-
- final Runner _asyncDelivery = new Runner();
-
- public void debug()
- {
- System.err.println(_extraMessages);
- System.err.println(_messages);
-
- }
-
- public void start(final Subscription subscription)
- {
- DeliveryAgent da = _subscriptions.getDeliveryAgent(subscription);
- if(da != null)
- {
- da.start();
- }
- }
-
-
- private class Runner implements Runnable
- {
- public void run()
- {
- String startName = Thread.currentThread().getName();
- Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
- boolean running = true;
- while (running && !_movingMessages.get())
- {
- processQueue();
-
- //Check that messages have not been added since we did our last peek();
- // Synchronize with the thread that adds to the queue.
- // If the queue is still empty then we can exit
- synchronized (_asyncDelivery)
- {
- if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
- {
- running = false;
- _processing.set(false);
- }
- }
- }
- Thread.currentThread().setName(startName);
- }
- }
-
- public void processAsync(Executor executor)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Processing Async." + currentStatus());
- }
-
- synchronized (_asyncDelivery)
- {
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- if (_log.isDebugEnabled())
- {
- _log.debug(debugIdentity() + "Executing Async process.");
- }
- executor.execute(_asyncDelivery);
- }
- }
- }
- }
-
- public void closeSubscription(final Subscription subscription)
- {
- _subscriptions.getDeliveryAgent(subscription).close();
- subscription.close();
- }
-
-
- public void resend(final QueueEntry entry, final Subscription subscription)
- {
- final DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(subscription);
- deliveryAgent.addToResendQueue(entry);
-
- }
-
-
- private String currentStatus()
- {
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
- "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
- ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
- " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
- "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false");
- }
-
-
-
-}