diff options
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java')
-rw-r--r-- | qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java new file mode 100644 index 0000000000..63b3a7d165 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -0,0 +1,247 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SimpleQueueEntryListTest extends QueueEntryListTestBase +{ + private SimpleQueueEntryList _sqel; + + private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; + private String oldScavengeValue = null; + + @Override + protected void setUp() + { + oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); + _sqel = new SimpleQueueEntryList(_testQueue); + for(int i = 1; i <= 100; i++) + { + final ServerMessage message = mock(ServerMessage.class); + when(message.getMessageNumber()).thenReturn((long) i); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + + final QueueEntry bleh = _sqel.add(message); + assertNotNull("QE should not have been null", bleh); + } + } + + @Override + protected void tearDown() + { + if(oldScavengeValue != null) + { + System.setProperty(SCAVENGE_PROP, oldScavengeValue); + } + else + { + System.clearProperty(SCAVENGE_PROP); + } + } + + @Override + public QueueEntryList getTestList() + { + return getTestList(false); + } + + @Override + public QueueEntryList getTestList(boolean newList) + { + if(newList) + { + return new SimpleQueueEntryList(_testQueue); + } + else + { + return _sqel; + } + } + + @Override + public long getExpectedFirstMsgId() + { + return 1; + } + + @Override + public int getExpectedListLength() + { + return 100; + } + + @Override + public ServerMessage getTestMessageToAdd() throws AMQException + { + ServerMessage msg = mock(ServerMessage.class); + when(msg.getMessageNumber()).thenReturn(1l); + return msg; + } + + public void testScavenge() throws Exception + { + SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); + ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>(); + + + //Add messages to generate QueueEntry's + for(int i = 1; i <= 100 ; i++) + { + ServerMessage message = mock(ServerMessage.class); + when(message.getMessageNumber()).thenReturn((long) i); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + QueueEntry bleh = sqel.add(message); + assertNotNull("QE should not have been null", bleh); + entriesMap.put(i,bleh); + } + + SimpleQueueEntryImpl head = sqel.getHead(); + + //We shall now delete some specific messages mid-queue that will lead to + //requiring a scavenge once the requested threshold of 9 deletes is passed + + //Delete the 2nd message only + assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete()); + verifyDeletedButPresentBeforeScavenge(head, 2); + + //Delete messages 12 to 14 + assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete()); + verifyDeletedButPresentBeforeScavenge(head, 12); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete()); + verifyDeletedButPresentBeforeScavenge(head, 13); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete()); + verifyDeletedButPresentBeforeScavenge(head, 14); + + //Delete message 20 only + assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete()); + verifyDeletedButPresentBeforeScavenge(head, 20); + + //Delete messages 81 to 84 + assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete()); + verifyDeletedButPresentBeforeScavenge(head, 81); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete()); + verifyDeletedButPresentBeforeScavenge(head, 82); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete()); + verifyDeletedButPresentBeforeScavenge(head, 83); + assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete()); + verifyDeletedButPresentBeforeScavenge(head, 84); + + //Delete message 99 - this is the 10th message deleted that is after the queue head + //and so will invoke the scavenge() which is set to go after 9 previous deletions + assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete()); + + verifyAllDeletedMessagedNotPresent(head, entriesMap); + } + + private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId) + { + //Use the head to get the initial entry in the queue + SimpleQueueEntryImpl entry = head.getNextNode(); + + for(long i = 1; i < messageId ; i++) + { + assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber()); + entry = entry.getNextNode(); + } + + assertTrue("Entry should have been deleted", entry.isDeleted()); + } + + private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) + { + //Use the head to get the initial entry in the queue + SimpleQueueEntryImpl entry = head.getNextNode(); + + assertNotNull("Initial entry should not have been null", entry); + + int count = 0; + + while (entry != null) + { + assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); + assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages, + remainingMessages.get((int)(entry.getMessage().getMessageNumber()))); + + count++; + entry = entry.getNextNode(); + } + + assertEquals("Count should have been equal",count,remainingMessages.size()); + } + + public void testGettingNextElement() + { + final int numberOfEntries = 5; + final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries]; + final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries; i++) + { + ServerMessage message = mock(ServerMessage.class); + when(message.getMessageNumber()).thenReturn((long)i); + entries[i] = queueEntryList.add(message); + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries; i++) + { + final SimpleQueueEntryImpl next = entries[i].getNextValidEntry(); + + if(i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + SimpleQueueEntryImpl next = entries[2].getNextValidEntry(); + assertEquals("expected forth entry", entries[3], next); + next = next.getNextValidEntry(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNextValidEntry(); + assertNull("The next entry after the last should be null", next); + } +} |