diff options
6 files changed, 396 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ec6fb1f8de..b003152db6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -54,7 +54,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -742,12 +741,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void deliverMessage(final Subscription sub, final QueueEntry entry) throws AMQException { + setLastSeenEntry(sub, entry); + _deliveredMessages.incrementAndGet(); incrementUnackedMsgCount(); sub.send(entry); - - setLastSeenEntry(sub,entry); } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java new file mode 100644 index 0000000000..b67723dd25 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.test.utils.QpidTestCase; + +/** + * + * Tests QueueEntry + * + */ +public class QueueEntryTest extends QpidTestCase +{ + private QueueEntryImpl _queueEntry1 = null; + private QueueEntryImpl _queueEntry2 = null; + private QueueEntryImpl _queueEntry3 = null; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + int i = 0; + + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null); + _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); + _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); + _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); + } + + public void testCompareTo() + { + assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0); + assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0); + assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0); + } + + /** + * Tests that the getNext() can be used to traverse the list. + */ + public void testTraverseWithNoDeletedEntries() + { + QueueEntryImpl current = _queueEntry1; + + current = current.getNext(); + assertSame("Unexpected current entry",_queueEntry2, current); + + current = current.getNext(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNext(); + assertNull(current); + + } + + /** + * Tests that the getNext() can be used to traverse the list but deleted + * entries are skipped and de-linked from the chain of entries. + */ + public void testTraverseWithDeletedEntries() + { + // Delete 2nd queue entry + _queueEntry2.delete(); + assertTrue(_queueEntry2.isDeleted()); + + + QueueEntryImpl current = _queueEntry1; + + current = current.getNext(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNext(); + assertNull(current); + + // Assert the side effects of getNext() + assertSame("Next node of entry 1 should now be entry 3", + _queueEntry3, _queueEntry1.nextNode()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 9b65b7750c..67d093d00a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.server.queue; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +19,7 @@ package org.apache.qpid.server.queue; * */ +package org.apache.qpid.server.queue; import org.apache.commons.configuration.PropertiesConfiguration; @@ -36,6 +36,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -170,7 +171,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } - public void testSubscription() throws AMQException + public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); @@ -185,6 +186,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry); // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); @@ -199,13 +201,269 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } - public void testQueueNoSubscriber() throws AMQException, InterruptedException + public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + } + + /** + * Tests enqueuing two messages. + */ + public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception + { + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + _queue.enqueue(messageA); + _queue.enqueue(messageB); + _queue.registerSubscription(_subscription, false); + Thread.sleep(150); + assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + } + + /** + * Tests that a re-queued message is resent to the subscriber. Verifies also that the + * QueueContext._releasedEntry is reset to null after the entry has been reset. + */ + public void testRequeuedMessageIsResentToSubscriber() throws Exception + { + _queue.registerSubscription(_subscription, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + AMQMessage messageC = createMessage(new Long(26)); + + /* Enqueue three messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + _queue.enqueue(messageC, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); + + /* Now requeue the first message only */ + + queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(0)); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); + assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + } + + /** + * Tests that a re-queued message that becomes expired is not resent to the subscriber. + * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. + * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. + */ + public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception + { + _queue.registerSubscription(_subscription, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + /* Enqueue one message with expiration set for a short time in the future */ + + AMQMessage messageA = createMessage(new Long(24)); + int messageExpirationOffset = 200; + messageA.setExpiration(System.currentTimeMillis() + messageExpirationOffset); + + _queue.enqueue(messageA, postEnqueueAction); + + int subFlushWaitTime = 150; + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + + /* Wait a little more to be sure that message will have expired, then requeue it */ + Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); + queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(0)); + + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + + assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); + assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + + } + + /** + * Tests that if a client requeues messages 'out of order' (the order + * used by QueueEntryImpl.compareTo) that messages are still resent + * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} + * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. + */ + public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception + { + _queue.registerSubscription(_subscription, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + AMQMessage messageC = createMessage(new Long(26)); + + /* Enqueue three messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + _queue.enqueue(messageC, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); + + /* Now requeue the third and first message only */ + + queueEntries.get(2).release(); + queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(2)); + _queue.requeue(queueEntries.get(0)); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); + assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); + assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + } + + + /** + * Tests a requeue for a queue with multiple subscriptions. Verifies that a + * requeue resends a message to a <i>single</i> subscriber. + */ + public void testRequeueForQueueWithMultipleSubscriptions() throws Exception + { + MockSubscription subscription1 = new MockSubscription(); + MockSubscription subscription2 = new MockSubscription(); + + _queue.registerSubscription(subscription1, false); + _queue.registerSubscription(subscription2, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + + /* Enqueue two messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); + + /* Now requeue a message (for any subscription) */ + + queueEntries.get(0).release(); + _queue.requeue((QueueEntryImpl)queueEntries.get(0)); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); + } + + /** + * Tests a requeue for a queue with multiple subscriptions. Verifies that a + * subscriber specific requeue resends the message to <i>that</i> subscriber. + */ + public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception + { + MockSubscription subscription1 = new MockSubscription(); + MockSubscription subscription2 = new MockSubscription(); + + _queue.registerSubscription(subscription1, false); + _queue.registerSubscription(subscription2, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + + /* Enqueue two messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); + + /* Now requeue a message (for first subscription) */ + + queueEntries.get(0).release(); + _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); } public void testExclusiveConsumer() throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 2fbf5bb2cf..320a75045a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -51,6 +51,21 @@ public class SimpleQueueEntryListTest extends TestCase } } + /** + * Tests the behavior of the next(QueuyEntry) method. + */ + public void testNext() throws Exception + { + SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); + int i = 0; + + QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++)); + QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++)); + + assertSame(queueEntry2, sqel.next(queueEntry1)); + assertNull(sqel.next(queueEntry2)); + } + public void testScavenge() throws Exception { SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index e8d0b99e6e..3593297a05 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.store; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.framing.abstraction.ContentChunk; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashMap; -import java.util.List; -import java.nio.ByteBuffer; /** * Adds some extra methods to the memory message store for testing purposes. @@ -52,8 +47,11 @@ public class TestableMemoryMessageStore extends MemoryMessageStore } - - + @Override + public void close() throws Exception + { + // Not required to do anything + } @Override public StoredMessage addMessage(StorableMessageMetaData metaData) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index e6367c4468..1ec134e90e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -21,20 +21,19 @@ package org.apache.qpid.server.subscription; * */ +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - public class MockSubscription implements Subscription { @@ -137,12 +136,11 @@ public class MockSubscription implements Subscription public void set(String key, Object value) { - //To change body of implemented methods use File | Settings | File Templates. } public Object get(String key) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isAutoClose() @@ -194,12 +192,15 @@ public class MockSubscription implements Subscription public void restoreCredit(QueueEntry queueEntry) { - //To change body of implemented methods use File | Settings | File Templates. } - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry) throws AMQException { - messages.add(msg); + if (messages.contains(entry)) + { + entry.setRedelivered(); + } + messages.add(entry); } public void setQueueContext(AMQQueue.Context queueContext) |