/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.lang.reflect.Field; import java.util.Collections; import java.util.UUID; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** * Tests for {@link QueueEntryImpl} */ public abstract class QueueEntryImplTestBase extends TestCase { // tested entry protected QueueEntryImpl _queueEntry; protected QueueEntryImpl _queueEntry2; protected QueueEntryImpl _queueEntry3; public abstract QueueEntryImpl getQueueEntryImpl(int msgId); public abstract void testCompareTo(); public abstract void testTraverseWithNoDeletedEntries(); public abstract void testTraverseWithDeletedEntries(); public void setUp() throws Exception { _queueEntry = getQueueEntryImpl(1); _queueEntry2 = getQueueEntryImpl(2); _queueEntry3 = getQueueEntryImpl(3); } protected void mockLogging() { final LogActor logActor = mock(LogActor.class); when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); CurrentActor.setDefault(logActor); } public void testAcquire() { assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", _queueEntry.isAvailable()); acquire(); } public void testDelete() { delete(); } /** * Tests release method for entry in acquired state. *
* Entry in state ACQUIRED should be released and its status should be * changed to AVAILABLE. */ public void testReleaseAcquired() { acquire(); _queueEntry.release(); assertTrue("Queue entry should be in AVAILABLE state after invoking of release method", _queueEntry.isAvailable()); } /** * Tests release method for entry in deleted state. *
* Invoking release on deleted entry should not have any effect on its
* state.
*/
public void testReleaseDeleted()
{
delete();
_queueEntry.release();
assertTrue("Invoking of release on entry in DELETED state should not have any effect",
_queueEntry.isDeleted());
}
/**
* A helper method to put tested object into deleted state and assert the state
*/
private void delete()
{
_queueEntry.delete();
assertTrue("Queue entry should be in DELETED state after invoking of delete method",
_queueEntry.isDeleted());
}
/**
* A helper method to put tested entry into acquired state and assert the sate
*/
private void acquire()
{
_queueEntry.acquire(newConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
private QueueConsumer newConsumer()
{
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
return consumer;
}
/**
* A helper method to get entry state
*
* @return entry state
*/
private EntryState getState()
{
EntryState state = null;
try
{
Field f = QueueEntryImpl.class.getDeclaredField("_state");
f.setAccessible(true);
state = (EntryState) f.get(_queueEntry);
}
catch (Exception e)
{
fail("Failure to get a state field: " + e.getMessage());
}
return state;
}
/**
* Tests rejecting a queue entry records the Consumer ID
* for later verification by isRejectedBy(consumerId).
*/
public void testRejectAndRejectedBy()
{
QueueConsumer sub = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
//acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
_queueEntry.reject();
_queueEntry.release();
//verify the rejection is recorded
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
//repeat rejection using a second consumer
QueueConsumer sub2 = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
//verify it still records being rejected by both consumers
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
}
/**
* Tests if entries in DEQUEUED or DELETED state are not returned by getNext method.
*/
public void testGetNext()
{
int numberOfEntries = 5;
QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
mock(VirtualHost.class), Collections.