diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-04-30 14:40:18 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-04-30 14:40:18 +0000 |
commit | 502a31a9149d6920010e0b316c87fe53dbd51a8c (patch) | |
tree | fb77af4882b73ace8f28103dd00a984f3f4f8cfe | |
parent | cf5c341c92b554b2b8d4491dc846618f038e0da8 (diff) | |
download | qpid-python-502a31a9149d6920010e0b316c87fe53dbd51a8c.tar.gz |
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@652388 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 31 |
1 files changed, 22 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 7dfcae95c3..a729ebcb37 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private final Object _queueHeadLock = new Object(); private String _processingThreadName = ""; - - /** Used by any reaping thread to purge messages */ - private StoreContext _reapingStoreContext = new StoreContext(); - ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -220,14 +216,22 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); - for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + // New Context to for dealing with the MessageStore. + StoreContext context = new StoreContext(); + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) { QueueEntry entry = iter.next(); if(entry.expired()) { // fixme: Currently we have to update the total byte size here for the data in the queue _totalMessageSize.addAndGet(-entry.getSize()); - _queue.dequeue(_reapingStoreContext,entry); + + // Remove the message from the queue in the MessageStore + _queue.dequeue(context,entry); + + // This queue nolonger needs a reference to this message + entry.getMessage().decrementReference(context); iter.remove(); } } @@ -469,14 +473,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager synchronized (_queueHeadLock) { QueueEntry entry = getNextMessage(); + + // todo: note: why do we need this? Why not reuse the passed 'storeContext' + //Create a new StoreContext for decrementing the References + StoreContext context = new StoreContext(); + while (entry != null) { //and remove it _messages.poll(); + // todo: NOTE: Why is this a different context to the new local 'context'? _queue.dequeue(storeContext, entry); - entry.getMessage().decrementReference(_reapingStoreContext); + entry.getMessage().decrementReference(context); entry = getNextMessage(); count++; @@ -518,10 +528,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _totalMessageSize.addAndGet(-entry.getSize()); + // New Store Context for removing expired messages + StoreContext storeContext = new StoreContext(); + // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - _queue.dequeue(_reapingStoreContext, entry); + _queue.dequeue(storeContext, entry); - message.decrementReference(_reapingStoreContext); + message.decrementReference(storeContext); if (_log.isInfoEnabled()) { |