diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 75 |
1 files changed, 55 insertions, 20 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f4cdbbe02c..6c7094cac0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,8 +21,12 @@ package org.apache.qpid.server.queue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.PropertiesConfiguration; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; @@ -51,12 +55,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -190,6 +188,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch(InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry); @@ -431,6 +436,13 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue @@ -724,7 +736,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { @Override public void run() @@ -827,7 +839,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testCopyMessagesWithDequeuedEntry() { @@ -845,7 +857,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // copy messages into another queue _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -877,7 +889,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)} */ public void testMovedMessagesWithDequeuedEntry() { @@ -895,7 +907,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase SimpleAMQQueue queue = createQueue(anotherQueueName); // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); // move messages into another queue _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); @@ -928,7 +940,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that messages in given range including dequeued one are deleted * from the queue on invocation of - * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)} */ public void testRemoveMessagesFromQueueWithDequeuedEntry() { @@ -955,7 +967,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message on the top is not accounted and next message * is deleted from the queue on invocation of - * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + * {@link SimpleAMQQueue#deleteMessageFromTop()} */ public void testDeleteMessageFromTopWithDequeuedEntryOnTop() { @@ -984,7 +996,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that all messages including dequeued one are deleted from the queue - * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + * on invocation of {@link SimpleAMQQueue#clearQueue()} */ public void testClearQueueWithDequeuedEntry() { @@ -1050,10 +1062,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { /** * Send a message and decrement latch + * @param entry + * @param batch */ - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry, boolean batch) throws AMQException { - super.send(msg); + super.send(entry, batch); latch.countDown(); } }; @@ -1064,7 +1078,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase testQueue.registerSubscription(subscription, false); // process queue - testQueue.processQueue(new QueueRunner(testQueue, 1) + testQueue.processQueue(new QueueRunner(testQueue) { public void run() { @@ -1110,9 +1124,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase * Entries with even message id are considered * dequeued! */ - protected QueueEntryImpl createQueueEntry(final ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message) { - return new QueueEntryImpl(this, message) + return new SimpleQueueEntryImpl(this, message) { public boolean isDequeued() { @@ -1128,6 +1142,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); } + + @Override + public boolean acquire(Subscription sub) + { + if(((AMQMessage) message).getMessageId().longValue() % 2 == 0) + { + return false; + } + else + { + return super.acquire(sub); + } + } }; } }; @@ -1244,6 +1271,14 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase fail("Failure to put message on queue:" + e.getMessage()); } } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } } /** |