diff options
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid')
16 files changed, 444 insertions, 340 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 228c99dcbd..01533d6509 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -130,7 +130,7 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + AMQMessage message = new TestMessage(deliveryTag, info); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; @@ -197,11 +197,9 @@ public class TxAckTest extends TestCase } } - private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(MessagePublishInfo publishBody) { - final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, - null, - false); + final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false); try { // Safe to use null here as we just created a TransientMessage above @@ -228,10 +226,10 @@ public class TxAckTest extends TestCase private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + TestMessage(long tag, MessagePublishInfo publishBody) throws AMQException { - super(createMessage(messageId, publishBody)); + super(createMessage( publishBody)); _tag = tag; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index e0a4357990..5c54c0b57f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -21,25 +21,40 @@ package org.apache.qpid.server.exchange; import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.FailedDequeueException; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.MessageCleanupException; +import org.apache.qpid.server.queue.MessageFactory; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.log4j.Logger; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; public class AbstractHeadersExchangeTestBase extends TestCase { @@ -48,14 +63,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase private final HeadersExchange exchange = new HeadersExchange(); protected final Set<TestQueue> queues = new HashSet<TestQueue>(); - /** - * Not used in this test, just there to stub out the routing calls - */ + /** Not used in this test, just there to stub out the routing calls */ private MessageStore _store = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); - private MessageFactory _handleFactory = new MessageFactory(); + private MessageFactory _messageFactory = MessageFactory.getInstance(); private int count; @@ -91,12 +104,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return queue; } - protected void route(Message m) throws AMQException { - m.route(exchange); - m.getIncomingMessage().routingComplete(_store, _handleFactory); - if(m.getIncomingMessage().allContentReceived()) + exchange.route(m.getIncomingMessage()); + m.getIncomingMessage().routingComplete(_store); + if (m.getIncomingMessage().allContentReceived()) { m.getIncomingMessage().deliverToQueues(); } @@ -112,17 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase routeAndTest(m, expectReturn, Arrays.asList(expected)); } - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException { try { route(m); - assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + assertFalse("Expected " + m + " to be returned due to manadatory flag, and lack of routing", expectReturn); for (TestQueue q : queues) { if (expected.contains(q)) @@ -140,7 +147,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase catch (NoRouteException ex) { - assertTrue("Expected "+m+" not to be returned",expectReturn); + assertTrue("Expected " + m + " not to be returned", expectReturn); } } @@ -177,7 +184,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase static class TestQueue extends SimpleAMQQueue { - final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + // final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + final List<AMQMessage> messages = new ArrayList<AMQMessage>(); public TestQueue(AMQShortString name) throws AMQException { @@ -189,13 +197,15 @@ public class AbstractHeadersExchangeTestBase extends TestCase * We override this method so that the default behaviour, which attempts to use a delivery manager, is * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. + * * @param msg + * * @throws AMQException */ @Override public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg)); + messages.add(msg);//new HeadersExchangeTest.Message(msg)); return new QueueEntry() { @@ -368,123 +378,69 @@ public class AbstractHeadersExchangeTestBase extends TestCase } - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends PersistentAMQMessage + /** Just add some extra utility methods to AMQMessage to aid testing. */ + static class Message { - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final TransactionalContext txnContext, - final AMQProtocolSession publisher) - { - super(messageId, info, txnContext, publisher); - } - - - public AMQMessage getUnderlyingMessage() - { - return Message.this; - } - - - public ContentHeaderBody getContentHeaderBody() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; private static MessageStore _messageStore = new SkeletonMessageStore(); private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, - new LinkedList<RequiredDeliveryException>() + new LinkedList<RequiredDeliveryException>() ); - Message(String id, String... headers) throws AMQException + public static Message create(String id, String... headers) throws AMQException { - this(id, getHeaders(headers)); - } + ContentHeaderBody headerBody = getContentHeader(getHeaders(headers)); - Message(String id, FieldTable headers) throws AMQException - { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers)); - } + MessagePublishInfo mpi = getPublishRequest(id); - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(long messageId, - MessagePublishInfo publish, - ContentHeaderBody header) throws AMQException - { - super(messageId, _messageStore); + IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore); try { - setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header); + incomming.setContentHeaderBody(headerBody); } catch (AMQException e) { } - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); - _incoming.setContentHeaderBody(header); + return new Message(incomming, mpi); } - private Message(AMQMessage msg) throws AMQException - { - super(msg.getMessageId(), _messageStore); - - this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody()); - - Iterator<ContentChunk> iterator = msg.getContentBodyIterator(); - - while(iterator.hasNext()) - { - this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext()); - } - } - - + private IncomingMessage _incoming; + private MessagePublishInfo _mpi; - void route(Exchange exchange) throws AMQException + public Message(IncomingMessage incomming, MessagePublishInfo mpi) { - exchange.route(_incoming); + _incoming = incomming; + _mpi = mpi; } - - public int hashCode() + public IncomingMessage getIncomingMessage() { - return getKey().hashCode(); + return _incoming; } - public boolean equals(Object o) + public MessagePublishInfo getMessagePublishInfo() { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + return _mpi; } - private boolean equals(HeadersExchangeTest.Message m) + public boolean equals(Object o) { - return getKey().equals(m.getKey()); - } + if (o instanceof AMQMessage) + { + return _incoming.getMessageId().equals(((AMQMessage) o).getMessageId()); + } - public String toString() - { - return getKey().toString(); - } + if (o instanceof Message) + { + return _incoming.getMessageId().equals(((Message) o).getIncomingMessage().getMessageId()); + } - private Object getKey() - { - return getMessagePublishInfo().getRoutingKey(); + return false; } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index ddf177690c..396b8c5128 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -75,7 +75,7 @@ public class DestWildExchangeTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b")); - IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession); + IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store); _exchange.route(message); @@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase throws AMQException { _exchange.route(message); - message.routingComplete(_store, new MessageFactory()); + message.routingComplete(_store); message.deliverToQueues(); } @@ -551,7 +551,7 @@ public class DestWildExchangeTest extends TestCase new LinkedList<RequiredDeliveryException>() ); - IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession); + IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store); message.setContentHeaderBody( new ContentHeaderBody()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 5843e280fa..a60045eaba 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -50,21 +50,21 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase TestQueue q7 = bindDefault("F0000", "F0001=Bear"); TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - routeAndTest(new Message("Message1", "F0000"), q1); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), + routeAndTest(Message.create("Message1", "F0000"), q1); + routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2); + routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); + routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); + routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message("Message6", "F0002")); + routeAndTest(Message.create("Message6", "F0002")); - Message m7 = new Message("Message7", "XXXXX"); + Message m7 = Message.create("Message7", "XXXXX"); MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); pb7.setMandatory(true); routeAndTest(m7,true); - Message m8 = new Message("Message8", "F0000"); + Message m8 = Message.create("Message8", "F0000"); MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); pb8.setMandatory(true); routeAndTest(m8,false,q1); @@ -80,19 +80,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - routeAndTest(new Message("Message1", "F0000"), q1, q3); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message6", "F0002")); + routeAndTest(Message.create("Message1", "F0000"), q1, q3); + routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2, q3, q4); + routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); + routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(Message.create("Message6", "F0002")); } public void testMandatory() throws AMQException { bindDefault("F0000"); - Message m1 = new Message("Message1", "XXXXX"); - Message m2 = new Message("Message2", "F0000"); + + Message m1 = Message.create("Message1", "XXXXX"); + Message m2 = Message.create("Message2", "F0000"); MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); pb1.setMandatory(true); MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index ffe858f517..ba02e6f6bd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -20,13 +20,13 @@ package org.apache.qpid.server.queue; * */ -import java.util.ArrayList; - +import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ContentHeaderBody; -import junit.framework.AssertionFailedError; +import org.apache.qpid.framing.FieldTable; + +import java.util.ArrayList; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { @@ -44,38 +44,40 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest { // Enqueue messages in order - _queue.enqueue(null, createMessage(1L, (byte) 10)); - _queue.enqueue(null, createMessage(2L, (byte) 4)); - _queue.enqueue(null, createMessage(3L, (byte) 0)); - + AMQMessage message = createMessage((byte) 10); + Long messagIDOffset = message.getMessageId() - 1; + _queue.enqueue(null, message); + _queue.enqueue(null, createMessage((byte) 4)); + _queue.enqueue(null, createMessage((byte) 0)); + // Enqueue messages in reverse order - _queue.enqueue(null, createMessage(4L, (byte) 0)); - _queue.enqueue(null, createMessage(5L, (byte) 4)); - _queue.enqueue(null, createMessage(6L, (byte) 10)); - + _queue.enqueue(null, createMessage((byte) 0)); + _queue.enqueue(null, createMessage((byte) 4)); + _queue.enqueue(null, createMessage((byte) 10)); + // Enqueue messages out of order - _queue.enqueue(null, createMessage(7L, (byte) 4)); - _queue.enqueue(null, createMessage(8L, (byte) 10)); - _queue.enqueue(null, createMessage(9L, (byte) 0)); - + _queue.enqueue(null, createMessage((byte) 4)); + _queue.enqueue(null, createMessage((byte) 10)); + _queue.enqueue(null, createMessage((byte) 0)); + // Register subscriber _queue.registerSubscription(_subscription, false); Thread.sleep(150); - + ArrayList<QueueEntry> msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId()); + assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId()); + assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId()); - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId()); + assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId()); + assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId()); - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId()); + assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId()); + assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId()); } catch (AssertionFailedError afe) { @@ -92,25 +94,12 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest } - protected AMQMessage createMessage(Long id, byte i) throws AMQException + protected AMQMessage createMessage(byte i) throws AMQException { - AMQMessage message = super.createMessage(id); + AMQMessage message = super.createMessage(); + + ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i); - ContentHeaderBody header = new ContentHeaderBody(); - header.bodySize = MESSAGE_SIZE; - - //The createMessage above is for a Transient Message so it is safe to have no context. - message.setPublishAndContentHeaderBody(null, info, header); - - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setPriority(i); - message.getContentHeaderBody().properties = props; return message; } - - protected AMQMessage createMessage(Long id) throws AMQException - { - return createMessage(id, (byte) 0); - } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index b159e2cda5..be40535df9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = size; // in bytes - IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); + IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore); message.setContentHeaderBody(contentHeaderBody); return message; @@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore, new MessageFactory()); + messages[i].routingComplete(_messageStore); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index a5e2da7b36..97c76c8e9d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -216,12 +216,14 @@ public class AMQQueueMBeanTest extends TestCase } IncomingMessage msg = message(false, false); - long id = msg.getMessageId(); + _queue.clearQueue(_storeContext); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, new MessageFactory()); + msg.routingComplete(_messageStore); + + long id = msg.getMessageId(); msg.addContentBodyFrame(new ContentChunk() { @@ -264,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); + IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore); msg.setContentHeaderBody(contentHeaderBody); return msg; @@ -305,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, new MessageFactory()); + currentMessage.routingComplete(_messageStore); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index cd1ee65c0c..98465eda20 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -98,7 +98,7 @@ public class AckTest extends TestCase new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageFactory factory = new MessageFactory(); + MessageFactory factory = MessageFactory.getInstance(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -106,7 +106,7 @@ public class AckTest extends TestCase MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false, false, new AMQShortString("rk")); - IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore); //IncomingMessage msg2 = null; if (persistent) { @@ -127,7 +127,7 @@ public class AckTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, factory); + msg.routingComplete(_messageStore); if(msg.allContentReceived()) { msg.deliverToQueues(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java index 582e2bfb00..75e9f08417 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java @@ -22,27 +22,28 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; -public class MessageFactoryTest extends TestCase +public class MessageFactoryClassTest extends TestCase { private MessageFactory _factory; public void setUp() { - _factory = new MessageFactory(); + _factory = MessageFactory.getInstance(); } public void testTransientMessageCreation() { - AMQMessage message = _factory.createMessage(0L, null, false); + AMQMessage message = _factory.createMessage(null, false); assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass()); } public void testPersistentMessageCreation() { - AMQMessage message = _factory.createMessage(0L, null, true); + AMQMessage message = _factory.createMessage(null, true); assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass()); } -}
\ No newline at end of file + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java new file mode 100644 index 0000000000..db0fc56303 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; + +public class MessageFactoryRecoveryTest extends TestCase +{ + private MessageFactory _factory; + + public void setUp() + { + _factory = MessageFactory.getInstance(); + + } + + public void test() + { + AMQMessage message = _factory.createMessage(null, false); + + _factory.enableRecover(); + + Long messasgeID = message.getMessageId(); + + try + { + _factory.createMessage(messasgeID, null); + fail("Cannot recreate message with an existing id"); + } + catch (RuntimeException re) + { + assertEquals("Incorrect exception thrown ", + "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + messasgeID, re.getMessage()); + } + + //Check we cannot go backwords with ids. + try + { + _factory.createMessage(messasgeID - 1, null); + fail("Cannot recreate message with an old id"); + } + catch (RuntimeException re) + { + assertEquals("Incorrect exception thrown ", + "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + (messasgeID - 1), re.getMessage()); + } + + //Check that we can jump forward in ids during recovery. + messasgeID += 100; + try + { + message = _factory.createMessage(messasgeID, null); + assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); + } + catch (Exception re) + { + fail("Message with a much higher value should be created"); + } + + // End the reovery process. + _factory.start(); + + //Check we cannot still create by id after ending recovery phase + try + { + _factory.createMessage(messasgeID, null); + fail("We have left recovery mode so we cannot create by id any more"); + } + catch (Exception re) + { + assertEquals("Incorrect exception thrown ", + "Unable to create message by ID when not recovering", re.getMessage()); + } + + // Check that the next message created has the next available id + + messasgeID++; + + try + { + message = _factory.createMessage(null, false); + assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); + } + catch (Exception re) + { + fail("Message with a much higher value should be created"); + } + + } + +}
\ No newline at end of file diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java new file mode 100644 index 0000000000..3633481012 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; + +public class MockPersistentAMQMessage extends PersistentAMQMessage +{ + public MockPersistentAMQMessage(long messageId, MessageStore store) + throws AMQException + { + super(messageId, store); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index e213be7560..fdaf2c309f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -35,15 +35,15 @@ public class PersistentMessageTest extends TransientMessageTest } @Override - protected AMQMessage newMessage(Long id) + protected AMQMessage newMessage() { - return new MessageFactory().createMessage(id, _messageStore, true); + return MessageFactory.getInstance().createMessage(_messageStore, true); } @Override public void testIsPersistent() { - _message = newMessage(1L); + _message = newMessage(); assertTrue(_message.isPersistent()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 2dcb081739..98772e7b61 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,12 +20,7 @@ package org.apache.qpid.server.queue; * */ - -import java.util.ArrayList; -import java.util.List; - import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -42,6 +37,9 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.ArrayList; +import java.util.List; + public class SimpleAMQQueueTest extends TestCase { @@ -54,7 +52,7 @@ public class SimpleAMQQueueTest extends TestCase protected DirectExchange _exchange = new DirectExchange(); protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; - + MessagePublishInfo info = new MessagePublishInfoImpl(); private static final long MESSAGE_SIZE = 100; @@ -63,7 +61,7 @@ public class SimpleAMQQueueTest extends TestCase { super.setUp(); //Create Application Registry for test - ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); + ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1); _virtualHost = new VirtualHost("vhost", _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); @@ -81,97 +79,99 @@ public class SimpleAMQQueueTest extends TestCase public void testCreateQueue() throws AMQException { _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments ); + try + { + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing name", - e.getMessage().contains("name")); + assertTrue("Exception was not about missing name", + e.getMessage().contains("name")); } - - try { + + try + { _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing vhost", - e.getMessage().contains("Host")); + assertTrue("Exception was not about missing vhost", + e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, - _virtualHost, _arguments); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, + _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } - + public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } - + public void testBinding() { try { _queue.bind(_exchange, _routingKey, null); - assertTrue("Routing key was not bound", - _exchange.getBindings().containsKey(_routingKey)); - assertEquals("Queue was not bound to key", - _exchange.getBindings().get(_routingKey).get(0), - _queue); - assertEquals("Exchange binding count", 1, - _queue.getExchangeBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, - _queue.getExchangeBindings().get(0).getRoutingKey()); - assertEquals("Wrong exchange bound", _exchange, - _queue.getExchangeBindings().get(0).getExchange()); - + assertTrue("Routing key was not bound", + _exchange.getBindings().containsKey(_routingKey)); + assertEquals("Queue was not bound to key", + _exchange.getBindings().get(_routingKey).get(0), + _queue); + assertEquals("Exchange binding count", 1, + _queue.getExchangeBindings().size()); + assertEquals("Wrong exchange bound", _routingKey, + _queue.getExchangeBindings().get(0).getRoutingKey()); + assertEquals("Wrong exchange bound", _exchange, + _queue.getExchangeBindings().get(0).getExchange()); + _queue.unBind(_exchange, _routingKey, null); - assertFalse("Routing key was still bound", - _exchange.getBindings().containsKey(_routingKey)); - assertNull("Routing key was not empty", - _exchange.getBindings().get(_routingKey)); + assertFalse("Routing key was still bound", + _exchange.getBindings().containsKey(_routingKey)); + assertNull("Routing key was not empty", + _exchange.getBindings().get(_routingKey)); } catch (AMQException e) { assertNull("Unexpected exception", e); } } - + public void testSubscription() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Subscription did not get queue", _queue, + _subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); + // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); assertTrue("Subscription still had queue", _subscription.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); - - AMQMessage messageB = createMessage(new Long (25)); + assertFalse("Queue still has active consumer", + 1 == _queue.getActiveConsumerCount()); + + AMQMessage messageB = createMessage(); _queue.enqueue(null, messageB); QueueEntry entry = _subscription.getLastSeenEntry(); assertNull(entry); } - + public void testQueueNoSubscriber() throws AMQException, InterruptedException { - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); @@ -182,18 +182,18 @@ public class SimpleAMQQueueTest extends TestCase { // Check adding an exclusive subscription adds it to the queue _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + assertEquals("Subscription did not get queue", _queue, + _subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, + _queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageA = createMessage(); _queue.enqueue(null, messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); Exception ex = null; @@ -203,7 +203,7 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); assertTrue(ex instanceof AMQException); @@ -218,40 +218,40 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); } - - public void testAutoDeleteQueue() throws Exception + + public void testAutoDeleteQueue() throws Exception { - _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); - _queue.registerSubscription(_subscription, false); - AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); - _queue.unregisterSubscription(_subscription); - assertTrue("Queue was not deleted when subscription was removed", - _queue.isDeleted()); + _queue.stop(); + _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); + _queue.registerSubscription(_subscription, false); + AMQMessage message = createMessage(); + _queue.enqueue(null, message); + _queue.unregisterSubscription(_subscription); + assertTrue("Queue was not deleted when subscription was removed", + _queue.isDeleted()); } - + public void testResend() throws Exception { _queue.registerSubscription(_subscription, false); - Long id = new Long(26); - AMQMessage message = createMessage(id); + AMQMessage message = createMessage(); + Long id = message.getMessageId(); _queue.enqueue(null, message); QueueEntry entry = _subscription.getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); - + } - + public void testGetFirstMessageId() throws Exception { // Create message - Long messageId = new Long(23); - AMQMessage message = createMessage(messageId); + AMQMessage message = createMessage(); + Long messageId = message.getMessageId(); // Put message on queue _queue.enqueue(null, message); @@ -264,34 +264,40 @@ public class SimpleAMQQueueTest extends TestCase public void testGetFirstFiveMessageIds() throws Exception { - for (int i = 0 ; i < 5; i++) + // Create message + + AMQMessage message = createMessage(); + Long initialMessageID = message.getMessageId(); + + for (int i = 0; i < 5; i++) { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(null, message); + // Create message + message = createMessage(); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); + Long messageId = initialMessageID; // Check message id for (int i = 0; i < 5; i++) { - Long messageId = new Long(i); assertEquals("Message ID was wrong", messageId, msgids.get(i)); + messageId++; } } public void testGetLastFiveMessageIds() throws Exception { - for (int i = 0 ; i < 10; i++) + AMQMessage message = createMessage(); + Long messageIdOffset = message.getMessageId() -1 ; + for (int i = 0; i < 10; i++) { - // Create message - Long messageId = new Long(i); - AMQMessage message = createMessage(messageId); // Put message on queue _queue.enqueue(null, message); + // Create message + message = createMessage(); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -299,87 +305,104 @@ public class SimpleAMQQueueTest extends TestCase // Check message id for (int i = 0; i < 5; i++) { - Long messageId = new Long(i+5); + Long messageId = new Long(messageIdOffset + 1 + i + 5); assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } - + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); + + long messageId = msg.getMessageId(); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - + // Send persistent message qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_store, new MessageFactory()); - _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); - + msg.routingComplete(_store); + + + _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); + // Check that it is enqueued - AMQQueue data = _store.getMessages().get(1L); + AMQQueue data = _store.getMessages().get(messageId); assertNotNull(data); - + // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage message = new MessageFactory().createMessage(1L, _store, true); - + ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; - // This is a persist message but we are not in a transaction so create a new context for the message + AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - + entry.setMessage(message); _queue.dequeue(null, entry); - + // Check that it is dequeued - data = _store.getMessages().get(1L); + data = _store.getMessages().get(messageId); assertNull(data); } - // FIXME: move this to somewhere useful - private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final MessagePublishInfo publishBody) { - final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false); + final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false); try { //Safe to use a null StoreContext as we have created a TransientMessage (see false param above) - amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); + amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); } catch (AMQException e) { // won't happen } - return amqMessage; } + public AMQMessage createMessage() throws AMQException + { + AMQMessage message = new TestMessage(info); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + + //The createMessage above is for a Transient Message so it is safe to have no context. + message.setPublishAndContentHeaderBody(null, info, header); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + message.getContentHeaderBody().properties = props; + + return message; + } + public class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody) + TestMessage(MessagePublishInfo publishBody) throws AMQException { - super(createMessage(messageId, publishBody)); - _tag = tag; + super(SimpleAMQQueueTest.createMessage(publishBody)); + _tag = getMessageId(); } - public boolean incrementReference() { _count++; @@ -396,11 +419,5 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } - - protected AMQMessage createMessage(Long id) throws AMQException - { - AMQMessage messageA = new TestMessage(id, id, info); - return messageA; - } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java index e37269526c..16d1ab60f3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java @@ -38,22 +38,21 @@ public class TransientMessageTest extends TestCase AMQMessage _message; StoreContext _storeContext = null; - protected AMQMessage newMessage(Long id) + protected AMQMessage newMessage() { - return new MessageFactory().createMessage(id, null, false); + return MessageFactory.getInstance().createMessage(null, false); } public void testMessageID() { - Long id = 1L; - _message = newMessage(id); + _message = newMessage(); - assertEquals("Message not set value", id, _message.getMessageId()); + assertTrue("Message ID is not set ", _message.getMessageId() > 0L); } public void testInvalidContentChunk() { - _message = newMessage(1L); + _message = newMessage(); try { @@ -100,7 +99,7 @@ public class TransientMessageTest extends TestCase public void testAddSingleContentChunk() { - _message = newMessage(1L); + _message = newMessage(); ContentChunk cc = new MockContentChunk(100); @@ -138,7 +137,7 @@ public class TransientMessageTest extends TestCase public void testAddMultipleContentChunk() { - _message = newMessage(1L); + _message = newMessage(); ContentChunk cc = new MockContentChunk(100); @@ -174,14 +173,14 @@ public class TransientMessageTest extends TestCase public void testInitialArrivalTime() { - _message = newMessage(1L); + _message = newMessage(); assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime()); } public void testSetPublishAndContentHeaderBody_WithBody() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(); int bodySize = 100; @@ -202,7 +201,7 @@ public class TransientMessageTest extends TestCase public void testSetPublishAndContentHeaderBody_Null() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(); int bodySize = 0; @@ -244,7 +243,7 @@ public class TransientMessageTest extends TestCase public void testSetPublishAndContentHeaderBody_Empty() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(); int bodySize = 0; @@ -283,14 +282,14 @@ public class TransientMessageTest extends TestCase public void testIsPersistent() { - _message = newMessage(1L); + _message = newMessage(); assertFalse(_message.isPersistent()); } public void testImmediateAndNotDelivered() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null); int bodySize = 0; @@ -323,7 +322,7 @@ public class TransientMessageTest extends TestCase public void testNotImmediateAndNotDelivered() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); int bodySize = 0; @@ -356,7 +355,7 @@ public class TransientMessageTest extends TestCase public void testExpiry() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); int bodySize = 0; @@ -412,7 +411,7 @@ public class TransientMessageTest extends TestCase public void testNoExpiry() { - _message = newMessage(1L); + _message = newMessage(); MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); int bodySize = 0; diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index b4ed1f8709..36f640a325 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -347,18 +347,17 @@ public class MessageStoreTest extends TestCase try { - currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(), - messageInfo, + currentMessage = new IncomingMessage(messageInfo, new NonTransactionalContext(_virtualHost.getMessageStore(), new StoreContext(), null, null), - new InternalTestProtocolSession()); + new InternalTestProtocolSession(), + _virtualHost.getMessageStore()); } catch (AMQException e) { fail(e.getMessage()); } - currentMessage.setMessageStore(_virtualHost.getMessageStore()); currentMessage.setExchange(directExchange); ContentHeaderBody headerBody = new ContentHeaderBody(); @@ -389,7 +388,7 @@ public class MessageStoreTest extends TestCase try { - currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory()); + currentMessage.routingComplete(_virtualHost.getMessageStore()); } catch (AMQException e) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 9a9fe3644c..48d69c5bad 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -56,7 +56,7 @@ public class TestReferenceCounting extends TestCase final long messageId = _store.getNewMessageId(); - AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); @@ -86,7 +86,7 @@ public class TestReferenceCounting extends TestCase final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); |