diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 261 |
1 files changed, 139 insertions, 122 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 2dcb081739..98772e7b61 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,12 +20,7 @@ package org.apache.qpid.server.queue; * */ - -import java.util.ArrayList; -import java.util.List; - import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -42,6 +37,9 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.ArrayList; +import java.util.List; + public class SimpleAMQQueueTest extends TestCase { @@ -54,7 +52,7 @@ public class SimpleAMQQueueTest extends TestCase protected DirectExchange _exchange = new DirectExchange(); protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; - + MessagePublishInfo info = new MessagePublishInfoImpl(); private static final long MESSAGE_SIZE = 100; @@ -63,7 +61,7 @@ public class SimpleAMQQueueTest extends TestCase { super.setUp(); //Create Application Registry for test - ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); + ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1); _virtualHost = new VirtualHost("vhost", _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); @@ -81,97 +79,99 @@ public class SimpleAMQQueueTest extends TestCase public void testCreateQueue() throws AMQException { _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments ); + try + { + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing name", - e.getMessage().contains("name")); + assertTrue("Exception was not about missing name", + e.getMessage().contains("name")); } - - try { + + try + { _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing vhost", - e.getMessage().contains("Host")); + assertTrue("Exception was not about missing vhost", + e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, - _virtualHost, _arguments); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, + _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } - + public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } - + public void testBinding() { try { _queue.bind(_exchange, _routingKey, null); - assertTrue("Routing key was not bound", - _exchange.getBindings().containsKey(_routingKey)); - assertEquals("Queue was not bound to key", - _exchange.getBindings().get(_routingKey).get(0), - _queue); - assertEquals("Exchange binding count", 1, - _queue.getExchangeBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, - _queue.getExchangeBindings().get(0).getRoutingKey()); - assertEquals("Wrong exchange bound", _exchange, - _queue.getExchangeBindings().get(0).getExchange()); - + assertTrue("Routing key was not bound", + _exchange.getBindings().containsKey(_routingKey)); + assertEquals("Queue was not bound to key", + _exchange.getBindings().get(_routingKey).get(0), + _queue); + assertEquals("Exchange binding count", 1, + _queue.getExchangeBindings().size()); + assertEquals("Wrong exchange bound", _routingKey, + _queue.getExchangeBindings().get(0).getRoutingKey()); + assertEquals("Wrong exchange bound", _exchange, + _queue.getExchangeBindings().get(0).getExchange()); + _queue.unBind(_exchange, _routingKey, null); - assertFalse("Routing key was still bound", - _exchange.getBindings().containsKey(_routingKey)); - assertNull("Routing key was not empty", - _exchange.getBindings().get(_routingKey)); + assertFalse("Routing key was still bound", + _exchange.getBindings().containsKey(_routingKey)); + assertNull("Routing key was not empty", + _exchange.getBindings().get(_routingKey)); } catch (AMQException e) { assertNull("Unexpected exception", e); } } - + public void testSubscription() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Subscription did not get queue", _queue, + _subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); + // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); assertTrue("Subscription still had queue", _subscription.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); - - AMQMessage messageB = createMessage(new Long (25)); + assertFalse("Queue still has active consumer", + 1 == _queue.getActiveConsumerCount()); + + AMQMessage messageB = createMessage(); _queue.enqueue(null, messageB); QueueEntry entry = _subscription.getLastSeenEntry(); assertNull(entry); } - + public void testQueueNoSubscriber() throws AMQException, InterruptedException { - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); @@ -182,18 +182,18 @@ public class SimpleAMQQueueTest extends TestCase { // Check adding an exclusive subscription adds it to the queue _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + assertEquals("Subscription did not get queue", _queue, + _subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, + _queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); Exception ex = null; @@ -203,7 +203,7 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); assertTrue(ex instanceof AMQException); @@ -218,40 +218,40 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); } - - public void testAutoDeleteQueue() throws Exception + + public void testAutoDeleteQueue() throws Exception { - _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); - _queue.registerSubscription(_subscription, false); - AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); - _queue.unregisterSubscription(_subscription); - assertTrue("Queue was not deleted when subscription was removed", - _queue.isDeleted()); + _queue.stop(); + _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); + _queue.registerSubscription(_subscription, false); + AMQMessage message = createMessage(); + _queue.enqueue(null, message); + _queue.unregisterSubscription(_subscription); + assertTrue("Queue was not deleted when subscription was removed", + _queue.isDeleted()); } - + public void testResend() throws Exception { _queue.registerSubscription(_subscription, false); - Long id = new Long(26); - AMQMessage message = createMessage(id); + AMQMessage message = createMessage(); + Long id = message.getMessageId(); _queue.enqueue(null, message); QueueEntry entry = _subscription.getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); - + } - + public void testGetFirstMessageId() throws Exception { // Create message - Long messageId = new Long(23); - AMQMessage message = createMessage(messageId); + AMQMessage message = createMessage(); + Long messageId = message.getMessageId(); // Put message on queue _queue.enqueue(null, message); @@ -264,34 +264,40 @@ public class SimpleAMQQueueTest extends TestCase public void testGetFirstFiveMessageIds() throws Exception { - for (int i = 0 ; i < 5; i++) + // Create message + + AMQMessage message = createMessage(); + Long initialMessageID = message.getMessageId(); + + for (int i = 0; i < 5; i++) { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(null, message); + // Create message + message = createMessage(); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); + Long messageId = initialMessageID; // Check message id for (int i = 0; i < 5; i++) { - Long messageId = new Long(i); assertEquals("Message ID was wrong", messageId, msgids.get(i)); + messageId++; } } public void testGetLastFiveMessageIds() throws Exception { - for (int i = 0 ; i < 10; i++) + AMQMessage message = createMessage(); + Long messageIdOffset = message.getMessageId() -1 ; + for (int i = 0; i < 10; i++) { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(null, message); + // Create message + message = createMessage(); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -299,87 +305,104 @@ public class SimpleAMQQueueTest extends TestCase // Check message id for (int i = 0; i < 5; i++) { - Long messageId = new Long(i+5); + Long messageId = new Long(messageIdOffset + 1 + i + 5); assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } - + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); + + long messageId = msg.getMessageId(); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - + // Send persistent message qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_store, new MessageFactory()); - _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); - + msg.routingComplete(_store); + + + _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); + // Check that it is enqueued - AMQQueue data = _store.getMessages().get(1L); + AMQQueue data = _store.getMessages().get(messageId); assertNotNull(data); - + // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage message = new MessageFactory().createMessage(1L, _store, true); - + ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; - // This is a persist message but we are not in a transaction so create a new context for the message + AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - + entry.setMessage(message); _queue.dequeue(null, entry); - + // Check that it is dequeued - data = _store.getMessages().get(1L); + data = _store.getMessages().get(messageId); assertNull(data); } - // FIXME: move this to somewhere useful - private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final MessagePublishInfo publishBody) { - final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false); + final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false); try { //Safe to use a null StoreContext as we have created a TransientMessage (see false param above) - amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); + amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); } catch (AMQException e) { // won't happen } - return amqMessage; } + public AMQMessage createMessage() throws AMQException + { + AMQMessage message = new TestMessage(info); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + + //The createMessage above is for a Transient Message so it is safe to have no context. + message.setPublishAndContentHeaderBody(null, info, header); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + message.getContentHeaderBody().properties = props; + + return message; + } + public class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody) + TestMessage(MessagePublishInfo publishBody) throws AMQException { - super(createMessage(messageId, publishBody)); - _tag = tag; + super(SimpleAMQQueueTest.createMessage(publishBody)); + _tag = getMessageId(); } - public boolean incrementReference() { _count++; @@ -396,11 +419,5 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } - - protected AMQMessage createMessage(Long id) throws AMQException - { - AMQMessage messageA = new TestMessage(id, id, info); - return messageA; - } } |