summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java35
1 files changed, 11 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index ae88e3e9f7..7ac89fcc81 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
@@ -38,17 +37,14 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
@@ -102,7 +98,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
try
{
AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
-
+
if (q == null)
{
q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
@@ -120,9 +116,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
q.setAlternateExchange(altExchange);
}
}
-
+
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
-
+
//Record that we have a queue for recovery
_queueRecoveries.put(queueName, 0);
}
@@ -169,21 +165,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- ServerMessage serverMessage;
- switch(message.getMetaData().getType())
+ final MessageMetaDataType type = message.getMetaData().getType();
+ if(type == null)
{
- case META_DATA_0_8:
- serverMessage = new AMQMessage(message);
- break;
- case META_DATA_0_10:
- serverMessage = new MessageTransferMessage(message, null);
- break;
- case META_DATA_1_0:
- serverMessage = new Message_1_0(message);
- break;
- default:
- throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
+ throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
}
+ ServerMessage serverMessage = type.createMessage(message);
_recoveredMessages.put(message.getMessageNumber(), serverMessage);
_unusedMessages.put(message.getMessageNumber(), message);
@@ -252,7 +239,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(messageId)));
-
+
}
}
@@ -277,9 +264,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(message != null)
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
+
entry.acquire();
-
+
branch.dequeue(queue, message);
branch.addPostTransactionAcion(new ServerTransaction.Action()