diff options
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 128 |
1 files changed, 93 insertions, 35 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 4c02f266ed..3caf6ad73d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -24,10 +24,13 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -36,6 +39,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.LinkedList; @@ -49,18 +54,16 @@ public class AMQQueueMBeanTest extends TestCase private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private MessageStore _messageStore = new MemoryMessageStore(); + private MessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; + private AMQProtocolSession _protocolSession; - public void testMessageCount() throws Exception + public void testMessageCountTransient() throws Exception { int messageCount = 10; - sendMessages(messageCount); + sendMessages(messageCount, false); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; @@ -73,6 +76,43 @@ public class AMQQueueMBeanTest extends TestCase _queueMBean.clearQueue(); assertTrue(_queueMBean.getMessageCount() == 0); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + public void testMessageCountPersistent() throws Exception + { + int messageCount = 10; + sendMessages(messageCount, true); + assertEquals("", messageCount, _queueMBean.getMessageCount().intValue()); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); + + _queueMBean.deleteMessageFromTop(); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getMessageCount() == 0); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest + private void verifyBrokerState() + { + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore()); + + // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. + assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); + assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); + assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); + assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); } public void testConsumerCount() throws AMQException @@ -86,26 +126,26 @@ public class AMQQueueMBeanTest extends TestCase AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); protocolSession.addChannel(channel); - _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); - Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S1"), - false, - null, - true, - _queue); - - Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S2"), - false, - null, - true, - _queue); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); _subscribers.addSubscriber(s1); _subscribers.addSubscriber(s2); assertTrue(_queueMBean.getActiveConsumerCount() == 3); @@ -165,7 +205,7 @@ public class AMQQueueMBeanTest extends TestCase } - AMQMessage msg = message(false); + AMQMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); @@ -184,7 +224,7 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(final boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -209,9 +249,11 @@ public class AMQQueueMBeanTest extends TestCase return null; } }; - + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -221,22 +263,38 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); + + _protocolSession = new TestMinaProtocolSession(); } - private void sendMessages(int messageCount) throws AMQException + private void sendMessages(int messageCount, boolean persistent) throws AMQException { - AMQMessage[] messages = new AMQMessage[messageCount]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(false); - messages[i].enqueue(_queue); - messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i], false); + AMQMessage currentMessage = message(false, persistent); + currentMessage.enqueue(_queue); + + // route header + currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + + // Add the body so we have somthing to test later + currentMessage.addContentBodyFrame(_storeContext, + _protocolSession.getRegistry() + .getProtocolVersionMethodConverter() + .convertToContentChunk( + new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), + MESSAGE_SIZE))); + + } } } |