/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.virtualhost; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import org.apache.qpid.util.ByteBufferInputStream; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.TreeMap; import java.util.UUID; public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, ConfigurationRecoveryHandler.BindingRecoveryHandler, ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, TransactionLogRecoveryHandler.QueueEntryRecoveryHandler { private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); private final VirtualHost _virtualHost; private MessageStoreLogSubject _logSubject; private List _actions; private MessageStore _store; private final Map _queueRecoveries = new TreeMap(); private Map _recoveredMessages = new HashMap(); private Map _unusedMessages = new HashMap(); public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost) { _virtualHost = virtualHost; } public VirtualHostConfigRecoveryHandler begin(MessageStore store) { _logSubject = new MessageStoreLogSubject(_virtualHost,store); _store = store; CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); return this; } public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) { try { AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); if (q == null) { q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, FieldTable.convertToMap(arguments)); _virtualHost.getQueueRegistry().registerQueue(q); } CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); //Record that we have a queue for recovery _queueRecoveries.put(queueName, 0); } catch (AMQException e) { // TODO throw new RuntimeException(e); } } public ExchangeRecoveryHandler completeQueueRecovery() { return this; } public void exchange(String exchangeName, String type, boolean autoDelete) { try { Exchange exchange; AMQShortString exchangeNameSS = new AMQShortString(exchangeName); exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); if (exchange == null) { exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); _virtualHost.getExchangeRegistry().registerExchange(exchange); } } catch (AMQException e) { // TODO throw new RuntimeException(e); } } public BindingRecoveryHandler completeExchangeRecovery() { return this; } public StoredMessageRecoveryHandler begin() { // TODO - log begin return this; } public void message(StoredMessage message) { ServerMessage serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: serverMessage = new AMQMessage(message); break; case META_DATA_0_10: serverMessage = new MessageTransferMessage(message, null); break; default: throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } //_logger.debug("Recovered message with id " + serverMessage); _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); } public void completeMessageRecovery() { //TODO - log end } public BridgeRecoveryHandler brokerLink(final UUID id, final long createTime, final Map arguments) { BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); return new BridgeRecoveryHandlerImpl(blink); } public void completeBrokerLinkRecovery() { } private static final class ProcessAction { private final AMQQueue _queue; private final AMQMessage _message; public ProcessAction(AMQQueue queue, AMQMessage message) { _queue = queue; _message = message; } public void process() { try { _queue.enqueue(_message); } catch(AMQException e) { throw new RuntimeException(e); } } } public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) { _actions = new ArrayList(); try { Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); if (exchange == null) { _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName); return; } AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName); } else { FieldTable argumentsFT = null; if(buf != null) { try { argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); } catch (IOException e) { throw new RuntimeException("IOException should not be thrown here", e); } } BindingFactory bf = _virtualHost.getBindingFactory(); Map argumentMap = FieldTable.convertToMap(argumentsFT); if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) { _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); bf.restoreBinding(bindingKey, queue, exchange, argumentMap); } } } catch (AMQException e) { throw new RuntimeException(e); } } public BrokerLinkRecoveryHandler completeBindingRecovery() { return this; } public void complete() { } public void queueEntry(final String queueName, long messageId) { AMQShortString queueNameShortString = new AMQShortString(queueName); AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); try { if(queue != null) { ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) { if (_logger.isDebugEnabled()) { _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString()); } Integer count = _queueRecoveries.get(queueName); if (count == null) { count = 0; } queue.enqueue(message); _queueRecoveries.put(queueName, ++count); } else { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); MessageStore.Transaction txn = _store.newTransaction(); txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); } } else { _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); MessageStore.Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { public String getResourceName() { return queueName; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); txn.commitTranAsync(); } } catch(AMQException e) { throw new RuntimeException(e); } } public void completeQueueEntryRecovery() { for(StoredMessage m : _unusedMessages.values()) { _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); m.remove(); } for(Map.Entry entry : _queueRecoveries.entrySet()) { CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); } CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } private static class DummyMessage implements EnqueableMessage { private final long _messageId; public DummyMessage(long messageId) { _messageId = messageId; } public long getMessageNumber() { return _messageId; } public boolean isPersistent() { return true; } public StoredMessage getStoredMessage() { return null; } } private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler { private final BrokerLink _blink; public BridgeRecoveryHandlerImpl(final BrokerLink blink) { _blink = blink; } public void bridge(final UUID id, final long createTime, final Map arguments) { _blink.createBridge(id, createTime, arguments); } public void completeBridgeRecoveryForLink() { } } }