diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 0f5374b3e5..5d559c9d0d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; @@ -143,16 +144,19 @@ public class AckTest extends InternalBrokerBaseCase qs.add(_queue); msg.enqueue(qs); MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(_messageStore.addMessage(mmd)); + final StoredMessage storedMessage = _messageStore.addMessage(mmd); + msg.setStoredMessage(storedMessage); + final AMQMessage message = new AMQMessage(storedMessage); if(msg.allContentReceived()) { ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, msg, new ServerTransaction.Action() { + txn.enqueue(_queue, message, new ServerTransaction.Action() { public void postCommit() { try { - _queue.enqueue(new AMQMessage(msg.getStoredMessage())); + + _queue.enqueue(message); } catch (AMQException e) { @@ -170,6 +174,15 @@ public class AckTest extends InternalBrokerBaseCase // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } /** @@ -181,9 +194,8 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertEquals("",msgCount,map.size()); + assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -206,7 +218,6 @@ public class AckTest extends InternalBrokerBaseCase _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageCount() == 0); @@ -243,7 +254,7 @@ public class AckTest extends InternalBrokerBaseCase _channel.acknowledgeMessage(5, false); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount - 1); + assertEquals("Map not expected size",msgCount - 1,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -270,6 +281,8 @@ public class AckTest extends InternalBrokerBaseCase final int msgCount = 10; publishMessages(msgCount); + + _channel.acknowledgeMessage(5, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); |