summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java27
1 files changed, 20 insertions, 7 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 0f5374b3e5..5d559c9d0d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
@@ -143,16 +144,19 @@ public class AckTest extends InternalBrokerBaseCase
qs.add(_queue);
msg.enqueue(qs);
MessageMetaData mmd = msg.headersReceived();
- msg.setStoredMessage(_messageStore.addMessage(mmd));
+ final StoredMessage storedMessage = _messageStore.addMessage(mmd);
+ msg.setStoredMessage(storedMessage);
+ final AMQMessage message = new AMQMessage(storedMessage);
if(msg.allContentReceived())
{
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+ txn.enqueue(_queue, message, new ServerTransaction.Action() {
public void postCommit()
{
try
{
- _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+
+ _queue.enqueue(message);
}
catch (AMQException e)
{
@@ -170,6 +174,15 @@ public class AckTest extends InternalBrokerBaseCase
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
}
/**
@@ -181,9 +194,8 @@ public class AckTest extends InternalBrokerBaseCase
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertEquals("",msgCount,map.size());
+ assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -206,7 +218,6 @@ public class AckTest extends InternalBrokerBaseCase
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageCount() == 0);
@@ -243,7 +254,7 @@ public class AckTest extends InternalBrokerBaseCase
_channel.acknowledgeMessage(5, false);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount - 1);
+ assertEquals("Map not expected size",msgCount - 1,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -270,6 +281,8 @@ public class AckTest extends InternalBrokerBaseCase
final int msgCount = 10;
publishMessages(msgCount);
+
+
_channel.acknowledgeMessage(5, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);