diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java | 91 |
1 files changed, 72 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 0fd31973b2..51892d965a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -43,7 +44,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; -import org.apache.qpid.server.util.ByteBufferInputStream; +import org.apache.qpid.util.ByteBufferInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -54,11 +55,13 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.TreeMap; +import java.util.UUID; public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, ConfigurationRecoveryHandler.BindingRecoveryHandler, + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -73,7 +76,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private List<ProcessAction> _actions; private MessageStore _store; - private TransactionLog _transactionLog; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); @@ -86,7 +88,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa _virtualHost = virtualHost; } - public QueueRecoveryHandler begin(MessageStore store) + public VirtualHostConfigRecoveryHandler begin(MessageStore store) { _logSubject = new MessageStoreLogSubject(_virtualHost,store); _store = store; @@ -99,14 +101,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { try { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); + AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); if (q == null) { - q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost, - arguments); + q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, + FieldTable.convertToMap(arguments)); _virtualHost.getQueueRegistry().registerQueue(q); } @@ -183,13 +183,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void completeMessageRecovery() { //TODO - log end - //To change body of implemented methods use File | Settings | File Templates. } - public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log) + public BridgeRecoveryHandler brokerLink(final UUID id, + final long createTime, + final Map<String, String> arguments) + { + BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); + return new BridgeRecoveryHandlerImpl(blink); + + } + + public void completeBrokerLinkRecovery() { - _transactionLog = log; - return this; } private static final class ProcessAction @@ -270,9 +276,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void completeBindingRecovery() + public BrokerLinkRecoveryHandler completeBindingRecovery() { - //return this; + return this; } public void complete() @@ -316,15 +322,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); - txn.dequeueMessage(queue, messageId); + MessageStore.Transaction txn = _store.newTransaction(); + txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); } } else { _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); + MessageStore.Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { @@ -334,7 +340,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return queueName; } }; - txn.dequeueMessage(mockQueue, messageId); + txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); txn.commitTranAsync(); } @@ -367,4 +373,51 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } + private static class DummyMessage implements EnqueableMessage + { + + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + + public boolean isPersistent() + { + return true; + } + + + public StoredMessage getStoredMessage() + { + return null; + } + } + + private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler + { + private final BrokerLink _blink; + + public BridgeRecoveryHandlerImpl(final BrokerLink blink) + { + _blink = blink; + } + + public void bridge(final UUID id, final long createTime, final Map<String, String> arguments) + { + _blink.createBridge(id, createTime, arguments); + } + + public void completeBridgeRecoveryForLink() + { + } + } } |