summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java41
1 files changed, 29 insertions, 12 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index b135567029..3bb8d397be 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -29,6 +29,10 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -41,6 +45,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
import javax.management.JMException;
@@ -151,13 +156,11 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+ assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
}
public void testConsumerCount() throws AMQException
@@ -266,16 +269,22 @@ public class AMQQueueMBeanTest extends TestCase
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageNumber();
_queue.clearQueue();
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
+ long id = msg.getMessageNumber();
+
msg.addContentBodyFrame(new ContentChunk()
{
ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+ {
+ _data.limit((int)MESSAGE_SIZE);
+ }
+
public int getSize()
{
return (int) MESSAGE_SIZE;
@@ -292,7 +301,7 @@ public class AMQQueueMBeanTest extends TestCase
}
});
- AMQMessage m = new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo());
+ AMQMessage m = new AMQMessage(msg.getStoredMessage());
for(AMQQueue q : msg.getDestinationQueues())
{
q.enqueue(m);
@@ -345,7 +354,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -355,7 +364,14 @@ public class AMQQueueMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ IApplicationRegistry applicationRegistry = new TestApplicationRegistry(new ServerConfiguration(configuration));
+ ApplicationRegistry.initialise(applicationRegistry );
+
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
@@ -381,7 +397,8 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
@@ -391,7 +408,7 @@ public class AMQQueueMBeanTest extends TestCase
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- AMQMessage m = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize(), currentMessage.getMessagePublishInfo());
+ AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(AMQQueue q : currentMessage.getDestinationQueues())
{
q.enqueue(m);