summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java172
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java75
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java (renamed from java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java)11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java109
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java261
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java4
16 files changed, 444 insertions, 340 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 228c99dcbd..01533d6509 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -130,7 +130,7 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ AMQMessage message = new TestMessage(deliveryTag, info);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -197,11 +197,9 @@ public class TxAckTest extends TestCase
}
}
- private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(MessagePublishInfo publishBody)
{
- final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false);
try
{
// Safe to use null here as we just created a TransientMessage above
@@ -228,10 +226,10 @@ public class TxAckTest extends TestCase
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessage(messageId, publishBody));
+ super(createMessage( publishBody));
_tag = tag;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index e0a4357990..5c54c0b57f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -21,25 +21,40 @@
package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageCleanupException;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.log4j.Logger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -48,14 +63,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private final HeadersExchange exchange = new HeadersExchange();
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
- /**
- * Not used in this test, just there to stub out the routing calls
- */
+ /** Not used in this test, just there to stub out the routing calls */
private MessageStore _store = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
- private MessageFactory _handleFactory = new MessageFactory();
+ private MessageFactory _messageFactory = MessageFactory.getInstance();
private int count;
@@ -91,12 +104,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return queue;
}
-
protected void route(Message m) throws AMQException
{
- m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
- if(m.getIncomingMessage().allContentReceived())
+ exchange.route(m.getIncomingMessage());
+ m.getIncomingMessage().routingComplete(_store);
+ if (m.getIncomingMessage().allContentReceived())
{
m.getIncomingMessage().deliverToQueues();
}
@@ -112,17 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
routeAndTest(m, expectReturn, Arrays.asList(expected));
}
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
try
{
route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ assertFalse("Expected " + m + " to be returned due to manadatory flag, and lack of routing", expectReturn);
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -140,7 +147,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
catch (NoRouteException ex)
{
- assertTrue("Expected "+m+" not to be returned",expectReturn);
+ assertTrue("Expected " + m + " not to be returned", expectReturn);
}
}
@@ -177,7 +184,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static class TestQueue extends SimpleAMQQueue
{
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ // final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ final List<AMQMessage> messages = new ArrayList<AMQMessage>();
public TestQueue(AMQShortString name) throws AMQException
{
@@ -189,13 +197,15 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* We override this method so that the default behaviour, which attempts to use a delivery manager, is
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
+ *
* @param msg
+ *
* @throws AMQException
*/
@Override
public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add(msg);//new HeadersExchangeTest.Message(msg));
return new QueueEntry()
{
@@ -368,123 +378,69 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends PersistentAMQMessage
+ /** Just add some extra utility methods to AMQMessage to aid testing. */
+ static class Message
{
- private class TestIncomingMessage extends IncomingMessage
- {
-
- public TestIncomingMessage(final long messageId,
- final MessagePublishInfo info,
- final TransactionalContext txnContext,
- final AMQProtocolSession publisher)
- {
- super(messageId, info, txnContext, publisher);
- }
-
-
- public AMQMessage getUnderlyingMessage()
- {
- return Message.this;
- }
-
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return Message.this.getContentHeaderBody();
- }
- }
-
- private IncomingMessage _incoming;
private static MessageStore _messageStore = new SkeletonMessageStore();
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
null,
- new LinkedList<RequiredDeliveryException>()
+ new LinkedList<RequiredDeliveryException>()
);
- Message(String id, String... headers) throws AMQException
+ public static Message create(String id, String... headers) throws AMQException
{
- this(id, getHeaders(headers));
- }
+ ContentHeaderBody headerBody = getContentHeader(getHeaders(headers));
- Message(String id, FieldTable headers) throws AMQException
- {
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
- }
+ MessagePublishInfo mpi = getPublishRequest(id);
- public IncomingMessage getIncomingMessage()
- {
- return _incoming;
- }
-
- private Message(long messageId,
- MessagePublishInfo publish,
- ContentHeaderBody header) throws AMQException
- {
- super(messageId, _messageStore);
+ IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore);
try
{
- setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
+ incomming.setContentHeaderBody(headerBody);
}
catch (AMQException e)
{
}
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
- _incoming.setContentHeaderBody(header);
+ return new Message(incomming, mpi);
}
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg.getMessageId(), _messageStore);
-
- this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
-
- Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
-
- while(iterator.hasNext())
- {
- this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
- }
- }
-
-
+ private IncomingMessage _incoming;
+ private MessagePublishInfo _mpi;
- void route(Exchange exchange) throws AMQException
+ public Message(IncomingMessage incomming, MessagePublishInfo mpi)
{
- exchange.route(_incoming);
+ _incoming = incomming;
+ _mpi = mpi;
}
-
- public int hashCode()
+ public IncomingMessage getIncomingMessage()
{
- return getKey().hashCode();
+ return _incoming;
}
- public boolean equals(Object o)
+ public MessagePublishInfo getMessagePublishInfo()
{
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ return _mpi;
}
- private boolean equals(HeadersExchangeTest.Message m)
+ public boolean equals(Object o)
{
- return getKey().equals(m.getKey());
- }
+ if (o instanceof AMQMessage)
+ {
+ return _incoming.getMessageId().equals(((AMQMessage) o).getMessageId());
+ }
- public String toString()
- {
- return getKey().toString();
- }
+ if (o instanceof Message)
+ {
+ return _incoming.getMessageId().equals(((Message) o).getIncomingMessage().getMessageId());
+ }
- private Object getKey()
- {
- return getMessagePublishInfo().getRoutingKey();
+ return false;
}
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index ddf177690c..396b8c5128 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -75,7 +75,7 @@ public class DestWildExchangeTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
+ IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store);
_exchange.route(message);
@@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store, new MessageFactory());
+ message.routingComplete(_store);
message.deliverToQueues();
}
@@ -551,7 +551,7 @@ public class DestWildExchangeTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
+ IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store);
message.setContentHeaderBody( new ContentHeaderBody());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 5843e280fa..a60045eaba 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -50,21 +50,21 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q7 = bindDefault("F0000", "F0001=Bear");
TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
- routeAndTest(new Message("Message1", "F0000"), q1);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ routeAndTest(Message.create("Message1", "F0000"), q1);
+ routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(Message.create("Message6", "F0002"));
- Message m7 = new Message("Message7", "XXXXX");
+ Message m7 = Message.create("Message7", "XXXXX");
MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
pb7.setMandatory(true);
routeAndTest(m7,true);
- Message m8 = new Message("Message8", "F0000");
+ Message m8 = Message.create("Message8", "F0000");
MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -80,19 +80,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
- routeAndTest(new Message("Message1", "F0000"), q1, q3);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(Message.create("Message1", "F0000"), q1, q3);
+ routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message6", "F0002"));
}
public void testMandatory() throws AMQException
{
bindDefault("F0000");
- Message m1 = new Message("Message1", "XXXXX");
- Message m2 = new Message("Message2", "F0000");
+
+ Message m1 = Message.create("Message1", "XXXXX");
+ Message m2 = Message.create("Message2", "F0000");
MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
pb1.setMandatory(true);
MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index ffe858f517..ba02e6f6bd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,13 +20,13 @@ package org.apache.qpid.server.queue;
*
*/
-import java.util.ArrayList;
-
+import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.framing.FieldTable;
+
+import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
@@ -44,38 +44,40 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
// Enqueue messages in order
- _queue.enqueue(null, createMessage(1L, (byte) 10));
- _queue.enqueue(null, createMessage(2L, (byte) 4));
- _queue.enqueue(null, createMessage(3L, (byte) 0));
-
+ AMQMessage message = createMessage((byte) 10);
+ Long messagIDOffset = message.getMessageId() - 1;
+ _queue.enqueue(null, message);
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 0));
+
// Enqueue messages in reverse order
- _queue.enqueue(null, createMessage(4L, (byte) 0));
- _queue.enqueue(null, createMessage(5L, (byte) 4));
- _queue.enqueue(null, createMessage(6L, (byte) 10));
-
+ _queue.enqueue(null, createMessage((byte) 0));
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 10));
+
// Enqueue messages out of order
- _queue.enqueue(null, createMessage(7L, (byte) 4));
- _queue.enqueue(null, createMessage(8L, (byte) 10));
- _queue.enqueue(null, createMessage(9L, (byte) 0));
-
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 10));
+ _queue.enqueue(null, createMessage((byte) 0));
+
// Register subscriber
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
-
+
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
+ assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId());
+ assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId());
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId());
+ assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId());
+ assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId());
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId());
+ assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId());
+ assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId());
}
catch (AssertionFailedError afe)
{
@@ -92,25 +94,12 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
}
- protected AMQMessage createMessage(Long id, byte i) throws AMQException
+ protected AMQMessage createMessage(byte i) throws AMQException
{
- AMQMessage message = super.createMessage(id);
+ AMQMessage message = super.createMessage();
+
+ ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
- ContentHeaderBody header = new ContentHeaderBody();
- header.bodySize = MESSAGE_SIZE;
-
- //The createMessage above is for a Transient Message so it is safe to have no context.
- message.setPublishAndContentHeaderBody(null, info, header);
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setPriority(i);
- message.getContentHeaderBody().properties = props;
return message;
}
-
- protected AMQMessage createMessage(Long id) throws AMQException
- {
- return createMessage(id, (byte) 0);
- }
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index b159e2cda5..be40535df9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageFactory());
+ messages[i].routingComplete(_messageStore);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index a5e2da7b36..97c76c8e9d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -216,12 +216,14 @@ public class AMQQueueMBeanTest extends TestCase
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageId();
+
_queue.clearQueue(_storeContext);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageFactory());
+ msg.routingComplete(_messageStore);
+
+ long id = msg.getMessageId();
msg.addContentBodyFrame(new ContentChunk()
{
@@ -264,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -305,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageFactory());
+ currentMessage.routingComplete(_messageStore);
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index cd1ee65c0c..98465eda20 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -98,7 +98,7 @@ public class AckTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
_queue.registerSubscription(_subscription,false);
- MessageFactory factory = new MessageFactory();
+ MessageFactory factory = MessageFactory.getInstance();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -106,7 +106,7 @@ public class AckTest extends TestCase
MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false,
false, new AMQShortString("rk"));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore);
//IncomingMessage msg2 = null;
if (persistent)
{
@@ -127,7 +127,7 @@ public class AckTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, factory);
+ msg.routingComplete(_messageStore);
if(msg.allContentReceived())
{
msg.deliverToQueues();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java
index 582e2bfb00..75e9f08417 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java
@@ -22,27 +22,28 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
-public class MessageFactoryTest extends TestCase
+public class MessageFactoryClassTest extends TestCase
{
private MessageFactory _factory;
public void setUp()
{
- _factory = new MessageFactory();
+ _factory = MessageFactory.getInstance();
}
public void testTransientMessageCreation()
{
- AMQMessage message = _factory.createMessage(0L, null, false);
+ AMQMessage message = _factory.createMessage(null, false);
assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass());
}
public void testPersistentMessageCreation()
{
- AMQMessage message = _factory.createMessage(0L, null, true);
+ AMQMessage message = _factory.createMessage(null, true);
assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass());
}
-} \ No newline at end of file
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
new file mode 100644
index 0000000000..db0fc56303
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageFactoryRecoveryTest extends TestCase
+{
+ private MessageFactory _factory;
+
+ public void setUp()
+ {
+ _factory = MessageFactory.getInstance();
+
+ }
+
+ public void test()
+ {
+ AMQMessage message = _factory.createMessage(null, false);
+
+ _factory.enableRecover();
+
+ Long messasgeID = message.getMessageId();
+
+ try
+ {
+ _factory.createMessage(messasgeID, null);
+ fail("Cannot recreate message with an existing id");
+ }
+ catch (RuntimeException re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + messasgeID, re.getMessage());
+ }
+
+ //Check we cannot go backwords with ids.
+ try
+ {
+ _factory.createMessage(messasgeID - 1, null);
+ fail("Cannot recreate message with an old id");
+ }
+ catch (RuntimeException re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + (messasgeID - 1), re.getMessage());
+ }
+
+ //Check that we can jump forward in ids during recovery.
+ messasgeID += 100;
+ try
+ {
+ message = _factory.createMessage(messasgeID, null);
+ assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
+ }
+ catch (Exception re)
+ {
+ fail("Message with a much higher value should be created");
+ }
+
+ // End the reovery process.
+ _factory.start();
+
+ //Check we cannot still create by id after ending recovery phase
+ try
+ {
+ _factory.createMessage(messasgeID, null);
+ fail("We have left recovery mode so we cannot create by id any more");
+ }
+ catch (Exception re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Unable to create message by ID when not recovering", re.getMessage());
+ }
+
+ // Check that the next message created has the next available id
+
+ messasgeID++;
+
+ try
+ {
+ message = _factory.createMessage(null, false);
+ assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
+ }
+ catch (Exception re)
+ {
+ fail("Message with a much higher value should be created");
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
new file mode 100644
index 0000000000..3633481012
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+
+public class MockPersistentAMQMessage extends PersistentAMQMessage
+{
+ public MockPersistentAMQMessage(long messageId, MessageStore store)
+ throws AMQException
+ {
+ super(messageId, store);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index e213be7560..fdaf2c309f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -35,15 +35,15 @@ public class PersistentMessageTest extends TransientMessageTest
}
@Override
- protected AMQMessage newMessage(Long id)
+ protected AMQMessage newMessage()
{
- return new MessageFactory().createMessage(id, _messageStore, true);
+ return MessageFactory.getInstance().createMessage(_messageStore, true);
}
@Override
public void testIsPersistent()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertTrue(_message.isPersistent());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 2dcb081739..98772e7b61 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,12 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
-
-import java.util.ArrayList;
-import java.util.List;
-
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,6 +37,9 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.List;
+
public class SimpleAMQQueueTest extends TestCase
{
@@ -54,7 +52,7 @@ public class SimpleAMQQueueTest extends TestCase
protected DirectExchange _exchange = new DirectExchange();
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
-
+
MessagePublishInfo info = new MessagePublishInfoImpl();
private static final long MESSAGE_SIZE = 100;
@@ -63,7 +61,7 @@ public class SimpleAMQQueueTest extends TestCase
{
super.setUp();
//Create Application Registry for test
- ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1);
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
_virtualHost = new VirtualHost("vhost", _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -81,97 +79,99 @@ public class SimpleAMQQueueTest extends TestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments );
+ try
+ {
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing name",
- e.getMessage().contains("name"));
+ assertTrue("Exception was not about missing name",
+ e.getMessage().contains("name"));
}
-
- try {
+
+ try
+ {
_queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing vhost",
- e.getMessage().contains("Host"));
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
- _virtualHost, _arguments);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _virtualHost, _arguments);
assertNotNull("Queue was not created", _queue);
}
-
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
-
+
public void testBinding()
{
try
{
_queue.bind(_exchange, _routingKey, null);
- assertTrue("Routing key was not bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertEquals("Queue was not bound to key",
- _exchange.getBindings().get(_routingKey).get(0),
- _queue);
- assertEquals("Exchange binding count", 1,
- _queue.getExchangeBindings().size());
- assertEquals("Wrong exchange bound", _routingKey,
- _queue.getExchangeBindings().get(0).getRoutingKey());
- assertEquals("Wrong exchange bound", _exchange,
- _queue.getExchangeBindings().get(0).getExchange());
-
+ assertTrue("Routing key was not bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertEquals("Queue was not bound to key",
+ _exchange.getBindings().get(_routingKey).get(0),
+ _queue);
+ assertEquals("Exchange binding count", 1,
+ _queue.getExchangeBindings().size());
+ assertEquals("Wrong exchange bound", _routingKey,
+ _queue.getExchangeBindings().get(0).getRoutingKey());
+ assertEquals("Wrong exchange bound", _exchange,
+ _queue.getExchangeBindings().get(0).getExchange());
+
_queue.unBind(_exchange, _routingKey, null);
- assertFalse("Routing key was still bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertNull("Routing key was not empty",
- _exchange.getBindings().get(_routingKey));
+ assertFalse("Routing key was still bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertNull("Routing key was not empty",
+ _exchange.getBindings().get(_routingKey));
}
catch (AMQException e)
{
assertNull("Unexpected exception", e);
}
}
-
+
public void testSubscription() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
-
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
+
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
assertTrue("Subscription still had queue", _subscription.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
- assertFalse("Queue still has active consumer",
- 1 == _queue.getActiveConsumerCount());
-
- AMQMessage messageB = createMessage(new Long (25));
+ assertFalse("Queue still has active consumer",
+ 1 == _queue.getActiveConsumerCount());
+
+ AMQMessage messageB = createMessage();
_queue.enqueue(null, messageB);
QueueEntry entry = _subscription.getLastSeenEntry();
assertNull(entry);
}
-
+
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
@@ -182,18 +182,18 @@ public class SimpleAMQQueueTest extends TestCase
{
// Check adding an exclusive subscription adds it to the queue
_queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
- _queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ _queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
Exception ex = null;
@@ -203,7 +203,7 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
assertTrue(ex instanceof AMQException);
@@ -218,40 +218,40 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
}
-
- public void testAutoDeleteQueue() throws Exception
+
+ public void testAutoDeleteQueue() throws Exception
{
- _queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
- _queue.registerSubscription(_subscription, false);
- AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
- _queue.unregisterSubscription(_subscription);
- assertTrue("Queue was not deleted when subscription was removed",
- _queue.isDeleted());
+ _queue.stop();
+ _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue.registerSubscription(_subscription, false);
+ AMQMessage message = createMessage();
+ _queue.enqueue(null, message);
+ _queue.unregisterSubscription(_subscription);
+ assertTrue("Queue was not deleted when subscription was removed",
+ _queue.isDeleted());
}
-
+
public void testResend() throws Exception
{
_queue.registerSubscription(_subscription, false);
- Long id = new Long(26);
- AMQMessage message = createMessage(id);
+ AMQMessage message = createMessage();
+ Long id = message.getMessageId();
_queue.enqueue(null, message);
QueueEntry entry = _subscription.getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
-
+
}
-
+
public void testGetFirstMessageId() throws Exception
{
// Create message
- Long messageId = new Long(23);
- AMQMessage message = createMessage(messageId);
+ AMQMessage message = createMessage();
+ Long messageId = message.getMessageId();
// Put message on queue
_queue.enqueue(null, message);
@@ -264,34 +264,40 @@ public class SimpleAMQQueueTest extends TestCase
public void testGetFirstFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 5; i++)
+ // Create message
+
+ AMQMessage message = createMessage();
+ Long initialMessageID = message.getMessageId();
+
+ for (int i = 0; i < 5; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+ Long messageId = initialMessageID;
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
+ messageId++;
}
}
public void testGetLastFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 10; i++)
+ AMQMessage message = createMessage();
+ Long messageIdOffset = message.getMessageId() -1 ;
+ for (int i = 0; i < 10; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -299,87 +305,104 @@ public class SimpleAMQQueueTest extends TestCase
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i+5);
+ Long messageId = new Long(messageIdOffset + 1 + i + 5);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
-
+
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-
+
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store, new MessageFactory());
- _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-
+ msg.routingComplete(_store);
+
+
+ _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(1L);
+ AMQQueue data = _store.getMessages().get(messageId);
assertNotNull(data);
-
+
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage message = new MessageFactory().createMessage(1L, _store, true);
-
+
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- // This is a persist message but we are not in a transaction so create a new context for the message
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
-
+
entry.setMessage(message);
_queue.dequeue(null, entry);
-
+
// Check that it is dequeued
- data = _store.getMessages().get(1L);
+ data = _store.getMessages().get(messageId);
assertNull(data);
}
-
// FIXME: move this to somewhere useful
- private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
- final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false);
+ final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false);
try
{
//Safe to use a null StoreContext as we have created a TransientMessage (see false param above)
- amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
+ amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
}
catch (AMQException e)
{
// won't happen
}
-
return amqMessage;
}
+ public AMQMessage createMessage() throws AMQException
+ {
+ AMQMessage message = new TestMessage(info);
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+
+ //The createMessage above is for a Transient Message so it is safe to have no context.
+ message.setPublishAndContentHeaderBody(null, info, header);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ message.getContentHeaderBody().properties = props;
+
+ return message;
+ }
+
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessage(messageId, publishBody));
- _tag = tag;
+ super(SimpleAMQQueueTest.createMessage(publishBody));
+ _tag = getMessageId();
}
-
public boolean incrementReference()
{
_count++;
@@ -396,11 +419,5 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
-
- protected AMQMessage createMessage(Long id) throws AMQException
- {
- AMQMessage messageA = new TestMessage(id, id, info);
- return messageA;
- }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
index e37269526c..16d1ab60f3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -38,22 +38,21 @@ public class TransientMessageTest extends TestCase
AMQMessage _message;
StoreContext _storeContext = null;
- protected AMQMessage newMessage(Long id)
+ protected AMQMessage newMessage()
{
- return new MessageFactory().createMessage(id, null, false);
+ return MessageFactory.getInstance().createMessage(null, false);
}
public void testMessageID()
{
- Long id = 1L;
- _message = newMessage(id);
+ _message = newMessage();
- assertEquals("Message not set value", id, _message.getMessageId());
+ assertTrue("Message ID is not set ", _message.getMessageId() > 0L);
}
public void testInvalidContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
try
{
@@ -100,7 +99,7 @@ public class TransientMessageTest extends TestCase
public void testAddSingleContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
ContentChunk cc = new MockContentChunk(100);
@@ -138,7 +137,7 @@ public class TransientMessageTest extends TestCase
public void testAddMultipleContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
ContentChunk cc = new MockContentChunk(100);
@@ -174,14 +173,14 @@ public class TransientMessageTest extends TestCase
public void testInitialArrivalTime()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime());
}
public void testSetPublishAndContentHeaderBody_WithBody()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 100;
@@ -202,7 +201,7 @@ public class TransientMessageTest extends TestCase
public void testSetPublishAndContentHeaderBody_Null()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 0;
@@ -244,7 +243,7 @@ public class TransientMessageTest extends TestCase
public void testSetPublishAndContentHeaderBody_Empty()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 0;
@@ -283,14 +282,14 @@ public class TransientMessageTest extends TestCase
public void testIsPersistent()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertFalse(_message.isPersistent());
}
public void testImmediateAndNotDelivered()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
int bodySize = 0;
@@ -323,7 +322,7 @@ public class TransientMessageTest extends TestCase
public void testNotImmediateAndNotDelivered()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
@@ -356,7 +355,7 @@ public class TransientMessageTest extends TestCase
public void testExpiry()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
@@ -412,7 +411,7 @@ public class TransientMessageTest extends TestCase
public void testNoExpiry()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index b4ed1f8709..36f640a325 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -347,18 +347,17 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
- messageInfo,
+ currentMessage = new IncomingMessage(messageInfo,
new NonTransactionalContext(_virtualHost.getMessageStore(),
new StoreContext(), null, null),
- new InternalTestProtocolSession());
+ new InternalTestProtocolSession(),
+ _virtualHost.getMessageStore());
}
catch (AMQException e)
{
fail(e.getMessage());
}
- currentMessage.setMessageStore(_virtualHost.getMessageStore());
currentMessage.setExchange(directExchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -389,7 +388,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory());
+ currentMessage.routingComplete(_virtualHost.getMessageStore());
}
catch (AMQException e)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 9a9fe3644c..48d69c5bad 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -56,7 +56,7 @@ public class TestReferenceCounting extends TestCase
final long messageId = _store.getNewMessageId();
- AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
@@ -86,7 +86,7 @@ public class TestReferenceCounting extends TestCase
final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();