diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-09-18 21:17:32 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-09-18 21:17:32 +0000 |
| commit | b2ddfb386b3cd8b4f595a0fe3f77da94807282b1 (patch) | |
| tree | 7598dfa7c2eb0af3f3e95ce802f5b967a63f4845 /java/broker/src | |
| parent | f532031f02b5f9b03ec40cd81a13e2df091a79f0 (diff) | |
| download | qpid-python-b2ddfb386b3cd8b4f595a0fe3f77da94807282b1.tar.gz | |
QPID-2704: simplify the implementation of SQEL scavenge() ability and add test.
Incorporates changes for QPID-2597 from 0.5.x-dev branch revisions 943240, 943534, 943576, and 943845.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@998543 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java | 65 | ||||
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java | 144 |
2 files changed, 158 insertions, 51 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 334b7f4ea9..b97c2c55c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -45,7 +45,9 @@ public class SimpleQueueEntryList implements QueueEntryList _nextUpdater = AtomicReferenceFieldUpdater.newUpdater (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - private AtomicLong _deletes = new AtomicLong(0L); + + private AtomicLong _scavenges = new AtomicLong(0L); + private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); public SimpleQueueEntryList(AMQQueue queue) @@ -55,71 +57,32 @@ public class SimpleQueueEntryList implements QueueEntryList _tail = _head; } - - void advanceHead() { - _deletes.incrementAndGet(); - QueueEntryImpl head = _head.nextNode(); - while(head._next != null && head.isDeleted()) - { + QueueEntryImpl next = _head.nextNode(); + QueueEntryImpl newNext = _head.getNext(); - final QueueEntryImpl newhead = head.nextNode(); - if(newhead != null) + if (next == newNext) + { + if (_scavenges.incrementAndGet() > _scavengeCount) { - if(_nextUpdater.compareAndSet(_head,head, newhead)) - { - _deletes.decrementAndGet(); - } + _scavenges.set(0L); + scavenge(); } - head = _head.nextNode(); - } - - if(_deletes.get() > 1000L) - { - _deletes.set(0L); - scavenge(); } } void scavenge() { - QueueEntryImpl root = _head; - QueueEntryImpl next = root.nextNode(); + QueueEntryImpl next = _head.getNext(); - do + while (next != null) { - - - while(next._next != null && next.isDeleted()) - { - - final QueueEntryImpl newhead = next.nextNode(); - if(newhead != null) - { - _nextUpdater.compareAndSet(root,next, newhead); - } - next = root.nextNode(); - } - if(next._next != null) - { - if(!next.isDeleted()) - { - root = next; - next = root.nextNode(); - } - } - else - { - break; - } - - } while (next != null && next._next != null); - + next = next.getNext(); + } } - public AMQQueue getQueue() { return _queue; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java new file mode 100644 index 0000000000..2fbf5bb2cf --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -0,0 +1,144 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.server.message.AMQMessage; + +import junit.framework.TestCase; + +public class SimpleQueueEntryListTest extends TestCase +{ + private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; + String oldScavengeValue = null; + + @Override + protected void setUp() + { + oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); + } + + @Override + protected void tearDown() + { + if(oldScavengeValue != null) + { + System.setProperty(SCAVENGE_PROP, oldScavengeValue); + } + else + { + System.clearProperty(SCAVENGE_PROP); + } + } + + public void testScavenge() throws Exception + { + SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); + ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>(); + + + //Add messages to generate QueueEntry's + for(int i = 1; i <= 100 ; i++) + { + AMQMessage msg = new MockAMQMessage(i); + QueueEntry bleh = sqel.add(msg); + assertNotNull("QE should not have been null", bleh); + entriesMap.put(i,bleh); + } + + QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead()); + + //We shall now delete some specific messages mid-queue that will lead to + //requiring a scavenge once the requested threshold of 9 deletes is passed + + //Delete the 2nd message only + assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete()); + verifyDeletedButPresentBeforeScavenge(head, 2); + + //Delete messages 12 to 14 + assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete()); + verifyDeletedButPresentBeforeScavenge(head, 12); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete()); + verifyDeletedButPresentBeforeScavenge(head, 13); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete()); + verifyDeletedButPresentBeforeScavenge(head, 14); + + + //Delete message 20 only + assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete()); + verifyDeletedButPresentBeforeScavenge(head, 20); + + //Delete messages 81 to 84 + assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete()); + verifyDeletedButPresentBeforeScavenge(head, 81); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete()); + verifyDeletedButPresentBeforeScavenge(head, 82); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete()); + verifyDeletedButPresentBeforeScavenge(head, 83); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete()); + verifyDeletedButPresentBeforeScavenge(head, 84); + + //Delete message 99 - this is the 10th message deleted that is after the queue head + //and so will invoke the scavenge() which is set to go after 9 previous deletions + assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete()); + + verifyAllDeletedMessagedNotPresent(head, entriesMap); + } + + private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId) + { + //Use the head to get the initial entry in the queue + QueueEntryImpl entry = head._next; + + for(long i = 1; i < messageId ; i++) + { + assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber()); + entry = entry._next; + } + + assertTrue("Entry should have been deleted", entry.isDeleted()); + } + + private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) + { + //Use the head to get the initial entry in the queue + QueueEntryImpl entry = head._next; + + assertNotNull("Initial entry should not have been null", entry); + + int count = 0; + + while (entry != null) + { + assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); + assertNotNull("QueueEntry was not found in the list of remaining entries", + remainingMessages.get(entry.getMessage().getMessageNumber().intValue())); + + count++; + entry = entry._next; + } + + assertEquals("Count should have been equal",count,remainingMessages.size()); + } + +} |
