summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java261
1 files changed, 139 insertions, 122 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 2dcb081739..98772e7b61 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,12 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
-
-import java.util.ArrayList;
-import java.util.List;
-
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,6 +37,9 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.List;
+
public class SimpleAMQQueueTest extends TestCase
{
@@ -54,7 +52,7 @@ public class SimpleAMQQueueTest extends TestCase
protected DirectExchange _exchange = new DirectExchange();
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
-
+
MessagePublishInfo info = new MessagePublishInfoImpl();
private static final long MESSAGE_SIZE = 100;
@@ -63,7 +61,7 @@ public class SimpleAMQQueueTest extends TestCase
{
super.setUp();
//Create Application Registry for test
- ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1);
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
_virtualHost = new VirtualHost("vhost", _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -81,97 +79,99 @@ public class SimpleAMQQueueTest extends TestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments );
+ try
+ {
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing name",
- e.getMessage().contains("name"));
+ assertTrue("Exception was not about missing name",
+ e.getMessage().contains("name"));
}
-
- try {
+
+ try
+ {
_queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing vhost",
- e.getMessage().contains("Host"));
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
- _virtualHost, _arguments);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _virtualHost, _arguments);
assertNotNull("Queue was not created", _queue);
}
-
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
-
+
public void testBinding()
{
try
{
_queue.bind(_exchange, _routingKey, null);
- assertTrue("Routing key was not bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertEquals("Queue was not bound to key",
- _exchange.getBindings().get(_routingKey).get(0),
- _queue);
- assertEquals("Exchange binding count", 1,
- _queue.getExchangeBindings().size());
- assertEquals("Wrong exchange bound", _routingKey,
- _queue.getExchangeBindings().get(0).getRoutingKey());
- assertEquals("Wrong exchange bound", _exchange,
- _queue.getExchangeBindings().get(0).getExchange());
-
+ assertTrue("Routing key was not bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertEquals("Queue was not bound to key",
+ _exchange.getBindings().get(_routingKey).get(0),
+ _queue);
+ assertEquals("Exchange binding count", 1,
+ _queue.getExchangeBindings().size());
+ assertEquals("Wrong exchange bound", _routingKey,
+ _queue.getExchangeBindings().get(0).getRoutingKey());
+ assertEquals("Wrong exchange bound", _exchange,
+ _queue.getExchangeBindings().get(0).getExchange());
+
_queue.unBind(_exchange, _routingKey, null);
- assertFalse("Routing key was still bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertNull("Routing key was not empty",
- _exchange.getBindings().get(_routingKey));
+ assertFalse("Routing key was still bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertNull("Routing key was not empty",
+ _exchange.getBindings().get(_routingKey));
}
catch (AMQException e)
{
assertNull("Unexpected exception", e);
}
}
-
+
public void testSubscription() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
-
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
+
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
assertTrue("Subscription still had queue", _subscription.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
- assertFalse("Queue still has active consumer",
- 1 == _queue.getActiveConsumerCount());
-
- AMQMessage messageB = createMessage(new Long (25));
+ assertFalse("Queue still has active consumer",
+ 1 == _queue.getActiveConsumerCount());
+
+ AMQMessage messageB = createMessage();
_queue.enqueue(null, messageB);
QueueEntry entry = _subscription.getLastSeenEntry();
assertNull(entry);
}
-
+
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
@@ -182,18 +182,18 @@ public class SimpleAMQQueueTest extends TestCase
{
// Check adding an exclusive subscription adds it to the queue
_queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
- _queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ _queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
Exception ex = null;
@@ -203,7 +203,7 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
assertTrue(ex instanceof AMQException);
@@ -218,40 +218,40 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
}
-
- public void testAutoDeleteQueue() throws Exception
+
+ public void testAutoDeleteQueue() throws Exception
{
- _queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
- _queue.registerSubscription(_subscription, false);
- AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
- _queue.unregisterSubscription(_subscription);
- assertTrue("Queue was not deleted when subscription was removed",
- _queue.isDeleted());
+ _queue.stop();
+ _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue.registerSubscription(_subscription, false);
+ AMQMessage message = createMessage();
+ _queue.enqueue(null, message);
+ _queue.unregisterSubscription(_subscription);
+ assertTrue("Queue was not deleted when subscription was removed",
+ _queue.isDeleted());
}
-
+
public void testResend() throws Exception
{
_queue.registerSubscription(_subscription, false);
- Long id = new Long(26);
- AMQMessage message = createMessage(id);
+ AMQMessage message = createMessage();
+ Long id = message.getMessageId();
_queue.enqueue(null, message);
QueueEntry entry = _subscription.getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
-
+
}
-
+
public void testGetFirstMessageId() throws Exception
{
// Create message
- Long messageId = new Long(23);
- AMQMessage message = createMessage(messageId);
+ AMQMessage message = createMessage();
+ Long messageId = message.getMessageId();
// Put message on queue
_queue.enqueue(null, message);
@@ -264,34 +264,40 @@ public class SimpleAMQQueueTest extends TestCase
public void testGetFirstFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 5; i++)
+ // Create message
+
+ AMQMessage message = createMessage();
+ Long initialMessageID = message.getMessageId();
+
+ for (int i = 0; i < 5; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+ Long messageId = initialMessageID;
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
+ messageId++;
}
}
public void testGetLastFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 10; i++)
+ AMQMessage message = createMessage();
+ Long messageIdOffset = message.getMessageId() -1 ;
+ for (int i = 0; i < 10; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -299,87 +305,104 @@ public class SimpleAMQQueueTest extends TestCase
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i+5);
+ Long messageId = new Long(messageIdOffset + 1 + i + 5);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
-
+
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-
+
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store, new MessageFactory());
- _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-
+ msg.routingComplete(_store);
+
+
+ _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(1L);
+ AMQQueue data = _store.getMessages().get(messageId);
assertNotNull(data);
-
+
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage message = new MessageFactory().createMessage(1L, _store, true);
-
+
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- // This is a persist message but we are not in a transaction so create a new context for the message
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
-
+
entry.setMessage(message);
_queue.dequeue(null, entry);
-
+
// Check that it is dequeued
- data = _store.getMessages().get(1L);
+ data = _store.getMessages().get(messageId);
assertNull(data);
}
-
// FIXME: move this to somewhere useful
- private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
- final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false);
+ final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false);
try
{
//Safe to use a null StoreContext as we have created a TransientMessage (see false param above)
- amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
+ amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
}
catch (AMQException e)
{
// won't happen
}
-
return amqMessage;
}
+ public AMQMessage createMessage() throws AMQException
+ {
+ AMQMessage message = new TestMessage(info);
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+
+ //The createMessage above is for a Transient Message so it is safe to have no context.
+ message.setPublishAndContentHeaderBody(null, info, header);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ message.getContentHeaderBody().properties = props;
+
+ return message;
+ }
+
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessage(messageId, publishBody));
- _tag = tag;
+ super(SimpleAMQQueueTest.createMessage(publishBody));
+ _tag = getMessageId();
}
-
public boolean incrementReference()
{
_count++;
@@ -396,11 +419,5 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
-
- protected AMQMessage createMessage(Long id) throws AMQException
- {
- AMQMessage messageA = new TestMessage(id, id, info);
- return messageA;
- }
}