summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java64
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java113
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java64
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java177
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java24
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java56
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java44
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java13
12 files changed, 296 insertions, 287 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 42d9cccb4f..144e4be6af 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -22,14 +22,15 @@ package org.apache.qpid.server.ack;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntryImpl;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -99,12 +100,12 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException
{
TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
_storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new LinkedList<RequiredDeliveryException>()
+ );
for (int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
@@ -138,8 +139,8 @@ public class TxAckTest extends TestCase
}
};
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE));
}
_acked = acked;
_unacked = unacked;
@@ -154,7 +155,7 @@ public class TxAckTest extends TestCase
{
for (long tag : tags)
{
- UnacknowledgedMessage u = _map.get(tag);
+ QueueEntry u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
((TestMessage) u.getMessage()).assertCountEquals(expected);
}
@@ -195,31 +196,46 @@ public class TxAckTest extends TestCase
}
}
+ private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ {
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ null,
+ false);
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+ publishBody,
+ new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+
+
+ return amqMessageHandle;
+ }
+
+
private class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ throws AMQException
{
- super(messageId, publishBody, txnContext);
- try
- {
- setContentHeaderBody(new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
+ super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
_tag = tag;
}
+
public void incrementReference()
{
_count++;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 58323086b5..80470d44b3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -24,10 +24,7 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -36,6 +33,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
import java.util.*;
@@ -94,7 +92,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void route(Message m) throws AMQException
{
m.route(exchange);
- m.routingComplete(_store, _storeContext, _handleFactory);
+ m.getIncomingMessage().routingComplete(_store, _handleFactory);
+ if(m.getIncomingMessage().allContentReceived())
+ {
+ m.getIncomingMessage().deliverToQueues();
+ }
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -122,12 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
if (expected.contains(q))
{
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
//assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
}
else
{
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
//assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
}
}
@@ -234,7 +236,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return properties;
}
- static class TestQueue extends AMQQueue
+ static class TestQueue extends AMQQueueImpl
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
@@ -253,8 +255,14 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
{
- messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
+ messages.add( new HeadersExchangeTest.Message(msg.getMessage()));
+ }
+
+ boolean isInQueue(Message msg)
+ {
+ return messages.contains(msg);
}
+
}
/**
@@ -262,14 +270,48 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
static class Message extends AMQMessage
{
+ private class TestIncomingMessage extends IncomingMessage
+ {
+
+ public TestIncomingMessage(final long messageId,
+ final MessagePublishInfo info,
+ final TransactionalContext txnContext,
+ final AMQProtocolSession publisher)
+ {
+ super(messageId, info, txnContext, publisher);
+ }
+
+
+ public AMQMessage getUnderlyingMessage()
+ {
+ return Message.this;
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ try
+ {
+ return Message.this.getContentHeaderBody();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private IncomingMessage _incoming;
+
private static MessageStore _messageStore = new SkeletonMessageStore();
private static StoreContext _storeContext = new StoreContext();
+
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new LinkedList<RequiredDeliveryException>()
+ );
Message(String id, String... headers) throws AMQException
{
@@ -278,12 +320,47 @@ public class AbstractHeadersExchangeTestBase extends TestCase
Message(String id, FieldTable headers) throws AMQException
{
- this(getPublishRequest(id), getContentHeader(headers), null);
+ this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ public IncomingMessage getIncomingMessage()
+ {
+ return _incoming;
+ }
+
+ private Message(long messageId,
+ MessagePublishInfo publish,
+ ContentHeaderBody header,
+ List<ContentBody> bodies) throws AMQException
+ {
+ super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+
+
+
+ _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming.setContentHeaderBody(header);
+
+
}
- private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ private static AMQMessageHandle createMessageHandle(final long messageId,
+ final MessagePublishInfo publish,
+ final ContentHeaderBody header)
{
- super(_messageStore.getNewMessageId(), publish, _txnContext, header);
+
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ _messageStore,
+ true);
+
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+ }
+ catch (AMQException e)
+ {
+
+ }
+ return amqMessageHandle;
}
private Message(AMQMessage msg) throws AMQException
@@ -291,15 +368,13 @@ public class AbstractHeadersExchangeTestBase extends TestCase
super(msg);
}
+
+
void route(Exchange exchange) throws AMQException
{
- exchange.route(this);
+ exchange.route(_incoming);
}
- boolean isInQueue(TestQueue queue)
- {
- return queue.messages.contains(this);
- }
public int hashCode()
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index 5fbea5e14f..6f8963131b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -54,10 +55,12 @@ public class AMQProtocolSessionMBeanTest extends TestCase
// check the channel count is correct
int channelCount = _mbean.channels().size();
assertTrue(channelCount == 1);
- AMQQueue queue =
- new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false,
- new AMQShortString("test"), true, _protocolSession.getVirtualHost());
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
+ false,
+ new AMQShortString("test"),
+ true,
+ _protocolSession.getVirtualHost());
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 95ffb505fb..30240217a2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -29,7 +29,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -40,7 +42,6 @@ import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.LinkedList;
import java.util.Set;
-import java.util.HashSet;
/**
* Tests that acknowledgements are handled correctly.
@@ -49,7 +50,7 @@ public class AckTest extends TestCase
{
private static final Logger _log = Logger.getLogger(AckTest.class);
- private SubscriptionImpl _subscription;
+ private Subscription _subscription;
private MockProtocolSession _protocolSession;
@@ -57,9 +58,7 @@ public class AckTest extends TestCase
private StoreContext _storeContext = new StoreContext();
- private AMQChannel _channel;
-
- private SubscriptionSet _subscriptionManager;
+ private AMQChannel _channel;
private AMQQueue _queue;
@@ -78,8 +77,9 @@ public class AckTest extends TestCase
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
+
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
+
}
private void publishMessages(int count) throws AMQException
@@ -90,8 +90,9 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new LinkedList<RequiredDeliveryException>()
+ );
+ _queue.registerSubscription(_subscription,false);
MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
@@ -125,7 +126,8 @@ public class AckTest extends TestCase
return new AMQShortString("rk");
}
};
- AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ //IncomingMessage msg2 = null;
if (persistent)
{
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
@@ -142,10 +144,14 @@ public class AckTest extends TestCase
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
- msg.incrementReference();
- msg.routingComplete(_messageStore, _storeContext, factory);
+ msg.enqueue(_queue);
+ msg.routingComplete(_messageStore, factory);
+ if(msg.allContentReceived())
+ {
+ msg.deliverToQueues();
+ }
// we manually send the message to the subscription
- _subscription.send(new QueueEntry(_queue,msg), _queue);
+ //_subscription.send(new QueueEntry(_queue,msg), _queue);
}
}
@@ -155,7 +161,7 @@ public class AckTest extends TestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -169,7 +175,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i);
i++;
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
}
@@ -183,7 +189,7 @@ public class AckTest extends TestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -200,7 +206,7 @@ public class AckTest extends TestCase
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -217,7 +223,7 @@ public class AckTest extends TestCase
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -230,7 +236,7 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
@@ -246,7 +252,7 @@ public class AckTest extends TestCase
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -259,7 +265,7 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
@@ -270,7 +276,7 @@ public class AckTest extends TestCase
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
@@ -283,18 +289,19 @@ public class AckTest extends TestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+ QueueEntry unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
+/*
public void testPrefetchHighLow() throws AMQException
{
int lowMark = 5;
int highMark = 10;
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
_channel.setPrefetchLowMarkCount(lowMark);
_channel.setPrefetchHighMarkCount(highMark);
@@ -343,10 +350,12 @@ public class AckTest extends TestCase
assertTrue(map.size() == 0);
}
+*/
+/*
public void testPrefetch() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- _channel.setPrefetchCount(5);
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ _channel.setMessageCredit(5);
assertTrue(_channel.getPrefetchCount() == 5);
@@ -371,6 +380,7 @@ public class AckTest extends TestCase
assertTrue(map.size() == 0);
}
+*/
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(AckTest.class);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
index 282ad3ed5e..3ff691c792 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.subscription.Subscription;
import java.util.*;
import java.util.concurrent.Executor;
@@ -48,8 +49,8 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
private final Executor _executor = new OnCurrentThreadExecutor();
private final List<Thread> _threads = new ArrayList<Thread>();
- private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
- private final DeliveryManager _deliveryMgr;
+ private final SubscriptionSet _subscriptionMgr;
+ private final ConcurrentSelectorDeliveryManager _deliveryMgr;
private boolean isComplete;
private boolean failed;
@@ -60,8 +61,9 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager( new AMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
_virtualHost));
+ _subscriptionMgr = _deliveryMgr.getSubscribers();
}
public void testConcurrent1() throws InterruptedException, AMQException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
deleted file mode 100644
index b33259cfba..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-
-import junit.framework.TestSuite;
-
-abstract public class DeliveryManagerTest extends MessageTestHelper
-{
- protected final SubscriptionSet _subscriptions = new SubscriptionSet();
- protected DeliveryManager _mgr;
- protected StoreContext _storeContext = new StoreContext();
- private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me");
-
- public DeliveryManagerTest() throws Exception
- {
- }
-
- public void testStartInQueueingMode() throws AMQException
- {
- QueueEntry[] messages = new QueueEntry[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
- }
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("2");
- _subscriptions.addSubscriber(s1);
- _subscriptions.addSubscriber(s2);
-
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
- }
-
- assertTrue(s1.getMessages().isEmpty());
- assertTrue(s2.getMessages().isEmpty());
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
-
- assertEquals(messages.length / 2, s1.getMessages().size());
- assertEquals(messages.length / 2, s2.getMessages().size());
-
- for (int i = 0; i < messages.length; i++)
- {
- if (i % 2 == 0)
- {
- assertTrue(s1.getMessages().get(i / 2) == messages[i]);
- }
- else
- {
- assertTrue(s2.getMessages().get(i / 2) == messages[i]);
- }
- }
- }
-
- public void testStartInDirectMode() throws AMQException
- {
- QueueEntry[] messages = new QueueEntry[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- _subscriptions.addSubscriber(s1);
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
- }
-
- assertEquals(batch, s1.getMessages().size());
- for (int i = 0; i < batch; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i));
- }
- s1.getMessages().clear();
- assertEquals(0, s1.getMessages().size());
-
- s1.setSuspended(true);
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
- }
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(0, s1.getMessages().size());
- s1.setSuspended(false);
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(messages.length - batch, s1.getMessages().size());
-
- for (int i = batch; i < messages.length; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i - batch));
- }
-
- }
-
- public void testNoConsumers() throws AMQException
- {
- try
- {
- QueueEntry msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public void testNoActiveConsumers() throws AMQException
- {
- try
- {
- SubscriptionTestHelper s = new SubscriptionTestHelper("A");
- _subscriptions.addSubscriber(s);
- s.setSuspended(true);
- QueueEntry msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- return suite;
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 521bedeccd..b2a4216f8d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -28,7 +27,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -38,7 +36,6 @@ import org.apache.qpid.AMQException;
import junit.framework.TestCase;
import java.util.LinkedList;
-import java.util.HashSet;
class MessageTestHelper extends TestCase
{
@@ -47,20 +44,20 @@ class MessageTestHelper extends TestCase
private final StoreContext _storeContext = new StoreContext();
private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new LinkedList<RequiredDeliveryException>()
+ );
MessageTestHelper() throws Exception
{
ApplicationRegistry.initialise(new NullApplicationRegistry());
}
- QueueEntry message() throws AMQException
+ QueueEntryImpl message() throws AMQException
{
return message(false);
}
- QueueEntry message(final boolean immediate) throws AMQException
+ QueueEntryImpl message(final boolean immediate) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -90,9 +87,16 @@ class MessageTestHelper extends TestCase
return null;
}
};
-
- return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody()));
+
+ //public AMQMessage(Long messageId, AMQMessageHandle messageHandle , TransactionalContext txnConext, MessagePublishInfo info)
+ long messageId = _messageStore.getNewMessageId();
+ final AMQMessageHandle messageHandle =
+ (new MessageHandleFactory()).createMessageHandle(messageId, _messageStore, false);
+ messageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,new ContentHeaderBody());
+ AMQMessage msg = new AMQMessage(messageHandle, _txnContext.getStoreContext(), publish);
+
+
+ return new QueueEntryImpl(null,msg, Long.MIN_VALUE);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
index d3ec3c11d4..1200d7fc2b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.qpid.server.subscription.Subscription;
public class SubscriptionManagerTest extends TestCase
{
@@ -47,9 +48,9 @@ public class SubscriptionManagerTest extends TestCase
s1.setSuspended(true);
assertFalse(mgr.hasActiveSubscribers());
- mgr.removeSubscriber(new SubscriptionTestHelper("S1"));
+ mgr.removeSubscriber(s1);
assertFalse(mgr.isEmpty());
- mgr.removeSubscriber(new SubscriptionTestHelper("S2"));
+ mgr.removeSubscriber(s2);
assertTrue(mgr.isEmpty());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 458b510ef5..624a368b1f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.framing.AMQShortString;
import java.util.ArrayList;
import java.util.List;
@@ -54,7 +56,12 @@ public class SubscriptionTestHelper implements Subscription
return messages;
}
- public void send(QueueEntry msg, AMQQueue queue)
+ public void setQueue(AMQQueue queue)
+ {
+
+ }
+
+ public void send(QueueEntry msg)
{
messages.add(msg);
}
@@ -84,16 +91,56 @@ public class SubscriptionTestHelper implements Subscription
return new Object();
}
+ public void resend(final QueueEntry entry)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+
+ }
+
+ public void setStateListener(final StateListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object getQueueContext()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean setQueueContext(Object expected, Object newValue)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public AMQChannel getChannel()
{
return null;
}
-
+
public void start()
{
//no-op
}
+ public AMQShortString getConumerTag()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isActive()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public AMQQueue getQueue()
+ {
+ return null;
+ }
+
public void queueDeleted(AMQQueue queue)
{
}
@@ -108,6 +155,11 @@ public class SubscriptionTestHelper implements Subscription
return true;
}
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
public Queue<QueueEntry> getPreDeliveryQueue()
{
return null;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 6ffa3e0e02..57bc173812 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -22,12 +22,12 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 374c69fa00..f36e924890 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -23,12 +23,12 @@ package org.apache.qpid.server.store;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.txn.NonTransactionalContext;
/**
@@ -40,6 +40,7 @@ public class TestReferenceCounting extends TestCase
private StoreContext _storeContext = new StoreContext();
+
protected void setUp() throws Exception
{
super.setUp();
@@ -51,7 +52,7 @@ public class TestReferenceCounting extends TestCase
*/
public void testMessageGetsRemoved() throws AMQException
{
- createPersistentContentHeader();
+ ContentHeaderBody chb = createPersistentContentHeader();
MessagePublishInfo info = new MessagePublishInfo()
{
@@ -82,16 +83,22 @@ public class TestReferenceCounting extends TestCase
}
};
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
- new NonTransactionalContext(_store, _storeContext, null, null, null),
- createPersistentContentHeader());
+
+ final long messageId = _store.getNewMessageId();
+ AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+ messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
+ AMQMessage message = new AMQMessage(messageHandle,
+ _storeContext,info);
+
message = message.takeReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+
+
+ assertEquals(1, _store.getMessageMetaDataMap().size());
message.decrementReference(_storeContext);
- assertTrue(_store.getMessageMetaDataMap().size() == 0);
+ assertEquals(1, _store.getMessageMetaDataMap().size());
}
private ContentHeaderBody createPersistentContentHeader()
@@ -135,18 +142,25 @@ public class TestReferenceCounting extends TestCase
}
};
- AMQMessage message = new AMQMessage(_store.getNewMessageId(),
- info,
- new NonTransactionalContext(_store, _storeContext, null, null, null),
- createPersistentContentHeader());
+ final Long messageId = _store.getNewMessageId();
+ final ContentHeaderBody chb = createPersistentContentHeader();
+ AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+ messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
+ AMQMessage message = new AMQMessage(messageHandle,
+ _storeContext,
+ info);
+
message = message.takeReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+
+
+
+ assertEquals(1, _store.getMessageMetaDataMap().size());
message = message.takeReference();
message.decrementReference(_storeContext);
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ assertEquals(1, _store.getMessageMetaDataMap().size());
}
public static junit.framework.Test suite()
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
index 463946e14a..4a2de976da 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -108,11 +108,16 @@ public class DupsOkTest extends VMTestCase
{
try
{
+ System.err.println("Got last message!");
long remainingMessages = ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue);
- if(remainingMessages != 0)
+ if(_msgCount != MSG_COUNT)
{
- assertEquals("The queue should have 0 msgs left, seen " + _msgCount + " messages.", 0, getMessageCount(_queue.getQueueName()));
+ assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount);
+ }
+ else
+ {
+ System.err.println("0 remaining on queue");
}
}
catch (AMQException e)
@@ -151,6 +156,10 @@ public class DupsOkTest extends VMTestCase
// consumer.close();
+ // wait for the ack to get back
+ Thread.sleep(1000);
+
+
assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
clientConnection.close();