diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java | 56 |
1 files changed, 14 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ea2f0f15e4..ae88e3e9f7 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -35,15 +35,16 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -65,7 +66,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, ConfigurationRecoveryHandler.BindingRecoveryHandler, - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -77,7 +77,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final VirtualHost _virtualHost; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); + private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); private MessageStoreLogSubject _logSubject; @@ -169,7 +169,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - AbstractServerMessageImpl serverMessage; + ServerMessage serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -178,6 +178,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa case META_DATA_0_10: serverMessage = new MessageTransferMessage(message, null); break; + case META_DATA_1_0: + serverMessage = new Message_1_0(message); + break; default: throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } @@ -190,19 +193,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { } - public BridgeRecoveryHandler brokerLink(final UUID id, - final long createTime, - final Map<String, String> arguments) - { - BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); - return new BridgeRecoveryHandlerImpl(blink); - - } - - public void completeBrokerLinkRecovery() - { - } - public void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) @@ -221,12 +211,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) { - message.incrementReference(); + final MessageReference ref = message.newReference(); + branch.enqueue(queue,message); @@ -239,7 +230,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { queue.enqueue(message, true, null); - message.decrementReference(); + ref.release(); } catch (AMQException e) { @@ -251,7 +242,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void onRollback() { - message.decrementReference(); + ref.release(); } }); } @@ -280,7 +271,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) @@ -412,9 +403,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public BrokerLinkRecoveryHandler completeBindingRecovery() + public void completeBindingRecovery() { - return this; } public void complete() @@ -529,22 +519,4 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler - { - private final BrokerLink _blink; - - public BridgeRecoveryHandlerImpl(final BrokerLink blink) - { - _blink = blink; - } - - public void bridge(final UUID id, final long createTime, final Map<String, String> arguments) - { - _blink.createBridge(id, createTime, arguments); - } - - public void completeBridgeRecoveryForLink() - { - } - } } |