diff options
Diffstat (limited to 'java/systests/src')
12 files changed, 296 insertions, 287 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 42d9cccb4f..144e4be6af 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -22,14 +22,15 @@ package org.apache.qpid.server.ack; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntryImpl; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -99,12 +100,12 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); - Scenario(int messageCount, List<Long> acked, List<Long> unacked) + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException { TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), _storeContext, null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new LinkedList<RequiredDeliveryException>() + ); for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -138,8 +139,8 @@ public class TxAckTest extends TestCase } }; - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE)); } _acked = acked; _unacked = unacked; @@ -154,7 +155,7 @@ public class TxAckTest extends TestCase { for (long tag : tags) { - UnacknowledgedMessage u = _map.get(tag); + QueueEntry u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); ((TestMessage) u.getMessage()).assertCountEquals(expected); } @@ -195,31 +196,46 @@ public class TxAckTest extends TestCase } } + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + { + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + null, + false); + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), + publishBody, + new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + + + return amqMessageHandle; + } + + private class TestMessage extends AMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + throws AMQException { - super(messageId, publishBody, txnContext); - try - { - setContentHeaderBody(new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } + super(createMessageHandle(messageId, publishBody), storeContext, publishBody); _tag = tag; } + public void incrementReference() { _count++; diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 58323086b5..80470d44b3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -24,10 +24,7 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -36,6 +33,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; import java.util.*; @@ -94,7 +92,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void route(Message m) throws AMQException { m.route(exchange); - m.routingComplete(_store, _storeContext, _handleFactory); + m.getIncomingMessage().routingComplete(_store, _handleFactory); + if(m.getIncomingMessage().allContentReceived()) + { + m.getIncomingMessage().deliverToQueues(); + } } protected void routeAndTest(Message m, TestQueue... expected) throws AMQException @@ -122,12 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase { if (expected.contains(q)) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; } else { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; } } @@ -234,7 +236,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return properties; } - static class TestQueue extends AMQQueue + static class TestQueue extends AMQQueueImpl { final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); @@ -253,8 +255,14 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException { - messages.add(new HeadersExchangeTest.Message(msg.getMessage())); + messages.add( new HeadersExchangeTest.Message(msg.getMessage())); + } + + boolean isInQueue(Message msg) + { + return messages.contains(msg); } + } /** @@ -262,14 +270,48 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ static class Message extends AMQMessage { + private class TestIncomingMessage extends IncomingMessage + { + + public TestIncomingMessage(final long messageId, + final MessagePublishInfo info, + final TransactionalContext txnContext, + final AMQProtocolSession publisher) + { + super(messageId, info, txnContext, publisher); + } + + + public AMQMessage getUnderlyingMessage() + { + return Message.this; + } + + + public ContentHeaderBody getContentHeaderBody() + { + try + { + return Message.this.getContentHeaderBody(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + } + + private IncomingMessage _incoming; + private static MessageStore _messageStore = new SkeletonMessageStore(); private static StoreContext _storeContext = new StoreContext(); + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new LinkedList<RequiredDeliveryException>() + ); Message(String id, String... headers) throws AMQException { @@ -278,12 +320,47 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(String id, FieldTable headers) throws AMQException { - this(getPublishRequest(id), getContentHeader(headers), null); + this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + } + + public IncomingMessage getIncomingMessage() + { + return _incoming; + } + + private Message(long messageId, + MessagePublishInfo publish, + ContentHeaderBody header, + List<ContentBody> bodies) throws AMQException + { + super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); + + + + _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming.setContentHeaderBody(header); + + } - private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + private static AMQMessageHandle createMessageHandle(final long messageId, + final MessagePublishInfo publish, + final ContentHeaderBody header) { - super(_messageStore.getNewMessageId(), publish, _txnContext, header); + + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + _messageStore, + true); + + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); + } + catch (AMQException e) + { + + } + return amqMessageHandle; } private Message(AMQMessage msg) throws AMQException @@ -291,15 +368,13 @@ public class AbstractHeadersExchangeTestBase extends TestCase super(msg); } + + void route(Exchange exchange) throws AMQException { - exchange.route(this); + exchange.route(_incoming); } - boolean isInQueue(TestQueue queue) - { - return queue.messages.contains(this); - } public int hashCode() { diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index 5fbea5e14f..6f8963131b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -54,10 +55,12 @@ public class AMQProtocolSessionMBeanTest extends TestCase // check the channel count is correct int channelCount = _mbean.channels().size(); assertTrue(channelCount == 1); - AMQQueue queue = - new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false, - new AMQShortString("test"), true, _protocolSession.getVirtualHost()); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()), + false, + new AMQShortString("test"), + true, + _protocolSession.getVirtualHost()); + AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 95ffb505fb..30240217a2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -29,7 +29,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -40,7 +42,6 @@ import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.LinkedList; import java.util.Set; -import java.util.HashSet; /** * Tests that acknowledgements are handled correctly. @@ -49,7 +50,7 @@ public class AckTest extends TestCase { private static final Logger _log = Logger.getLogger(AckTest.class); - private SubscriptionImpl _subscription; + private Subscription _subscription; private MockProtocolSession _protocolSession; @@ -57,9 +58,7 @@ public class AckTest extends TestCase private StoreContext _storeContext = new StoreContext(); - private AMQChannel _channel; - - private SubscriptionSet _subscriptionManager; + private AMQChannel _channel; private AMQQueue _queue; @@ -78,8 +77,9 @@ public class AckTest extends TestCase _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); + } private void publishMessages(int count) throws AMQException @@ -90,8 +90,9 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new LinkedList<RequiredDeliveryException>() + ); + _queue.registerSubscription(_subscription,false); MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { @@ -125,7 +126,8 @@ public class AckTest extends TestCase return new AMQShortString("rk"); } }; - AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + //IncomingMessage msg2 = null; if (persistent) { BasicContentHeaderProperties b = new BasicContentHeaderProperties(); @@ -142,10 +144,14 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.incrementReference(); - msg.routingComplete(_messageStore, _storeContext, factory); + msg.enqueue(_queue); + msg.routingComplete(_messageStore, factory); + if(msg.allContentReceived()) + { + msg.deliverToQueues(); + } // we manually send the message to the subscription - _subscription.send(new QueueEntry(_queue,msg), _queue); + //_subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -155,7 +161,7 @@ public class AckTest extends TestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); @@ -169,7 +175,7 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i); i++; - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); } @@ -183,7 +189,7 @@ public class AckTest extends TestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -200,7 +206,7 @@ public class AckTest extends TestCase public void testPersistentNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); @@ -217,7 +223,7 @@ public class AckTest extends TestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -230,7 +236,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -246,7 +252,7 @@ public class AckTest extends TestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -259,7 +265,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } @@ -270,7 +276,7 @@ public class AckTest extends TestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -283,18 +289,19 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } } +/* public void testPrefetchHighLow() throws AMQException { int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -343,10 +350,12 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ +/* public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); - _channel.setPrefetchCount(5); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setMessageCredit(5); assertTrue(_channel.getPrefetchCount() == 5); @@ -371,6 +380,7 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ public static junit.framework.Test suite() { return new junit.framework.TestSuite(AckTest.class); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java index 282ad3ed5e..3ff691c792 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.handler.OnCurrentThreadExecutor; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.subscription.Subscription; import java.util.*; import java.util.concurrent.Executor; @@ -48,8 +49,8 @@ public class ConcurrencyTestDisabled extends MessageTestHelper private final Executor _executor = new OnCurrentThreadExecutor(); private final List<Thread> _threads = new ArrayList<Thread>(); - private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); - private final DeliveryManager _deliveryMgr; + private final SubscriptionSet _subscriptionMgr; + private final ConcurrentSelectorDeliveryManager _deliveryMgr; private boolean isComplete; private boolean failed; @@ -60,8 +61,9 @@ public class ConcurrencyTestDisabled extends MessageTestHelper IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, + _deliveryMgr = new ConcurrentSelectorDeliveryManager( new AMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, _virtualHost)); + _subscriptionMgr = _deliveryMgr.getSubscribers(); } public void testConcurrent1() throws InterruptedException, AMQException diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java deleted file mode 100644 index b33259cfba..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; - -import junit.framework.TestSuite; - -abstract public class DeliveryManagerTest extends MessageTestHelper -{ - protected final SubscriptionSet _subscriptions = new SubscriptionSet(); - protected DeliveryManager _mgr; - protected StoreContext _storeContext = new StoreContext(); - private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me"); - - public DeliveryManagerTest() throws Exception - { - } - - public void testStartInQueueingMode() throws AMQException - { - QueueEntry[] messages = new QueueEntry[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - for (int i = 0; i < batch; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); - SubscriptionTestHelper s2 = new SubscriptionTestHelper("2"); - _subscriptions.addSubscriber(s1); - _subscriptions.addSubscriber(s2); - - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - assertTrue(s1.getMessages().isEmpty()); - assertTrue(s2.getMessages().isEmpty()); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - - assertEquals(messages.length / 2, s1.getMessages().size()); - assertEquals(messages.length / 2, s2.getMessages().size()); - - for (int i = 0; i < messages.length; i++) - { - if (i % 2 == 0) - { - assertTrue(s1.getMessages().get(i / 2) == messages[i]); - } - else - { - assertTrue(s2.getMessages().get(i / 2) == messages[i]); - } - } - } - - public void testStartInDirectMode() throws AMQException - { - QueueEntry[] messages = new QueueEntry[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); - _subscriptions.addSubscriber(s1); - - for (int i = 0; i < batch; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - assertEquals(batch, s1.getMessages().size()); - for (int i = 0; i < batch; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i)); - } - s1.getMessages().clear(); - assertEquals(0, s1.getMessages().size()); - - s1.setSuspended(true); - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(0, s1.getMessages().size()); - s1.setSuspended(false); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(messages.length - batch, s1.getMessages().size()); - - for (int i = batch; i < messages.length; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i - batch)); - } - - } - - public void testNoConsumers() throws AMQException - { - try - { - QueueEntry msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); - msg.checkDeliveredToConsumer(); - fail("expected exception did not occur"); - } - catch (NoConsumersException m) - { - // ok - } - catch (Exception e) - { - fail("expected NoConsumersException, got " + e); - } - } - - public void testNoActiveConsumers() throws AMQException - { - try - { - SubscriptionTestHelper s = new SubscriptionTestHelper("A"); - _subscriptions.addSubscriber(s); - s.setSuspended(true); - QueueEntry msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); - msg.checkDeliveredToConsumer(); - fail("expected exception did not occur"); - } - catch (NoConsumersException m) - { - // ok - } - catch (Exception e) - { - fail("expected NoConsumersException, got " + e); - } - } - - public static junit.framework.Test suite() - { - TestSuite suite = new TestSuite(); - return suite; - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index 521bedeccd..b2a4216f8d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -28,7 +27,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -38,7 +36,6 @@ import org.apache.qpid.AMQException; import junit.framework.TestCase; import java.util.LinkedList; -import java.util.HashSet; class MessageTestHelper extends TestCase { @@ -47,20 +44,20 @@ class MessageTestHelper extends TestCase private final StoreContext _storeContext = new StoreContext(); private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new LinkedList<RequiredDeliveryException>() + ); MessageTestHelper() throws Exception { ApplicationRegistry.initialise(new NullApplicationRegistry()); } - QueueEntry message() throws AMQException + QueueEntryImpl message() throws AMQException { return message(false); } - QueueEntry message(final boolean immediate) throws AMQException + QueueEntryImpl message(final boolean immediate) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -90,9 +87,16 @@ class MessageTestHelper extends TestCase return null; } }; - - return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody())); + + //public AMQMessage(Long messageId, AMQMessageHandle messageHandle , TransactionalContext txnConext, MessagePublishInfo info) + long messageId = _messageStore.getNewMessageId(); + final AMQMessageHandle messageHandle = + (new MessageHandleFactory()).createMessageHandle(messageId, _messageStore, false); + messageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,new ContentHeaderBody()); + AMQMessage msg = new AMQMessage(messageHandle, _txnContext.getStoreContext(), publish); + + + return new QueueEntryImpl(null,msg, Long.MIN_VALUE); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java index d3ec3c11d4..1200d7fc2b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; +import org.apache.qpid.server.subscription.Subscription; public class SubscriptionManagerTest extends TestCase { @@ -47,9 +48,9 @@ public class SubscriptionManagerTest extends TestCase s1.setSuspended(true); assertFalse(mgr.hasActiveSubscribers()); - mgr.removeSubscriber(new SubscriptionTestHelper("S1")); + mgr.removeSubscriber(s1); assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(new SubscriptionTestHelper("S2")); + mgr.removeSubscriber(s2); assertTrue(mgr.isEmpty()); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 458b510ef5..624a368b1f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.framing.AMQShortString; import java.util.ArrayList; import java.util.List; @@ -54,7 +56,12 @@ public class SubscriptionTestHelper implements Subscription return messages; } - public void send(QueueEntry msg, AMQQueue queue) + public void setQueue(AMQQueue queue) + { + + } + + public void send(QueueEntry msg) { messages.add(msg); } @@ -84,16 +91,56 @@ public class SubscriptionTestHelper implements Subscription return new Object(); } + public void resend(final QueueEntry entry) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit(final QueueEntry queueEntry) + { + + } + + public void setStateListener(final StateListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Object getQueueContext() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean setQueueContext(Object expected, Object newValue) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public AMQChannel getChannel() { return null; } - + public void start() { //no-op } + public AMQShortString getConumerTag() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isActive() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQQueue getQueue() + { + return null; + } + public void queueDeleted(AMQQueue queue) { } @@ -108,6 +155,11 @@ public class SubscriptionTestHelper implements Subscription return true; } + public boolean isAutoClose() + { + return false; + } + public Queue<QueueEntry> getPreDeliveryQueue() { return null; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 6ffa3e0e02..57bc173812 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -22,12 +22,12 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueImpl; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index 374c69fa00..f36e924890 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -23,12 +23,12 @@ package org.apache.qpid.server.store; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.txn.NonTransactionalContext; /** @@ -40,6 +40,7 @@ public class TestReferenceCounting extends TestCase private StoreContext _storeContext = new StoreContext(); + protected void setUp() throws Exception { super.setUp(); @@ -51,7 +52,7 @@ public class TestReferenceCounting extends TestCase */ public void testMessageGetsRemoved() throws AMQException { - createPersistentContentHeader(); + ContentHeaderBody chb = createPersistentContentHeader(); MessagePublishInfo info = new MessagePublishInfo() { @@ -82,16 +83,22 @@ public class TestReferenceCounting extends TestCase } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, - new NonTransactionalContext(_store, _storeContext, null, null, null), - createPersistentContentHeader()); + + final long messageId = _store.getNewMessageId(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext,info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 0); + assertEquals(1, _store.getMessageMetaDataMap().size()); } private ContentHeaderBody createPersistentContentHeader() @@ -135,18 +142,25 @@ public class TestReferenceCounting extends TestCase } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), - info, - new NonTransactionalContext(_store, _storeContext, null, null, null), - createPersistentContentHeader()); + final Long messageId = _store.getNewMessageId(); + final ContentHeaderBody chb = createPersistentContentHeader(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext, + info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + assertEquals(1, _store.getMessageMetaDataMap().size()); } public static junit.framework.Test suite() diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java index 463946e14a..4a2de976da 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -108,11 +108,16 @@ public class DupsOkTest extends VMTestCase { try { + System.err.println("Got last message!"); long remainingMessages = ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue); - if(remainingMessages != 0) + if(_msgCount != MSG_COUNT) { - assertEquals("The queue should have 0 msgs left, seen " + _msgCount + " messages.", 0, getMessageCount(_queue.getQueueName())); + assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount); + } + else + { + System.err.println("0 remaining on queue"); } } catch (AMQException e) @@ -151,6 +156,10 @@ public class DupsOkTest extends VMTestCase // consumer.close(); + // wait for the ack to get back + Thread.sleep(1000); + + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue)); clientConnection.close(); |