summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java97
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java264
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java23
6 files changed, 396 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ec6fb1f8de..b003152db6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -54,7 +54,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -742,12 +741,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
{
+ setLastSeenEntry(sub, entry);
+
_deliveredMessages.incrementAndGet();
incrementUnackedMsgCount();
sub.send(entry);
-
- setLastSeenEntry(sub,entry);
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
new file mode 100644
index 0000000000..b67723dd25
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ *
+ * Tests QueueEntry
+ *
+ */
+public class QueueEntryTest extends QpidTestCase
+{
+ private QueueEntryImpl _queueEntry1 = null;
+ private QueueEntryImpl _queueEntry2 = null;
+ private QueueEntryImpl _queueEntry3 = null;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ int i = 0;
+
+ SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null);
+ _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+ _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+ _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+ }
+
+ public void testCompareTo()
+ {
+ assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0);
+ assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0);
+ }
+
+ /**
+ * Tests that the getNext() can be used to traverse the list.
+ */
+ public void testTraverseWithNoDeletedEntries()
+ {
+ QueueEntryImpl current = _queueEntry1;
+
+ current = current.getNext();
+ assertSame("Unexpected current entry",_queueEntry2, current);
+
+ current = current.getNext();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNext();
+ assertNull(current);
+
+ }
+
+ /**
+ * Tests that the getNext() can be used to traverse the list but deleted
+ * entries are skipped and de-linked from the chain of entries.
+ */
+ public void testTraverseWithDeletedEntries()
+ {
+ // Delete 2nd queue entry
+ _queueEntry2.delete();
+ assertTrue(_queueEntry2.isDeleted());
+
+
+ QueueEntryImpl current = _queueEntry1;
+
+ current = current.getNext();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNext();
+ assertNull(current);
+
+ // Assert the side effects of getNext()
+ assertSame("Next node of entry 1 should now be entry 3",
+ _queueEntry3, _queueEntry1.nextNode());
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 9b65b7750c..67d093d00a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.queue;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.server.queue;
*
*/
+package org.apache.qpid.server.queue;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -36,6 +36,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -170,7 +171,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
- public void testSubscription() throws AMQException
+ public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
@@ -185,6 +186,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -199,13 +201,269 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
- public void testQueueNoSubscriber() throws AMQException, InterruptedException
+ public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
{
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ }
+
+ /**
+ * Tests enqueuing two messages.
+ */
+ public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+ {
+ AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageB = createMessage(new Long(25));
+ _queue.enqueue(messageA);
+ _queue.enqueue(messageB);
+ _queue.registerSubscription(_subscription, false);
+ Thread.sleep(150);
+ assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ }
+
+ /**
+ * Tests that a re-queued message is resent to the subscriber. Verifies also that the
+ * QueueContext._releasedEntry is reset to null after the entry has been reset.
+ */
+ public void testRequeuedMessageIsResentToSubscriber() throws Exception
+ {
+ _queue.registerSubscription(_subscription, false);
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ {
+ public void onEnqueue(QueueEntry entry)
+ {
+ queueEntries.add(entry);
+ }
+ };
+
+ AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageB = createMessage(new Long(25));
+ AMQMessage messageC = createMessage(new Long(26));
+
+ /* Enqueue three messages */
+
+ _queue.enqueue(messageA, postEnqueueAction);
+ _queue.enqueue(messageB, postEnqueueAction);
+ _queue.enqueue(messageC, postEnqueueAction);
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
+
+ /* Now requeue the first message only */
+
+ queueEntries.get(0).release();
+ _queue.requeue(queueEntries.get(0));
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
+ assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
+ assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
+ assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ }
+
+ /**
+ * Tests that a re-queued message that becomes expired is not resent to the subscriber.
+ * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries.
+ * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset.
+ */
+ public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception
+ {
+ _queue.registerSubscription(_subscription, false);
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ {
+ public void onEnqueue(QueueEntry entry)
+ {
+ queueEntries.add(entry);
+ }
+ };
+
+ /* Enqueue one message with expiration set for a short time in the future */
+
+ AMQMessage messageA = createMessage(new Long(24));
+ int messageExpirationOffset = 200;
+ messageA.setExpiration(System.currentTimeMillis() + messageExpirationOffset);
+
+ _queue.enqueue(messageA, postEnqueueAction);
+
+ int subFlushWaitTime = 150;
+ Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+
+ /* Wait a little more to be sure that message will have expired, then requeue it */
+ Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
+ queueEntries.get(0).release();
+ _queue.requeue(queueEntries.get(0));
+
+ Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+
+ assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
+ assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+
+ }
+
+ /**
+ * Tests that if a client requeues messages 'out of order' (the order
+ * used by QueueEntryImpl.compareTo) that messages are still resent
+ * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()}
+ * can correctly move the _releasedEntry to an earlier position in the QueueEntry list.
+ */
+ public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception
+ {
+ _queue.registerSubscription(_subscription, false);
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ {
+ public void onEnqueue(QueueEntry entry)
+ {
+ queueEntries.add(entry);
+ }
+ };
+
+ AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageB = createMessage(new Long(25));
+ AMQMessage messageC = createMessage(new Long(26));
+
+ /* Enqueue three messages */
+
+ _queue.enqueue(messageA, postEnqueueAction);
+ _queue.enqueue(messageB, postEnqueueAction);
+ _queue.enqueue(messageC, postEnqueueAction);
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
+ assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
+
+ /* Now requeue the third and first message only */
+
+ queueEntries.get(2).release();
+ queueEntries.get(0).release();
+ _queue.requeue(queueEntries.get(2));
+ _queue.requeue(queueEntries.get(0));
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
+ assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
+ assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
+ assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ }
+
+
+ /**
+ * Tests a requeue for a queue with multiple subscriptions. Verifies that a
+ * requeue resends a message to a <i>single</i> subscriber.
+ */
+ public void testRequeueForQueueWithMultipleSubscriptions() throws Exception
+ {
+ MockSubscription subscription1 = new MockSubscription();
+ MockSubscription subscription2 = new MockSubscription();
+
+ _queue.registerSubscription(subscription1, false);
+ _queue.registerSubscription(subscription2, false);
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ {
+ public void onEnqueue(QueueEntry entry)
+ {
+ queueEntries.add(entry);
+ }
+ };
+
+ AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageB = createMessage(new Long(25));
+
+ /* Enqueue two messages */
+
+ _queue.enqueue(messageA, postEnqueueAction);
+ _queue.enqueue(messageB, postEnqueueAction);
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
+
+ /* Now requeue a message (for any subscription) */
+
+ queueEntries.get(0).release();
+ _queue.requeue((QueueEntryImpl)queueEntries.get(0));
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
+ }
+
+ /**
+ * Tests a requeue for a queue with multiple subscriptions. Verifies that a
+ * subscriber specific requeue resends the message to <i>that</i> subscriber.
+ */
+ public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception
+ {
+ MockSubscription subscription1 = new MockSubscription();
+ MockSubscription subscription2 = new MockSubscription();
+
+ _queue.registerSubscription(subscription1, false);
+ _queue.registerSubscription(subscription2, false);
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ {
+ public void onEnqueue(QueueEntry entry)
+ {
+ queueEntries.add(entry);
+ }
+ };
+
+ AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageB = createMessage(new Long(25));
+
+ /* Enqueue two messages */
+
+ _queue.enqueue(messageA, postEnqueueAction);
+ _queue.enqueue(messageB, postEnqueueAction);
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
+
+ /* Now requeue a message (for first subscription) */
+
+ queueEntries.get(0).release();
+ _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1);
+
+ Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+ assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
}
public void testExclusiveConsumer() throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index 2fbf5bb2cf..320a75045a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -51,6 +51,21 @@ public class SimpleQueueEntryListTest extends TestCase
}
}
+ /**
+ * Tests the behavior of the next(QueuyEntry) method.
+ */
+ public void testNext() throws Exception
+ {
+ SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
+ int i = 0;
+
+ QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++));
+ QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++));
+
+ assertSame(queueEntry2, sqel.next(queueEntry1));
+ assertNull(sqel.next(queueEntry2));
+ }
+
public void testScavenge() throws Exception
{
SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index e8d0b99e6e..3593297a05 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.store;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
@@ -52,8 +47,11 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
}
-
-
+ @Override
+ public void close() throws Exception
+ {
+ // Not required to do anything
+ }
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index e6367c4468..1ec134e90e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -21,20 +21,19 @@ package org.apache.qpid.server.subscription;
*
*/
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
public class MockSubscription implements Subscription
{
@@ -137,12 +136,11 @@ public class MockSubscription implements Subscription
public void set(String key, Object value)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public Object get(String key)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isAutoClose()
@@ -194,12 +192,15 @@ public class MockSubscription implements Subscription
public void restoreCredit(QueueEntry queueEntry)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry) throws AMQException
{
- messages.add(msg);
+ if (messages.contains(entry))
+ {
+ entry.setRedelivered();
+ }
+ messages.add(entry);
}
public void setQueueContext(AMQQueue.Context queueContext)