diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java | 88 |
1 files changed, 85 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index e18834874f..2f6e05963c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -21,12 +21,85 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; + +import java.util.concurrent.atomic.AtomicLong; public class MessageFactory { + private AtomicLong _messageId; + private static MessageFactory INSTANCE; + + private enum State + { + RECOVER, + OPEN + } + + private State _state = State.RECOVER; + + private MessageFactory() + { + _messageId = new AtomicLong(0L); + } + + public void start() + { + _state = State.OPEN; + } + + /** + * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode. + */ + protected void enableRecover() + { + _state = State.RECOVER; + } + + + /** + * Normal message creation path + * @param store + * @param persistent + * @return + */ + public AMQMessage createMessage(MessageStore store, boolean persistent) + { + if (_state != State.OPEN) + { + _state = State.OPEN; + } + + return createNextMessage(_messageId.incrementAndGet(), store, persistent); + } + + /** + * Used for message recovery only and so only creates persistent messages. + * @param messageId the id that this message must have + * @param store + * @return + */ + public AMQMessage createMessage(Long messageId, MessageStore store) + { + if (_state != State.RECOVER) + { + throw new RuntimeException("Unable to create message by ID when not recovering"); + } + + long currentID = _messageId.get(); + if (messageId <= currentID) + { + throw new RuntimeException("Message IDs can only increase current id is:" + + currentID + ". Requested:" + messageId); + } + else + { + _messageId.set(messageId); + } + + return createNextMessage(messageId, store, true); + } - public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent) + private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent) { if (persistent) { @@ -37,5 +110,14 @@ public class MessageFactory return new TransientAMQMessage(messageId); } } - + + public static MessageFactory getInstance() + { + if (INSTANCE == null) + { + INSTANCE = new MessageFactory(); + } + + return INSTANCE; + } } |