summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java350
1 files changed, 0 insertions, 350 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
deleted file mode 100755
index 3216115967..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.DtxBranch;
-import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.Xid;
-import org.apache.qpid.transport.util.Functions;
-
-public class VirtualHostConfigRecoveryHandler implements
- MessageStoreRecoveryHandler,
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
- TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
-{
- private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
- private final VirtualHost _virtualHost;
-
- private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
- private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
- private final EventLogger _eventLogger;
-
- private final MessageStoreLogSubject _logSubject;
- private MessageStore _store;
-
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
- {
- _virtualHost = virtualHost;
- _eventLogger = virtualHost.getEventLogger();
- _logSubject = logSubject;
- }
-
- public VirtualHostConfigRecoveryHandler begin(MessageStore store)
- {
- _store = store;
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
- return this;
- }
-
- public StoredMessageRecoveryHandler begin()
- {
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
- return this;
- }
-
- public void message(StoredMessage message)
- {
- ServerMessage serverMessage = message.getMetaData().getType().createMessage(message);
-
- _recoveredMessages.put(message.getMessageNumber(), serverMessage);
- _unusedMessages.put(message.getMessageNumber(), message);
- }
-
- public void completeMessageRecovery()
- {
- }
-
- public void dtxRecord(long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues,
- Transaction.Record[] dequeues)
- {
- Xid id = new Xid(format, globalId, branchId);
- DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
- DtxBranch branch = dtxRegistry.getBranch(id);
- if(branch == null)
- {
- branch = new DtxBranch(id, _store, _virtualHost);
- dtxRegistry.registerBranch(branch);
- }
- for(Transaction.Record record : enqueues)
- {
- final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final MessageReference ref = message.newReference();
-
-
- branch.enqueue(queue,message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- queue.enqueue(message, null);
- ref.release();
- }
-
- public void onRollback()
- {
- ref.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
-
- }
-
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
-
- }
- }
- for(Transaction.Record record : dequeues)
- {
- final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
- entry.acquire();
-
- branch.dequeue(queue, message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- entry.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
-
- }
-
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
- }
-
- }
-
- branch.setState(DtxBranch.State.PREPARED);
- branch.prePrepareTransaction();
- }
-
- private static StringBuilder xidAsString(Xid id)
- {
- return new StringBuilder("(")
- .append(id.getFormat())
- .append(',')
- .append(Functions.str(id.getGlobalId()))
- .append(',')
- .append(Functions.str(id.getBranchId()))
- .append(')');
- }
-
- public void completeDtxRecordRecovery()
- {
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
-
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
- }
-
- public void queueEntry(final UUID queueId, long messageId)
- {
- AMQQueue queue = _virtualHost.getQueue(queueId);
- if(queue != null)
- {
- String queueName = queue.getName();
- ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queue.enqueue(message,null);
-
- _queueRecoveries.put(queueName, ++count);
- }
- else
- {
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
- Transaction txn = _store.newTransaction();
- txn.dequeueMessage(queue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- }
- else
- {
- _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
- Transaction txn = _store.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return "<<UNKNOWN>>";
- }
-
- @Override
- public UUID getId()
- {
- return queueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return false;
- }
- };
- txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- }
-
- public DtxRecordRecoveryHandler completeQueueEntryRecovery()
- {
-
- for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
- {
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
-
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
- }
-
- return this;
- }
-
- private static class DummyMessage implements EnqueueableMessage
- {
-
-
- private final long _messageId;
-
- public DummyMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
-
- public boolean isPersistent()
- {
- return true;
- }
-
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- }
-
-}