summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java184
1 files changed, 168 insertions, 16 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 7a97837208..f39dfe765e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,6 @@ package org.apache.qpid.server.queue;
*/
import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -31,17 +29,18 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +50,7 @@ public class SimpleAMQQueueTest extends TestCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static final long MESSAGE_SIZE = 100;
+ protected static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -70,7 +69,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -320,8 +319,8 @@ public class SimpleAMQQueueTest extends TestCase
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -335,18 +334,18 @@ public class SimpleAMQQueueTest extends TestCase
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store);
+ msg.routingComplete(_transactionLog);
- _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+ _transactionLog.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _transactionLog);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
MockQueueEntry entry = new MockQueueEntry(message, _queue);
@@ -355,10 +354,164 @@ public class SimpleAMQQueueTest extends TestCase
entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessageReferenceMap(messageId);
+ data = _transactionLog.getMessageReferenceMap(messageId);
assertNull(data);
}
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ {
+ sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ long current = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
+ current <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+ }
+
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ /** Set to larger than the purge batch size. Default 100.
+ * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold all messages without flowing
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.setMemoryUsageMaximum(0L);
+
+ //Give the purger time to work maximum of 1s
+ int slept = 0;
+ while (_queue.getMemoryUsageCurrent() > 0 && slept < 5)
+ {
+ Thread.yield();
+ Thread.sleep(200);
+ slept++;
+ }
+
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(0L, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ }
+
+ protected void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ sendMessage(txnContext, 5);
+ }
+
+ protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
+ msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent 10 messages
+
+ qs.add(_queue);
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ msg.addContentBodyFrame(new MockContentChunk(1));
+
+ msg.deliverToQueues();
+
+ //Check message was correctly enqueued
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(data);
+ }
+
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
@@ -384,7 +537,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info, _store);
+ AMQMessage message = new TestMessage(info, _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -410,7 +563,6 @@ public class SimpleAMQQueueTest extends TestCase
_transactionLog = transactionLog;
}
-
void assertCountEquals(int expected)
{
assertEquals("Wrong count for message with tag " + _tag, expected,