diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java | 172 |
1 files changed, 64 insertions, 108 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index e0a4357990..5c54c0b57f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -21,25 +21,40 @@ package org.apache.qpid.server.exchange; import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.FailedDequeueException; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.MessageCleanupException; +import org.apache.qpid.server.queue.MessageFactory; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.log4j.Logger; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; public class AbstractHeadersExchangeTestBase extends TestCase { @@ -48,14 +63,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase private final HeadersExchange exchange = new HeadersExchange(); protected final Set<TestQueue> queues = new HashSet<TestQueue>(); - /** - * Not used in this test, just there to stub out the routing calls - */ + /** Not used in this test, just there to stub out the routing calls */ private MessageStore _store = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); - private MessageFactory _handleFactory = new MessageFactory(); + private MessageFactory _messageFactory = MessageFactory.getInstance(); private int count; @@ -91,12 +104,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return queue; } - protected void route(Message m) throws AMQException { - m.route(exchange); - m.getIncomingMessage().routingComplete(_store, _handleFactory); - if(m.getIncomingMessage().allContentReceived()) + exchange.route(m.getIncomingMessage()); + m.getIncomingMessage().routingComplete(_store); + if (m.getIncomingMessage().allContentReceived()) { m.getIncomingMessage().deliverToQueues(); } @@ -112,17 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase routeAndTest(m, expectReturn, Arrays.asList(expected)); } - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException { try { route(m); - assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + assertFalse("Expected " + m + " to be returned due to manadatory flag, and lack of routing", expectReturn); for (TestQueue q : queues) { if (expected.contains(q)) @@ -140,7 +147,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase catch (NoRouteException ex) { - assertTrue("Expected "+m+" not to be returned",expectReturn); + assertTrue("Expected " + m + " not to be returned", expectReturn); } } @@ -177,7 +184,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase static class TestQueue extends SimpleAMQQueue { - final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + // final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + final List<AMQMessage> messages = new ArrayList<AMQMessage>(); public TestQueue(AMQShortString name) throws AMQException { @@ -189,13 +197,15 @@ public class AbstractHeadersExchangeTestBase extends TestCase * We override this method so that the default behaviour, which attempts to use a delivery manager, is * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. + * * @param msg + * * @throws AMQException */ @Override public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg)); + messages.add(msg);//new HeadersExchangeTest.Message(msg)); return new QueueEntry() { @@ -368,123 +378,69 @@ public class AbstractHeadersExchangeTestBase extends TestCase } - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends PersistentAMQMessage + /** Just add some extra utility methods to AMQMessage to aid testing. */ + static class Message { - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final TransactionalContext txnContext, - final AMQProtocolSession publisher) - { - super(messageId, info, txnContext, publisher); - } - - - public AMQMessage getUnderlyingMessage() - { - return Message.this; - } - - - public ContentHeaderBody getContentHeaderBody() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; private static MessageStore _messageStore = new SkeletonMessageStore(); private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, - new LinkedList<RequiredDeliveryException>() + new LinkedList<RequiredDeliveryException>() ); - Message(String id, String... headers) throws AMQException + public static Message create(String id, String... headers) throws AMQException { - this(id, getHeaders(headers)); - } + ContentHeaderBody headerBody = getContentHeader(getHeaders(headers)); - Message(String id, FieldTable headers) throws AMQException - { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers)); - } + MessagePublishInfo mpi = getPublishRequest(id); - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(long messageId, - MessagePublishInfo publish, - ContentHeaderBody header) throws AMQException - { - super(messageId, _messageStore); + IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore); try { - setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header); + incomming.setContentHeaderBody(headerBody); } catch (AMQException e) { } - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); - _incoming.setContentHeaderBody(header); + return new Message(incomming, mpi); } - private Message(AMQMessage msg) throws AMQException - { - super(msg.getMessageId(), _messageStore); - - this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody()); - - Iterator<ContentChunk> iterator = msg.getContentBodyIterator(); - - while(iterator.hasNext()) - { - this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext()); - } - } - - + private IncomingMessage _incoming; + private MessagePublishInfo _mpi; - void route(Exchange exchange) throws AMQException + public Message(IncomingMessage incomming, MessagePublishInfo mpi) { - exchange.route(_incoming); + _incoming = incomming; + _mpi = mpi; } - - public int hashCode() + public IncomingMessage getIncomingMessage() { - return getKey().hashCode(); + return _incoming; } - public boolean equals(Object o) + public MessagePublishInfo getMessagePublishInfo() { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + return _mpi; } - private boolean equals(HeadersExchangeTest.Message m) + public boolean equals(Object o) { - return getKey().equals(m.getKey()); - } + if (o instanceof AMQMessage) + { + return _incoming.getMessageId().equals(((AMQMessage) o).getMessageId()); + } - public String toString() - { - return getKey().toString(); - } + if (o instanceof Message) + { + return _incoming.getMessageId().equals(((Message) o).getIncomingMessage().getMessageId()); + } - private Object getKey() - { - return getMessagePublishInfo().getRoutingKey(); + return false; } } } |