diff options
Diffstat (limited to 'java/systests/src')
17 files changed, 877 insertions, 823 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 2bb16aff2e..45db47a1c3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -26,12 +26,16 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -99,12 +103,16 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); - Scenario(int messageCount, List<Long> acked, List<Long> unacked) + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + AMQQueue queue = + AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()), + null); + for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -138,8 +146,8 @@ public class TxAckTest extends TestCase } }; - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map)); + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + _map.add(deliveryTag, queue.enqueue(new StoreContext(), message)); } _acked = acked; _unacked = unacked; @@ -154,7 +162,7 @@ public class TxAckTest extends TestCase { for (long tag : tags) { - UnacknowledgedMessage u = _map.get(tag); + QueueEntry u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); ((TestMessage) u.getMessage()).assertCountEquals(expected); } @@ -175,7 +183,7 @@ public class TxAckTest extends TestCase _op.consolidate(); _op.undoPrepare(); - assertCount(_acked, 1); //DTX Changed to 0, but that is wrong msg 5 is acked! + assertCount(_acked, 1); assertCount(_unacked, 0); } @@ -195,34 +203,50 @@ public class TxAckTest extends TestCase } } + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + { + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + null, + false); + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), + publishBody, + new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + + + return amqMessageHandle; + } + + private class TestMessage extends AMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + throws AMQException { - super(messageId, publishBody, txnContext); - try - { - setContentHeaderBody(new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } + super(createMessageHandle(messageId, publishBody), storeContext, publishBody); _tag = tag; } - public void incrementReference() + + public boolean incrementReference() { _count++; + return true; } public void decrementReference(StoreContext context) diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 0968f0c468..adb7a7cd0c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -24,18 +24,17 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; import java.util.*; @@ -50,7 +49,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase /** * Not used in this test, just there to stub out the routing calls */ - private MessageStore _store = new SkeletonMessageStore(); + private MessageStore _store = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); @@ -94,7 +93,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void route(Message m) throws AMQException { m.route(exchange); - m.routingComplete(_store, _storeContext, _handleFactory); + m.getIncomingMessage().routingComplete(_store, _handleFactory); + if(m.getIncomingMessage().allContentReceived()) + { + m.getIncomingMessage().deliverToQueues(); + } } protected void routeAndTest(Message m, TestQueue... expected) throws AMQException @@ -122,12 +125,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase { if (expected.contains(q)) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; } else { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; } } @@ -234,7 +237,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return properties; } - static class TestQueue extends AMQQueue + static class TestQueue extends SimpleAMQQueue { final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); @@ -248,13 +251,167 @@ public class AbstractHeadersExchangeTestBase extends TestCase * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg - * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException + @Override + public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException + { + messages.add( new HeadersExchangeTest.Message(msg)); + return new QueueEntry() + { + + public AMQQueue getQueue() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQMessage getMessage() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getSize() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean getDeliveredToConsumer() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean expired() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isAcquired() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire(Subscription sub) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean delete() + { + return false; + } + + public boolean isDeleted() + { + return false; + } + + public boolean acquiredBySubscription() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void release() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public String debugIdentity() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean immediateAndNotDelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setRedelivered(boolean b) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Subscription getDeliveredSubscription() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRejectedBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void requeue(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeue(final StoreContext storeContext) throws FailedDequeueException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dispose(final StoreContext storeContext) throws MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isQueueDeleted() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addStateChangeListener(StateChangeListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean removeStateChangeListener(StateChangeListener listener) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int compareTo(final QueueEntry o) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + }; + } + + boolean isInQueue(Message msg) { - messages.add(new HeadersExchangeTest.Message(msg.getMessage())); + return messages.contains(msg); } + } /** @@ -262,10 +419,44 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ static class Message extends AMQMessage { - private static MessageStore _messageStore = new MemoryMessageStore(); + private class TestIncomingMessage extends IncomingMessage + { + + public TestIncomingMessage(final long messageId, + final MessagePublishInfo info, + final TransactionalContext txnContext, + final AMQProtocolSession publisher) + { + super(messageId, info, txnContext, publisher); + } + + + public AMQMessage getUnderlyingMessage() + { + return Message.this; + } + + + public ContentHeaderBody getContentHeaderBody() + { + try + { + return Message.this.getContentHeaderBody(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + } + + private IncomingMessage _incoming; + + private static MessageStore _messageStore = new SkeletonMessageStore(); private static StoreContext _storeContext = new StoreContext(); + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>() @@ -278,12 +469,47 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(String id, FieldTable headers) throws AMQException { - this(getPublishRequest(id), getContentHeader(headers), null); + this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + } + + public IncomingMessage getIncomingMessage() + { + return _incoming; + } + + private Message(long messageId, + MessagePublishInfo publish, + ContentHeaderBody header, + List<ContentBody> bodies) throws AMQException + { + super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); + + + + _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming.setContentHeaderBody(header); + + } - private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + private static AMQMessageHandle createMessageHandle(final long messageId, + final MessagePublishInfo publish, + final ContentHeaderBody header) { - super(_messageStore.getNewMessageId(), publish, _txnContext, header); + + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + _messageStore, + true); + + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); + } + catch (AMQException e) + { + + } + return amqMessageHandle; } private Message(AMQMessage msg) throws AMQException @@ -291,15 +517,13 @@ public class AbstractHeadersExchangeTestBase extends TestCase super(msg); } + + void route(Exchange exchange) throws AMQException { - exchange.route(this); + exchange.route(_incoming); } - boolean isInQueue(TestQueue queue) - { - return queue.messages.contains(this); - } public int hashCode() { diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index e4555e020e..8e7038eec3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -28,16 +28,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; - /** * Test class to test MBean operations for AMQMinaProtocolSession. */ @@ -56,13 +55,12 @@ public class AMQProtocolSessionMBeanTest extends TestCase // check the channel count is correct int channelCount = _mbean.channels().size(); assertTrue(channelCount == 1); - AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()), false, new AMQShortString("test"), true, - _protocolSession.getVirtualHost()); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - + _protocolSession.getVirtualHost(), null); + AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 2416442b10..bbd6deffd3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -23,24 +23,26 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; -import java.util.HashSet; import java.util.LinkedList; import java.util.Set; +import java.util.Collections; /** * Tests that acknowledgements are handled correctly. @@ -49,7 +51,7 @@ public class AckTest extends TestCase { private static final Logger _log = Logger.getLogger(AckTest.class); - private SubscriptionImpl _subscription; + private Subscription _subscription; private MockProtocolSession _protocolSession; @@ -57,9 +59,7 @@ public class AckTest extends TestCase private StoreContext _storeContext = new StoreContext(); - private AMQChannel _channel; - - private SubscriptionSet _subscriptionManager; + private AMQChannel _channel; private AMQQueue _queue; @@ -75,11 +75,13 @@ public class AckTest extends TestCase super.setUp(); _messageStore = new TestableMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession, 5, _messageStore); + _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); + } private void publishMessages(int count) throws AMQException @@ -92,6 +94,7 @@ public class AckTest extends TestCase TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + _queue.registerSubscription(_subscription,false); MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { @@ -125,7 +128,8 @@ public class AckTest extends TestCase return new AMQShortString("rk"); } }; - AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + //IncomingMessage msg2 = null; if (persistent) { BasicContentHeaderProperties b = new BasicContentHeaderProperties(); @@ -142,10 +146,14 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.incrementReference(); - msg.routingComplete(_messageStore, _storeContext, factory); + msg.enqueue(Collections.singleton(_queue)); + msg.routingComplete(_messageStore, factory); + if(msg.allContentReceived()) + { + msg.deliverToQueues(); + } // we manually send the message to the subscription - _subscription.send(new QueueEntry(_queue,msg), _queue); + //_subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -155,16 +163,13 @@ public class AckTest extends TestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX - // assertTrue(_messageStore.getNumberStoredMessages() == msgCount); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -172,15 +177,12 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i); i++; - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); } assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX -// assertTrue(_messageStore.getNumberStoredMessages() == msgCount); } /** @@ -189,15 +191,32 @@ public class AckTest extends TestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - //DTX MessageStore -// assertTrue(_messageStore.getNumberStoredMessages() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + + } + + /** + * Tests that in no-ack mode no messages are retained + */ + public void testPersistentNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount, true); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + } /** @@ -206,7 +225,7 @@ public class AckTest extends TestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -219,7 +238,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -235,7 +254,7 @@ public class AckTest extends TestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -248,7 +267,7 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } @@ -259,7 +278,7 @@ public class AckTest extends TestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -272,18 +291,19 @@ public class AckTest extends TestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } } +/* public void testPrefetchHighLow() throws AMQException { int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -332,10 +352,12 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ +/* public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); - _channel.setPrefetchCount(5); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setMessageCredit(5); assertTrue(_channel.getPrefetchCount() == 5); @@ -360,6 +382,7 @@ public class AckTest extends TestCase assertTrue(map.size() == 0); } +*/ public static junit.framework.Test suite() { return new junit.framework.TestSuite(AckTest.class); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java deleted file mode 100644 index 4f92cc94b7..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.*; -import java.util.concurrent.Executor; - -/** - * Tests delivery in the face of concurrent incoming _messages, subscription alterations - * and attempts to asynchronously process queued _messages. - */ -public class ConcurrencyTestDisabled extends MessageTestHelper -{ - private final Random random = new Random(); - - private final int numMessages = 10; - - private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); - private final Set<Subscription> _active = new HashSet<Subscription>(); - private final List<QueueEntry> _messages = new ArrayList<QueueEntry>(); - private int next = 0;//index to next message to send - private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>()); - private final Executor _executor = new OnCurrentThreadExecutor(); - private final List<Thread> _threads = new ArrayList<Thread>(); - - private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); - private final DeliveryManager _deliveryMgr; - - private boolean isComplete; - private boolean failed; - private VirtualHost _virtualHost; - - public ConcurrencyTestDisabled() throws Exception - { - - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); - _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - _virtualHost)); - } - - public void testConcurrent1() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(1, 4, 4, 4); - doRun(); - check(); - } - - public void testConcurrent2() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(4, 2, 2, 2); - doRun(); - check(); - } - - void check() - { - assertFalse("Failed", failed); - - _deliveryMgr.processAsync(_executor); - - assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); - for(int i = 0; i < _messages.size(); i++) - { - assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); - } - } - - void initSubscriptions(int subscriptions) - { - for(int i = 0; i < subscriptions; i++) - { - _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); - } - } - - void initMessages(int messages) throws AMQException - { - for(int i = 0; i < messages; i++) - { - _messages.add(message()); - } - } - - void initThreads(int senders, int subscribers, int suspenders, int processors) - { - addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); - addThreads(subscribers, new Subscriber()); - addThreads(suspenders, new Suspender()); - addThreads(processors, new Processor()); - } - - void addThreads(int count, Runnable runner) - { - for(int i = 0; i < count; i++) - { - _threads.add(new Thread(runner, runner.toString())); - } - } - - void doRun() throws InterruptedException - { - for(Thread t : _threads) - { - t.start(); - } - - for(Thread t : _threads) - { - t.join(); - } - } - - private void toggle(Subscription s) - { - synchronized (_active) - { - if (_active.contains(s)) - { - _active.remove(s); - Subscription result = _subscriptionMgr.removeSubscriber(s); - assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, - result != null && result.equals(s)); - } - else - { - _active.add(s); - _subscriptionMgr.addSubscriber(s); - } - } - } - - private QueueEntry nextMessage() - { - synchronized (_messages) - { - if (next < _messages.size()) - { - return _messages.get(next++); - } - else - { - if (!_deliveryMgr.hasQueuedMessages()) { - isComplete = true; - } - return null; - } - } - } - - private boolean randomBoolean() - { - return random.nextBoolean(); - } - - private SubscriptionTestHelper randomSubscriber() - { - return _subscribers.get(random.nextInt(_subscribers.size())); - } - - private class Sender extends Runner - { - void doRun() throws Throwable - { - QueueEntry msg = nextMessage(); - if (msg != null) - { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); - } - } - } - - private class OrderedSender extends Sender - { - synchronized void doRun() throws Throwable - { - super.doRun(); - } - } - - private class Suspender extends Runner - { - void doRun() throws Throwable - { - randomSubscriber().setSuspended(randomBoolean()); - } - } - - private class Subscriber extends Runner - { - void doRun() throws Throwable - { - toggle(randomSubscriber()); - } - } - - private class Processor extends Runner - { - void doRun() throws Throwable - { - _deliveryMgr.processAsync(_executor); - } - } - - private abstract class Runner implements Runnable - { - public void run() - { - try - { - while (!stop()) - { - doRun(); - } - } - catch (Throwable t) - { - failed = true; - t.printStackTrace(); - } - } - - abstract void doRun() throws Throwable; - - boolean stop() - { - return isComplete || failed; - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ConcurrencyTestDisabled.class); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java deleted file mode 100644 index b33259cfba..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; - -import junit.framework.TestSuite; - -abstract public class DeliveryManagerTest extends MessageTestHelper -{ - protected final SubscriptionSet _subscriptions = new SubscriptionSet(); - protected DeliveryManager _mgr; - protected StoreContext _storeContext = new StoreContext(); - private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me"); - - public DeliveryManagerTest() throws Exception - { - } - - public void testStartInQueueingMode() throws AMQException - { - QueueEntry[] messages = new QueueEntry[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - for (int i = 0; i < batch; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); - SubscriptionTestHelper s2 = new SubscriptionTestHelper("2"); - _subscriptions.addSubscriber(s1); - _subscriptions.addSubscriber(s2); - - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - assertTrue(s1.getMessages().isEmpty()); - assertTrue(s2.getMessages().isEmpty()); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - - assertEquals(messages.length / 2, s1.getMessages().size()); - assertEquals(messages.length / 2, s2.getMessages().size()); - - for (int i = 0; i < messages.length; i++) - { - if (i % 2 == 0) - { - assertTrue(s1.getMessages().get(i / 2) == messages[i]); - } - else - { - assertTrue(s2.getMessages().get(i / 2) == messages[i]); - } - } - } - - public void testStartInDirectMode() throws AMQException - { - QueueEntry[] messages = new QueueEntry[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); - _subscriptions.addSubscriber(s1); - - for (int i = 0; i < batch; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - assertEquals(batch, s1.getMessages().size()); - for (int i = 0; i < batch; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i)); - } - s1.getMessages().clear(); - assertEquals(0, s1.getMessages().size()); - - s1.setSuspended(true); - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); - } - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(0, s1.getMessages().size()); - s1.setSuspended(false); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(messages.length - batch, s1.getMessages().size()); - - for (int i = batch; i < messages.length; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i - batch)); - } - - } - - public void testNoConsumers() throws AMQException - { - try - { - QueueEntry msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); - msg.checkDeliveredToConsumer(); - fail("expected exception did not occur"); - } - catch (NoConsumersException m) - { - // ok - } - catch (Exception e) - { - fail("expected NoConsumersException, got " + e); - } - } - - public void testNoActiveConsumers() throws AMQException - { - try - { - SubscriptionTestHelper s = new SubscriptionTestHelper("A"); - _subscriptions.addSubscriber(s); - s.setSuspended(true); - QueueEntry msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); - msg.checkDeliveredToConsumer(); - fail("expected exception did not occur"); - } - catch (NoConsumersException m) - { - // ok - } - catch (Exception e) - { - fail("expected NoConsumersException, got " + e); - } - } - - public static junit.framework.Test suite() - { - TestSuite suite = new TestSuite(); - return suite; - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index 114c8cac32..b2a4216f8d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -24,7 +24,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -39,7 +39,7 @@ import java.util.LinkedList; class MessageTestHelper extends TestCase { - private final MessageStore _messageStore = new MemoryMessageStore(); + private final MessageStore _messageStore = new SkeletonMessageStore(); private final StoreContext _storeContext = new StoreContext(); @@ -52,12 +52,12 @@ class MessageTestHelper extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry()); } - QueueEntry message() throws AMQException + QueueEntryImpl message() throws AMQException { return message(false); } - QueueEntry message(final boolean immediate) throws AMQException + QueueEntryImpl message(final boolean immediate) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -87,9 +87,16 @@ class MessageTestHelper extends TestCase return null; } }; - - return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody())); + + //public AMQMessage(Long messageId, AMQMessageHandle messageHandle , TransactionalContext txnConext, MessagePublishInfo info) + long messageId = _messageStore.getNewMessageId(); + final AMQMessageHandle messageHandle = + (new MessageHandleFactory()).createMessageHandle(messageId, _messageStore, false); + messageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,new ContentHeaderBody()); + AMQMessage msg = new AMQMessage(messageHandle, _txnContext.getStoreContext(), publish); + + + return new QueueEntryImpl(null,msg, Long.MIN_VALUE); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index cf986e7803..a1a405c313 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -215,6 +215,11 @@ public class MockProtocolSession implements AMQProtocolSession return null; //To change body of implemented methods use File | Settings | File Templates. } + public ProtocolSessionIdentifier getSessionIdentifier() + { + return null; + } + public byte getProtocolMajorVersion() { return getProtocolVersion().getMajorVersion(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java new file mode 100644 index 0000000000..0dbf95052f --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -0,0 +1,176 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import javax.jms.*; +import javax.naming.NamingException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.HashMap; +import java.util.Map; + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class PriorityTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "PriorityQueue"; + + + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + super.setUp(); + + if (usingInVMBroker()) + { + TransportConnection.createVMBroker(1); + } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + Context context = factory.getInitialContext(env); + + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",10); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + + Queue queue = new AMQQueue("amq.direct",QUEUE); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + + + + + + + producerConnection.start(); + + + MessageProducer producer = producerSession.createProducer(queue); + + + + + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + + + + consumerConnection.start(); + + Message received; + //Receive Message 0 + StringBuilder buf = new StringBuilder(); + int receivedCount = 0; + Message previous = null; + int messageCount = 0; + while((received = consumer.receive(1000))!=null) + { + messageCount++; + if(previous != null) + { + assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + + producerSession.close(); + producer.close(); + + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index f82fec61b0..280d897852 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -81,14 +81,6 @@ public class QueueDepthWithSelectorTest extends TestCase System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG)); System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE)); - Logger csdm = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); - System.err.println("csdm.isE-Error:" + csdm.isEnabledFor(Level.ERROR)); - System.err.println("csdm.isE-Warn:" + csdm.isEnabledFor(Level.WARN)); - System.err.println("csdm.isInfo:" + csdm.isInfoEnabled() + ":" + csdm.isEnabledFor(Level.INFO)); - System.err.println("csdm.isDebug:" + csdm.isDebugEnabled() + ":" + csdm.isEnabledFor(Level.DEBUG)); - System.err.println("csdm.isTrace:" + csdm.isTraceEnabled() + ":" + csdm.isEnabledFor(Level.TRACE)); - - System.err.println(Logger.getRootLogger().getLoggerRepository()); if (BROKER.startsWith("vm://")) @@ -184,9 +176,14 @@ public class QueueDepthWithSelectorTest extends TestCase try { + Thread.sleep(2000); long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); assertEquals("Session reports Queue depth not as expected", 0, queueDepth); } + catch (InterruptedException e) + { + fail(e.getMessage()); + } catch (NamingException e) { fail(e.getMessage()); @@ -209,7 +206,7 @@ public class QueueDepthWithSelectorTest extends TestCase } - private void verifyAllMessagesRecevied() throws JMSException + private void verifyAllMessagesRecevied() throws Exception { boolean[] msgIdRecevied = new boolean[MSG_COUNT]; @@ -219,6 +216,8 @@ public class QueueDepthWithSelectorTest extends TestCase _messages[i] = _consumer.receive(1000); assertNotNull("should have received a message but didn't", _messages[i]); } + long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); + assertEquals("Session reports Queue depth not as expected", 0, queueDepth); //Check received messages int msgId = 0; diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java deleted file mode 100644 index d3ec3c11d4..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class SubscriptionManagerTest extends TestCase -{ - private final SubscriptionSet mgr = new SubscriptionSet(); - - public void testBasicSubscriptionManagement() - { - assertTrue(mgr.isEmpty()); - assertFalse(mgr.hasActiveSubscribers()); - SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); - mgr.addSubscriber(s1); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - - SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); - mgr.addSubscriber(s2); - - s2.setSuspended(true); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - assertTrue(s2.isSuspended()); - assertFalse(s1.isSuspended()); - - s1.setSuspended(true); - assertFalse(mgr.hasActiveSubscribers()); - - mgr.removeSubscriber(new SubscriptionTestHelper("S1")); - assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(new SubscriptionTestHelper("S2")); - assertTrue(mgr.isEmpty()); - } - - public void testRoundRobin() - { - SubscriptionTestHelper a = new SubscriptionTestHelper("A"); - SubscriptionTestHelper b = new SubscriptionTestHelper("B"); - SubscriptionTestHelper c = new SubscriptionTestHelper("C"); - SubscriptionTestHelper d = new SubscriptionTestHelper("D"); - mgr.addSubscriber(a); - mgr.addSubscriber(b); - mgr.addSubscriber(c); - mgr.addSubscriber(d); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - c.setSuspended(true); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - mgr.removeSubscriber(a); - d.setSuspended(true); - c.setSuspended(false); - Subscription e = new SubscriptionTestHelper("D"); - mgr.addSubscriber(e); - - for (int i = 0; i < 3; i++) - { - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(e, mgr.nextSubscriber(null)); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SubscriptionManagerTest.class); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java deleted file mode 100644 index bcf54693d3..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class SubscriptionSetTest extends TestCase -{ - /** - * A SubscriptionSet that counts the number of items scanned. - */ - static class TestSubscriptionSet extends SubscriptionSet - { - private int scanned = 0; - - void resetScanned() - { - scanned = 0; - } - - protected void subscriberScanned() - { - ++scanned; - } - - int getScanned() - { - return scanned; - } - } - - final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1"); - final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2"); - final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3"); - - final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true); - final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true); - final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true); - - public void testNextMessage() - { - SubscriptionSet ss = new SubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(0, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub1); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub2); - ss.addSubscriber(sub3); - - assertEquals(sub2, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - public void testNextMessageWhenAllSuspended() - { - SubscriptionSet ss = createAllSuspendedSubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - private TestSubscriptionSet createAllSuspendedSubscriptionSet() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(suspendedSub3); - return ss; - } - - public void testNextMessageAfterRemove() - { - SubscriptionSet ss = new SubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(sub3); - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1)); - - assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here. - assertEquals(2, ss.getCurrentSubscriber()); - } - - public void testNextMessageOverScanning() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - SubscriptionTestHelper sub = new SubscriptionTestHelper("test"); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(sub); - ss.addSubscriber(suspendedSub3); - assertEquals(sub, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - assertEquals(2, ss.getScanned()); - - ss.resetScanned(); - sub.setSuspended(true); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - // Current implementation overscans by one item here. - assertEquals(ss.size() + 1, ss.getScanned()); - } - - public void testNextMessageOverscanWorstCase() { - TestSubscriptionSet ss = createAllSuspendedSubscriptionSet(); - ss.nextSubscriber(null); - // Scans the subscriptions twice. - assertEquals(ss.size() * 2, ss.getScanned()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SubscriptionSetTest.class); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 1fa70a08d4..eed60a1a7c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.framing.AMQShortString; import java.util.ArrayList; import java.util.List; @@ -54,7 +56,12 @@ public class SubscriptionTestHelper implements Subscription return messages; } - public void send(QueueEntry msg, AMQQueue queue) + public void setQueue(AMQQueue queue) + { + + } + + public void send(QueueEntry msg) { messages.add(msg); } @@ -79,9 +86,39 @@ public class SubscriptionTestHelper implements Subscription //no-op } - public Object getSendLock() + public void getSendLock() + { + return; + } + + public void releaseSendLock() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resend(final QueueEntry entry) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit(final QueueEntry queueEntry) + { + + } + + public void setStateListener(final StateListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public QueueEntry getLastSeenEntry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) { - return new Object(); + return false; //To change body of implemented methods use File | Settings | File Templates. } public AMQChannel getChannel() @@ -94,6 +131,26 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public AMQShortString getConsumerTag() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isActive() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQQueue getQueue() + { + return null; + } + + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void queueDeleted(AMQQueue queue) { } @@ -108,6 +165,11 @@ public class SubscriptionTestHelper implements Subscription return true; } + public boolean isAutoClose() + { + return false; + } + public Queue<QueueEntry> getPreDeliveryQueue() { return null; @@ -157,5 +219,4 @@ public class SubscriptionTestHelper implements Subscription { return key.toString(); } - } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 6ffa3e0e02..792744903e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -22,12 +22,11 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; @@ -130,17 +129,17 @@ public class SkeletonMessageStore implements MessageStore return null; } - public void removeQueue(AMQShortString name) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQException { } - public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } - public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index c7984d5d33..f36e924890 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.txn.NonTransactionalContext; /** @@ -39,6 +40,7 @@ public class TestReferenceCounting extends TestCase private StoreContext _storeContext = new StoreContext(); + protected void setUp() throws Exception { super.setUp(); @@ -50,7 +52,7 @@ public class TestReferenceCounting extends TestCase */ public void testMessageGetsRemoved() throws AMQException { - createPersistentContentHeader(); + ContentHeaderBody chb = createPersistentContentHeader(); MessagePublishInfo info = new MessagePublishInfo() { @@ -81,16 +83,22 @@ public class TestReferenceCounting extends TestCase } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, - new NonTransactionalContext(_store, _storeContext, null, null), - createPersistentContentHeader()); + + final long messageId = _store.getNewMessageId(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext,info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 0); + assertEquals(1, _store.getMessageMetaDataMap().size()); } private ContentHeaderBody createPersistentContentHeader() @@ -134,18 +142,25 @@ public class TestReferenceCounting extends TestCase } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), - info, - new NonTransactionalContext(_store, _storeContext, null, null), - createPersistentContentHeader()); + final Long messageId = _store.getNewMessageId(); + final ContentHeaderBody chb = createPersistentContentHeader(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext, + info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + assertEquals(1, _store.getMessageMetaDataMap().size()); } public static junit.framework.Test suite() diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java index 57370f490f..5659f533a1 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -110,13 +110,10 @@ public class DupsOkTest extends QpidTestCase { try { - long remainingMessages = ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue); - fail("The queue should have 0 msgs left, seen " + _msgCount + " messages, left: " - + remainingMessages); - } - catch (AMQException e) - { - fail("Got AMQException" + e.getMessage()); + if(_msgCount != MSG_COUNT) + { + assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount); + } } finally { @@ -124,7 +121,6 @@ public class DupsOkTest extends QpidTestCase _awaitCompletion.countDown(); } } - } catch (JMSException e) { @@ -147,6 +143,11 @@ public class DupsOkTest extends QpidTestCase fail("Unable to wait for test completion"); throw e; } + + + // wait for the ack to get back + Thread.sleep(1000); + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue)); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java new file mode 100644 index 0000000000..da4f3ad0d1 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -0,0 +1,210 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.Enumeration; + +public class FlowControlTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(FlowControlTest.class); + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + + } + + /** + * Simply + */ + public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[128]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[256]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession).setPrefecthLimits(0,256); + MessageConsumer recv = consumerSession.createConsumer(_queue); + consumerConnection.start(); + + Message r1 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r1.acknowledge(); + + r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r2.acknowledge(); + + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + r3.acknowledge(); + recv.close(); + consumerSession.close(); + consumerConnection.close(); + + } + + public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[256]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[128]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession1).setPrefecthLimits(0,256); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + consumerConnection.start(); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + + Message r2 = recv1.receiveNoWait(); + assertNull("Second message incorrectly delivered", r2); + + Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession2).setPrefecthLimits(0,256); + MessageConsumer recv2 = consumerSession2.createConsumer(_queue); + + + r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv2.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + + + r2.acknowledge(); + r3.acknowledge(); + recv1.close(); + recv2.close(); + consumerSession1.close(); + consumerSession2.close(); + consumerConnection.close(); + + } + +} |