diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java | 43 |
1 files changed, 20 insertions, 23 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 268451c74a..e26b5b048c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -27,15 +27,18 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; public class AbstractHeadersExchangeTestBase extends TestCase { @@ -49,8 +52,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ private MessageStore _store = new MemoryMessageStore(); - private MessageHandleFactory _handleFactory = new MessageHandleFactory(); - private int count; public void testDoNothing() @@ -88,8 +89,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected int route(Message m) throws AMQException { + m.getIncomingMessage().headersReceived(); m.route(exchange); - m.getIncomingMessage().routingComplete(_store, _handleFactory); if(m.getIncomingMessage().allContentReceived()) { for(AMQQueue q : m.getIncomingMessage().getDestinationQueues()) @@ -343,7 +344,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void setRedelivered(boolean b) + public void setRedelivered() { //To change body of implemented methods use File | Settings | File Templates. } @@ -452,6 +453,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ static class Message extends AMQMessage { + private static AtomicLong _messageId = new AtomicLong(); + private class TestIncomingMessage extends IncomingMessage { @@ -459,7 +462,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase final MessagePublishInfo info, final AMQProtocolSession publisher) { - super(messageId, info, publisher); + super(info); } @@ -484,7 +487,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase private IncomingMessage _incoming; - private static MessageStore _messageStore = new SkeletonMessageStore(); Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException { @@ -493,7 +495,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException { - this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); } public IncomingMessage getIncomingMessage() @@ -506,32 +508,27 @@ public class AbstractHeadersExchangeTestBase extends TestCase ContentHeaderBody header, List<ContentBody> bodies) throws AMQException { - super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish); + super(new MockStoredMessage(messageId, publish, header)); + + StoredMessage<MessageMetaData> storedMessage = getStoredMessage(); + int pos = 0; + for(ContentBody body : bodies) + { + storedMessage.addContent(pos, body.payload.duplicate().buf()); + pos += body.payload.limit(); + } - _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); _incoming.setContentHeaderBody(header); } - private static AMQMessageHandle createMessageHandle(final long messageId, - final MessagePublishInfo publish, - final ContentHeaderBody header) - { - - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - _messageStore, - true); - - amqMessageHandle.setPublishAndContentHeaderBody(publish,header); - return amqMessageHandle; - } private Message(AMQMessage msg) throws AMQException { - super(msg); + super(msg.getStoredMessage()); } |