summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java56
1 files changed, 14 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index ea2f0f15e4..ae88e3e9f7 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -35,15 +35,16 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
ConfigurationRecoveryHandler.QueueRecoveryHandler,
ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
ConfigurationRecoveryHandler.BindingRecoveryHandler,
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
@@ -77,7 +77,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private final VirtualHost _virtualHost;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+ private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
private MessageStoreLogSubject _logSubject;
@@ -169,7 +169,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- AbstractServerMessageImpl serverMessage;
+ ServerMessage serverMessage;
switch(message.getMetaData().getType())
{
case META_DATA_0_8:
@@ -178,6 +178,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
case META_DATA_0_10:
serverMessage = new MessageTransferMessage(message, null);
break;
+ case META_DATA_1_0:
+ serverMessage = new Message_1_0(message);
+ break;
default:
throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
}
@@ -190,19 +193,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
}
- public BridgeRecoveryHandler brokerLink(final UUID id,
- final long createTime,
- final Map<String, String> arguments)
- {
- BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments);
- return new BridgeRecoveryHandlerImpl(blink);
-
- }
-
- public void completeBrokerLinkRecovery()
- {
- }
-
public void dtxRecord(long format, byte[] globalId, byte[] branchId,
Transaction.Record[] enqueues,
Transaction.Record[] dequeues)
@@ -221,12 +211,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
- final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ final ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
{
- message.incrementReference();
+ final MessageReference ref = message.newReference();
+
branch.enqueue(queue,message);
@@ -239,7 +230,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
queue.enqueue(message, true, null);
- message.decrementReference();
+ ref.release();
}
catch (AMQException e)
{
@@ -251,7 +242,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void onRollback()
{
- message.decrementReference();
+ ref.release();
}
});
}
@@ -280,7 +271,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
- final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ final ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
@@ -412,9 +403,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public BrokerLinkRecoveryHandler completeBindingRecovery()
+ public void completeBindingRecovery()
{
- return this;
}
public void complete()
@@ -529,22 +519,4 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
}
- private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler
- {
- private final BrokerLink _blink;
-
- public BridgeRecoveryHandlerImpl(final BrokerLink blink)
- {
- _blink = blink;
- }
-
- public void bridge(final UUID id, final long createTime, final Map<String, String> arguments)
- {
- _blink.createBridge(id, createTime, arguments);
- }
-
- public void completeBridgeRecoveryForLink()
- {
- }
- }
}