summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-04-30 14:40:18 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-04-30 14:40:18 +0000
commit502a31a9149d6920010e0b316c87fe53dbd51a8c (patch)
treefb77af4882b73ace8f28103dd00a984f3f4f8cfe
parentcf5c341c92b554b2b8d4491dc846618f038e0da8 (diff)
downloadqpid-python-502a31a9149d6920010e0b316c87fe53dbd51a8c.tar.gz
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@652388 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java31
1 files changed, 22 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 7dfcae95c3..a729ebcb37 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
-
- /** Used by any reaping thread to purge messages */
- private StoreContext _reapingStoreContext = new StoreContext();
-
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -220,14 +216,22 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
- for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ // New Context to for dealing with the MessageStore.
+ StoreContext context = new StoreContext();
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
{
QueueEntry entry = iter.next();
if(entry.expired())
{
// fixme: Currently we have to update the total byte size here for the data in the queue
_totalMessageSize.addAndGet(-entry.getSize());
- _queue.dequeue(_reapingStoreContext,entry);
+
+ // Remove the message from the queue in the MessageStore
+ _queue.dequeue(context,entry);
+
+ // This queue nolonger needs a reference to this message
+ entry.getMessage().decrementReference(context);
iter.remove();
}
}
@@ -469,14 +473,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
synchronized (_queueHeadLock)
{
QueueEntry entry = getNextMessage();
+
+ // todo: note: why do we need this? Why not reuse the passed 'storeContext'
+ //Create a new StoreContext for decrementing the References
+ StoreContext context = new StoreContext();
+
while (entry != null)
{
//and remove it
_messages.poll();
+ // todo: NOTE: Why is this a different context to the new local 'context'?
_queue.dequeue(storeContext, entry);
- entry.getMessage().decrementReference(_reapingStoreContext);
+ entry.getMessage().decrementReference(context);
entry = getNextMessage();
count++;
@@ -518,10 +528,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_totalMessageSize.addAndGet(-entry.getSize());
+ // New Store Context for removing expired messages
+ StoreContext storeContext = new StoreContext();
+
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, entry);
+ _queue.dequeue(storeContext, entry);
- message.decrementReference(_reapingStoreContext);
+ message.decrementReference(storeContext);
if (_log.isInfoEnabled())
{