/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.queue; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class SimpleAMQQueueTest extends InternalBrokerBaseCase { protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); protected DirectExchange _exchange; protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; private MessagePublishInfo info = new MessagePublishInfo() { public AMQShortString getExchange() { return null; } public void setExchange(AMQShortString exchange) { //To change body of implemented methods use File | Settings | File Templates. } public boolean isImmediate() { return false; } public boolean isMandatory() { return false; } public AMQShortString getRoutingKey() { return null; } }; @Override public void setUp() throws Exception { super.setUp(); //Create Application Registry for test ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), new VirtualHostConfiguration(getClass().getName(), env), _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); _exchange = (DirectExchange)_virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); } @Override public void tearDown() throws Exception { _queue.stop(); super.tearDown(); } public void testCreateQueue() throws AMQException { _queue.stop(); try { _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, _virtualHost, _arguments ); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { assertTrue("Exception was not about missing name", e.getMessage().contains("name")); } try { _queue = new SimpleAMQQueue(_qname, false, _owner, false, false,null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { assertTrue("Exception was not about missing vhost", e.getMessage().contains("Host")); } _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } public void testBinding() throws AMQSecurityException, AMQInternalException { _virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP); assertTrue("Routing key was not bound", _exchange.isBound(_routingKey)); assertTrue("Queue was not bound to key", _exchange.isBound(_routingKey,_queue)); assertEquals("Exchange binding count", 1, _queue.getBindings().size()); assertEquals("Wrong exchange bound", String.valueOf(_routingKey), _queue.getBindings().get(0).getBindingKey()); assertEquals("Wrong exchange bound", _exchange, _queue.getBindings().get(0).getExchange()); _virtualHost.getBindingFactory().removeBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP); assertFalse("Routing key was still bound", _exchange.isBound(_routingKey)); } public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); try { Thread.sleep(2000L); } catch(InterruptedException e) { } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); assertTrue("Subscription still had queue", _subscription.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); assertFalse("Queue still has active consumer", 1 == _queue.getActiveConsumerCount()); AMQMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); assertNull(_subscription.getQueueContext()); } public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** * Tests enqueuing two messages. */ public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception { AMQMessage messageA = createMessage(new Long(24)); AMQMessage messageB = createMessage(new Long(25)); _queue.enqueue(messageA); _queue.enqueue(messageB); _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** * Tests that a released queue entry is resent to the subscriber. Verifies also that the * QueueContext._releasedEntry is reset to null after the entry has been reset. */ public void testReleasedMessageIsResentToSubscriber() throws Exception { _queue.registerSubscription(_subscription, false); final ArrayList queueEntries = new ArrayList(); PostEnqueueAction postEnqueueAction = new PostEnqueueAction() { public void onEnqueue(QueueEntry entry) { queueEntries.add(entry); } }; AMQMessage messageA = createMessage(new Long(24)); AMQMessage messageB = createMessage(new Long(25)); AMQMessage messageC = createMessage(new Long(26)); /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** * Tests that a released message that becomes expired is not resent to the subscriber. * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. */ public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); final ArrayList queueEntries = new ArrayList(); PostEnqueueAction postEnqueueAction = new PostEnqueueAction() { public void onEnqueue(QueueEntry entry) { queueEntries.add(entry); } }; /* Enqueue one message with expiration set for a short time in the future */ AMQMessage messageA = createMessage(new Long(24)); int messageExpirationOffset = 200; messageA.setExpiration(System.currentTimeMillis() + messageExpirationOffset); _queue.enqueue(messageA, postEnqueueAction); int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); queueEntries.get(0).release(); Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** * Tests that if a client releases entries 'out of order' (the order * used by QueueEntryImpl.compareTo) that messages are still resent * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. */ public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); final ArrayList queueEntries = new ArrayList(); PostEnqueueAction postEnqueueAction = new PostEnqueueAction() { public void onEnqueue(QueueEntry entry) { queueEntries.add(entry); } }; AMQMessage messageA = createMessage(new Long(24)); AMQMessage messageB = createMessage(new Long(25)); AMQMessage messageC = createMessage(new Long(26)); /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); /* Now release the third and first message only, causing it to be requeued */ queueEntries.get(2).release(); queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a * requeue resends a message to a single subscriber. */ public void testReleaseForQueueWithMultipleSubscriptions() throws Exception { MockSubscription subscription1 = new MockSubscription(); MockSubscription subscription2 = new MockSubscription(); _queue.registerSubscription(subscription1, false); _queue.registerSubscription(subscription2, false); final ArrayList queueEntries = new ArrayList(); PostEnqueueAction postEnqueueAction = new PostEnqueueAction() { public void onEnqueue(QueueEntry entry) { queueEntries.add(entry); } }; AMQMessage messageA = createMessage(new Long(24)); AMQMessage messageB = createMessage(new Long(25)); /* Enqueue two messages */ _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException { // Check adding an exclusive subscription adds it to the queue _queue.registerSubscription(_subscription, true); assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); try { Thread.sleep(2000L); } catch (InterruptedException e) { } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); Exception ex = null; try { _queue.registerSubscription(subB, false); } catch (AMQException e) { ex = e; } assertNotNull(ex); // Check we cannot add an exclusive subscriber to a queue with an // existing subscription _queue.unregisterSubscription(_subscription); _queue.registerSubscription(_subscription, false); try { _queue.registerSubscription(subB, true); } catch (AMQException e) { ex = e; } assertNotNull(ex); } public void testAutoDeleteQueue() throws Exception { _queue.stop(); _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); _queue.enqueue(message); _queue.unregisterSubscription(_subscription); assertTrue("Queue was not deleted when subscription was removed", _queue.isDeleted()); } public void testResend() throws Exception { _queue.registerSubscription(_subscription, false); Long id = new Long(26); AMQMessage message = createMessage(id); _queue.enqueue(message); QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); entry.setRedelivered(); _queue.resend(entry, _subscription); } public void testGetFirstMessageId() throws Exception { // Create message Long messageId = new Long(23); AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(message); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); // Check message id assertEquals("Message ID was wrong", messageId, testmsgid); } public void testGetFirstFiveMessageIds() throws Exception { for (int i = 0 ; i < 5; i++) { // Create message Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(message); } // Get message ids List msgids = _queue.getMessagesOnTheQueue(5); // Check message id for (int i = 0; i < 5; i++) { Long messageId = new Long(i); assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } public void testGetLastFiveMessageIds() throws Exception { for (int i = 0 ; i < 10; i++) { // Create message Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(message); } // Get message ids List msgids = _queue.getMessagesOnTheQueue(5, 5); // Check message id for (int i = 0; i < 5; i++) { Long messageId = new Long(i+5); assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } public void testGetMessagesRangeOnTheQueue() throws Exception { for (int i = 1 ; i <= 10; i++) { // Create message Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(message); } // Get non-existent 0th QueueEntry & check returned list was empty // (the position parameters in this method are indexed from 1) List entries = _queue.getMessagesRangeOnTheQueue(0, 0); assertTrue(entries.size() == 0); // Check that when 'from' is 0 it is ignored and the range continues from 1 entries = _queue.getMessagesRangeOnTheQueue(0, 2); assertTrue(entries.size() == 2); long msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 1L); msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 2L); // Check that when 'from' is greater than 'to' the returned list is empty entries = _queue.getMessagesRangeOnTheQueue(5, 4); assertTrue(entries.size() == 0); // Get first QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(1, 1); assertTrue(entries.size() == 1); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 1L); // Get 5th,6th,7th entries and check id's entries = _queue.getMessagesRangeOnTheQueue(5, 7); assertTrue(entries.size() == 3); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 5L); msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 6L); msgID = entries.get(2).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 7L); // Get 10th QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(10, 10); assertTrue(entries.size() == 1); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); // Get non-existent 11th QueueEntry & check returned set was empty entries = _queue.getMessagesRangeOnTheQueue(11, 11); assertTrue(entries.size() == 0); // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs entries = _queue.getMessagesRangeOnTheQueue(9, 11); assertTrue(entries.size() == 2); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 9L); msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); } public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue final IncomingMessage msg = new IncomingMessage(info); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.setProperties(new BasicContentHeaderProperties()); ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); final ArrayList qs = new ArrayList(); // Send persistent message qs.add(_queue); MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); StoredMessage handle = _store.addMessage(metaData); msg.setStoredMessage(handle); ServerTransaction txn = new AutoCommitTransaction(_store); txn.enqueue(qs, msg, new ServerTransaction.Action() { public void postCommit() { msg.enqueue(qs); } public void onRollback() { } }, 0L); // Check that it is enqueued AMQQueue data = _store.getMessages().get(1L); assertNull(data); // Dequeue message MockQueueEntry entry = new MockQueueEntry(); AMQMessage amqmsg = new AMQMessage(handle); entry.setMessage(amqmsg); _queue.dequeue(entry,null); // Check that it is dequeued data = _store.getMessages().get(1L); assertNull(data); } /** * processQueue() is used when asynchronously delivering messages to * subscriptions which could not be delivered immediately during the * enqueue() operation. * * A defect within the method would mean that delivery of these messages may * not occur should the Runner stop before all messages have been processed. * Such a defect was discovered when Selectors were used such that one and * only one subscription can/will accept any given messages, but multiple * subscriptions are present, and one of the earlier subscriptions receives * more messages than the others. * * This test is to validate that the processQueue() method is able to * correctly deliver all of the messages present for asynchronous delivery * to subscriptions in such a scenario. */ public void testProcessQueueWithUniqueSelectors() throws Exception { TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, false, _virtualHost, factory, null) { @Override public void deliverAsync(Subscription sub) { // do nothing, i.e prevent deliveries by the SubFlushRunner // when registering the new subscriptions } }; // retrieve the QueueEntryList the queue creates and insert the test // messages, thus avoiding straight-through delivery attempts during //enqueue() process. QueueEntryList list = factory.getQueueEntryList(); assertNotNull("QueueEntryList should have been created", list); QueueEntry msg1 = list.add(createMessage(1L)); QueueEntry msg2 = list.add(createMessage(2L)); QueueEntry msg3 = list.add(createMessage(3L)); QueueEntry msg4 = list.add(createMessage(4L)); QueueEntry msg5 = list.add(createMessage(5L)); // Create lists of the entries each subscription should be interested // in.Bias over 50% of the messages to the first subscription so that // the later subscriptions reject them and report being done before // the first subscription as the processQueue method proceeds. List msgListSub1 = createEntriesList(msg1, msg2, msg3); List msgListSub2 = createEntriesList(msg4); List msgListSub3 = createEntriesList(msg5); MockSubscription sub1 = new MockSubscription(msgListSub1); MockSubscription sub2 = new MockSubscription(msgListSub2); MockSubscription sub3 = new MockSubscription(msgListSub3); // register the subscriptions testQueue.registerSubscription(sub1, false); testQueue.registerSubscription(sub2, false); testQueue.registerSubscription(sub3, false); //check that no messages have been delivered to the //subscriptions during registration assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); // call processQueue to deliver the messages testQueue.processQueue(new QueueRunner(testQueue) { @Override public void run() { // we dont actually want/need this runner to do any work // because we we are already doing it! } }); // check expected messages delivered to correct consumers verifyRecievedMessages(msgListSub1, sub1.getMessages()); verifyRecievedMessages(msgListSub2, sub2.getMessages()); verifyRecievedMessages(msgListSub3, sub3.getMessages()); } /** * Tests that dequeued message is not present in the list returned form * {@link SimpleAMQQueue#getMessagesOnTheQueue()} */ public void testGetMessagesOnTheQueueWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; // send test messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message dequeueMessage(_queue, dequeueMessageIndex); // get messages on the queue List entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertEquals(messageNumber - 1, entries.size()); int expectedId = 0; for (int i = 0; i < messageNumber - 1; i++) { Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); if (i == dequeueMessageIndex) { assertFalse("Message with id " + dequeueMessageIndex + " was dequeued and should not be returned by method getMessagesOnTheQueue!", new Long(expectedId).equals(id)); expectedId++; } assertEquals("Expected message with id " + expectedId + " but got message with id " + id, new Long(expectedId), id); expectedId++; } } /** * Tests that dequeued message is not present in the list returned form * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} */ public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; // send test messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message dequeueMessage(_queue, dequeueMessageIndex); // get messages on the queue with filter accepting all available messages List entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() { public boolean accept(QueueEntry entry) { return true; } public boolean filterComplete() { return false; } }); // assert entries on the queue assertEquals(messageNumber - 1, entries.size()); int expectedId = 0; for (int i = 0; i < messageNumber - 1; i++) { Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); if (i == dequeueMessageIndex) { assertFalse("Message with id " + dequeueMessageIndex + " was dequeued and should not be returned by method getMessagesOnTheQueue!", new Long(expectedId).equals(id)); expectedId++; } assertEquals("Expected message with id " + expectedId + " but got message with id " + id, new Long(expectedId), id); expectedId++; } } /** * Tests that dequeued message is not copied as part of invocation of * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)} */ public void testCopyMessagesWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; String anotherQueueName = "testQueue2"; // put test messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message dequeueMessage(_queue, dequeueMessageIndex); // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); // copy messages into another queue _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List entries = queue.getMessagesOnTheQueue(); // assert another queue entries assertEquals(messageNumber - 1, entries.size()); int expectedId = 0; for (int i = 0; i < messageNumber - 1; i++) { Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); if (i == dequeueMessageIndex) { assertFalse("Message with id " + dequeueMessageIndex + " was dequeued and should not been copied into another queue!", new Long(expectedId).equals(id)); expectedId++; } assertEquals("Expected message with id " + expectedId + " but got message with id " + id, new Long(expectedId), id); expectedId++; } } /** * Tests that dequeued message is not moved as part of invocation of * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)} */ public void testMovedMessagesWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; String anotherQueueName = "testQueue2"; // put messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message dequeueMessage(_queue, dequeueMessageIndex); // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); // move messages into another queue _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List entries = queue.getMessagesOnTheQueue(); // assert another queue entries assertEquals(messageNumber - 1, entries.size()); int expectedId = 0; for (int i = 0; i < messageNumber - 1; i++) { Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); if (i == dequeueMessageIndex) { assertFalse("Message with id " + dequeueMessageIndex + " was dequeued and should not been copied into another queue!", new Long(expectedId).equals(id)); expectedId++; } assertEquals("Expected message with id " + expectedId + " but got message with id " + id, new Long(expectedId), id); expectedId++; } } /** * Tests that messages in given range including dequeued one are deleted * from the queue on invocation of * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)} */ public void testRemoveMessagesFromQueueWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; // put messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message dequeueMessage(_queue, dequeueMessageIndex); // remove messages _queue.removeMessagesFromQueue(0, messageNumber); // get queue entries List entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertNotNull("Null is returned from getMessagesOnTheQueue", entries); assertEquals("Queue should be empty", 0, entries.size()); } /** * Tests that dequeued message on the top is not accounted and next message * is deleted from the queue on invocation of * {@link SimpleAMQQueue#deleteMessageFromTop()} */ public void testDeleteMessageFromTopWithDequeuedEntryOnTop() { int messageNumber = 4; int dequeueMessageIndex = 0; // put messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message on top dequeueMessage(_queue, dequeueMessageIndex); //delete message from top _queue.deleteMessageFromTop(); //get queue netries List entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertNotNull("Null is returned from getMessagesOnTheQueue", entries); assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), messageNumber - 2, entries.size()); assertEquals("Expected first entry with id 2", new Long(2), ((AMQMessage) entries.get(0).getMessage()).getMessageId()); } /** * Tests that all messages including dequeued one are deleted from the queue * on invocation of {@link SimpleAMQQueue#clearQueue()} */ public void testClearQueueWithDequeuedEntry() { int messageNumber = 4; int dequeueMessageIndex = 1; // put messages into a test queue enqueueGivenNumberOfMessages(_queue, messageNumber); // dequeue message on a test queue dequeueMessage(_queue, dequeueMessageIndex); // clean queue try { _queue.clearQueue(); } catch (AMQException e) { fail("Failure to clear queue:" + e.getMessage()); } // get queue entries List entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertNotNull(entries); assertEquals(0, entries.size()); } /** * Tests whether dequeued entry is sent to subscriber in result of * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} */ public void testProcessQueueWithDequeuedEntry() { // total number of messages to send int messageNumber = 4; int dequeueMessageIndex = 1; // create queue with overridden method deliverAsync SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), false, false, _virtualHost, null) { @Override public void deliverAsync(Subscription sub) { // do nothing } }; // put messages List entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); // dequeue message dequeueMessage(testQueue, dequeueMessageIndex); // latch to wait for message receipt final CountDownLatch latch = new CountDownLatch(messageNumber -1); // create a subscription MockSubscription subscription = new MockSubscription() { /** * Send a message and decrement latch * @param entry * @param batch */ public void send(QueueEntry entry, boolean batch) throws AMQException { super.send(entry, batch); latch.countDown(); } }; try { // subscribe testQueue.registerSubscription(subscription, false); // process queue testQueue.processQueue(new QueueRunner(testQueue) { public void run() { // do nothing } }); } catch (AMQException e) { fail("Failure to process queue:" + e.getMessage()); } // wait up to 1 minute for message receipt try { latch.await(1, TimeUnit.MINUTES); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } List expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); verifyRecievedMessages(expected, subscription.getMessages()); } /** * Tests that entry in dequeued state are not enqueued and not delivered to subscription */ public void testEqueueDequeuedEntry() { // create a queue where each even entry is considered a dequeued SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory() { public QueueEntryList createQueueEntryList(AMQQueue queue) { /** * Override SimpleQueueEntryList to create a dequeued * entries for messages with even id */ return new SimpleQueueEntryList(queue) { /** * Entries with even message id are considered * dequeued! */ protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message) { return new SimpleQueueEntryImpl(this, message) { public boolean isDequeued() { return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); } public boolean isDispensed() { return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); } public boolean isAvailable() { return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); } @Override public boolean acquire(Subscription sub) { if(((AMQMessage) message).getMessageId().longValue() % 2 == 0) { return false; } else { return super.acquire(sub); } } }; } }; } }, null); // create a subscription MockSubscription subscription = new MockSubscription(); // register subscription try { queue.registerSubscription(subscription, false); } catch (AMQException e) { fail("Failure to register subscription:" + e.getMessage()); } // put test messages into a queue putGivenNumberOfMessages(queue, 4); // assert received messages List messages = subscription.getMessages(); assertEquals("Only 2 messages should be returned", 2, messages.size()); assertEquals("ID of first message should be 1", new Long(1), ((AMQMessage) messages.get(0).getMessage()).getMessageId()); assertEquals("ID of second message should be 3", new Long(3), ((AMQMessage) messages.get(1).getMessage()).getMessageId()); } public void testActiveConsumerCount() throws Exception { final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); //verify adding an active subscription increases the count final MockSubscription subscription1 = new MockSubscription(); subscription1.setActive(true); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); queue.registerSubscription(subscription1, false); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify adding an inactive subscription doesn't increase the count final MockSubscription subscription2 = new MockSubscription(); subscription2.setActive(false); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); queue.registerSubscription(subscription2, false); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: //verify a subscription going suspended->active increases the count queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); //verify a subscription going active->suspended decreases the count queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going suspended->closed doesn't change the count queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going active->closed decreases the count queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); //verify behaviour in face of unexpected state changes: //verify a subscription going closed->active increases the count queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going active->active doesn't change the count queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going closed->suspended doesn't change the count queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going suspended->suspended doesn't change the count queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); } /** * A helper method to create a queue with given name * * @param name * queue name * @return queue */ private SimpleAMQQueue createQueue(String name) { SimpleAMQQueue queue = null; try { AMQShortString queueName = new AMQShortString(name); AMQShortString ownerName = new AMQShortString(name + "Owner"); queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, _virtualHost, _arguments); } catch (AMQException e) { fail("Failure to create a queue:" + e.getMessage()); } assertNotNull("Queue was not created", queue); return queue; } /** * A helper method to put given number of messages into queue *

* All messages are asserted that they are present on queue * * @param queue * queue to put messages into * @param messageNumber * number of messages to put into queue */ private List enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) { putGivenNumberOfMessages(queue, messageNumber); // make sure that all enqueued messages are on the queue List entries = queue.getMessagesOnTheQueue(); assertEquals(messageNumber, entries.size()); for (int i = 0; i < messageNumber; i++) { assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId()); } return entries; } /** * A helper method to put given number of messages into queue *

* Queue is not checked if messages are added into queue * * @param queue * queue to put messages into * @param messageNumber * number of messages to put into queue * @param queue * @param messageNumber */ private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) { for (int i = 0; i < messageNumber; i++) { // Create message Long messageId = new Long(i); AMQMessage message = null; try { message = createMessage(messageId); } catch (AMQException e) { fail("Failure to create a test message:" + e.getMessage()); } // Put message on queue try { queue.enqueue(message); } catch (AMQException e) { fail("Failure to put message on queue:" + e.getMessage()); } } try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } } /** * A helper method to dequeue an entry on queue with given index * * @param queue * queue to dequeue message on * @param dequeueMessageIndex * entry index to dequeue. */ private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) { List entries = queue.getMessagesOnTheQueue(); QueueEntry entry = entries.get(dequeueMessageIndex); entry.acquire(); entry.dequeue(); assertTrue(entry.isDequeued()); return entry; } private List createEntriesList(QueueEntry... entries) { ArrayList entriesList = new ArrayList(); for (QueueEntry entry : entries) { entriesList.add(entry); } return entriesList; } private void verifyRecievedMessages(List expected, List delivered) { assertEquals("Consumer did not receive the expected number of messages", expected.size(), delivered.size()); for (QueueEntry msg : expected) { assertTrue("Consumer did not recieve msg: " + msg.getMessage().getMessageNumber(), delivered.contains(msg)); } } public class TestMessage extends AMQMessage { private final long _tag; private int _count; TestMessage(long tag, long messageId, MessagePublishInfo publishBody) throws AMQException { this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0)); } TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb) throws AMQException { super(new MockStoredMessage(messageId, publishBody, chb)); _tag = tag; } public boolean incrementReference() { _count++; return true; } public void decrementReference() { _count--; } void assertCountEquals(int expected) { assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } protected AMQMessage createMessage(Long id) throws AMQException { AMQMessage messageA = new TestMessage(id, id, info); return messageA; } class TestSimpleQueueEntryListFactory implements QueueEntryListFactory { QueueEntryList _list; public QueueEntryList createQueueEntryList(AMQQueue queue) { _list = new SimpleQueueEntryList(queue); return _list; } public QueueEntryList getQueueEntryList() { return _list; } } }