diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 178 |
1 files changed, 110 insertions, 68 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 7dfcae95c3..cf607548f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private final Object _queueHeadLock = new Object(); private String _processingThreadName = ""; - - /** Used by any reaping thread to purge messages */ - private StoreContext _reapingStoreContext = new StoreContext(); - ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -218,22 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeExpired() throws AMQException { _lock.lock(); - - - for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + try { - QueueEntry entry = iter.next(); - if(entry.expired()) + // New Context to for dealing with the MessageStore. + StoreContext context = new StoreContext(); + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) { - // fixme: Currently we have to update the total byte size here for the data in the queue - _totalMessageSize.addAndGet(-entry.getSize()); - _queue.dequeue(_reapingStoreContext,entry); - iter.remove(); - } - } + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); + // Remove the message from the queue in the MessageStore + _queue.dequeue(context,entry); - _lock.unlock(); + // This queue nolonger needs a reference to this message + entry.getMessage().decrementReference(context); + iter.remove(); + } + } + } + finally + { + _lock.unlock(); + } } /** @return the state of the async processor. */ @@ -249,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public List<QueueEntry> getMessages() { - _lock.lock(); - List<QueueEntry> list = new ArrayList<QueueEntry>(); + List<QueueEntry> list = new ArrayList<QueueEntry>(); - for (QueueEntry entry : _messages) + _lock.lock(); + try { - list.add(entry); + for (QueueEntry entry : _messages) + { + list.add(entry); + } + } + finally + { + _lock.unlock(); } - _lock.unlock(); return list; } @@ -278,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; - _lock.lock(); - List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>(); - - for (QueueEntry entry : _messages) + _lock.lock(); + try { - long msgId = entry.getMessage().getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) + for (QueueEntry entry : _messages) { - foundMessagesList.add(entry); - } - // break if the no of messages are found - if (foundMessagesList.size() == maxMessageCount) - { - break; + long msgId = entry.getMessage().getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(entry); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } return foundMessagesList; } @@ -445,45 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _lock.lock(); - QueueEntry entry = _messages.poll(); - - if (entry != null) + try { - queue.dequeue(storeContext, entry); + QueueEntry entry = _messages.poll(); - _totalMessageSize.addAndGet(-entry.getSize()); + if (entry != null) + { + queue.dequeue(storeContext, entry); - //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - entry.getMessage().decrementReference(storeContext); + _totalMessageSize.addAndGet(-entry.getSize()); - } + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + entry.getMessage().decrementReference(storeContext); - _lock.unlock(); + } + } + finally + { + _lock.unlock(); + } } public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - _lock.lock(); - synchronized (_queueHeadLock) + _lock.lock(); + try { - QueueEntry entry = getNextMessage(); - while (entry != null) + synchronized (_queueHeadLock) { - //and remove it - _messages.poll(); + QueueEntry entry = getNextMessage(); - _queue.dequeue(storeContext, entry); + // todo: note: why do we need this? Why not reuse the passed 'storeContext' + //Create a new StoreContext for decrementing the References + StoreContext context = new StoreContext(); + + while (entry != null) + { + //and remove it + _messages.poll(); - entry.getMessage().decrementReference(_reapingStoreContext); + // todo: NOTE: Why is this a different context to the new local 'context'? + _queue.dequeue(storeContext, entry); - entry = getNextMessage(); - count++; + entry.getMessage().decrementReference(context); + + entry = getNextMessage(); + count++; + } + _totalMessageSize.set(0L); } - _totalMessageSize.set(0L); } - _lock.unlock(); + finally + { + _lock.unlock(); + } return count; } @@ -518,10 +551,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _totalMessageSize.addAndGet(-entry.getSize()); + // New Store Context for removing expired messages + StoreContext storeContext = new StoreContext(); + // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - _queue.dequeue(_reapingStoreContext, entry); + _queue.dequeue(storeContext, entry); - message.decrementReference(_reapingStoreContext); + message.decrementReference(storeContext); if (_log.isInfoEnabled()) { @@ -760,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList) { _lock.lock(); - for (QueueEntry entry : movedMessageList) - { - addMessageToQueue(entry, false); - } - - // enqueue on the pre delivery queues - for (Subscription sub : _subscriptions.getSubscriptions()) + try { for (QueueEntry entry : movedMessageList) { - // Only give the message to those that want them. - if (sub.hasInterest(entry)) + addMessageToQueue(entry, false); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (QueueEntry entry : movedMessageList) { - sub.enqueueForPreDelivery(entry, true); + // Only give the message to those that want them. + if (sub.hasInterest(entry)) + { + sub.enqueueForPreDelivery(entry, true); + } } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } } /** |