summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java84
1 files changed, 41 insertions, 43 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 28f5d417ff..2747094caf 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -29,13 +29,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -54,8 +49,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
private MessageStore _store = new MemoryMessageStore();
- private StoreContext _storeContext = new StoreContext();
-
private MessageHandleFactory _handleFactory = new MessageHandleFactory();
private int count;
@@ -93,14 +86,18 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- protected void route(Message m) throws AMQException
+ protected int route(Message m) throws AMQException
{
m.route(exchange);
m.getIncomingMessage().routingComplete(_store, _handleFactory);
if(m.getIncomingMessage().allContentReceived())
{
- m.getIncomingMessage().deliverToQueues();
+ for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
+ return m.getIncomingMessage().getDestinationQueues().size();
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -120,10 +117,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ int queueCount = route(m);
+
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -137,12 +132,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
}
}
- }
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
+ if(expectReturn)
+ {
+ assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
+ }
}
@@ -244,6 +238,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ public String toString()
+ {
+ return getName().toString();
+ }
+
public TestQueue(AMQShortString name) throws AMQException
{
super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
@@ -334,6 +333,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean releaseButRetain()
+ {
+ return false;
+ }
+
public boolean immediateAndNotDelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -344,6 +348,16 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isRedelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -369,7 +383,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -379,12 +393,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -394,7 +408,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void discard()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -438,10 +452,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
public TestIncomingMessage(final long messageId,
final MessagePublishInfo info,
- final TransactionalContext txnContext,
final AMQProtocolSession publisher)
{
- super(messageId, info, txnContext, publisher);
+ super(messageId, info, publisher);
}
@@ -468,14 +481,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private static MessageStore _messageStore = new SkeletonMessageStore();
- private static StoreContext _storeContext = new StoreContext();
-
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
-
Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
this(protocolSession, id, getHeaders(headers));
@@ -496,11 +501,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+ super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
- _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
+ _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
_incoming.setContentHeaderBody(header);
@@ -515,14 +520,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
_messageStore,
true);
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
- }
- catch (AMQException e)
- {
-
- }
+ amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
return amqMessageHandle;
}