diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java | 102 |
1 files changed, 52 insertions, 50 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index d542c82a5b..d9352f34f7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -49,8 +48,8 @@ import java.util.Set; */ public class AckTest extends QpidTestCase { - private SubscriptionTarget_0_8 _subscriptionTarget; - private Subscription _subscription; + private ConsumerTarget_0_8 _subscriptionTarget; + private Consumer _consumer; private AMQProtocolSession _protocolSession; @@ -180,10 +179,13 @@ public class AckTest extends QpidTestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -207,16 +209,16 @@ public class AckTest extends QpidTestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, - null, - AMQMessage.class, - DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, + null, + AMQMessage.class, + DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -233,12 +235,12 @@ public class AckTest extends QpidTestCase { // false arg means no acks expected - _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -256,13 +258,13 @@ public class AckTest extends QpidTestCase public void testSingleAckReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -293,13 +295,13 @@ public class AckTest extends QpidTestCase public void testMultiAckReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -327,13 +329,13 @@ public class AckTest extends QpidTestCase public void testMultiAckAllReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -364,15 +366,15 @@ public class AckTest extends QpidTestCase Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 1; publishMessages(msgCount); - _subscription.externalStateChange(); + _consumer.externalStateChange(); _channel.acknowledgeMessage(1, false); |