summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java75
1 files changed, 55 insertions, 20 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f4cdbbe02c..6c7094cac0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,12 @@
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -51,12 +55,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -190,6 +188,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch(InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
@@ -431,6 +436,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -724,7 +736,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
// call processQueue to deliver the messages
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
@Override
public void run()
@@ -827,7 +839,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testCopyMessagesWithDequeuedEntry()
{
@@ -845,7 +857,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// copy messages into another queue
_queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -877,7 +889,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testMovedMessagesWithDequeuedEntry()
{
@@ -895,7 +907,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// move messages into another queue
_queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -928,7 +940,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that messages in given range including dequeued one are deleted
* from the queue on invocation of
- * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+ * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)}
*/
public void testRemoveMessagesFromQueueWithDequeuedEntry()
{
@@ -955,7 +967,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message on the top is not accounted and next message
* is deleted from the queue on invocation of
- * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+ * {@link SimpleAMQQueue#deleteMessageFromTop()}
*/
public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
{
@@ -984,7 +996,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that all messages including dequeued one are deleted from the queue
- * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+ * on invocation of {@link SimpleAMQQueue#clearQueue()}
*/
public void testClearQueueWithDequeuedEntry()
{
@@ -1050,10 +1062,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
/**
* Send a message and decrement latch
+ * @param entry
+ * @param batch
*/
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
- super.send(msg);
+ super.send(entry, batch);
latch.countDown();
}
};
@@ -1064,7 +1078,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
testQueue.registerSubscription(subscription, false);
// process queue
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
public void run()
{
@@ -1110,9 +1124,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
* Entries with even message id are considered
* dequeued!
*/
- protected QueueEntryImpl createQueueEntry(final ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message)
{
- return new QueueEntryImpl(this, message)
+ return new SimpleQueueEntryImpl(this, message)
{
public boolean isDequeued()
{
@@ -1128,6 +1142,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
}
+
+ @Override
+ public boolean acquire(Subscription sub)
+ {
+ if(((AMQMessage) message).getMessageId().longValue() % 2 == 0)
+ {
+ return false;
+ }
+ else
+ {
+ return super.acquire(sub);
+ }
+ }
};
}
};
@@ -1244,6 +1271,14 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
fail("Failure to put message on queue:" + e.getMessage());
}
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
}
/**