summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java88
1 files changed, 85 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index e18834874f..2f6e05963c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -21,12 +21,85 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.atomic.AtomicLong;
public class MessageFactory
{
+ private AtomicLong _messageId;
+ private static MessageFactory INSTANCE;
+
+ private enum State
+ {
+ RECOVER,
+ OPEN
+ }
+
+ private State _state = State.RECOVER;
+
+ private MessageFactory()
+ {
+ _messageId = new AtomicLong(0L);
+ }
+
+ public void start()
+ {
+ _state = State.OPEN;
+ }
+
+ /**
+ * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
+ */
+ protected void enableRecover()
+ {
+ _state = State.RECOVER;
+ }
+
+
+ /**
+ * Normal message creation path
+ * @param store
+ * @param persistent
+ * @return
+ */
+ public AMQMessage createMessage(MessageStore store, boolean persistent)
+ {
+ if (_state != State.OPEN)
+ {
+ _state = State.OPEN;
+ }
+
+ return createNextMessage(_messageId.incrementAndGet(), store, persistent);
+ }
+
+ /**
+ * Used for message recovery only and so only creates persistent messages.
+ * @param messageId the id that this message must have
+ * @param store
+ * @return
+ */
+ public AMQMessage createMessage(Long messageId, MessageStore store)
+ {
+ if (_state != State.RECOVER)
+ {
+ throw new RuntimeException("Unable to create message by ID when not recovering");
+ }
+
+ long currentID = _messageId.get();
+ if (messageId <= currentID)
+ {
+ throw new RuntimeException("Message IDs can only increase current id is:"
+ + currentID + ". Requested:" + messageId);
+ }
+ else
+ {
+ _messageId.set(messageId);
+ }
+
+ return createNextMessage(messageId, store, true);
+ }
- public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
+ private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent)
{
if (persistent)
{
@@ -37,5 +110,14 @@ public class MessageFactory
return new TransientAMQMessage(messageId);
}
}
-
+
+ public static MessageFactory getInstance()
+ {
+ if (INSTANCE == null)
+ {
+ INSTANCE = new MessageFactory();
+ }
+
+ return INSTANCE;
+ }
}