summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java76
1 files changed, 50 insertions, 26 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 2bb16aff2e..aafddb810a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -26,12 +26,16 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -99,12 +103,16 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
+ AMQQueue queue =
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()),
+ null);
+
for (int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
@@ -138,8 +146,8 @@ public class TxAckTest extends TestCase
}
};
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map));
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ _map.add(deliveryTag, queue.enqueue(new StoreContext(), message));
}
_acked = acked;
_unacked = unacked;
@@ -154,7 +162,7 @@ public class TxAckTest extends TestCase
{
for (long tag : tags)
{
- UnacknowledgedMessage u = _map.get(tag);
+ QueueEntry u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
((TestMessage) u.getMessage()).assertCountEquals(expected);
}
@@ -175,7 +183,7 @@ public class TxAckTest extends TestCase
_op.consolidate();
_op.undoPrepare();
- assertCount(_acked, 1); //DTX Changed to 0, but that is wrong msg 5 is acked!
+ assertCount(_acked, 1);
assertCount(_unacked, 0);
}
@@ -195,34 +203,50 @@ public class TxAckTest extends TestCase
}
}
+ private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ {
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ null,
+ false);
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+ publishBody,
+ new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+
+
+ return amqMessageHandle;
+ }
+
+
private class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ throws AMQException
{
- super(messageId, publishBody, txnContext);
- try
- {
- setContentHeaderBody(new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
+ super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
_tag = tag;
}
- public void incrementReference()
+
+ public boolean incrementReference()
{
_count++;
+ return true;
}
public void decrementReference(StoreContext context)