From d1dc8d7dc0136971a11c6809e8a5b530b35e4062 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 30 Apr 2008 15:32:42 +0000 Subject: QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@652399 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 177 ++++++++++++--------- 1 file changed, 103 insertions(+), 74 deletions(-) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index a729ebcb37..cf607548f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -214,30 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeExpired() throws AMQException { _lock.lock(); - - - // New Context to for dealing with the MessageStore. - StoreContext context = new StoreContext(); - - for(Iterator iter = _messages.iterator(); iter.hasNext();) + try { - QueueEntry entry = iter.next(); - if(entry.expired()) + // New Context to for dealing with the MessageStore. + StoreContext context = new StoreContext(); + + for(Iterator iter = _messages.iterator(); iter.hasNext();) { - // fixme: Currently we have to update the total byte size here for the data in the queue - _totalMessageSize.addAndGet(-entry.getSize()); + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); - // Remove the message from the queue in the MessageStore - _queue.dequeue(context,entry); + // Remove the message from the queue in the MessageStore + _queue.dequeue(context,entry); - // This queue nolonger needs a reference to this message - entry.getMessage().decrementReference(context); - iter.remove(); + // This queue nolonger needs a reference to this message + entry.getMessage().decrementReference(context); + iter.remove(); + } } - } - - - _lock.unlock(); + } + finally + { + _lock.unlock(); + } } /** @return the state of the async processor. */ @@ -253,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public List getMessages() { - _lock.lock(); - List list = new ArrayList(); + List list = new ArrayList(); - for (QueueEntry entry : _messages) + _lock.lock(); + try { - list.add(entry); + for (QueueEntry entry : _messages) + { + list.add(entry); + } + } + finally + { + _lock.unlock(); } - _lock.unlock(); return list; } @@ -282,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; - _lock.lock(); - List foundMessagesList = new ArrayList(); - - for (QueueEntry entry : _messages) + _lock.lock(); + try { - long msgId = entry.getMessage().getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) + for (QueueEntry entry : _messages) { - foundMessagesList.add(entry); - } - // break if the no of messages are found - if (foundMessagesList.size() == maxMessageCount) - { - break; + long msgId = entry.getMessage().getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(entry); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } return foundMessagesList; } @@ -449,51 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _lock.lock(); - QueueEntry entry = _messages.poll(); - - if (entry != null) + try { - queue.dequeue(storeContext, entry); + QueueEntry entry = _messages.poll(); + + if (entry != null) + { + queue.dequeue(storeContext, entry); - _totalMessageSize.addAndGet(-entry.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); - //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - entry.getMessage().decrementReference(storeContext); + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + entry.getMessage().decrementReference(storeContext); + } + } + finally + { + _lock.unlock(); } - - _lock.unlock(); } public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - _lock.lock(); - synchronized (_queueHeadLock) + _lock.lock(); + try { - QueueEntry entry = getNextMessage(); + synchronized (_queueHeadLock) + { + QueueEntry entry = getNextMessage(); - // todo: note: why do we need this? Why not reuse the passed 'storeContext' - //Create a new StoreContext for decrementing the References - StoreContext context = new StoreContext(); + // todo: note: why do we need this? Why not reuse the passed 'storeContext' + //Create a new StoreContext for decrementing the References + StoreContext context = new StoreContext(); - while (entry != null) - { - //and remove it - _messages.poll(); + while (entry != null) + { + //and remove it + _messages.poll(); - // todo: NOTE: Why is this a different context to the new local 'context'? - _queue.dequeue(storeContext, entry); + // todo: NOTE: Why is this a different context to the new local 'context'? + _queue.dequeue(storeContext, entry); - entry.getMessage().decrementReference(context); + entry.getMessage().decrementReference(context); - entry = getNextMessage(); - count++; + entry = getNextMessage(); + count++; + } + _totalMessageSize.set(0L); } - _totalMessageSize.set(0L); } - _lock.unlock(); + finally + { + _lock.unlock(); + } return count; } @@ -773,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void enqueueMovedMessages(StoreContext storeContext, List movedMessageList) { _lock.lock(); - for (QueueEntry entry : movedMessageList) - { - addMessageToQueue(entry, false); - } - - // enqueue on the pre delivery queues - for (Subscription sub : _subscriptions.getSubscriptions()) + try { for (QueueEntry entry : movedMessageList) { - // Only give the message to those that want them. - if (sub.hasInterest(entry)) + addMessageToQueue(entry, false); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (QueueEntry entry : movedMessageList) { - sub.enqueueForPreDelivery(entry, true); + // Only give the message to those that want them. + if (sub.hasInterest(entry)) + { + sub.enqueueForPreDelivery(entry, true); + } } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } } /** -- cgit v1.2.1