summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java91
1 files changed, 72 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 0fd31973b2..51892d965a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -43,7 +44,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.util.ByteBufferInputStream;
+import org.apache.qpid.util.ByteBufferInputStream;
import java.io.DataInputStream;
import java.io.IOException;
@@ -54,11 +55,13 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
+import java.util.UUID;
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
ConfigurationRecoveryHandler.QueueRecoveryHandler,
ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
ConfigurationRecoveryHandler.BindingRecoveryHandler,
+ ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
@@ -73,7 +76,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private List<ProcessAction> _actions;
private MessageStore _store;
- private TransactionLog _transactionLog;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
@@ -86,7 +88,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
_virtualHost = virtualHost;
}
- public QueueRecoveryHandler begin(MessageStore store)
+ public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store);
_store = store;
@@ -99,14 +101,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
try
{
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
if (q == null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
- arguments);
+ q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+ FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
}
@@ -183,13 +183,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void completeMessageRecovery()
{
//TODO - log end
- //To change body of implemented methods use File | Settings | File Templates.
}
- public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
+ public BridgeRecoveryHandler brokerLink(final UUID id,
+ final long createTime,
+ final Map<String, String> arguments)
+ {
+ BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments);
+ return new BridgeRecoveryHandlerImpl(blink);
+
+ }
+
+ public void completeBrokerLinkRecovery()
{
- _transactionLog = log;
- return this;
}
private static final class ProcessAction
@@ -270,9 +276,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void completeBindingRecovery()
+ public BrokerLinkRecoveryHandler completeBindingRecovery()
{
- //return this;
+ return this;
}
public void complete()
@@ -316,15 +322,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
else
{
_logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, messageId);
+ MessageStore.Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
}
}
else
{
_logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ MessageStore.Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
@@ -334,7 +340,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return queueName;
}
};
- txn.dequeueMessage(mockQueue, messageId);
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
txn.commitTranAsync();
}
@@ -367,4 +373,51 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
+ private static class DummyMessage implements EnqueableMessage
+ {
+
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
+
+ private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler
+ {
+ private final BrokerLink _blink;
+
+ public BridgeRecoveryHandlerImpl(final BrokerLink blink)
+ {
+ _blink = blink;
+ }
+
+ public void bridge(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ _blink.createBridge(id, createTime, arguments);
+ }
+
+ public void completeBridgeRecoveryForLink()
+ {
+ }
+ }
}