/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.queue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import junit.framework.TestCase; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class ConflationQueueListTest extends TestCase { private static final String CONFLATION_KEY = "CONFLATION_KEY"; private static final String TEST_KEY_VALUE = "testKeyValue"; private static final String TEST_KEY_VALUE1 = "testKeyValue1"; private static final String TEST_KEY_VALUE2 = "testKeyValue2"; private ConflationQueueList _list; private ConflationQueue _queue; @Override protected void setUp() throws Exception { super.setUp(); Map queueAttributes = new HashMap(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes); _list = _queue.getEntries(); } public void testListHasNoEntries() { int numberOfEntries = countEntries(_list); assertEquals(0, numberOfEntries); } public void testAddMessageWithoutConflationKeyValue() { ServerMessage message = createTestServerMessage(null); _list.add(message); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); } public void testAddAndDiscardMessageWithoutConflationKeyValue() { ServerMessage message = createTestServerMessage(null); QueueEntry addedEntry = _list.add(message); addedEntry.delete(); int numberOfEntries = countEntries(_list); assertEquals(0, numberOfEntries); } public void testAddMessageWithConflationKeyValue() { ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); _list.add(message); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); } public void testAddAndRemoveMessageWithConflationKeyValue() { ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); QueueEntry addedEntry = _list.add(message); addedEntry.delete(); int numberOfEntries = countEntries(_list); assertEquals(0, numberOfEntries); } public void testAddTwoMessagesWithDifferentConflationKeyValue() { ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); _list.add(message1); _list.add(message2); int numberOfEntries = countEntries(_list); assertEquals(2, numberOfEntries); } public void testAddTwoMessagesWithSameConflationKeyValue() { ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); _list.add(message1); _list.add(message2); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); } public void testSupersededEntryIsDiscardedOnRelease() { ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); QueueEntry entry1 = _list.add(message1); entry1.acquire(); // simulate an in-progress delivery to consumer _list.add(message2); assertFalse(entry1.isDeleted()); assertEquals(2, countEntries(_list)); entry1.release(); // simulate consumer rollback/recover assertEquals(1, countEntries(_list)); assertTrue(entry1.isDeleted()); } public void testConflationMapMaintained() { assertEquals(0, _list.getLatestValuesMap().size()); ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); QueueEntry addedEntry = _list.add(message); assertEquals(1, countEntries(_list)); assertEquals(1, _list.getLatestValuesMap().size()); addedEntry.delete(); assertEquals(0, countEntries(_list)); assertEquals(0, _list.getLatestValuesMap().size()); } public void testConflationMapMaintainedWithDifferentConflationKeyValue() { assertEquals(0, _list.getLatestValuesMap().size()); ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); QueueEntry addedEntry1 = _list.add(message1); QueueEntry addedEntry2 = _list.add(message2); assertEquals(2, countEntries(_list)); assertEquals(2, _list.getLatestValuesMap().size()); addedEntry1.delete(); addedEntry2.delete(); assertEquals(0, countEntries(_list)); assertEquals(0, _list.getLatestValuesMap().size()); } private int countEntries(ConflationQueueList list) { QueueEntryIterator> iterator = list.iterator(); int count = 0; while(iterator.advance()) { count++; } return count; } private ServerMessage createTestServerMessage(String conflationKeyValue) { ServerMessage mockMessage = mock(ServerMessage.class); AMQMessageHeader messageHeader = mock(AMQMessageHeader.class); when(messageHeader.getHeader(CONFLATION_KEY)).thenReturn(conflationKeyValue); when(mockMessage.getMessageHeader()).thenReturn(messageHeader); MessageReference messageReference = mock(MessageReference.class); when(mockMessage.newReference()).thenReturn(messageReference); when(messageReference.getMessage()).thenReturn(mockMessage); return mockMessage; } private AMQQueue createTestQueue() { AMQQueue queue = mock(AMQQueue.class); VirtualHost virtualHost = mock(VirtualHost.class); when(queue.getVirtualHost()).thenReturn(virtualHost); return queue; } }