summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/virtualhost
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java76
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java44
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java100
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java360
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java876
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java106
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java158
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java141
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java81
15 files changed, 0 insertions, 2221 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
deleted file mode 100644
index 2db1944cd1..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-
-public abstract class HouseKeepingTask implements Runnable
-{
- Logger _logger = Logger.getLogger(this.getClass());
-
- private VirtualHost _virtualHost;
-
- private String _name;
-
- private RootMessageLogger _rootLogger;
- public HouseKeepingTask(VirtualHost vhost)
- {
- _virtualHost = vhost;
- _name = _virtualHost.getName() + ":" + this.getClass().getSimpleName();
- _rootLogger = CurrentActor.get().getRootMessageLogger();
- }
-
- final public void run()
- {
- // Don't need to undo this as this is a thread pool thread so will
- // always go through here before we do any real work.
- Thread.currentThread().setName(_name);
- CurrentActor.set(new AbstractActor(_rootLogger)
- {
- @Override
- public String getLogMessage()
- {
- return _name;
- }
- });
-
- try
- {
- execute();
- }
- catch (Throwable e)
- {
- _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
- }
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- /** Execute the plugin. */
- public abstract void execute();
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
deleted file mode 100644
index 767474d5ae..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.io.IOException;
-
-import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
-
-/**
- * The management interface exposed to allow management of a virtualHost
- */
-public interface ManagedVirtualHost
-{
- static final String TYPE = "VirtualHost";
- static final int VERSION = 1;
-
- /**
- * Returns the name of the managed virtualHost.
- * @return the name of the exchange.
- * @throws java.io.IOException
- */
- @MBeanAttribute(name="Name", description= TYPE + " Name")
- String getName() throws IOException;
-
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
deleted file mode 100755
index 04f19b79bb..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import java.util.UUID;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
-{
- IConnectionRegistry getConnectionRegistry();
-
- VirtualHostConfiguration getConfiguration();
-
- String getName();
-
- QueueRegistry getQueueRegistry();
-
- ExchangeRegistry getExchangeRegistry();
-
- ExchangeFactory getExchangeFactory();
-
- MessageStore getMessageStore();
-
- TransactionLog getTransactionLog();
-
- DurableConfigurationStore getDurableConfigurationStore();
-
- AuthenticationManager getAuthenticationManager();
-
- SecurityManager getSecurityManager();
-
- void close();
-
- ManagedObject getManagedObject();
-
- UUID getBrokerId();
-
- void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
-
- long getHouseKeepingTaskCount();
-
- public long getHouseKeepingCompletedTaskCount();
-
- int getHouseKeepingPoolSize();
-
- void setHouseKeepingPoolSize(int newSize);
-
- int getHouseKeepingActiveCount();
-
- IApplicationRegistry getApplicationRegistry();
-
- BindingFactory getBindingFactory();
-
- void createBrokerConnection(String transport,
- String host,
- int port,
- String vhost,
- boolean durable,
- String authMechanism, String username, String password);
-
- ConfigStore getConfigStore();
-
- void removeBrokerConnection(BrokerLink brokerLink);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
deleted file mode 100755
index 96a9ac729e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import org.apache.log4j.Logger;
-
-import java.nio.ByteBuffer;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-
-public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
- ConfigurationRecoveryHandler.QueueRecoveryHandler,
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
- ConfigurationRecoveryHandler.BindingRecoveryHandler,
- MessageStoreRecoveryHandler,
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
- TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
-{
- private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
-
- private final VirtualHost _virtualHost;
-
- private MessageStoreLogSubject _logSubject;
- private List<ProcessAction> _actions;
-
- private MessageStore _store;
- private TransactionLog _transactionLog;
-
- private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
- private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
-
-
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public QueueRecoveryHandler begin(MessageStore store)
- {
- _logSubject = new MessageStoreLogSubject(_virtualHost,store);
- _store = store;
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
-
- return this;
- }
-
- public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments)
- {
- try
- {
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
- if (q == null)
- {
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
- arguments);
- _virtualHost.getQueueRegistry().registerQueue(q);
- }
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
-
- //Record that we have a queue for recovery
- _queueRecoveries.put(queueName, 0);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
-
- public ExchangeRecoveryHandler completeQueueRecovery()
- {
- return this;
- }
-
- public void exchange(String exchangeName, String type, boolean autoDelete)
- {
- try
- {
- Exchange exchange;
- AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
- exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
- if (exchange == null)
- {
- exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- }
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
-
- public BindingRecoveryHandler completeExchangeRecovery()
- {
- return this;
- }
-
- public StoredMessageRecoveryHandler begin()
- {
- // TODO - log begin
- return this;
- }
-
- public void message(StoredMessage message)
- {
- ServerMessage serverMessage;
- switch(message.getMetaData().getType())
- {
- case META_DATA_0_8:
- serverMessage = new AMQMessage(message);
- break;
- case META_DATA_0_10:
- serverMessage = new MessageTransferMessage(message, null);
- break;
- default:
- throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
- }
-
- //_logger.debug("Recovered message with id " + serverMessage);
-
-
- _recoveredMessages.put(message.getMessageNumber(), serverMessage);
- _unusedMessages.put(message.getMessageNumber(), message);
- }
-
- public void completeMessageRecovery()
- {
- //TODO - log end
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
- {
- _transactionLog = log;
- return this;
- }
-
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, AMQMessage message)
- {
- _queue = queue;
- _message = message;
- }
-
- public void process()
- {
- try
- {
- _queue.enqueue(_message);
- }
- catch(AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
- {
- _actions = new ArrayList<ProcessAction>();
- try
- {
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
- if (exchange == null)
- {
- _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName);
- return;
- }
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (queue == null)
- {
- _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName);
- }
- else
- {
- FieldTable argumentsFT = null;
- if(buf != null)
- {
- argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
- }
-
- BindingFactory bf = _virtualHost.getBindingFactory();
-
- Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
-
- if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
- {
-
- _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
- + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
-
- bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
- }
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void completeBindingRecovery()
- {
- //return this;
- }
-
- public void complete()
- {
-
-
- }
-
- public void queueEntry(final String queueName, long messageId)
- {
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
- try
- {
- if(queue != null)
- {
- ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queue.enqueue(message);
-
- _queueRecoveries.put(queueName, ++count);
- }
- else
- {
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, messageId);
- txn.commitTranAsync();
- }
- }
- else
- {
- _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
-
- public String getResourceName()
- {
- return queueName;
- }
- };
- txn.dequeueMessage(mockQueue, messageId);
- txn.commitTranAsync();
- }
-
- }
- catch(AMQException e)
- {
- throw new RuntimeException(e);
- }
-
-
-
- }
-
- public void completeQueueEntryRecovery()
- {
-
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
-
- for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
- {
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
- }
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
deleted file mode 100644
index 33c713c62a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.BrokerConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ExchangeConfiguration;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfigType;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.connection.ConnectionRegistry;
-import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-
-public class VirtualHostImpl implements VirtualHost
-{
- private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
-
- private final String _name;
-
- private ConnectionRegistry _connectionRegistry;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private final AuthenticationManager _authenticationManager;
-
- private SecurityManager _securityManager;
-
- private final ScheduledThreadPoolExecutor _houseKeepingTasks;
- private final IApplicationRegistry _appRegistry;
- private VirtualHostConfiguration _configuration;
- private DurableConfigurationStore _durableConfigurationStore;
- private BindingFactory _bindingFactory;
- private BrokerConfig _broker;
- private UUID _id;
-
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
-
- private final long _createTime = System.currentTimeMillis();
- private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
- private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
-
- public IConnectionRegistry getConnectionRegistry()
- {
- return _connectionRegistry;
- }
-
- public VirtualHostConfiguration getConfiguration()
- {
- return _configuration;
- }
-
- public UUID getId()
- {
- return _id;
- }
-
- public VirtualHostConfigType getConfigType()
- {
- return VirtualHostConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getBroker();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- /**
- * Virtual host JMX MBean class.
- *
- * This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
- public String getName()
- {
- return _name;
- }
-
- public VirtualHostImpl getVirtualHost()
- {
- return VirtualHostImpl.this;
- }
- }
-
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
- {
- this(appRegistry, hostConfig, null);
- }
-
-
- public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
- {
- this(ApplicationRegistry.getInstance(),hostConfig,store);
- }
-
- private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
-
- _appRegistry = appRegistry;
- _broker = _appRegistry.getBroker();
- _configuration = hostConfig;
- _name = _configuration.getName();
-
- _id = _appRegistry.getConfigStore().createId();
-
- CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
-
- if (_name == null || _name.length() == 0)
- {
- throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
- }
-
- _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
- _securityManager.configureHostPlugins(_configuration);
-
- _virtualHostMBean = new VirtualHostMBean();
-
- _connectionRegistry = new ConnectionRegistry();
-
- _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
-
- _queueRegistry = new DefaultQueueRegistry(this);
-
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(_configuration);
-
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- StartupRoutingTable configFileRT = new StartupRoutingTable();
-
- _durableConfigurationStore = configFileRT;
-
- // This needs to be after the RT has been defined as it creates the default durable exchanges.
- _exchangeRegistry.initialise();
-
- _bindingFactory = new BindingFactory(this);
-
- initialiseModel(_configuration);
-
- if (store != null)
- {
- _messageStore = store;
- _durableConfigurationStore = store;
- }
- else
- {
- initialiseMessageStore(hostConfig);
- }
-
- _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
-
- initialiseStatistics();
- }
-
- private void initialiseHouseKeeping(long period)
- {
- /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if (period != 0L)
- {
- class ExpiredMessagesTask extends HouseKeepingTask
- {
- public ExpiredMessagesTask(VirtualHost vhost)
- {
- super(vhost);
- }
-
- public void execute()
- {
- for (AMQQueue q : _queueRegistry.getQueues())
- {
- _logger.debug("Checking message status for queue: "
- + q.getName());
- try
- {
- q.checkMessageStatus();
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
- }
- }
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- for (AMQSessionModel session : connection.getSessionModels())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- try
- {
- session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
- _configuration.getTransactionTimeoutOpenClose(),
- _configuration.getTransactionTimeoutIdleWarn(),
- _configuration.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
- }
- }
-
- scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
-
- Map<String, VirtualHostPluginFactory> plugins =
- ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
-
- if (plugins != null)
- {
- for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet())
- {
- String pluginName = entry.getKey();
- VirtualHostPluginFactory factory = entry.getValue();
- try
- {
- VirtualHostPlugin plugin = factory.newInstance(this);
-
- // If we had configuration for the plugin the schedule it.
- if (plugin != null)
- {
- _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2,
- plugin.getDelay(), plugin.getTimeUnit());
-
- _logger.info("Loaded VirtualHostPlugin:" + plugin);
- }
- }
- catch (RuntimeException e)
- {
- _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e);
- }
- }
- }
- }
- }
-
- /**
- * Allow other broker components to register a HouseKeepingTask
- *
- * @param period How often this task should run, in ms.
- * @param task The task to run.
- */
- public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
- {
- _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period,
- TimeUnit.MILLISECONDS);
- }
-
- public long getHouseKeepingTaskCount()
- {
- return _houseKeepingTasks.getTaskCount();
- }
-
- public long getHouseKeepingCompletedTaskCount()
- {
- return _houseKeepingTasks.getCompletedTaskCount();
- }
-
- public int getHouseKeepingPoolSize()
- {
- return _houseKeepingTasks.getCorePoolSize();
- }
-
- public void setHouseKeepingPoolSize(int newSize)
- {
- _houseKeepingTasks.setCorePoolSize(newSize);
- }
-
-
- public int getHouseKeepingActiveCount()
- {
- return _houseKeepingTasks.getActiveCount();
- }
-
-
- private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
- {
- String messageStoreClass = hostConfig.getMessageStoreClass();
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- MessageStore messageStore = (MessageStore) o;
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
-
- MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
-
- messageStore.configureConfigStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
-
- messageStore.configureMessageStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
- messageStore.configureTransactionLog(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
-
- _messageStore = messageStore;
- _durableConfigurationStore = messageStore;
- }
-
- private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
- {
- _logger.debug("Loading configuration for virtualhost: " + config.getName());
-
- List exchangeNames = config.getExchanges();
-
- for (Object exchangeNameObj : exchangeNames)
- {
- String exchangeName = String.valueOf(exchangeNameObj);
- configureExchange(config.getExchangeConfiguration(exchangeName));
- }
-
- String[] queueNames = config.getQueueNames();
-
- for (Object queueNameObj : queueNames)
- {
- String queueName = String.valueOf(queueNameObj);
- configureQueue(config.getQueueConfiguration(queueName));
- }
- }
-
- private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
- {
- AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
-
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
- {
-
- AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
- boolean durable = exchangeConfiguration.getDurable();
- boolean autodelete = exchangeConfiguration.getAutoDelete();
-
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
- _exchangeRegistry.registerExchange(newExchange);
-
- if (newExchange.isDurable())
- {
- _durableConfigurationStore.createExchange(newExchange);
- }
- }
- }
-
- private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
-
- if (queue.isDurable())
- {
- getDurableConfigurationStore().createQueue(queue);
- }
-
- String exchangeName = queueConfiguration.getExchange();
-
- Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
-
- if (exchange == null)
- {
- exchange = _exchangeRegistry.getDefaultExchange();
- }
-
- if (exchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
- }
-
- List routingKeys = queueConfiguration.getRoutingKeys();
- if (routingKeys == null || routingKeys.isEmpty())
- {
- routingKeys = Collections.singletonList(queue.getNameShortString());
- }
-
- for (Object routingKeyNameObj : routingKeys)
- {
- AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- if (_logger.isInfoEnabled())
- {
- _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
- }
- _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
- }
-
- if (exchange != _exchangeRegistry.getDefaultExchange())
- {
- _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
- }
- }
-
- public String getName()
- {
- return _name;
- }
-
- public BrokerConfig getBroker()
- {
- return _broker;
- }
-
- public String getFederationTag()
- {
- return _broker.getFederationTag();
- }
-
- public void setBroker(final BrokerConfig broker)
- {
- _broker = broker;
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public TransactionLog getTransactionLog()
- {
- return _messageStore;
- }
-
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _durableConfigurationStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public SecurityManager getSecurityManager()
- {
- return _securityManager;
- }
-
- public void close()
- {
- //Stop Connections
- _connectionRegistry.close();
-
- //Stop the Queues processing
- if (_queueRegistry != null)
- {
- for (AMQQueue queue : _queueRegistry.getQueues())
- {
- queue.stop();
- }
- }
-
- //Stop Housekeeping
- if (_houseKeepingTasks != null)
- {
- _houseKeepingTasks.shutdown();
-
- try
- {
- if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
- {
- _houseKeepingTasks.shutdownNow();
- }
- }
- catch (InterruptedException e)
- {
- _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage());
- // Swallowing InterruptedException ok as we are shutting down.
- }
- }
-
- //Close MessageStore
- if (_messageStore != null)
- {
- //Remove MessageStore Interface should not throw Exception
- try
- {
- _messageStore.close();
- }
- catch (Exception e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- CurrentActor.get().message(VirtualHostMessages.CLOSED());
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-
- public UUID getBrokerId()
- {
- return _appRegistry.getBrokerId();
- }
-
- public IApplicationRegistry getApplicationRegistry()
- {
- return _appRegistry;
- }
-
- public BindingFactory getBindingFactory()
- {
- return _bindingFactory;
- }
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _appRegistry.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _appRegistry.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (AMQConnectionModel connection : _connectionRegistry.getConnections())
- {
- connection.resetStatistics();
- }
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
- _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
- _messagesReceived = new StatisticsCounter("messages-received-" + getName());
- _dataReceived = new StatisticsCounter("bytes-received-" + getName());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
- public void createBrokerConnection(final String transport,
- final String host,
- final int port,
- final String vhost,
- final boolean durable,
- final String authMechanism,
- final String username,
- final String password)
- {
- BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
- if(_links.putIfAbsent(blink,blink) != null)
- {
- getConfigStore().addConfiguredObject(blink);
- }
- }
-
- public void removeBrokerConnection(final String transport,
- final String host,
- final int port,
- final String vhost)
- {
- removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null));
-
- }
-
- public void removeBrokerConnection(BrokerLink blink)
- {
- blink = _links.get(blink);
- if(blink != null)
- {
- blink.close();
- getConfigStore().removeConfiguredObject(blink);
- }
- }
-
- public ConfigStore getConfigStore()
- {
- return getApplicationRegistry().getConfigStore();
- }
-
- /**
- * Temporary Startup RT class to record the creation of persistent queues / exchanges.
- *
- *
- * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
- * This should be removed after the _RT has been fully split from the the TL
- */
- private static class StartupRoutingTable implements DurableConfigurationStore
- {
- public List<Exchange> exchange = new LinkedList<Exchange>();
- public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
- public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
-
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public void removeMessage(Long messageId) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (exchange.isDurable())
- {
- this.exchange.add(exchange);
- }
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- if (exchange.isDurable() && queue.isDurable())
- {
- bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
- }
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- }
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (queue.isDurable())
- {
- this.queue.add(new CreateQueueTuple(queue, arguments));
- }
- }
-
- public void removeQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
-
- private static class CreateQueueTuple
- {
- public AMQQueue queue;
- public FieldTable arguments;
-
- public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
- {
- this.queue = queue;
- this.arguments = arguments;
- }
- }
-
- private static class CreateBindingTuple
- {
- public AMQQueue queue;
- public FieldTable arguments;
- public Exchange exchange;
- public AMQShortString routingKey;
-
- public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- {
- this.exchange = exchange;
- this.routingKey = routingKey;
- this.queue = queue;
- arguments = args;
- }
- }
-
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
- }
- }
-
- @Override
- public String toString()
- {
- return _name;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
deleted file mode 100644
index 32d0c4c4d1..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.configuration.ConfigStore;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry implements Closeable
-{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
-
-
- private String _defaultVirtualHostName;
- private ApplicationRegistry _applicationRegistry;
-
- public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
- {
- _applicationRegistry = applicationRegistry;
- }
-
- public synchronized void registerVirtualHost(VirtualHost host) throws Exception
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- }
-
- public synchronized void unregisterVirtualHost(VirtualHost host)
- {
- _registry.remove(host.getName());
- }
-
- public VirtualHost getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- public VirtualHost getDefaultVirtualHost()
- {
- return getVirtualHost(getDefaultVirtualHostName());
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHost> getVirtualHosts()
- {
- return new ArrayList<VirtualHost>(_registry.values());
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- return _applicationRegistry;
- }
-
- public ConfigStore getConfigStore()
- {
- return _applicationRegistry.getConfigStore();
- }
-
- public void close()
- {
- for (VirtualHost virtualHost : getVirtualHosts())
- {
- virtualHost.close();
- }
-
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
deleted file mode 100644
index 12206013eb..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.Exchange.BindingListener;
-import org.apache.qpid.server.queue.AMQQueue;
-
-/**
- * This is a listener that caches queues that are configured for slow consumer disconnection.
- *
- * There should be one listener per virtual host, which can be added to all exchanges on
- * that host.
- *
- * TODO In future, it will be possible to configure the policy at runtime, so only the queue
- * itself is cached, and the configuration looked up by the housekeeping thread. This means
- * that there may be occasions where the copy of the cache contents retrieved by the thread
- * does not contain queues that are configured, or that configured queues are not present.
- *
- * @see BindingListener
- */
-public class ConfiguredQueueBindingListener implements BindingListener
-{
- private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
-
- private String _vhostName;
- private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
-
- public ConfiguredQueueBindingListener(String vhostName)
- {
- _vhostName = vhostName;
- }
-
- /**
- * @see BindingListener#bindingAdded(Exchange, Binding)
- */
- public void bindingAdded(Exchange exchange, Binding binding)
- {
- processBinding(binding);
- }
-
- /**
- * @see BindingListener#bindingRemoved(Exchange, Binding)
- */
- public void bindingRemoved(Exchange exchange, Binding binding)
- {
- processBinding(binding);
- }
-
- private void processBinding(Binding binding)
- {
- AMQQueue queue = binding.getQueue();
-
- SlowConsumerDetectionQueueConfiguration config =
- queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
- if (config != null)
- {
- _cache.add(queue);
- }
- else
- {
- _cache.remove(queue);
- }
- }
-
- /**
- * Lookup and return the cache of configured {@link AMQQueue}s.
- *
- * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
- * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
- * cache is returned.
- *
- * @return a copy of the cached {@link java.util.Set} of queues
- */
- public Set<AMQQueue> getQueueCache()
- {
- return new HashSet<AMQQueue>(_cache);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
deleted file mode 100644
index 5c4fe0aab8..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- *
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
-import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.plugins.Plugin;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
-
-public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
-{
- private SlowConsumerDetectionConfiguration _config;
- private ConfiguredQueueBindingListener _listener;
-
- public static class SlowConsumerFactory implements VirtualHostPluginFactory
- {
- public SlowConsumerDetection newInstance(VirtualHost vhost)
- {
- SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
-
- if (config == null)
- {
- return null;
- }
-
- SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
- plugin.configure(config);
- return plugin;
- }
- }
-
- /**
- * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
- * cirtual host to record all the configured queues in a cache for processing by the housekeeping
- * thread.
- *
- * @see Plugin#configure(ConfigurationPlugin)
- */
- public void configure(ConfigurationPlugin config)
- {
- _config = (SlowConsumerDetectionConfiguration) config;
- _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
- for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames())
- {
- getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
- }
- }
-
- public SlowConsumerDetection(VirtualHost vhost)
- {
- super(vhost);
- }
-
- public void execute()
- {
- CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
-
- Set<AMQQueue> cache = _listener.getQueueCache();
- for (AMQQueue q : cache)
- {
- CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
-
- try
- {
- SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
- if (checkQueueStatus(q, config))
- {
- config.getPolicy().performPolicy(q);
- }
- }
- catch (Exception e)
- {
- // Don't throw exceptions as this will stop the house keeping task from running.
- _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
- }
- }
-
- CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
- }
-
- public long getDelay()
- {
- return _config.getDelay();
- }
-
- public TimeUnit getTimeUnit()
- {
- return _config.getTimeUnit();
- }
-
- /**
- * Check the depth,messageSize,messageAge,messageCount values for this q
- *
- * @param q the queue to check
- * @param config the queue configuration to compare against the queue state
- *
- * @return true if the queue has reached a threshold.
- */
- private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
- {
- if (config != null)
- {
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
-
- int count = q.getMessageCount();
-
- // First Check message counts
- if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) ||
- // The check queue depth
- (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
- // finally if we have messages on the queue check Arrival time.
- // We must check count as OldestArrival time is Long.MAX_LONG when
- // there are no messages.
- (config.getMessageAge() != 0 &&
- ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge())))
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
- _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
- _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
- _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
- }
-
- return true;
- }
- }
- return false;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
deleted file mode 100644
index 3798f47f0b..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.virtualhost.HouseKeepingTask;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.concurrent.TimeUnit;
-
-public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin
-{
- protected final Logger _logger = Logger.getLogger(getClass());
-
- public VirtualHostHouseKeepingPlugin(VirtualHost vhost)
- {
- super(vhost);
- }
-
-
- /**
- * Long value representing the delay between repeats
- *
- * @return
- */
- public abstract long getDelay();
-
- /**
- * Option to specify what the delay value represents
- *
- * @return
- *
- * @see java.util.concurrent.TimeUnit for valid value.
- */
- public abstract TimeUnit getTimeUnit();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
deleted file mode 100644
index 1886c2d01d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.server.plugins.Plugin;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public interface VirtualHostPlugin extends Runnable, Plugin
-{
- /**
- * Long value representing the delay between repeats
- *
- * @return
- */
- public long getDelay();
-
- /**
- * Option to specify what the delay value represents
- * @see java.util.concurrent.TimeUnit for valid value.
- * @return
- */
- public TimeUnit getTimeUnit();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java
deleted file mode 100644
index c8bea18444..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public interface VirtualHostPluginFactory
-{
- public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties
deleted file mode 100644
index 03c56910c2..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# Default File used for all non-defined locales.
-
-RUNNING = SCD-1001 : Running
-COMPLETE = SCD-1002 : Complete
-CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties
deleted file mode 100644
index ed4fb1d45a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# Default File used for all non-defined locales.
-
-DELETING_QUEUE = TDP-1001 : Deleting Queue
-DISCONNECTING = TDP-1002 : Disconnecting Session \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
deleted file mode 100644
index 6028f63fdb..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins.policies;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.plugins.logging.TopicDeletePolicyMessages;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-
-public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
-{
- Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
- private TopicDeletePolicyConfiguration _configuration;
-
- public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory
- {
- public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
- {
- TopicDeletePolicyConfiguration config =
- configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName());
-
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(config);
- return policy;
- }
-
- public String getPluginName()
- {
- return "topicdelete";
- }
-
- public Class<TopicDeletePolicy> getPluginClass()
- {
- return TopicDeletePolicy.class;
- }
- }
-
- public void performPolicy(AMQQueue q)
- {
- if (q == null)
- {
- return;
- }
-
- AMQSessionModel owner = q.getExclusiveOwningSession();
-
- // Only process exclusive queues
- if (owner == null)
- {
- return;
- }
-
- //Only process Topics
- if (!validateQueueIsATopic(q))
- {
- return;
- }
-
- try
- {
- CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING());
- // Close the consumer . this will cause autoDelete Queues to be purged
- owner.getConnectionModel().
- closeSession(owner, AMQConstant.RESOURCE_ERROR,
- "Consuming to slow.");
-
- // Actively delete non autoDelete queues if deletePersistent is set
- if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent()))
- {
- CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE());
- q.delete();
- }
-
- }
- catch (AMQException e)
- {
- _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
- }
-
- }
-
- /**
- * Check the queue bindings to validate the queue is bound to the
- * topic exchange.
- *
- * @param q the Queue
- *
- * @return true iff Q is bound to a TopicExchange
- */
- private boolean validateQueueIsATopic(AMQQueue q)
- {
- for (Binding binding : q.getBindings())
- {
- if (binding.getExchange() instanceof TopicExchange)
- {
- return true;
- }
- }
-
- return false;
- }
-
- public void configure(ConfigurationPlugin config)
- {
- _configuration = (TopicDeletePolicyConfiguration) config;
- }
-
- @Override
- public String toString()
- {
- return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]");
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
deleted file mode 100644
index 7dfd22c733..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugins.policies;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-
-public class TopicDeletePolicyConfiguration extends ConfigurationPlugin
-{
-
- public static class TopicDeletePolicyConfigurationFactory
- implements ConfigurationPluginFactory
- {
- public ConfigurationPlugin newInstance(String path,
- Configuration config)
- throws ConfigurationException
- {
- TopicDeletePolicyConfiguration slowConsumerConfig =
- new TopicDeletePolicyConfiguration();
- slowConsumerConfig.setConfiguration(path, config);
- return slowConsumerConfig;
- }
-
- public List<String> getParentPaths()
- {
- return Arrays.asList(
- "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete");
- }
- }
-
- public String[] getElementsProcessed()
- {
- return new String[]{"delete-persistent"};
- }
-
- @Override
- public void validateConfiguration() throws ConfigurationException
- {
- // No validation required.
- }
-
- public boolean deletePersistent()
- {
- // If we don't have configuration then we don't deletePersistent Queues
- return (hasConfiguration() && contains("delete-persistent"));
- }
-
- @Override
- public String formatToString()
- {
- return (deletePersistent()?"delete-durable":"");
- }
-
-
-}