summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-09-18 21:17:32 +0000
committerRobert Gemmell <robbie@apache.org>2010-09-18 21:17:32 +0000
commitb2ddfb386b3cd8b4f595a0fe3f77da94807282b1 (patch)
tree7598dfa7c2eb0af3f3e95ce802f5b967a63f4845 /java/broker/src
parentf532031f02b5f9b03ec40cd81a13e2df091a79f0 (diff)
downloadqpid-python-b2ddfb386b3cd8b4f595a0fe3f77da94807282b1.tar.gz
QPID-2704: simplify the implementation of SQEL scavenge() ability and add test.
Incorporates changes for QPID-2597 from 0.5.x-dev branch revisions 943240, 943534, 943576, and 943845. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@998543 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java65
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java144
2 files changed, 158 insertions, 51 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 334b7f4ea9..b97c2c55c5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -45,7 +45,9 @@ public class SimpleQueueEntryList implements QueueEntryList
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
- private AtomicLong _deletes = new AtomicLong(0L);
+
+ private AtomicLong _scavenges = new AtomicLong(0L);
+ private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
public SimpleQueueEntryList(AMQQueue queue)
@@ -55,71 +57,32 @@ public class SimpleQueueEntryList implements QueueEntryList
_tail = _head;
}
-
-
void advanceHead()
{
- _deletes.incrementAndGet();
- QueueEntryImpl head = _head.nextNode();
- while(head._next != null && head.isDeleted())
- {
+ QueueEntryImpl next = _head.nextNode();
+ QueueEntryImpl newNext = _head.getNext();
- final QueueEntryImpl newhead = head.nextNode();
- if(newhead != null)
+ if (next == newNext)
+ {
+ if (_scavenges.incrementAndGet() > _scavengeCount)
{
- if(_nextUpdater.compareAndSet(_head,head, newhead))
- {
- _deletes.decrementAndGet();
- }
+ _scavenges.set(0L);
+ scavenge();
}
- head = _head.nextNode();
- }
-
- if(_deletes.get() > 1000L)
- {
- _deletes.set(0L);
- scavenge();
}
}
void scavenge()
{
- QueueEntryImpl root = _head;
- QueueEntryImpl next = root.nextNode();
+ QueueEntryImpl next = _head.getNext();
- do
+ while (next != null)
{
-
-
- while(next._next != null && next.isDeleted())
- {
-
- final QueueEntryImpl newhead = next.nextNode();
- if(newhead != null)
- {
- _nextUpdater.compareAndSet(root,next, newhead);
- }
- next = root.nextNode();
- }
- if(next._next != null)
- {
- if(!next.isDeleted())
- {
- root = next;
- next = root.nextNode();
- }
- }
- else
- {
- break;
- }
-
- } while (next != null && next._next != null);
-
+ next = next.getNext();
+ }
}
-
public AMQQueue getQueue()
{
return _queue;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
new file mode 100644
index 0000000000..2fbf5bb2cf
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -0,0 +1,144 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.message.AMQMessage;
+
+import junit.framework.TestCase;
+
+public class SimpleQueueEntryListTest extends TestCase
+{
+ private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
+ String oldScavengeValue = null;
+
+ @Override
+ protected void setUp()
+ {
+ oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
+ }
+
+ @Override
+ protected void tearDown()
+ {
+ if(oldScavengeValue != null)
+ {
+ System.setProperty(SCAVENGE_PROP, oldScavengeValue);
+ }
+ else
+ {
+ System.clearProperty(SCAVENGE_PROP);
+ }
+ }
+
+ public void testScavenge() throws Exception
+ {
+ SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
+ ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
+
+
+ //Add messages to generate QueueEntry's
+ for(int i = 1; i <= 100 ; i++)
+ {
+ AMQMessage msg = new MockAMQMessage(i);
+ QueueEntry bleh = sqel.add(msg);
+ assertNotNull("QE should not have been null", bleh);
+ entriesMap.put(i,bleh);
+ }
+
+ QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead());
+
+ //We shall now delete some specific messages mid-queue that will lead to
+ //requiring a scavenge once the requested threshold of 9 deletes is passed
+
+ //Delete the 2nd message only
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 2);
+
+ //Delete messages 12 to 14
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 12);
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 13);
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 14);
+
+
+ //Delete message 20 only
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 20);
+
+ //Delete messages 81 to 84
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 81);
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 82);
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 83);
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete());
+ verifyDeletedButPresentBeforeScavenge(head, 84);
+
+ //Delete message 99 - this is the 10th message deleted that is after the queue head
+ //and so will invoke the scavenge() which is set to go after 9 previous deletions
+ assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete());
+
+ verifyAllDeletedMessagedNotPresent(head, entriesMap);
+ }
+
+ private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId)
+ {
+ //Use the head to get the initial entry in the queue
+ QueueEntryImpl entry = head._next;
+
+ for(long i = 1; i < messageId ; i++)
+ {
+ assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber());
+ entry = entry._next;
+ }
+
+ assertTrue("Entry should have been deleted", entry.isDeleted());
+ }
+
+ private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
+ {
+ //Use the head to get the initial entry in the queue
+ QueueEntryImpl entry = head._next;
+
+ assertNotNull("Initial entry should not have been null", entry);
+
+ int count = 0;
+
+ while (entry != null)
+ {
+ assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
+ assertNotNull("QueueEntry was not found in the list of remaining entries",
+ remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
+
+ count++;
+ entry = entry._next;
+ }
+
+ assertEquals("Count should have been equal",count,remainingMessages.size());
+ }
+
+}