summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java71
1 files changed, 67 insertions, 4 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
index b3223f16c4..4d6d60906d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
@@ -22,14 +22,72 @@ package org.apache.qpid.server.ack;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
import java.util.List;
-public class AcknowledgeTest extends InternalBrokerBaseCase
+public class AcknowledgeTest extends QpidTestCase
{
+ private AMQChannel _channel;
+ private SimpleAMQQueue _queue;
+ private MessageStore _messageStore;
+ private String _queueName;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ BrokerTestHelper.setUp();
+ _channel = BrokerTestHelper.createChannel();
+ VirtualHost virtualHost = _channel.getVirtualHost();
+ _queueName = getTestName();
+ _queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
+ _messageStore = virtualHost.getMessageStore();
+ Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange();
+ virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.getVirtualHost().close();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
+ }
+
+ private AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ private InternalTestProtocolSession getSession()
+ {
+ return (InternalTestProtocolSession)_channel.getProtocolSession();
+ }
+
+ private SimpleAMQQueue getQueue()
+ {
+ return _queue;
+ }
public void testTransactionalSingleAck() throws AMQException
{
@@ -70,7 +128,7 @@ public class AcknowledgeTest extends InternalBrokerBaseCase
checkStoreContents(0);
//Send required messsages to the queue
- publishMessages(getSession(), getChannel(), sendMessageCount);
+ BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
if (getChannel().isTransactional())
{
@@ -84,7 +142,7 @@ public class AcknowledgeTest extends InternalBrokerBaseCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = subscribe(getSession(), getChannel(), getQueue());
+ AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
getQueue().deliverAsync();
@@ -117,4 +175,9 @@ public class AcknowledgeTest extends InternalBrokerBaseCase
checkStoreContents(remainingUnackedMessages);
}
+ private void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+ }
+
}