diff options
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 94 |
1 files changed, 67 insertions, 27 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 9b874d63e8..50bee71d59 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -27,10 +27,14 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactory; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -43,7 +47,10 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; + +import java.util.ArrayList; import java.util.LinkedList; +import java.util.Collections; /** * Test class to test AMQQueueMBean attribtues and operations @@ -58,6 +65,7 @@ public class AMQQueueMBeanTest extends TestCase private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; private AMQProtocolSession _protocolSession; + private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE; public void testMessageCountTransient() throws Exception { @@ -73,7 +81,7 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); _queueMBean.clearQueue(); - assertTrue(_queueMBean.getMessageCount() == 0); + assertEquals(0,(int)_queueMBean.getMessageCount()); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); //Ensure that the data has been removed from the Store @@ -116,27 +124,30 @@ public class AMQQueueMBeanTest extends TestCase public void testConsumerCount() throws AMQException { - SubscriptionManager mgr = _queue.getSubscribers(); - assertFalse(mgr.hasActiveSubscribers()); + + assertTrue(_queue.getActiveConsumerCount() == 0); assertTrue(_queueMBean.getActiveConsumerCount() == 0); - TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); + InternalTestProtocolSession protocolSession = new InternalTestProtocolSession(); AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); - _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); - assertTrue(_queueMBean.getActiveConsumerCount() == 1); + Subscription subscription = + SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager()); + + _queue.registerSubscription(subscription, false); + assertEquals(1,(int)_queueMBean.getActiveConsumerCount()); - SubscriptionSet _subscribers = (SubscriptionSet) mgr; - SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); + + SubscriptionFactory subscriptionFactory = SUBSCRIPTION_FACTORY; Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("S1"), false, null, true, - _queue); + channel.getCreditManager()); Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), protocolSession, @@ -144,14 +155,14 @@ public class AMQQueueMBeanTest extends TestCase false, null, true, - _queue); - _subscribers.addSubscriber(s1); - _subscribers.addSubscriber(s2); + channel.getCreditManager()); + _queue.registerSubscription(s1,false); + _queue.registerSubscription(s2,false); assertTrue(_queueMBean.getActiveConsumerCount() == 3); assertTrue(_queueMBean.getConsumerCount() == 3); s1.close(); - assertTrue(_queueMBean.getActiveConsumerCount() == 2); + assertEquals(2, (int) _queueMBean.getActiveConsumerCount()); assertTrue(_queueMBean.getConsumerCount() == 3); } @@ -204,13 +215,35 @@ public class AMQQueueMBeanTest extends TestCase } - AMQMessage msg = message(false, false); + IncomingMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); + msg.routingComplete(_messageStore, new MessageHandleFactory()); + + msg.addContentBodyFrame(new ContentChunk() + { + ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); + + public int getSize() + { + return (int) MESSAGE_SIZE; + } - msg.enqueue(_queue); - msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - _queue.process(_storeContext, new QueueEntry(_queue, msg), false); + public ByteBuffer getData() + { + return _data; + } + + public void reduceToFit() + { + + } + }); + msg.deliverToQueues(); +// _queue.process(_storeContext, new QueueEntry(_queue, msg), false); _queueMBean.viewMessageContent(id); try { @@ -223,7 +256,7 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException + private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -258,7 +291,10 @@ public class AMQQueueMBeanTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); - return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); + msg.setContentHeaderBody(contentHeaderBody); + return msg; + } @Override @@ -274,29 +310,33 @@ public class AMQQueueMBeanTest extends TestCase new LinkedList<RequiredDeliveryException>() ); - _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost, + null); _queueMBean = new AMQQueueMBean(_queue); - _protocolSession = new TestMinaProtocolSession(); + _protocolSession = new InternalTestProtocolSession(); } private void sendMessages(int messageCount, boolean persistent) throws AMQException { for (int i = 0; i < messageCount; i++) { - AMQMessage currentMessage = message(false, persistent); - currentMessage.enqueue(_queue); + IncomingMessage currentMessage = message(false, persistent); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); // Add the body so we have somthing to test later - currentMessage.addContentBodyFrame(_storeContext, - _protocolSession.getMethodRegistry() + currentMessage.addContentBodyFrame( + _protocolSession.getMethodRegistry() .getProtocolVersionMethodConverter() .convertToContentChunk( new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), MESSAGE_SIZE))); + currentMessage.deliverToQueues(); } |