diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java new file mode 100644 index 0000000000..df47c85f64 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java @@ -0,0 +1,357 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; + +public class MessageStoreRecoverer +{ + private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class); + + private final VirtualHost _virtualHost; + + private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); + private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>(); + private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>(); + private final EventLogger _eventLogger; + + private final MessageStoreLogSubject _logSubject; + private final MessageStore _store; + + + public MessageStoreRecoverer(VirtualHost virtualHost, MessageStoreLogSubject logSubject) + { + super(); + _virtualHost = virtualHost; + _eventLogger = virtualHost.getEventLogger(); + _logSubject = logSubject; + _store = virtualHost.getMessageStore(); + } + + + public void recover() + { + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START()); + _store.visitMessages(messageVisitor); + + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); + _store.visitMessageInstances(messageAndMessageInstanceRecoverer); + + for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) + { + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); + } + + _store.visitDistributedTransactions(distributedTransactionRecoverer); + + + + for(StoredMessage<?> m : _unusedMessages.values()) + { + _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); + m.remove(); + } + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size())); + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + + + } + + MessageHandler messageVisitor = new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> message) + { + StorableMessageMetaData metaData = message.getMetaData(); + + @SuppressWarnings("rawtypes") + MessageMetaDataType type = metaData.getType(); + + @SuppressWarnings("unchecked") + ServerMessage<?> serverMessage = type.createMessage(message); + + _recoveredMessages.put(message.getMessageNumber(), serverMessage); + _unusedMessages.put(message.getMessageNumber(), message); + return true; + } + + }; + + MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler() + { + @Override + public boolean handle(final UUID queueId, long messageId) + { + AMQQueue<?> queue = _virtualHost.getQueue(queueId); + if(queue != null) + { + String queueName = queue.getName(); + ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); + } + + Integer count = _queueRecoveries.get(queueName); + if (count == null) + { + count = 0; + } + + queue.enqueue(message,null); + + _queueRecoveries.put(queueName, ++count); + } + else + { + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); + Transaction txn = _store.newTransaction(); + txn.dequeueMessage(queue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + } + else + { + _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); + Transaction txn = _store.newTransaction(); + TransactionLogResource mockQueue = + new TransactionLogResource() + { + @Override + public String getName() + { + return "<<UNKNOWN>>"; + } + + @Override + public UUID getId() + { + return queueId; + } + + @Override + public boolean isDurable() + { + return false; + } + }; + txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + return true; + } + }; + + private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler() + { + + @Override + public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if(branch == null) + { + branch = new DtxBranch(id, _store, _virtualHost); + dtxRegistry.registerBranch(branch); + } + for(Transaction.Record record : enqueues) + { + final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final MessageReference<?> ref = message.newReference(); + + branch.enqueue(queue,message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + queue.enqueue(message, null); + ref.release(); + } + + public void onRollback() + { + ref.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + } + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + + } + } + for(Transaction.Record record : dequeues) + { + final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.delete(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + } + + } + + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + return true; + } + + private StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + + }; + + + private static class DummyMessage implements EnqueueableMessage + { + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + return null; + } + } +} |