summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java42
1 files changed, 24 insertions, 18 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 79f7d75aa9..0edcc182b0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -28,7 +28,8 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +40,9 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.Set;
-import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -62,8 +57,6 @@ public class AckTest extends TestCase
private TestMemoryMessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
-
private AMQChannel _channel;
private AMQQueue _queue;
@@ -99,9 +92,6 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>()
- );
_queue.registerSubscription(_subscription,false);
MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
@@ -136,7 +126,7 @@ public class AckTest extends TestCase
return new AMQShortString("rk");
}
};
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession);
//IncomingMessage msg2 = null;
if (persistent)
{
@@ -160,7 +150,26 @@ public class AckTest extends TestCase
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{
- msg.deliverToQueues();
+ Transaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, msg, new Transaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo()));
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +187,7 @@ public class AckTest extends TestCase
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ assertEquals("",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -191,8 +199,6 @@ public class AckTest extends TestCase
assertTrue(unackedMsg.getQueue() == _queue);
}
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**