summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java43
1 files changed, 20 insertions, 23 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 268451c74a..e26b5b048c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -27,15 +27,18 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -49,8 +52,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
private MessageStore _store = new MemoryMessageStore();
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
private int count;
public void testDoNothing()
@@ -88,8 +89,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected int route(Message m) throws AMQException
{
+ m.getIncomingMessage().headersReceived();
m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
if(m.getIncomingMessage().allContentReceived())
{
for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
@@ -343,7 +344,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void setRedelivered(boolean b)
+ public void setRedelivered()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -452,6 +453,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
static class Message extends AMQMessage
{
+ private static AtomicLong _messageId = new AtomicLong();
+
private class TestIncomingMessage extends IncomingMessage
{
@@ -459,7 +462,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
final MessagePublishInfo info,
final AMQProtocolSession publisher)
{
- super(messageId, info, publisher);
+ super(info);
}
@@ -484,7 +487,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private IncomingMessage _incoming;
- private static MessageStore _messageStore = new SkeletonMessageStore();
Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
@@ -493,7 +495,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
{
- this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
}
public IncomingMessage getIncomingMessage()
@@ -506,32 +508,27 @@ public class AbstractHeadersExchangeTestBase extends TestCase
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
+ super(new MockStoredMessage(messageId, publish, header));
+
+ StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
+ int pos = 0;
+ for(ContentBody body : bodies)
+ {
+ storedMessage.addContent(pos, body.payload.duplicate().buf());
+ pos += body.payload.limit();
+ }
-
_incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
_incoming.setContentHeaderBody(header);
}
- private static AMQMessageHandle createMessageHandle(final long messageId,
- final MessagePublishInfo publish,
- final ContentHeaderBody header)
- {
-
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- _messageStore,
- true);
-
- amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
- return amqMessageHandle;
- }
private Message(AMQMessage msg) throws AMQException
{
- super(msg);
+ super(msg.getStoredMessage());
}