summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java435
1 files changed, 435 insertions, 0 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
new file mode 100644
index 0000000000..3084dc7fa1
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -0,0 +1,435 @@
+package org.apache.qpid.server.queue;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SimpleAMQQueueTest extends TestCase
+{
+
+ protected SimpleAMQQueue _queue;
+ protected VirtualHost _virtualHost;
+ protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+ protected AMQShortString _qname = new AMQShortString("qname");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ protected DirectExchange _exchange = new DirectExchange();
+ protected MockSubscription _subscription = new MockSubscription();
+ protected FieldTable _arguments = null;
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ //Create Application Registry for test
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1);
+
+ PropertiesConfiguration env = new PropertiesConfiguration();
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
+ }
+
+ @Override
+ protected void tearDown()
+ {
+ _queue.stop();
+ ApplicationRegistry.remove(1);
+ }
+
+ public void testCreateQueue() throws AMQException
+ {
+ _queue.stop();
+ try {
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments );
+ assertNull("Queue was created", _queue);
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing name",
+ e.getMessage().contains("name"));
+ }
+
+ try {
+ _queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
+ assertNull("Queue was created", _queue);
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
+ }
+
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _virtualHost, _arguments);
+ assertNotNull("Queue was not created", _queue);
+ }
+
+ public void testGetVirtualHost()
+ {
+ assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
+ }
+
+ public void testBinding()
+ {
+ try
+ {
+ _queue.bind(_exchange, _routingKey, null);
+ assertTrue("Routing key was not bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertEquals("Queue was not bound to key",
+ _exchange.getBindings().get(_routingKey).get(0),
+ _queue);
+ assertEquals("Exchange binding count", 1,
+ _queue.getExchangeBindings().size());
+ assertEquals("Wrong exchange bound", _routingKey,
+ _queue.getExchangeBindings().get(0).getRoutingKey());
+ assertEquals("Wrong exchange bound", _exchange,
+ _queue.getExchangeBindings().get(0).getExchange());
+
+ _queue.unBind(_exchange, _routingKey, null);
+ assertFalse("Routing key was still bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertNull("Routing key was not empty",
+ _exchange.getBindings().get(_routingKey));
+ }
+ catch (AMQException e)
+ {
+ assertNull("Unexpected exception", e);
+ }
+ }
+
+ public void testSubscription() throws AMQException
+ {
+ // Check adding a subscription adds it to the queue
+ _queue.registerSubscription(_subscription, false);
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ _queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
+
+ // Check sending a message ends up with the subscriber
+ AMQMessage messageA = createMessage(new Long(24));
+ _queue.enqueue(null, messageA);
+ assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+
+ // Check removing the subscription removes it's information from the queue
+ _queue.unregisterSubscription(_subscription);
+ assertTrue("Subscription still had queue", _subscription.isClosed());
+ assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
+ assertFalse("Queue still has active consumer",
+ 1 == _queue.getActiveConsumerCount());
+
+ AMQMessage messageB = createMessage(new Long (25));
+ _queue.enqueue(null, messageB);
+ QueueEntry entry = _subscription.getLastSeenEntry();
+ assertNull(entry);
+ }
+
+ public void testQueueNoSubscriber() throws AMQException, InterruptedException
+ {
+ AMQMessage messageA = createMessage(new Long(24));
+ _queue.enqueue(null, messageA);
+ _queue.registerSubscription(_subscription, false);
+ Thread.sleep(150);
+ assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ }
+
+ public void testExclusiveConsumer() throws AMQException
+ {
+ // Check adding an exclusive subscription adds it to the queue
+ _queue.registerSubscription(_subscription, true);
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ _queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
+
+ // Check sending a message ends up with the subscriber
+ AMQMessage messageA = createMessage(new Long(24));
+ _queue.enqueue(null, messageA);
+ assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+
+ // Check we cannot add a second subscriber to the queue
+ Subscription subB = new MockSubscription();
+ Exception ex = null;
+ try
+ {
+ _queue.registerSubscription(subB, false);
+ }
+ catch (AMQException e)
+ {
+ ex = e;
+ }
+ assertNotNull(ex);
+ assertTrue(ex instanceof AMQException);
+
+ // Check we cannot add an exclusive subscriber to a queue with an
+ // existing subscription
+ _queue.unregisterSubscription(_subscription);
+ _queue.registerSubscription(_subscription, false);
+ try
+ {
+ _queue.registerSubscription(subB, true);
+ }
+ catch (AMQException e)
+ {
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
+ public void testAutoDeleteQueue() throws Exception
+ {
+ _queue.stop();
+ _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue.registerSubscription(_subscription, false);
+ AMQMessage message = createMessage(new Long(25));
+ _queue.enqueue(null, message);
+ _queue.unregisterSubscription(_subscription);
+ assertTrue("Queue was not deleted when subscription was removed",
+ _queue.isDeleted());
+ }
+
+ public void testResend() throws Exception
+ {
+ _queue.registerSubscription(_subscription, false);
+ Long id = new Long(26);
+ AMQMessage message = createMessage(id);
+ _queue.enqueue(null, message);
+ QueueEntry entry = _subscription.getLastSeenEntry();
+ entry.setRedelivered(true);
+ _queue.resend(entry, _subscription);
+
+ }
+
+ public void testGetFirstMessageId() throws Exception
+ {
+ // Create message
+ Long messageId = new Long(23);
+ AMQMessage message = createMessage(messageId);
+
+ // Put message on queue
+ _queue.enqueue(null, message);
+ // Get message id
+ Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
+
+ // Check message id
+ assertEquals("Message ID was wrong", messageId, testmsgid);
+ }
+
+ public void testGetFirstFiveMessageIds() throws Exception
+ {
+ for (int i = 0 ; i < 5; i++)
+ {
+ // Create message
+ Long messageId = new Long(i);
+ AMQMessage message = createMessage(messageId);
+ // Put message on queue
+ _queue.enqueue(null, message);
+ }
+ // Get message ids
+ List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+
+ // Check message id
+ for (int i = 0; i < 5; i++)
+ {
+ Long messageId = new Long(i);
+ assertEquals("Message ID was wrong", messageId, msgids.get(i));
+ }
+ }
+
+ public void testGetLastFiveMessageIds() throws Exception
+ {
+ for (int i = 0 ; i < 10; i++)
+ {
+ // Create message
+ Long messageId = new Long(i);
+ AMQMessage message = createMessage(messageId);
+ // Put message on queue
+ _queue.enqueue(null, message);
+ }
+ // Get message ids
+ List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
+
+ // Check message id
+ for (int i = 0; i < 5; i++)
+ {
+ Long messageId = new Long(i+5);
+ assertEquals("Message ID was wrong", messageId, msgids.get(i));
+ }
+ }
+
+ public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
+ IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ msg.setContentHeaderBody(contentHeaderBody);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent message
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_store, new MessageHandleFactory());
+ _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
+
+ // Check that it is enqueued
+ AMQQueue data = _store.getMessages().get(1L);
+ assertNotNull(data);
+
+ // Dequeue message
+ MockQueueEntry entry = new MockQueueEntry();
+ AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+
+ entry.setMessage(amqmsg);
+ _queue.dequeue(null, entry);
+
+ // Check that it is dequeued
+ data = _store.getMessages().get(1L);
+ assertNull(data);
+ }
+
+
+ // FIXME: move this to somewhere useful
+ private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ {
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ null,
+ false);
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+ publishBody,
+ new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+
+
+ return amqMessageHandle;
+ }
+
+ public class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ throws AMQException
+ {
+ super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ _tag = tag;
+ }
+
+
+ public boolean incrementReference()
+ {
+ _count++;
+ return true;
+ }
+
+ public void decrementReference(StoreContext context)
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+
+ protected AMQMessage createMessage(Long id) throws AMQException
+ {
+ AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+ return messageA;
+ }
+}