summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java')
-rwxr-xr-xtrunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java341
1 files changed, 0 insertions, 341 deletions
diff --git a/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
deleted file mode 100755
index c543531210..0000000000
--- a/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import org.apache.log4j.Logger;
-
-import java.nio.ByteBuffer;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-
-public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
- ConfigurationRecoveryHandler.QueueRecoveryHandler,
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
- ConfigurationRecoveryHandler.BindingRecoveryHandler,
- MessageStoreRecoveryHandler,
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
- TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
-{
- private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
-
- private final VirtualHost _virtualHost;
-
- private MessageStoreLogSubject _logSubject;
- private List<ProcessAction> _actions;
-
- private MessageStore _store;
- private TransactionLog _transactionLog;
-
- private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
- private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
-
-
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public QueueRecoveryHandler begin(MessageStore store)
- {
- _logSubject = new MessageStoreLogSubject(_virtualHost,store);
- _store = store;
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(null, false));
-
- return this;
- }
-
- public void queue(String queueName, String owner, FieldTable arguments)
- {
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
- if (q == null)
- {
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
- arguments);
- _virtualHost.getQueueRegistry().registerQueue(q);
- }
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(queueName, true));
-
- //Record that we have a queue for recovery
- _queueRecoveries.put(queueName, 0);
- }
-
- public ExchangeRecoveryHandler completeQueueRecovery()
- {
- return this;
- }
-
- public void exchange(String exchangeName, String type, boolean autoDelete)
- {
- try
- {
- Exchange exchange;
- AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
- exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
- if (exchange == null)
- {
- exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public BindingRecoveryHandler completeExchangeRecovery()
- {
- return this;
- }
-
- public StoredMessageRecoveryHandler begin()
- {
- // TODO - log begin
- return this;
- }
-
- public void message(StoredMessage message)
- {
- ServerMessage serverMessage;
- switch(message.getMetaData().getType())
- {
- case META_DATA_0_8:
- serverMessage = new AMQMessage(message);
- break;
- case META_DATA_0_10:
- serverMessage = new MessageTransferMessage(message, null);
- break;
- default:
- throw new RuntimeException("Unknown message type retreived from store " + message.getMetaData().getClass());
- }
-
-
- _recoveredMessages.put(message.getMessageNumber(), serverMessage);
- _unusedMessages.put(message.getMessageNumber(), message);
- }
-
- public void completeMessageRecovery()
- {
- //TODO - log end
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
- {
- _transactionLog = log;
- return this;
- }
-
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, AMQMessage message)
- {
- _queue = queue;
- _message = message;
- }
-
- public void process()
- {
- try
- {
- _queue.enqueue(_message);
- }
- catch(AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
- {
- _actions = new ArrayList<ProcessAction>();
- try
- {
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
- AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
- if (queue == null)
- {
- _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
- + exchange.getName());
- }
- else
- {
-
-
- FieldTable argumentsFT = null;
- if(buf != null)
- {
- argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
- }
-
- _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
- + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
-
- queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
-
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void completeBindingRecovery()
- {
- //return this;
- }
-
- public void complete()
- {
-
-
- }
-
- public void queueEntry(final String queueName, long messageId)
- {
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
- try
- {
- if(queue != null)
- {
- ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getName());
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queue.enqueue(message);
-
- _queueRecoveries.put(queueName, ++count);
- }
- else
- {
- _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getName() + " is unknwon, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, messageId);
- txn.commitTranAsync();
- }
- }
- else
- {
- _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
-
- public String getResourceName()
- {
- return queueName;
- }
- };
- txn.dequeueMessage(mockQueue, messageId);
- txn.commitTranAsync();
- }
-
- }
- catch(AMQException e)
- {
- throw new RuntimeException(e);
- }
-
-
-
- }
-
- public void completeQueueEntryRecovery()
- {
-
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
-
- for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
- {
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1005(entry.getValue(), entry.getKey()));
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(entry.getKey(), true));
- }
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(null, false));
- }
-
-}