diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java | 35 |
1 files changed, 11 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ae88e3e9f7..7ac89fcc81 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.UUID; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; @@ -38,17 +37,14 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; @@ -102,7 +98,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa try { AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); - + if (q == null) { q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, @@ -120,9 +116,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa q.setAlternateExchange(altExchange); } } - + CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); - + //Record that we have a queue for recovery _queueRecoveries.put(queueName, 0); } @@ -169,21 +165,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - ServerMessage serverMessage; - switch(message.getMetaData().getType()) + final MessageMetaDataType type = message.getMetaData().getType(); + if(type == null) { - case META_DATA_0_8: - serverMessage = new AMQMessage(message); - break; - case META_DATA_0_10: - serverMessage = new MessageTransferMessage(message, null); - break; - case META_DATA_1_0: - serverMessage = new Message_1_0(message); - break; - default: - throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); + throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } + ServerMessage serverMessage = type.createMessage(message); _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); @@ -252,7 +239,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId))); - + } } @@ -277,9 +264,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(message != null) { final QueueEntry entry = queue.getMessageOnTheQueue(messageId); - + entry.acquire(); - + branch.dequeue(queue, message); branch.addPostTransactionAcion(new ServerTransaction.Action() |