summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java94
1 files changed, 67 insertions, 27 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 9b874d63e8..50bee71d59 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -27,10 +27,14 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactory;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -43,7 +47,10 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
+
+import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -58,6 +65,7 @@ public class AMQQueueMBeanTest extends TestCase
private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
private AMQProtocolSession _protocolSession;
+ private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
public void testMessageCountTransient() throws Exception
{
@@ -73,7 +81,7 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
_queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
+ assertEquals(0,(int)_queueMBean.getMessageCount());
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
//Ensure that the data has been removed from the Store
@@ -116,27 +124,30 @@ public class AMQQueueMBeanTest extends TestCase
public void testConsumerCount() throws AMQException
{
- SubscriptionManager mgr = _queue.getSubscribers();
- assertFalse(mgr.hasActiveSubscribers());
+
+ assertTrue(_queue.getActiveConsumerCount() == 0);
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
- _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
- assertTrue(_queueMBean.getActiveConsumerCount() == 1);
+ Subscription subscription =
+ SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
+
+ _queue.registerSubscription(subscription, false);
+ assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
- SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
+
+ SubscriptionFactory subscriptionFactory = SUBSCRIPTION_FACTORY;
Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
protocolSession,
new AMQShortString("S1"),
false,
null,
true,
- _queue);
+ channel.getCreditManager());
Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
protocolSession,
@@ -144,14 +155,14 @@ public class AMQQueueMBeanTest extends TestCase
false,
null,
true,
- _queue);
- _subscribers.addSubscriber(s1);
- _subscribers.addSubscriber(s2);
+ channel.getCreditManager());
+ _queue.registerSubscription(s1,false);
+ _queue.registerSubscription(s2,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
assertTrue(_queueMBean.getConsumerCount() == 3);
s1.close();
- assertTrue(_queueMBean.getActiveConsumerCount() == 2);
+ assertEquals(2, (int) _queueMBean.getActiveConsumerCount());
assertTrue(_queueMBean.getConsumerCount() == 3);
}
@@ -204,13 +215,35 @@ public class AMQQueueMBeanTest extends TestCase
}
- AMQMessage msg = message(false, false);
+ IncomingMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_messageStore, new MessageHandleFactory());
+
+ msg.addContentBodyFrame(new ContentChunk()
+ {
+ ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+
+ public int getSize()
+ {
+ return (int) MESSAGE_SIZE;
+ }
- msg.enqueue(_queue);
- msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
+ public ByteBuffer getData()
+ {
+ return _data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ });
+ msg.deliverToQueues();
+// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
{
@@ -223,7 +256,7 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
+ private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -258,7 +291,10 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ msg.setContentHeaderBody(contentHeaderBody);
+ return msg;
+
}
@Override
@@ -274,29 +310,33 @@ public class AMQQueueMBeanTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
+ null);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws AMQException
{
for (int i = 0; i < messageCount; i++)
{
- AMQMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
+ IncomingMessage currentMessage = message(false, persistent);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
// Add the body so we have somthing to test later
- currentMessage.addContentBodyFrame(_storeContext,
- _protocolSession.getMethodRegistry()
+ currentMessage.addContentBodyFrame(
+ _protocolSession.getMethodRegistry()
.getProtocolVersionMethodConverter()
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
+ currentMessage.deliverToQueues();
}