summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java172
1 files changed, 64 insertions, 108 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index e0a4357990..5c54c0b57f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -21,25 +21,40 @@
package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageCleanupException;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.log4j.Logger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -48,14 +63,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private final HeadersExchange exchange = new HeadersExchange();
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
- /**
- * Not used in this test, just there to stub out the routing calls
- */
+ /** Not used in this test, just there to stub out the routing calls */
private MessageStore _store = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
- private MessageFactory _handleFactory = new MessageFactory();
+ private MessageFactory _messageFactory = MessageFactory.getInstance();
private int count;
@@ -91,12 +104,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return queue;
}
-
protected void route(Message m) throws AMQException
{
- m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
- if(m.getIncomingMessage().allContentReceived())
+ exchange.route(m.getIncomingMessage());
+ m.getIncomingMessage().routingComplete(_store);
+ if (m.getIncomingMessage().allContentReceived())
{
m.getIncomingMessage().deliverToQueues();
}
@@ -112,17 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
routeAndTest(m, expectReturn, Arrays.asList(expected));
}
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
try
{
route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ assertFalse("Expected " + m + " to be returned due to manadatory flag, and lack of routing", expectReturn);
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -140,7 +147,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
catch (NoRouteException ex)
{
- assertTrue("Expected "+m+" not to be returned",expectReturn);
+ assertTrue("Expected " + m + " not to be returned", expectReturn);
}
}
@@ -177,7 +184,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static class TestQueue extends SimpleAMQQueue
{
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ // final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ final List<AMQMessage> messages = new ArrayList<AMQMessage>();
public TestQueue(AMQShortString name) throws AMQException
{
@@ -189,13 +197,15 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* We override this method so that the default behaviour, which attempts to use a delivery manager, is
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
+ *
* @param msg
+ *
* @throws AMQException
*/
@Override
public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add(msg);//new HeadersExchangeTest.Message(msg));
return new QueueEntry()
{
@@ -368,123 +378,69 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends PersistentAMQMessage
+ /** Just add some extra utility methods to AMQMessage to aid testing. */
+ static class Message
{
- private class TestIncomingMessage extends IncomingMessage
- {
-
- public TestIncomingMessage(final long messageId,
- final MessagePublishInfo info,
- final TransactionalContext txnContext,
- final AMQProtocolSession publisher)
- {
- super(messageId, info, txnContext, publisher);
- }
-
-
- public AMQMessage getUnderlyingMessage()
- {
- return Message.this;
- }
-
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return Message.this.getContentHeaderBody();
- }
- }
-
- private IncomingMessage _incoming;
private static MessageStore _messageStore = new SkeletonMessageStore();
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
null,
- new LinkedList<RequiredDeliveryException>()
+ new LinkedList<RequiredDeliveryException>()
);
- Message(String id, String... headers) throws AMQException
+ public static Message create(String id, String... headers) throws AMQException
{
- this(id, getHeaders(headers));
- }
+ ContentHeaderBody headerBody = getContentHeader(getHeaders(headers));
- Message(String id, FieldTable headers) throws AMQException
- {
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
- }
+ MessagePublishInfo mpi = getPublishRequest(id);
- public IncomingMessage getIncomingMessage()
- {
- return _incoming;
- }
-
- private Message(long messageId,
- MessagePublishInfo publish,
- ContentHeaderBody header) throws AMQException
- {
- super(messageId, _messageStore);
+ IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore);
try
{
- setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
+ incomming.setContentHeaderBody(headerBody);
}
catch (AMQException e)
{
}
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
- _incoming.setContentHeaderBody(header);
+ return new Message(incomming, mpi);
}
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg.getMessageId(), _messageStore);
-
- this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
-
- Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
-
- while(iterator.hasNext())
- {
- this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
- }
- }
-
-
+ private IncomingMessage _incoming;
+ private MessagePublishInfo _mpi;
- void route(Exchange exchange) throws AMQException
+ public Message(IncomingMessage incomming, MessagePublishInfo mpi)
{
- exchange.route(_incoming);
+ _incoming = incomming;
+ _mpi = mpi;
}
-
- public int hashCode()
+ public IncomingMessage getIncomingMessage()
{
- return getKey().hashCode();
+ return _incoming;
}
- public boolean equals(Object o)
+ public MessagePublishInfo getMessagePublishInfo()
{
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ return _mpi;
}
- private boolean equals(HeadersExchangeTest.Message m)
+ public boolean equals(Object o)
{
- return getKey().equals(m.getKey());
- }
+ if (o instanceof AMQMessage)
+ {
+ return _incoming.getMessageId().equals(((AMQMessage) o).getMessageId());
+ }
- public String toString()
- {
- return getKey().toString();
- }
+ if (o instanceof Message)
+ {
+ return _incoming.getMessageId().equals(((Message) o).getIncomingMessage().getMessageId());
+ }
- private Object getKey()
- {
- return getMessagePublishInfo().getRoutingKey();
+ return false;
}
}
}