diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index 88702e66f4..a6e5f29964 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,20 +23,20 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryIterator; -import org.apache.qpid.server.queue.SimpleQueueEntryList; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,40 +62,51 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(getName()); + private AMQQueue _queue; private MessageStore _messageStore = new TestMemoryMessageStore(); private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); + private Subscription _subscription; + private boolean _queueDeleted; @Override public void setUp() throws AMQException { + _queueDeleted = false; _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100); + _queue = mock(AMQQueue.class); + when(_queue.getName()).thenReturn(getName()); + when(_queue.isDeleted()).thenReturn(_queueDeleted); + _subscription = mock(Subscription.class); + when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); + long id = 0; - SimpleQueueEntryList list = new SimpleQueueEntryList(_queue); // Add initial messages to QueueEntryList for (int count = 0; count < INITIAL_MSG_COUNT; count++) { - AMQMessage msg = new MockAMQMessage(id); - - list.add(msg); - + ServerMessage msg = mock(ServerMessage.class); + when(msg.getMessageNumber()).thenReturn(id); + final QueueEntry entry = mock(QueueEntry.class); + when(entry.getMessage()).thenReturn(msg); + when(entry.getQueue()).thenReturn(_queue); + when(entry.isQueueDeleted()).thenReturn(_queueDeleted); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(entry.isDeleted()).thenReturn(true); + return null; + } + }).when(entry).delete(); + + _unacknowledgedMessageMap.add(id, entry); + _referenceList.add(entry); //Increment ID; id++; } - // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList - QueueEntryIterator queueEntries = list.iterator(); - while(queueEntries.advance()) - { - QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); - - // Store the entry for future inspection - _referenceList.add(entry); - } - assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } @@ -106,19 +117,14 @@ public class ExtractResendAndRequeueTest extends TestCase * * @return Subscription that performed the acquire */ - private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList) + private void acquireMessages(LinkedList<QueueEntry> messageList) { - Subscription subscription = mock(Subscription.class); - when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); - when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); // Acquire messages in subscription - for (QueueEntry entry : messageList) + for(QueueEntry entry : messageList) { - entry.acquire(subscription); + when(entry.getDeliveredSubscription()).thenReturn(_subscription); } - - return subscription; } /** @@ -133,7 +139,7 @@ public class ExtractResendAndRequeueTest extends TestCase public void testResend() throws AMQException { //We don't need the subscription object here. - createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); @@ -159,10 +165,10 @@ public class ExtractResendAndRequeueTest extends TestCase */ public void testRequeueDueToSubscriptionClosure() throws AMQException { - Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); // Close subscription - when(subscription.isClosed()).thenReturn(true); + when(_subscription.isClosed()).thenReturn(true); final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); @@ -239,8 +245,7 @@ public class ExtractResendAndRequeueTest extends TestCase final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - _queue.delete(); - + _queueDeleted = true; // requeueIfUnableToResend : value doesn't matter here as queue has been deleted _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, msgToResend, false, _messageStore)); |