summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java135
1 files changed, 94 insertions, 41 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 2416442b10..9c2932c5e2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -23,24 +23,28 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
+import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -49,37 +53,38 @@ public class AckTest extends TestCase
{
private static final Logger _log = Logger.getLogger(AckTest.class);
- private SubscriptionImpl _subscription;
+ private Subscription _subscription;
private MockProtocolSession _protocolSession;
- private TestableMemoryMessageStore _messageStore;
+ private TestMemoryMessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
private AMQChannel _channel;
- private SubscriptionSet _subscriptionManager;
-
private AMQQueue _queue;
private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
- public AckTest() throws Exception
- {
- ApplicationRegistry.initialise(new NullApplicationRegistry());
- }
-
protected void setUp() throws Exception
{
super.setUp();
- _messageStore = new TestableMemoryMessageStore();
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
+
+ _messageStore = new TestMemoryMessageStore();
_protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(_protocolSession, 5, _messageStore);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
+
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+ null);
+ }
+
+ protected void tearDown()
+ {
+ ApplicationRegistry.remove(1);
}
private void publishMessages(int count) throws AMQException
@@ -92,6 +97,7 @@ public class AckTest extends TestCase
TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
+ _queue.registerSubscription(_subscription,false);
MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
@@ -125,7 +131,8 @@ public class AckTest extends TestCase
return new AMQShortString("rk");
}
};
- AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ //IncomingMessage msg2 = null;
if (persistent)
{
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
@@ -142,10 +149,16 @@ public class AckTest extends TestCase
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
- msg.incrementReference();
- msg.routingComplete(_messageStore, _storeContext, factory);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_messageStore, factory);
+ if(msg.allContentReceived())
+ {
+ msg.deliverToQueues();
+ }
// we manually send the message to the subscription
- _subscription.send(new QueueEntry(_queue,msg), _queue);
+ //_subscription.send(new QueueEntry(_queue,msg), _queue);
}
}
@@ -155,16 +168,13 @@ public class AckTest extends TestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
-
- //DTX
- // assertTrue(_messageStore.getNumberStoredMessages() == msgCount);
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -172,15 +182,12 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i);
i++;
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
}
assertTrue(map.size() == msgCount);
assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
-
- //DTX
-// assertTrue(_messageStore.getNumberStoredMessages() == msgCount);
}
/**
@@ -189,15 +196,32 @@ public class AckTest extends TestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- //DTX MessageStore
-// assertTrue(_messageStore.getNumberStoredMessages() == 0);
+ assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ public void testPersistentNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount, true);
+
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
}
/**
@@ -206,7 +230,7 @@ public class AckTest extends TestCase
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -219,7 +243,7 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
@@ -235,7 +259,7 @@ public class AckTest extends TestCase
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -248,7 +272,7 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
@@ -259,7 +283,7 @@ public class AckTest extends TestCase
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -272,18 +296,44 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
+ /**
+ * A regression fixing QPID-1136 showed this up
+ *
+ * @throws Exception
+ */
+ public void testMessageDequeueRestoresCreditTest() throws Exception
+ {
+ // Send 10 messages
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
+ DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+ final int msgCount = 1;
+ publishMessages(msgCount);
+
+ _queue.deliverAsync(_subscription);
+
+ _channel.acknowledgeMessage(1, false);
+
+ // Check credit available
+ assertTrue("No credit available", creditManager.hasCredit());
+
+ }
+
+
+/*
public void testPrefetchHighLow() throws AMQException
{
int lowMark = 5;
int highMark = 10;
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
_channel.setPrefetchLowMarkCount(lowMark);
_channel.setPrefetchHighMarkCount(highMark);
@@ -332,10 +382,12 @@ public class AckTest extends TestCase
assertTrue(map.size() == 0);
}
+*/
+/*
public void testPrefetch() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- _channel.setPrefetchCount(5);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ _channel.setMessageCredit(5);
assertTrue(_channel.getPrefetchCount() == 5);
@@ -360,6 +412,7 @@ public class AckTest extends TestCase
assertTrue(map.size() == 0);
}
+*/
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(AckTest.class);