summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java128
1 files changed, 93 insertions, 35 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 4c02f266ed..3caf6ad73d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -24,10 +24,13 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -36,6 +39,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
@@ -49,18 +54,16 @@ public class AMQQueueMBeanTest extends TestCase
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore = new MemoryMessageStore();
+ private MessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
+ private AMQProtocolSession _protocolSession;
- public void testMessageCount() throws Exception
+ public void testMessageCountTransient() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount);
+ sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -73,6 +76,43 @@ public class AMQQueueMBeanTest extends TestCase
_queueMBean.clearQueue();
assertTrue(_queueMBean.getMessageCount() == 0);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ public void testMessageCountPersistent() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+ private void verifyBrokerState()
+ {
+
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+ // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+ assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
+ assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+ assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
}
public void testConsumerCount() throws AMQException
@@ -86,26 +126,26 @@ public class AMQQueueMBeanTest extends TestCase
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
protocolSession.addChannel(channel);
- _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+ _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
- Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S1"),
- false,
- null,
- true,
- _queue);
-
- Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S2"),
- false,
- null,
- true,
- _queue);
+ Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S1"),
+ false,
+ null,
+ true,
+ _queue);
+
+ Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S2"),
+ false,
+ null,
+ true,
+ _queue);
_subscribers.addSubscriber(s1);
_subscribers.addSubscriber(s2);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
@@ -165,7 +205,7 @@ public class AMQQueueMBeanTest extends TestCase
}
- AMQMessage msg = message(false);
+ AMQMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
@@ -184,7 +224,7 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(final boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -209,9 +249,11 @@ public class AMQQueueMBeanTest extends TestCase
return null;
}
};
-
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -221,22 +263,38 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
+
+ _protocolSession = new TestMinaProtocolSession();
}
- private void sendMessages(int messageCount) throws AMQException
+ private void sendMessages(int messageCount, boolean persistent) throws AMQException
{
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false);
- messages[i].enqueue(_queue);
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- }
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ AMQMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(_queue);
+
+ // route header
+ currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+ // Add the body so we have somthing to test later
+ currentMessage.addContentBodyFrame(_storeContext,
+ _protocolSession.getRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+ MESSAGE_SIZE)));
+
+
}
}
}