summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java102
1 files changed, 52 insertions, 50 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index d542c82a5b..d9352f34f7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,8 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private SubscriptionTarget_0_8 _subscriptionTarget;
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -180,10 +179,13 @@ public class AckTest extends QpidTestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -207,16 +209,16 @@ public class AckTest extends QpidTestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget,
- null,
- AMQMessage.class,
- DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -233,12 +235,12 @@ public class AckTest extends QpidTestCase
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -256,13 +258,13 @@ public class AckTest extends QpidTestCase
public void testSingleAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -293,13 +295,13 @@ public class AckTest extends QpidTestCase
public void testMultiAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -327,13 +329,13 @@ public class AckTest extends QpidTestCase
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -364,15 +366,15 @@ public class AckTest extends QpidTestCase
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 1;
publishMessages(msgCount);
- _subscription.externalStateChange();
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);