diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-04-30 15:32:42 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-04-30 15:32:42 +0000 |
commit | d1dc8d7dc0136971a11c6809e8a5b530b35e4062 (patch) | |
tree | 6a6ac7e895a90e742e992b95b274216c541e9d52 | |
parent | e870da9f7909033225d1ff9d05d6306095189b25 (diff) | |
download | qpid-python-d1dc8d7dc0136971a11c6809e8a5b530b35e4062.tar.gz |
QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@652399 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 177 |
1 files changed, 103 insertions, 74 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index a729ebcb37..cf607548f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -214,30 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeExpired() throws AMQException { _lock.lock(); - - - // New Context to for dealing with the MessageStore. - StoreContext context = new StoreContext(); - - for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + try { - QueueEntry entry = iter.next(); - if(entry.expired()) + // New Context to for dealing with the MessageStore. + StoreContext context = new StoreContext(); + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) { - // fixme: Currently we have to update the total byte size here for the data in the queue - _totalMessageSize.addAndGet(-entry.getSize()); + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); - // Remove the message from the queue in the MessageStore - _queue.dequeue(context,entry); + // Remove the message from the queue in the MessageStore + _queue.dequeue(context,entry); - // This queue nolonger needs a reference to this message - entry.getMessage().decrementReference(context); - iter.remove(); + // This queue nolonger needs a reference to this message + entry.getMessage().decrementReference(context); + iter.remove(); + } } - } - - - _lock.unlock(); + } + finally + { + _lock.unlock(); + } } /** @return the state of the async processor. */ @@ -253,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public List<QueueEntry> getMessages() { - _lock.lock(); - List<QueueEntry> list = new ArrayList<QueueEntry>(); + List<QueueEntry> list = new ArrayList<QueueEntry>(); - for (QueueEntry entry : _messages) + _lock.lock(); + try { - list.add(entry); + for (QueueEntry entry : _messages) + { + list.add(entry); + } + } + finally + { + _lock.unlock(); } - _lock.unlock(); return list; } @@ -282,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; - _lock.lock(); - List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>(); - - for (QueueEntry entry : _messages) + _lock.lock(); + try { - long msgId = entry.getMessage().getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) + for (QueueEntry entry : _messages) { - foundMessagesList.add(entry); - } - // break if the no of messages are found - if (foundMessagesList.size() == maxMessageCount) - { - break; + long msgId = entry.getMessage().getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(entry); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } return foundMessagesList; } @@ -449,51 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _lock.lock(); - QueueEntry entry = _messages.poll(); - - if (entry != null) + try { - queue.dequeue(storeContext, entry); + QueueEntry entry = _messages.poll(); + + if (entry != null) + { + queue.dequeue(storeContext, entry); - _totalMessageSize.addAndGet(-entry.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); - //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - entry.getMessage().decrementReference(storeContext); + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + entry.getMessage().decrementReference(storeContext); + } + } + finally + { + _lock.unlock(); } - - _lock.unlock(); } public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - _lock.lock(); - synchronized (_queueHeadLock) + _lock.lock(); + try { - QueueEntry entry = getNextMessage(); + synchronized (_queueHeadLock) + { + QueueEntry entry = getNextMessage(); - // todo: note: why do we need this? Why not reuse the passed 'storeContext' - //Create a new StoreContext for decrementing the References - StoreContext context = new StoreContext(); + // todo: note: why do we need this? Why not reuse the passed 'storeContext' + //Create a new StoreContext for decrementing the References + StoreContext context = new StoreContext(); - while (entry != null) - { - //and remove it - _messages.poll(); + while (entry != null) + { + //and remove it + _messages.poll(); - // todo: NOTE: Why is this a different context to the new local 'context'? - _queue.dequeue(storeContext, entry); + // todo: NOTE: Why is this a different context to the new local 'context'? + _queue.dequeue(storeContext, entry); - entry.getMessage().decrementReference(context); + entry.getMessage().decrementReference(context); - entry = getNextMessage(); - count++; + entry = getNextMessage(); + count++; + } + _totalMessageSize.set(0L); } - _totalMessageSize.set(0L); } - _lock.unlock(); + finally + { + _lock.unlock(); + } return count; } @@ -773,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList) { _lock.lock(); - for (QueueEntry entry : movedMessageList) - { - addMessageToQueue(entry, false); - } - - // enqueue on the pre delivery queues - for (Subscription sub : _subscriptions.getSubscriptions()) + try { for (QueueEntry entry : movedMessageList) { - // Only give the message to those that want them. - if (sub.hasInterest(entry)) + addMessageToQueue(entry, false); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (QueueEntry entry : movedMessageList) { - sub.enqueueForPreDelivery(entry, true); + // Only give the message to those that want them. + if (sub.hasInterest(entry)) + { + sub.enqueueForPreDelivery(entry, true); + } } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } } /** |