diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java | 135 |
1 files changed, 94 insertions, 41 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 2416442b10..9c2932c5e2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -23,24 +23,28 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; -import java.util.HashSet; +import java.util.ArrayList; import java.util.LinkedList; import java.util.Set; +import java.util.Collections; /** * Tests that acknowledgements are handled correctly. @@ -49,37 +53,38 @@ public class AckTest extends TestCase { private static final Logger _log = Logger.getLogger(AckTest.class); - private SubscriptionImpl _subscription; + private Subscription _subscription; private MockProtocolSession _protocolSession; - private TestableMemoryMessageStore _messageStore; + private TestMemoryMessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); private AMQChannel _channel; - private SubscriptionSet _subscriptionManager; - private AMQQueue _queue; private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); - public AckTest() throws Exception - { - ApplicationRegistry.initialise(new NullApplicationRegistry()); - } - protected void setUp() throws Exception { super.setUp(); - _messageStore = new TestableMemoryMessageStore(); + ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); + + _messageStore = new TestMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession, 5, _messageStore); + _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); + } + + protected void tearDown() + { + ApplicationRegistry.remove(1); } private void publishMessages(int count) throws AMQException @@ -92,6 +97,7 @@ public class AckTest extends TestCase TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + _queue.registerSubscription(_subscription,false); MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { @@ -125,7 +131,8 @@ public class AckTest extends TestCase return new AMQShortString("rk"); } }; - AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + //IncomingMessage msg2 = null; if (persistent) { BasicContentHeaderProperties b = new BasicContentHeaderProperties(); @@ -142,10 +149,16 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.incrementReference(); - msg.routingComplete(_messageStore, _storeContext, factory); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); + msg.routingComplete(_messageStore, factory); + if(msg.allContentReceived()) + { + msg.deliverToQueues(); + } // we manually send the message to the subscription - _subscription.send(new QueueEntry(_queue,msg), _queue); + //_subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -155,16 +168,13 @@ public class AckTest extends TestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX - // assertTrue(_messageStore.getNumberStoredMessages() == msgCount); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -172,15 +182,12 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i); i++; - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); } assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX -// assertTrue(_messageStore.getNumberStoredMessages() == msgCount); } /** @@ -189,15 +196,32 @@ public class AckTest extends TestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - //DTX MessageStore -// assertTrue(_messageStore.getNumberStoredMessages() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + + } + + /** + * Tests that in no-ack mode no messages are retained + */ + public void testPersistentNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount, true); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + } /** @@ -206,7 +230,7 @@ public class AckTest extends TestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -219,7 +243,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -235,7 +259,7 @@ public class AckTest extends TestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -248,7 +272,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } @@ -259,7 +283,7 @@ public class AckTest extends TestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -272,18 +296,44 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } } + /** + * A regression fixing QPID-1136 showed this up + * + * @throws Exception + */ + public void testMessageDequeueRestoresCreditTest() throws Exception + { + // Send 10 messages + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, + DEFAULT_CONSUMER_TAG, true, null, false, creditManager); + final int msgCount = 1; + publishMessages(msgCount); + + _queue.deliverAsync(_subscription); + + _channel.acknowledgeMessage(1, false); + + // Check credit available + assertTrue("No credit available", creditManager.hasCredit()); + + } + + +/* public void testPrefetchHighLow() throws AMQException { int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -332,10 +382,12 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ +/* public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); - _channel.setPrefetchCount(5); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setMessageCredit(5); assertTrue(_channel.getPrefetchCount() == 5); @@ -360,6 +412,7 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ public static junit.framework.Test suite() { return new junit.framework.TestSuite(AckTest.class); |