diff options
Diffstat (limited to 'M4-RCs/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | M4-RCs/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 432 |
1 files changed, 0 insertions, 432 deletions
diff --git a/M4-RCs/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/M4-RCs/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java deleted file mode 100644 index 7a60ed9795..0000000000 --- a/M4-RCs/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ /dev/null @@ -1,432 +0,0 @@ -package org.apache.qpid.server.queue; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.ArrayList; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionImpl; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class SimpleAMQQueueTest extends TestCase -{ - - protected SimpleAMQQueue _queue; - protected VirtualHost _virtualHost; - protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); - protected AMQShortString _qname = new AMQShortString("qname"); - protected AMQShortString _owner = new AMQShortString("owner"); - protected AMQShortString _routingKey = new AMQShortString("routing key"); - protected DirectExchange _exchange = new DirectExchange(); - protected MockSubscription _subscription = new MockSubscription(); - protected FieldTable _arguments = null; - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - //Create Application Registry for test - ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); - - _virtualHost = new VirtualHost("vhost", _store); - applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); - - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); - } - - @Override - protected void tearDown() - { - _queue.stop(); - ApplicationRegistry.remove(1); - } - - public void testCreateQueue() throws AMQException - { - _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments ); - assertNull("Queue was created", _queue); - } - catch (IllegalArgumentException e) - { - assertTrue("Exception was not about missing name", - e.getMessage().contains("name")); - } - - try { - _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); - assertNull("Queue was created", _queue); - } - catch (IllegalArgumentException e) - { - assertTrue("Exception was not about missing vhost", - e.getMessage().contains("Host")); - } - - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, - _virtualHost, _arguments); - assertNotNull("Queue was not created", _queue); - } - - public void testGetVirtualHost() - { - assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); - } - - public void testBinding() - { - try - { - _queue.bind(_exchange, _routingKey, null); - assertTrue("Routing key was not bound", - _exchange.getBindings().containsKey(_routingKey)); - assertEquals("Queue was not bound to key", - _exchange.getBindings().get(_routingKey).get(0), - _queue); - assertEquals("Exchange binding count", 1, - _queue.getExchangeBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, - _queue.getExchangeBindings().get(0).getRoutingKey()); - assertEquals("Wrong exchange bound", _exchange, - _queue.getExchangeBindings().get(0).getExchange()); - - _queue.unBind(_exchange, _routingKey, null); - assertFalse("Routing key was still bound", - _exchange.getBindings().containsKey(_routingKey)); - assertNull("Routing key was not empty", - _exchange.getBindings().get(_routingKey)); - } - catch (AMQException e) - { - assertNull("Unexpected exception", e); - } - } - - public void testSubscription() throws AMQException - { - // Check adding a subscription adds it to the queue - _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - - // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - - // Check removing the subscription removes it's information from the queue - _queue.unregisterSubscription(_subscription); - assertTrue("Subscription still had queue", _subscription.isClosed()); - assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); - - AMQMessage messageB = createMessage(new Long (25)); - _queue.enqueue(null, messageB); - QueueEntry entry = _subscription.getLastSeenEntry(); - assertNull(entry); - } - - public void testQueueNoSubscriber() throws AMQException, InterruptedException - { - AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); - _queue.registerSubscription(_subscription, false); - Thread.sleep(150); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - } - - public void testExclusiveConsumer() throws AMQException - { - // Check adding an exclusive subscription adds it to the queue - _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - - // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - - // Check we cannot add a second subscriber to the queue - Subscription subB = new MockSubscription(); - Exception ex = null; - try - { - _queue.registerSubscription(subB, false); - } - catch (AMQException e) - { - ex = e; - } - assertNotNull(ex); - assertTrue(ex instanceof AMQException); - - // Check we cannot add an exclusive subscriber to a queue with an - // existing subscription - _queue.unregisterSubscription(_subscription); - _queue.registerSubscription(_subscription, false); - try - { - _queue.registerSubscription(subB, true); - } - catch (AMQException e) - { - ex = e; - } - assertNotNull(ex); - } - - public void testAutoDeleteQueue() throws Exception - { - _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); - _queue.registerSubscription(_subscription, false); - AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); - _queue.unregisterSubscription(_subscription); - assertTrue("Queue was not deleted when subscription was removed", - _queue.isDeleted()); - } - - public void testResend() throws Exception - { - _queue.registerSubscription(_subscription, false); - Long id = new Long(26); - AMQMessage message = createMessage(id); - _queue.enqueue(null, message); - QueueEntry entry = _subscription.getLastSeenEntry(); - entry.setRedelivered(true); - _queue.resend(entry, _subscription); - - } - - public void testGetFirstMessageId() throws Exception - { - // Create message - Long messageId = new Long(23); - AMQMessage message = createMessage(messageId); - - // Put message on queue - _queue.enqueue(null, message); - // Get message id - Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); - - // Check message id - assertEquals("Message ID was wrong", messageId, testmsgid); - } - - public void testGetFirstFiveMessageIds() throws Exception - { - for (int i = 0 ; i < 5; i++) - { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); - // Put message on queue - _queue.enqueue(null, message); - } - // Get message ids - List<Long> msgids = _queue.getMessagesOnTheQueue(5); - - // Check message id - for (int i = 0; i < 5; i++) - { - Long messageId = new Long(i); - assertEquals("Message ID was wrong", messageId, msgids.get(i)); - } - } - - public void testGetLastFiveMessageIds() throws Exception - { - for (int i = 0 ; i < 10; i++) - { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); - // Put message on queue - _queue.enqueue(null, message); - } - // Get message ids - List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); - - // Check message id - for (int i = 0; i < 5; i++) - { - Long messageId = new Long(i+5); - assertEquals("Message ID was wrong", messageId, msgids.get(i)); - } - } - - public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException - { - // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); - msg.setContentHeaderBody(contentHeaderBody); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - - // Send persistent message - qs.add(_queue); - msg.enqueue(qs); - msg.routingComplete(_store, new MessageHandleFactory()); - _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); - - // Check that it is enqueued - AMQQueue data = _store.getMessages().get(1L); - assertNotNull(data); - - // Dequeue message - MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); - - entry.setMessage(amqmsg); - _queue.dequeue(null, entry); - - // Check that it is dequeued - data = _store.getMessages().get(1L); - assertNull(data); - } - - - // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) - { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } - - - return amqMessageHandle; - } - - public class TestMessage extends AMQMessage - { - private final long _tag; - private int _count; - - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) - throws AMQException - { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); - _tag = tag; - } - - - public boolean incrementReference() - { - _count++; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - - void assertCountEquals(int expected) - { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); - } - } - - protected AMQMessage createMessage(Long id) throws AMQException - { - AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); - return messageA; - } -} |