diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 0edcc182b0..d64e533f72 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -28,7 +28,9 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.txn.Transaction; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.InternalTestProtocolSession; @@ -93,7 +95,6 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -126,37 +127,37 @@ public class AckTest extends TestCase return new AMQShortString("rk"); } }; - final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession); + final IncomingMessage msg = new IncomingMessage(publishBody); //IncomingMessage msg2 = null; + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + if (persistent) { - BasicContentHeaderProperties b = new BasicContentHeaderProperties(); //This is DeliveryMode.PERSISTENT b.setDeliveryMode((byte) 2); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; - msg.setContentHeaderBody(cb); - } - else - { - msg.setContentHeaderBody(new ContentHeaderBody()); } + + msg.setContentHeaderBody(cb); + // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, factory); + MessageMetaData mmd = msg.headersReceived(); + msg.setStoredMessage(_messageStore.addMessage(mmd)); if(msg.allContentReceived()) { - Transaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, msg, new Transaction.Action() { + ServerTransaction txn = new AutoCommitTransaction(_messageStore); + txn.enqueue(_queue, msg, new ServerTransaction.Action() { public void postCommit() { try { - _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo())); + _queue.enqueue(new AMQMessage(msg.getStoredMessage())); } catch (AMQException e) { @@ -213,8 +214,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertTrue(_messageStore.getMessageCount() == 0); + } @@ -230,8 +231,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertTrue(_messageStore.getMessageCount() == 0); + } |