diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java | 76 |
1 files changed, 50 insertions, 26 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 2bb16aff2e..aafddb810a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -26,12 +26,16 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -99,12 +103,16 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); - Scenario(int messageCount, List<Long> acked, List<Long> unacked) + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + AMQQueue queue = + AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()), + null); + for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -138,8 +146,8 @@ public class TxAckTest extends TestCase } }; - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map)); + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + _map.add(deliveryTag, queue.enqueue(new StoreContext(), message)); } _acked = acked; _unacked = unacked; @@ -154,7 +162,7 @@ public class TxAckTest extends TestCase { for (long tag : tags) { - UnacknowledgedMessage u = _map.get(tag); + QueueEntry u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); ((TestMessage) u.getMessage()).assertCountEquals(expected); } @@ -175,7 +183,7 @@ public class TxAckTest extends TestCase _op.consolidate(); _op.undoPrepare(); - assertCount(_acked, 1); //DTX Changed to 0, but that is wrong msg 5 is acked! + assertCount(_acked, 1); assertCount(_unacked, 0); } @@ -195,34 +203,50 @@ public class TxAckTest extends TestCase } } + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + { + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + null, + false); + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), + publishBody, + new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + + + return amqMessageHandle; + } + + private class TestMessage extends AMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + throws AMQException { - super(messageId, publishBody, txnContext); - try - { - setContentHeaderBody(new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } + super(createMessageHandle(messageId, publishBody), storeContext, publishBody); _tag = tag; } - public void incrementReference() + + public boolean incrementReference() { _count++; + return true; } public void decrementReference(StoreContext context) |