summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java357
1 files changed, 357 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
new file mode 100644
index 0000000000..df47c85f64
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
@@ -0,0 +1,357 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
+
+public class MessageStoreRecoverer
+{
+ private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class);
+
+ private final VirtualHost _virtualHost;
+
+ private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+ private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>();
+ private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>();
+ private final EventLogger _eventLogger;
+
+ private final MessageStoreLogSubject _logSubject;
+ private final MessageStore _store;
+
+
+ public MessageStoreRecoverer(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
+ {
+ super();
+ _virtualHost = virtualHost;
+ _eventLogger = virtualHost.getEventLogger();
+ _logSubject = logSubject;
+ _store = virtualHost.getMessageStore();
+ }
+
+
+ public void recover()
+ {
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ _store.visitMessages(messageVisitor);
+
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+ _store.visitMessageInstances(messageAndMessageInstanceRecoverer);
+
+ for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+ {
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+ }
+
+ _store.visitDistributedTransactions(distributedTransactionRecoverer);
+
+
+
+ for(StoredMessage<?> m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+
+
+ }
+
+ MessageHandler messageVisitor = new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> message)
+ {
+ StorableMessageMetaData metaData = message.getMetaData();
+
+ @SuppressWarnings("rawtypes")
+ MessageMetaDataType type = metaData.getType();
+
+ @SuppressWarnings("unchecked")
+ ServerMessage<?> serverMessage = type.createMessage(message);
+
+ _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+ _unusedMessages.put(message.getMessageNumber(), message);
+ return true;
+ }
+
+ };
+
+ MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler()
+ {
+ @Override
+ public boolean handle(final UUID queueId, long messageId)
+ {
+ AMQQueue<?> queue = _virtualHost.getQueue(queueId);
+ if(queue != null)
+ {
+ String queueName = queue.getName();
+ ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
+ }
+
+ Integer count = _queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queue.enqueue(message,null);
+
+ _queueRecoveries.put(queueName, ++count);
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ TransactionLogResource mockQueue =
+ new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return "<<UNKNOWN>>";
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return queueId;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+ };
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ return true;
+ }
+ };
+
+ private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler()
+ {
+
+ @Override
+ public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if(branch == null)
+ {
+ branch = new DtxBranch(id, _store, _virtualHost);
+ dtxRegistry.registerBranch(branch);
+ }
+ for(Transaction.Record record : enqueues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final MessageReference<?> ref = message.newReference();
+
+ branch.enqueue(queue,message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ queue.enqueue(message, null);
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ ref.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+ }
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+
+ }
+ }
+ for(Transaction.Record record : dequeues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+ }
+
+ }
+
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+ return true;
+ }
+
+ private StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+
+ };
+
+
+ private static class DummyMessage implements EnqueueableMessage
+ {
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
+}