summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-13 15:09:50 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-13 15:09:50 +0000
commita8874f55a3b17ac94f38e6697bdb4bed72e7ee6c (patch)
treecae9360e132d1379b581c704b0f86724e7d2433a
parentc82b4f9475ecb2a62b6517f1b10fbefd69431600 (diff)
downloadqpid-python-a8874f55a3b17ac94f38e6697bdb4bed72e7ee6c.tar.gz
QPID-1630 : Updated MessageFactory to be responsible for assigning messagIDs. Several tests needed updating as they relied upon specified messageIDs.
Added a recovery mode to the MessageFactory where by new messages can have their ids specified but only as long as the new id is larger than the last one. Recovered messages are always persistent. Simplified a interfaces as a result of not requiring the messageID to be passed through. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@744140 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java88
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java172
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java75
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java)11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java109
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java261
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java8
22 files changed, 548 insertions, 385 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3b290b3d51..341aae2875 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.UnauthorizedAccessException;
import org.apache.qpid.server.queue.AMQMessage;
@@ -108,8 +107,6 @@ public class AMQChannel
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private MessageFactory _messageHandleFactory = new MessageFactory();
-
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
private boolean _closing;
@@ -153,8 +150,7 @@ public class AMQChannel
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
- _currentMessage.setMessageStore(_messageStore);
+ _currentMessage = new IncomingMessage(info, _txnContext, _session, _messageStore);
_currentMessage.setExchange(e);
}
@@ -178,7 +174,7 @@ public class AMQChannel
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ _currentMessage.routingComplete(_messageStore);
deliverCurrentMessageIfComplete();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index aad99da6c3..20926b99d7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -48,7 +48,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
private AMQMessage _message;
- private final Long _messageId;
private final TransactionalContext _txnContext;
private static final boolean MSG_AUTH =
@@ -72,22 +71,23 @@ public class IncomingMessage implements Filterable<RuntimeException>
private long _expiration;
private Exchange _exchange;
+ private static MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance();
- public IncomingMessage(final Long messageId,
- final MessagePublishInfo info,
+ public IncomingMessage(final MessagePublishInfo info,
final TransactionalContext txnContext,
- final AMQProtocolSession publisher)
+ final AMQProtocolSession publisher,
+ MessageStore messasgeStore)
{
- _messageId = messageId;
_messagePublishInfo = info;
_txnContext = txnContext;
_publisher = publisher;
-
+ _messageStore = messasgeStore;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _message = MESSAGE_FACTORY.createMessage(_messageStore, isPersistent());
}
public void setExpiration()
@@ -121,13 +121,10 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
- public void routingComplete(final MessageStore store,
- final MessageFactory factory) throws AMQException
+ public void routingComplete(final MessageStore store) throws AMQException
{
- final boolean persistent = isPersistent();
- _message = factory.createMessage(_messageId, store, persistent);
- if (persistent)
+ if (isPersistent())
{
_txnContext.beginTranIfNecessary();
// enqueuing the messages ensure that if required the destinations are recorded to a
@@ -138,7 +135,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
for (int i = 0; i < _destinationQueues.size(); i++)
{
store.enqueueMessage(_txnContext.getStoreContext(),
- _destinationQueues.get(i), _messageId);
+ _destinationQueues.get(i), getMessageId());
}
}
}
@@ -152,7 +149,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
// transient message data as quickly as possible
if (_logger.isDebugEnabled())
{
- _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
+ _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
}
try
@@ -284,14 +281,13 @@ public class IncomingMessage implements Filterable<RuntimeException>
return false;
}
- public void setMessageStore(final MessageStore messageStore)
- {
- _messageStore = messageStore;
- }
-
+ /**
+ * The message ID will not be assigned until the ContentHeaderBody has arrived.
+ * @return
+ */
public Long getMessageId()
{
- return _messageId;
+ return _message.getMessageId();
}
public void setExchange(final Exchange e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index e18834874f..2f6e05963c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -21,12 +21,85 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.atomic.AtomicLong;
public class MessageFactory
{
+ private AtomicLong _messageId;
+ private static MessageFactory INSTANCE;
+
+ private enum State
+ {
+ RECOVER,
+ OPEN
+ }
+
+ private State _state = State.RECOVER;
+
+ private MessageFactory()
+ {
+ _messageId = new AtomicLong(0L);
+ }
+
+ public void start()
+ {
+ _state = State.OPEN;
+ }
+
+ /**
+ * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
+ */
+ protected void enableRecover()
+ {
+ _state = State.RECOVER;
+ }
+
+
+ /**
+ * Normal message creation path
+ * @param store
+ * @param persistent
+ * @return
+ */
+ public AMQMessage createMessage(MessageStore store, boolean persistent)
+ {
+ if (_state != State.OPEN)
+ {
+ _state = State.OPEN;
+ }
+
+ return createNextMessage(_messageId.incrementAndGet(), store, persistent);
+ }
+
+ /**
+ * Used for message recovery only and so only creates persistent messages.
+ * @param messageId the id that this message must have
+ * @param store
+ * @return
+ */
+ public AMQMessage createMessage(Long messageId, MessageStore store)
+ {
+ if (_state != State.RECOVER)
+ {
+ throw new RuntimeException("Unable to create message by ID when not recovering");
+ }
+
+ long currentID = _messageId.get();
+ if (messageId <= currentID)
+ {
+ throw new RuntimeException("Message IDs can only increase current id is:"
+ + currentID + ". Requested:" + messageId);
+ }
+ else
+ {
+ _messageId.set(messageId);
+ }
+
+ return createNextMessage(messageId, store, true);
+ }
- public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
+ private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent)
{
if (persistent)
{
@@ -37,5 +110,14 @@ public class MessageFactory
return new TransientAMQMessage(messageId);
}
}
-
+
+ public static MessageFactory getInstance()
+ {
+ if (INSTANCE == null)
+ {
+ INSTANCE = new MessageFactory();
+ }
+
+ return INSTANCE;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 9de2d09b8e..425aed43d4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -167,7 +167,7 @@ public class DerbyMessageStore implements MessageStore
// this recovers durable queues and persistent messages
- _messageFactory = new MessageFactory();
+ _messageFactory = MessageFactory.getInstance();
recover();
@@ -1355,7 +1355,7 @@ public class DerbyMessageStore implements MessageStore
}
else
{
- message = _messageFactory.createMessage(messageId, this, true);
+ message = _messageFactory.createMessage(messageId, this);
_logger.error("todo must do message recovery now.");
//todo must do message recovery now.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index f2910acb77..e65dded149 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -210,13 +210,6 @@ public interface MessageStore
boolean inTran(StoreContext context);
/**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- Long getNewMessageId();
-
- /**
* Stores a chunk of message data.
*
* @param context The transactional context for the operation.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 228c99dcbd..01533d6509 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -130,7 +130,7 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ AMQMessage message = new TestMessage(deliveryTag, info);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -197,11 +197,9 @@ public class TxAckTest extends TestCase
}
}
- private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(MessagePublishInfo publishBody)
{
- final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false);
try
{
// Safe to use null here as we just created a TransientMessage above
@@ -228,10 +226,10 @@ public class TxAckTest extends TestCase
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessage(messageId, publishBody));
+ super(createMessage( publishBody));
_tag = tag;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index e0a4357990..5c54c0b57f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -21,25 +21,40 @@
package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageCleanupException;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.log4j.Logger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -48,14 +63,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private final HeadersExchange exchange = new HeadersExchange();
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
- /**
- * Not used in this test, just there to stub out the routing calls
- */
+ /** Not used in this test, just there to stub out the routing calls */
private MessageStore _store = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
- private MessageFactory _handleFactory = new MessageFactory();
+ private MessageFactory _messageFactory = MessageFactory.getInstance();
private int count;
@@ -91,12 +104,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return queue;
}
-
protected void route(Message m) throws AMQException
{
- m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
- if(m.getIncomingMessage().allContentReceived())
+ exchange.route(m.getIncomingMessage());
+ m.getIncomingMessage().routingComplete(_store);
+ if (m.getIncomingMessage().allContentReceived())
{
m.getIncomingMessage().deliverToQueues();
}
@@ -112,17 +124,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
routeAndTest(m, expectReturn, Arrays.asList(expected));
}
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
try
{
route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ assertFalse("Expected " + m + " to be returned due to manadatory flag, and lack of routing", expectReturn);
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -140,7 +147,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
catch (NoRouteException ex)
{
- assertTrue("Expected "+m+" not to be returned",expectReturn);
+ assertTrue("Expected " + m + " not to be returned", expectReturn);
}
}
@@ -177,7 +184,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static class TestQueue extends SimpleAMQQueue
{
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ // final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ final List<AMQMessage> messages = new ArrayList<AMQMessage>();
public TestQueue(AMQShortString name) throws AMQException
{
@@ -189,13 +197,15 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* We override this method so that the default behaviour, which attempts to use a delivery manager, is
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
+ *
* @param msg
+ *
* @throws AMQException
*/
@Override
public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add(msg);//new HeadersExchangeTest.Message(msg));
return new QueueEntry()
{
@@ -368,123 +378,69 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends PersistentAMQMessage
+ /** Just add some extra utility methods to AMQMessage to aid testing. */
+ static class Message
{
- private class TestIncomingMessage extends IncomingMessage
- {
-
- public TestIncomingMessage(final long messageId,
- final MessagePublishInfo info,
- final TransactionalContext txnContext,
- final AMQProtocolSession publisher)
- {
- super(messageId, info, txnContext, publisher);
- }
-
-
- public AMQMessage getUnderlyingMessage()
- {
- return Message.this;
- }
-
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return Message.this.getContentHeaderBody();
- }
- }
-
- private IncomingMessage _incoming;
private static MessageStore _messageStore = new SkeletonMessageStore();
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
null,
- new LinkedList<RequiredDeliveryException>()
+ new LinkedList<RequiredDeliveryException>()
);
- Message(String id, String... headers) throws AMQException
+ public static Message create(String id, String... headers) throws AMQException
{
- this(id, getHeaders(headers));
- }
+ ContentHeaderBody headerBody = getContentHeader(getHeaders(headers));
- Message(String id, FieldTable headers) throws AMQException
- {
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
- }
+ MessagePublishInfo mpi = getPublishRequest(id);
- public IncomingMessage getIncomingMessage()
- {
- return _incoming;
- }
-
- private Message(long messageId,
- MessagePublishInfo publish,
- ContentHeaderBody header) throws AMQException
- {
- super(messageId, _messageStore);
+ IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore);
try
{
- setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
+ incomming.setContentHeaderBody(headerBody);
}
catch (AMQException e)
{
}
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
- _incoming.setContentHeaderBody(header);
+ return new Message(incomming, mpi);
}
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg.getMessageId(), _messageStore);
-
- this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
-
- Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
-
- while(iterator.hasNext())
- {
- this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
- }
- }
-
-
+ private IncomingMessage _incoming;
+ private MessagePublishInfo _mpi;
- void route(Exchange exchange) throws AMQException
+ public Message(IncomingMessage incomming, MessagePublishInfo mpi)
{
- exchange.route(_incoming);
+ _incoming = incomming;
+ _mpi = mpi;
}
-
- public int hashCode()
+ public IncomingMessage getIncomingMessage()
{
- return getKey().hashCode();
+ return _incoming;
}
- public boolean equals(Object o)
+ public MessagePublishInfo getMessagePublishInfo()
{
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ return _mpi;
}
- private boolean equals(HeadersExchangeTest.Message m)
+ public boolean equals(Object o)
{
- return getKey().equals(m.getKey());
- }
+ if (o instanceof AMQMessage)
+ {
+ return _incoming.getMessageId().equals(((AMQMessage) o).getMessageId());
+ }
- public String toString()
- {
- return getKey().toString();
- }
+ if (o instanceof Message)
+ {
+ return _incoming.getMessageId().equals(((Message) o).getIncomingMessage().getMessageId());
+ }
- private Object getKey()
- {
- return getMessagePublishInfo().getRoutingKey();
+ return false;
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index ddf177690c..396b8c5128 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -75,7 +75,7 @@ public class DestWildExchangeTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
+ IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store);
_exchange.route(message);
@@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store, new MessageFactory());
+ message.routingComplete(_store);
message.deliverToQueues();
}
@@ -551,7 +551,7 @@ public class DestWildExchangeTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
+ IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store);
message.setContentHeaderBody( new ContentHeaderBody());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 5843e280fa..a60045eaba 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -50,21 +50,21 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q7 = bindDefault("F0000", "F0001=Bear");
TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
- routeAndTest(new Message("Message1", "F0000"), q1);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ routeAndTest(Message.create("Message1", "F0000"), q1);
+ routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(Message.create("Message6", "F0002"));
- Message m7 = new Message("Message7", "XXXXX");
+ Message m7 = Message.create("Message7", "XXXXX");
MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
pb7.setMandatory(true);
routeAndTest(m7,true);
- Message m8 = new Message("Message8", "F0000");
+ Message m8 = Message.create("Message8", "F0000");
MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -80,19 +80,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
- routeAndTest(new Message("Message1", "F0000"), q1, q3);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(Message.create("Message1", "F0000"), q1, q3);
+ routeAndTest(Message.create("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(Message.create("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(Message.create("Message6", "F0002"));
}
public void testMandatory() throws AMQException
{
bindDefault("F0000");
- Message m1 = new Message("Message1", "XXXXX");
- Message m2 = new Message("Message2", "F0000");
+
+ Message m1 = Message.create("Message1", "XXXXX");
+ Message m2 = Message.create("Message2", "F0000");
MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
pb1.setMandatory(true);
MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index ffe858f517..ba02e6f6bd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,13 +20,13 @@ package org.apache.qpid.server.queue;
*
*/
-import java.util.ArrayList;
-
+import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.framing.FieldTable;
+
+import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
@@ -44,38 +44,40 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
// Enqueue messages in order
- _queue.enqueue(null, createMessage(1L, (byte) 10));
- _queue.enqueue(null, createMessage(2L, (byte) 4));
- _queue.enqueue(null, createMessage(3L, (byte) 0));
-
+ AMQMessage message = createMessage((byte) 10);
+ Long messagIDOffset = message.getMessageId() - 1;
+ _queue.enqueue(null, message);
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 0));
+
// Enqueue messages in reverse order
- _queue.enqueue(null, createMessage(4L, (byte) 0));
- _queue.enqueue(null, createMessage(5L, (byte) 4));
- _queue.enqueue(null, createMessage(6L, (byte) 10));
-
+ _queue.enqueue(null, createMessage((byte) 0));
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 10));
+
// Enqueue messages out of order
- _queue.enqueue(null, createMessage(7L, (byte) 4));
- _queue.enqueue(null, createMessage(8L, (byte) 10));
- _queue.enqueue(null, createMessage(9L, (byte) 0));
-
+ _queue.enqueue(null, createMessage((byte) 4));
+ _queue.enqueue(null, createMessage((byte) 10));
+ _queue.enqueue(null, createMessage((byte) 0));
+
// Register subscriber
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
-
+
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
+ assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId());
+ assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId());
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId());
+ assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId());
+ assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId());
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId());
+ assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId());
+ assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId());
}
catch (AssertionFailedError afe)
{
@@ -92,25 +94,12 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
}
- protected AMQMessage createMessage(Long id, byte i) throws AMQException
+ protected AMQMessage createMessage(byte i) throws AMQException
{
- AMQMessage message = super.createMessage(id);
+ AMQMessage message = super.createMessage();
+
+ ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
- ContentHeaderBody header = new ContentHeaderBody();
- header.bodySize = MESSAGE_SIZE;
-
- //The createMessage above is for a Transient Message so it is safe to have no context.
- message.setPublishAndContentHeaderBody(null, info, header);
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setPriority(i);
- message.getContentHeaderBody().properties = props;
return message;
}
-
- protected AMQMessage createMessage(Long id) throws AMQException
- {
- return createMessage(id, (byte) 0);
- }
-
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index b159e2cda5..be40535df9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageFactory());
+ messages[i].routingComplete(_messageStore);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index a5e2da7b36..97c76c8e9d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -216,12 +216,14 @@ public class AMQQueueMBeanTest extends TestCase
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageId();
+
_queue.clearQueue(_storeContext);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageFactory());
+ msg.routingComplete(_messageStore);
+
+ long id = msg.getMessageId();
msg.addContentBodyFrame(new ContentChunk()
{
@@ -264,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -305,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageFactory());
+ currentMessage.routingComplete(_messageStore);
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index cd1ee65c0c..98465eda20 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -98,7 +98,7 @@ public class AckTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
_queue.registerSubscription(_subscription,false);
- MessageFactory factory = new MessageFactory();
+ MessageFactory factory = MessageFactory.getInstance();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -106,7 +106,7 @@ public class AckTest extends TestCase
MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false,
false, new AMQShortString("rk"));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore);
//IncomingMessage msg2 = null;
if (persistent)
{
@@ -127,7 +127,7 @@ public class AckTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, factory);
+ msg.routingComplete(_messageStore);
if(msg.allContentReceived())
{
msg.deliverToQueues();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java
index 582e2bfb00..75e9f08417 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryClassTest.java
@@ -22,27 +22,28 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
-public class MessageFactoryTest extends TestCase
+public class MessageFactoryClassTest extends TestCase
{
private MessageFactory _factory;
public void setUp()
{
- _factory = new MessageFactory();
+ _factory = MessageFactory.getInstance();
}
public void testTransientMessageCreation()
{
- AMQMessage message = _factory.createMessage(0L, null, false);
+ AMQMessage message = _factory.createMessage(null, false);
assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass());
}
public void testPersistentMessageCreation()
{
- AMQMessage message = _factory.createMessage(0L, null, true);
+ AMQMessage message = _factory.createMessage(null, true);
assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass());
}
-} \ No newline at end of file
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
new file mode 100644
index 0000000000..db0fc56303
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageFactoryRecoveryTest extends TestCase
+{
+ private MessageFactory _factory;
+
+ public void setUp()
+ {
+ _factory = MessageFactory.getInstance();
+
+ }
+
+ public void test()
+ {
+ AMQMessage message = _factory.createMessage(null, false);
+
+ _factory.enableRecover();
+
+ Long messasgeID = message.getMessageId();
+
+ try
+ {
+ _factory.createMessage(messasgeID, null);
+ fail("Cannot recreate message with an existing id");
+ }
+ catch (RuntimeException re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + messasgeID, re.getMessage());
+ }
+
+ //Check we cannot go backwords with ids.
+ try
+ {
+ _factory.createMessage(messasgeID - 1, null);
+ fail("Cannot recreate message with an old id");
+ }
+ catch (RuntimeException re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Message IDs can only increase current id is:" + messasgeID + ". Requested:" + (messasgeID - 1), re.getMessage());
+ }
+
+ //Check that we can jump forward in ids during recovery.
+ messasgeID += 100;
+ try
+ {
+ message = _factory.createMessage(messasgeID, null);
+ assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
+ }
+ catch (Exception re)
+ {
+ fail("Message with a much higher value should be created");
+ }
+
+ // End the reovery process.
+ _factory.start();
+
+ //Check we cannot still create by id after ending recovery phase
+ try
+ {
+ _factory.createMessage(messasgeID, null);
+ fail("We have left recovery mode so we cannot create by id any more");
+ }
+ catch (Exception re)
+ {
+ assertEquals("Incorrect exception thrown ",
+ "Unable to create message by ID when not recovering", re.getMessage());
+ }
+
+ // Check that the next message created has the next available id
+
+ messasgeID++;
+
+ try
+ {
+ message = _factory.createMessage(null, false);
+ assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
+ }
+ catch (Exception re)
+ {
+ fail("Message with a much higher value should be created");
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
new file mode 100644
index 0000000000..3633481012
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+
+public class MockPersistentAMQMessage extends PersistentAMQMessage
+{
+ public MockPersistentAMQMessage(long messageId, MessageStore store)
+ throws AMQException
+ {
+ super(messageId, store);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index e213be7560..fdaf2c309f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -35,15 +35,15 @@ public class PersistentMessageTest extends TransientMessageTest
}
@Override
- protected AMQMessage newMessage(Long id)
+ protected AMQMessage newMessage()
{
- return new MessageFactory().createMessage(id, _messageStore, true);
+ return MessageFactory.getInstance().createMessage(_messageStore, true);
}
@Override
public void testIsPersistent()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertTrue(_message.isPersistent());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 2dcb081739..98772e7b61 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,12 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
-
-import java.util.ArrayList;
-import java.util.List;
-
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,6 +37,9 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.List;
+
public class SimpleAMQQueueTest extends TestCase
{
@@ -54,7 +52,7 @@ public class SimpleAMQQueueTest extends TestCase
protected DirectExchange _exchange = new DirectExchange();
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
-
+
MessagePublishInfo info = new MessagePublishInfoImpl();
private static final long MESSAGE_SIZE = 100;
@@ -63,7 +61,7 @@ public class SimpleAMQQueueTest extends TestCase
{
super.setUp();
//Create Application Registry for test
- ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1);
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
_virtualHost = new VirtualHost("vhost", _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -81,97 +79,99 @@ public class SimpleAMQQueueTest extends TestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments );
+ try
+ {
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing name",
- e.getMessage().contains("name"));
+ assertTrue("Exception was not about missing name",
+ e.getMessage().contains("name"));
}
-
- try {
+
+ try
+ {
_queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing vhost",
- e.getMessage().contains("Host"));
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
- _virtualHost, _arguments);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _virtualHost, _arguments);
assertNotNull("Queue was not created", _queue);
}
-
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
-
+
public void testBinding()
{
try
{
_queue.bind(_exchange, _routingKey, null);
- assertTrue("Routing key was not bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertEquals("Queue was not bound to key",
- _exchange.getBindings().get(_routingKey).get(0),
- _queue);
- assertEquals("Exchange binding count", 1,
- _queue.getExchangeBindings().size());
- assertEquals("Wrong exchange bound", _routingKey,
- _queue.getExchangeBindings().get(0).getRoutingKey());
- assertEquals("Wrong exchange bound", _exchange,
- _queue.getExchangeBindings().get(0).getExchange());
-
+ assertTrue("Routing key was not bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertEquals("Queue was not bound to key",
+ _exchange.getBindings().get(_routingKey).get(0),
+ _queue);
+ assertEquals("Exchange binding count", 1,
+ _queue.getExchangeBindings().size());
+ assertEquals("Wrong exchange bound", _routingKey,
+ _queue.getExchangeBindings().get(0).getRoutingKey());
+ assertEquals("Wrong exchange bound", _exchange,
+ _queue.getExchangeBindings().get(0).getExchange());
+
_queue.unBind(_exchange, _routingKey, null);
- assertFalse("Routing key was still bound",
- _exchange.getBindings().containsKey(_routingKey));
- assertNull("Routing key was not empty",
- _exchange.getBindings().get(_routingKey));
+ assertFalse("Routing key was still bound",
+ _exchange.getBindings().containsKey(_routingKey));
+ assertNull("Routing key was not empty",
+ _exchange.getBindings().get(_routingKey));
}
catch (AMQException e)
{
assertNull("Unexpected exception", e);
}
}
-
+
public void testSubscription() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
-
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
+
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
assertTrue("Subscription still had queue", _subscription.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
- assertFalse("Queue still has active consumer",
- 1 == _queue.getActiveConsumerCount());
-
- AMQMessage messageB = createMessage(new Long (25));
+ assertFalse("Queue still has active consumer",
+ 1 == _queue.getActiveConsumerCount());
+
+ AMQMessage messageB = createMessage();
_queue.enqueue(null, messageB);
QueueEntry entry = _subscription.getLastSeenEntry();
assertNull(entry);
}
-
+
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
@@ -182,18 +182,18 @@ public class SimpleAMQQueueTest extends TestCase
{
// Check adding an exclusive subscription adds it to the queue
_queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
- _queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ assertEquals("Subscription did not get queue", _queue,
+ _subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ _queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- AMQMessage messageA = createMessage(new Long(24));
+ AMQMessage messageA = createMessage();
_queue.enqueue(null, messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
Exception ex = null;
@@ -203,7 +203,7 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
assertTrue(ex instanceof AMQException);
@@ -218,40 +218,40 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
}
-
- public void testAutoDeleteQueue() throws Exception
+
+ public void testAutoDeleteQueue() throws Exception
{
- _queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
- _queue.registerSubscription(_subscription, false);
- AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
- _queue.unregisterSubscription(_subscription);
- assertTrue("Queue was not deleted when subscription was removed",
- _queue.isDeleted());
+ _queue.stop();
+ _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue.registerSubscription(_subscription, false);
+ AMQMessage message = createMessage();
+ _queue.enqueue(null, message);
+ _queue.unregisterSubscription(_subscription);
+ assertTrue("Queue was not deleted when subscription was removed",
+ _queue.isDeleted());
}
-
+
public void testResend() throws Exception
{
_queue.registerSubscription(_subscription, false);
- Long id = new Long(26);
- AMQMessage message = createMessage(id);
+ AMQMessage message = createMessage();
+ Long id = message.getMessageId();
_queue.enqueue(null, message);
QueueEntry entry = _subscription.getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
-
+
}
-
+
public void testGetFirstMessageId() throws Exception
{
// Create message
- Long messageId = new Long(23);
- AMQMessage message = createMessage(messageId);
+ AMQMessage message = createMessage();
+ Long messageId = message.getMessageId();
// Put message on queue
_queue.enqueue(null, message);
@@ -264,34 +264,40 @@ public class SimpleAMQQueueTest extends TestCase
public void testGetFirstFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 5; i++)
+ // Create message
+
+ AMQMessage message = createMessage();
+ Long initialMessageID = message.getMessageId();
+
+ for (int i = 0; i < 5; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+ Long messageId = initialMessageID;
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
+ messageId++;
}
}
public void testGetLastFiveMessageIds() throws Exception
{
- for (int i = 0 ; i < 10; i++)
+ AMQMessage message = createMessage();
+ Long messageIdOffset = message.getMessageId() -1 ;
+ for (int i = 0; i < 10; i++)
{
- // Create message
- Long messageId = new Long(i);
- AMQMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(null, message);
+ // Create message
+ message = createMessage();
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -299,87 +305,104 @@ public class SimpleAMQQueueTest extends TestCase
// Check message id
for (int i = 0; i < 5; i++)
{
- Long messageId = new Long(i+5);
+ Long messageId = new Long(messageIdOffset + 1 + i + 5);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
-
+
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-
+
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store, new MessageFactory());
- _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-
+ msg.routingComplete(_store);
+
+
+ _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(1L);
+ AMQQueue data = _store.getMessages().get(messageId);
assertNotNull(data);
-
+
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage message = new MessageFactory().createMessage(1L, _store, true);
-
+
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- // This is a persist message but we are not in a transaction so create a new context for the message
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
-
+
entry.setMessage(message);
_queue.dequeue(null, entry);
-
+
// Check that it is dequeued
- data = _store.getMessages().get(1L);
+ data = _store.getMessages().get(messageId);
assertNull(data);
}
-
// FIXME: move this to somewhere useful
- private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
- final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false);
+ final AMQMessage amqMessage = (MessageFactory.getInstance()).createMessage(null, false);
try
{
//Safe to use a null StoreContext as we have created a TransientMessage (see false param above)
- amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
+ amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
}
catch (AMQException e)
{
// won't happen
}
-
return amqMessage;
}
+ public AMQMessage createMessage() throws AMQException
+ {
+ AMQMessage message = new TestMessage(info);
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+
+ //The createMessage above is for a Transient Message so it is safe to have no context.
+ message.setPublishAndContentHeaderBody(null, info, header);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ message.getContentHeaderBody().properties = props;
+
+ return message;
+ }
+
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessage(messageId, publishBody));
- _tag = tag;
+ super(SimpleAMQQueueTest.createMessage(publishBody));
+ _tag = getMessageId();
}
-
public boolean incrementReference()
{
_count++;
@@ -396,11 +419,5 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
-
- protected AMQMessage createMessage(Long id) throws AMQException
- {
- AMQMessage messageA = new TestMessage(id, id, info);
- return messageA;
- }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
index e37269526c..16d1ab60f3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -38,22 +38,21 @@ public class TransientMessageTest extends TestCase
AMQMessage _message;
StoreContext _storeContext = null;
- protected AMQMessage newMessage(Long id)
+ protected AMQMessage newMessage()
{
- return new MessageFactory().createMessage(id, null, false);
+ return MessageFactory.getInstance().createMessage(null, false);
}
public void testMessageID()
{
- Long id = 1L;
- _message = newMessage(id);
+ _message = newMessage();
- assertEquals("Message not set value", id, _message.getMessageId());
+ assertTrue("Message ID is not set ", _message.getMessageId() > 0L);
}
public void testInvalidContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
try
{
@@ -100,7 +99,7 @@ public class TransientMessageTest extends TestCase
public void testAddSingleContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
ContentChunk cc = new MockContentChunk(100);
@@ -138,7 +137,7 @@ public class TransientMessageTest extends TestCase
public void testAddMultipleContentChunk()
{
- _message = newMessage(1L);
+ _message = newMessage();
ContentChunk cc = new MockContentChunk(100);
@@ -174,14 +173,14 @@ public class TransientMessageTest extends TestCase
public void testInitialArrivalTime()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime());
}
public void testSetPublishAndContentHeaderBody_WithBody()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 100;
@@ -202,7 +201,7 @@ public class TransientMessageTest extends TestCase
public void testSetPublishAndContentHeaderBody_Null()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 0;
@@ -244,7 +243,7 @@ public class TransientMessageTest extends TestCase
public void testSetPublishAndContentHeaderBody_Empty()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl();
int bodySize = 0;
@@ -283,14 +282,14 @@ public class TransientMessageTest extends TestCase
public void testIsPersistent()
{
- _message = newMessage(1L);
+ _message = newMessage();
assertFalse(_message.isPersistent());
}
public void testImmediateAndNotDelivered()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
int bodySize = 0;
@@ -323,7 +322,7 @@ public class TransientMessageTest extends TestCase
public void testNotImmediateAndNotDelivered()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
@@ -356,7 +355,7 @@ public class TransientMessageTest extends TestCase
public void testExpiry()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
@@ -412,7 +411,7 @@ public class TransientMessageTest extends TestCase
public void testNoExpiry()
{
- _message = newMessage(1L);
+ _message = newMessage();
MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
int bodySize = 0;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index b4ed1f8709..36f640a325 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -347,18 +347,17 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
- messageInfo,
+ currentMessage = new IncomingMessage(messageInfo,
new NonTransactionalContext(_virtualHost.getMessageStore(),
new StoreContext(), null, null),
- new InternalTestProtocolSession());
+ new InternalTestProtocolSession(),
+ _virtualHost.getMessageStore());
}
catch (AMQException e)
{
fail(e.getMessage());
}
- currentMessage.setMessageStore(_virtualHost.getMessageStore());
currentMessage.setExchange(directExchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -389,7 +388,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory());
+ currentMessage.routingComplete(_virtualHost.getMessageStore());
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 9a9fe3644c..48d69c5bad 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -56,7 +56,7 @@ public class TestReferenceCounting extends TestCase
final long messageId = _store.getNewMessageId();
- AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
@@ -86,7 +86,7 @@ public class TestReferenceCounting extends TestCase
final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 92a142e402..dd71b2d0da 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -251,14 +251,6 @@ public class SlowMessageStore implements MessageStore
return b;
}
- public Long getNewMessageId()
- {
- doPreDelay("getNewMessageId");
- Long l = _realStore.getNewMessageId();
- doPostDelay("getNewMessageId");
- return l;
- }
-
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
doPreDelay("storeContentBodyChunk");