summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java317
1 files changed, 0 insertions, 317 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
deleted file mode 100644
index 1245eaf7c2..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.txn.MemoryTransactionManager;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.JMException;
-import java.util.LinkedList;
-import java.util.HashSet;
-
-/**
- * Test class to test AMQQueueMBean attribtues and operations
- */
-public class AMQQueueMBeanTest extends TestCase
-{
- private static long MESSAGE_SIZE = 1000;
- private AMQQueue _queue;
- private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore = new MemoryMessageStore();
- private TransactionManager _txm = new MemoryTransactionManager();
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
- private VirtualHost _virtualHost;
- private AMQProtocolSession _protocolSession;
-
- public void testMessageCount() throws Exception
- {
- int messageCount = 10;
- sendMessages(messageCount, false);
- assertTrue(_queueMBean.getMessageCount() == messageCount);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
- assertTrue(_queueMBean.getQueueDepth() == queueDepth);
-
- _queueMBean.deleteMessageFromTop();
- assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- _queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- //Ensure that the data has been removed from the Store
- verifyBrokerState();
- }
-
- public void testMessageCountPersistent() throws Exception
- {
- int messageCount = 10;
- sendMessages(messageCount, true);
- assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
- assertTrue(_queueMBean.getQueueDepth() == queueDepth);
-
- _queueMBean.deleteMessageFromTop();
- assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- _queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- //Ensure that the data has been removed from the Store
- verifyBrokerState();
- }
-
- // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
- private void verifyBrokerState()
- {
-
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
-
- // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
- }
-
- public void testConsumerCount() throws AMQException
- {
- SubscriptionManager mgr = _queue.getSubscribers();
- assertFalse(mgr.hasActiveSubscribers());
- assertTrue(_queueMBean.getActiveConsumerCount() == 0);
-
-
- TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore);
- protocolSession.addChannel(channel);
-
- _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
- assertTrue(_queueMBean.getActiveConsumerCount() == 1);
-
- SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
- Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S1"),
- false,
- null,
- true,
- _queue);
-
- Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S2"),
- false,
- null,
- true,
- _queue);
- _subscribers.addSubscriber(s1);
- _subscribers.addSubscriber(s2);
- assertTrue(_queueMBean.getActiveConsumerCount() == 3);
- assertTrue(_queueMBean.getConsumerCount() == 3);
-
- s1.close();
- assertTrue(_queueMBean.getActiveConsumerCount() == 2);
- assertTrue(_queueMBean.getConsumerCount() == 3);
- }
-
- public void testGeneralProperties()
- {
- long maxQueueDepth = 1000; // in bytes
- _queueMBean.setMaximumMessageCount(50000l);
- _queueMBean.setMaximumMessageSize(2000l);
- _queueMBean.setMaximumQueueDepth(maxQueueDepth);
-
- assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
- assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
-
- assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
- assertFalse(_queueMBean.isAutoDelete());
- assertFalse(_queueMBean.isDurable());
- }
-
- public void testExceptions() throws Exception
- {
- try
- {
- _queueMBean.viewMessages(0, 3);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(2, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(-1, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- AMQMessage msg = message(false, false);
- long id = msg.getMessageId();
- _queue.clearQueue(_storeContext);
-
- msg.enqueue(_queue);
- msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
- _queueMBean.viewMessageContent(id);
- try
- {
- _queueMBean.viewMessageContent(id + 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
- }
-
- private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
- {
- MessagePublishInfo publish = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- if (contentHeaderBody.properties == null)
- {
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- }
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
- _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
- _queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
- }
-
- private void sendMessages(int messageCount, boolean persistent) throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false, persistent);
- messages[i].enqueue(_queue);
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- }
- for (int i = 0; i < messageCount; i++)
- {
- /* TGM: From here to end of method was:
- * queue.process(_storeContext, messages[i], false);
- */
-
- AMQMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
-
- // route header
- currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
-
- // Add the body so we have somthing to test later
- currentMessage.addContentBodyFrame(_storeContext,
- _protocolSession.getMethodRegistry()
- .getProtocolVersionMethodConverter()
- .convertToContentChunk(
- new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
- MESSAGE_SIZE)));
-
- }
- }
-}