diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost')
13 files changed, 132 insertions, 1010 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index f810360662..d24f79c56c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -25,13 +25,10 @@ import java.util.UUID; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.VirtualHostConfig; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -41,7 +38,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; -public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer +public interface VirtualHost extends DurableConfigurationStore.Source, Closeable, StatisticsGatherer { IConnectionRegistry getConnectionRegistry(); @@ -61,8 +58,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo void close(); - UUID getBrokerId(); - UUID getId(); void scheduleHouseKeepingTask(long period, HouseKeepingTask task); @@ -77,25 +72,12 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo int getHouseKeepingActiveCount(); - IApplicationRegistry getApplicationRegistry(); + VirtualHostRegistry getVirtualHostRegistry(); BindingFactory getBindingFactory(); - void createBrokerConnection(String transport, - String host, - int port, - String vhost, - boolean durable, - String authMechanism, String username, String password); - - public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments); - - ConfigStore getConfigStore(); - DtxRegistry getDtxRegistry(); - void removeBrokerConnection(BrokerLink brokerLink); - LinkRegistry getLinkRegistry(String remoteContainerId); ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ea2f0f15e4..ae88e3e9f7 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -35,15 +35,16 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -65,7 +66,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, ConfigurationRecoveryHandler.BindingRecoveryHandler, - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -77,7 +77,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final VirtualHost _virtualHost; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); + private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); private MessageStoreLogSubject _logSubject; @@ -169,7 +169,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - AbstractServerMessageImpl serverMessage; + ServerMessage serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -178,6 +178,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa case META_DATA_0_10: serverMessage = new MessageTransferMessage(message, null); break; + case META_DATA_1_0: + serverMessage = new Message_1_0(message); + break; default: throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } @@ -190,19 +193,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { } - public BridgeRecoveryHandler brokerLink(final UUID id, - final long createTime, - final Map<String, String> arguments) - { - BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); - return new BridgeRecoveryHandlerImpl(blink); - - } - - public void completeBrokerLinkRecovery() - { - } - public void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) @@ -221,12 +211,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) { - message.incrementReference(); + final MessageReference ref = message.newReference(); + branch.enqueue(queue,message); @@ -239,7 +230,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { queue.enqueue(message, true, null); - message.decrementReference(); + ref.release(); } catch (AMQException e) { @@ -251,7 +242,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void onRollback() { - message.decrementReference(); + ref.release(); } }); } @@ -280,7 +271,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) @@ -412,9 +403,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public BrokerLinkRecoveryHandler completeBindingRecovery() + public void completeBindingRecovery() { - return this; } public void complete() @@ -529,22 +519,4 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler - { - private final BrokerLink _blink; - - public BridgeRecoveryHandlerImpl(final BrokerLink blink) - { - _blink = blink; - } - - public void bridge(final UUID id, final long createTime, final Map<String, String> arguments) - { - _blink.createBridge(id, createTime, arguments); - } - - public void completeBridgeRecoveryForLink() - { - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index d9dc0aa64e..dd3610373f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,12 +34,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.BrokerConfig; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfigType; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -49,7 +44,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; @@ -61,17 +55,16 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { @@ -79,23 +72,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - private final UUID _qmfId; - private final String _name; private final UUID _id; private final long _createTime = System.currentTimeMillis(); - private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - private final IApplicationRegistry _appRegistry; + private final VirtualHostRegistry _virtualHostRegistry; - private final SecurityManager _securityManager; + private final StatisticsGatherer _brokerStatisticsGatherer; - private final BrokerConfig _brokerConfig; + private final SecurityManager _securityManager; private final VirtualHostConfiguration _vhostConfig; @@ -120,7 +109,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); private boolean _blocked; - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception + public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception { if (hostConfig == null) { @@ -132,19 +121,17 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost."); } - _appRegistry = appRegistry; - _brokerConfig = _appRegistry.getBrokerConfig(); + _virtualHostRegistry = virtualHostRegistry; + _brokerStatisticsGatherer = brokerStatisticsGatherer; _vhostConfig = hostConfig; _name = _vhostConfig.getName(); _dtxRegistry = new DtxRegistry(); - _qmfId = _appRegistry.getConfigStore().createId(); _id = UUIDGenerator.generateVhostUUID(_name); CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); - _securityManager.configureHostPlugins(_vhostConfig); + _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl")); _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); @@ -154,13 +141,12 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _queueRegistry = new DefaultQueueRegistry(this); _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(_vhostConfig); _exchangeRegistry = new DefaultExchangeRegistry(this); _bindingFactory = new BindingFactory(this); - _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass()); + _messageStore = initialiseMessageStore(hostConfig); configureMessageStore(hostConfig); @@ -187,22 +173,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _id; } - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public VirtualHostConfigType getConfigType() - { - return VirtualHostConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getBroker(); - } - public boolean isDurable() { return false; @@ -216,38 +186,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr */ private void initialiseHouseKeeping(long period) { - if (period != 0L) { scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); - - Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins(); - - if (plugins != null) - { - for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet()) - { - String pluginName = entry.getKey(); - VirtualHostPluginFactory factory = entry.getValue(); - try - { - VirtualHostPlugin plugin = factory.newInstance(this); - - // If we had configuration for the plugin the schedule it. - if (plugin != null) - { - _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2, - plugin.getDelay(), plugin.getTimeUnit()); - - _logger.info("Loaded VirtualHostPlugin:" + plugin); - } - } - catch (RuntimeException e) - { - _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e); - } - } - } } } @@ -329,19 +270,34 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr if (!(o instanceof MessageStore)) { - throw new ClassCastException("Message store factory class must implement " + MessageStore.class + + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + " does not."); } final MessageStore messageStore = (MessageStore) o; - final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName()); + return messageStore; + } + + private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + { + String storeType = hostConfig.getConfig().getString("store.type"); + MessageStore messageStore = null; + if (storeType == null) + { + messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass()); + } + else + { + messageStore = new MessageStoreCreator().createMessageStore(storeType); + } + + final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); OperationalLoggingListener.listen(messageStore, storeLogSubject); messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - return messageStore; } @@ -468,16 +424,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _name; } - public BrokerConfig getBroker() - { - return _brokerConfig; - } - - public String getFederationTag() - { - return _brokerConfig.getFederationTag(); - } - public long getCreateTime() { return _createTime; @@ -534,14 +480,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr CurrentActor.get().message(VirtualHostMessages.CLOSED()); } - public UUID getBrokerId() - { - return _appRegistry.getBrokerId(); - } - - public IApplicationRegistry getApplicationRegistry() + public VirtualHostRegistry getVirtualHostRegistry() { - return _appRegistry; + return _virtualHostRegistry; } public BindingFactory getBindingFactory() @@ -553,14 +494,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { _messagesDelivered.registerEvent(1L); _dataDelivered.registerEvent(messageSize); - _appRegistry.registerMessageDelivered(messageSize); + _brokerStatisticsGatherer.registerMessageDelivered(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { _messagesReceived.registerEvent(1L, timestamp); _dataReceived.registerEvent(messageSize, timestamp); - _appRegistry.registerMessageReceived(messageSize, timestamp); + _brokerStatisticsGatherer.registerMessageReceived(messageSize, timestamp); } public StatisticsCounter getMessageReceiptStatistics() @@ -604,51 +545,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _dataReceived = new StatisticsCounter("bytes-received-" + getName()); } - public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments) - { - BrokerLink blink = new BrokerLink(this, id, createTime, arguments); - // TODO - cope with duplicate broker link creation requests - _links.putIfAbsent(blink,blink); - getConfigStore().addConfiguredObject(blink); - return blink; - } - - public void createBrokerConnection(final String transport, - final String host, - final int port, - final String vhost, - final boolean durable, - final String authMechanism, - final String username, - final String password) - { - BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password); - - // TODO - cope with duplicate broker link creation requests - _links.putIfAbsent(blink,blink); - getConfigStore().addConfiguredObject(blink); - - } - - public void removeBrokerConnection(final String transport, - final String host, - final int port, - final String vhost) - { - removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null)); - - } - - public void removeBrokerConnection(BrokerLink blink) - { - blink = _links.get(blink); - if(blink != null) - { - blink.close(); - getConfigStore().removeConfiguredObject(blink); - } - } - public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) { LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); @@ -660,11 +556,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return linkRegistry; } - public ConfigStore getConfigStore() - { - return getApplicationRegistry().getConfigStore(); - } - public DtxRegistry getDtxRegistry() { return _dtxRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 1be472844a..483e11942b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -1,140 +1,95 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections; -import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry implements Closeable
-{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
-
-
- private String _defaultVirtualHostName;
- private ApplicationRegistry _applicationRegistry;
- private final Collection<RegistryChangeListener> _listeners = - Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>()); -
- public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
- {
- _applicationRegistry = applicationRegistry;
- }
-
- public synchronized void registerVirtualHost(VirtualHost host) throws Exception
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- synchronized (_listeners) +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import org.apache.qpid.common.Closeable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +public class VirtualHostRegistry implements Closeable +{ + private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>(); + private String _defaultVirtualHostName; + + + public VirtualHostRegistry() + { + super(); + } + + public synchronized void registerVirtualHost(VirtualHost host) + { + if(_registry.containsKey(host.getName())) { - for(RegistryChangeListener listener : _listeners) - { - listener.virtualHostRegistered(host); - } + throw new IllegalArgumentException("Virtual Host with name " + host.getName() + " already registered."); } - }
-
- public synchronized void unregisterVirtualHost(VirtualHost host)
- {
- _registry.remove(host.getName());
- synchronized (_listeners) + _registry.put(host.getName(),host); + } + + public synchronized void unregisterVirtualHost(VirtualHost host) + { + _registry.remove(host.getName()); + } + + public VirtualHost getVirtualHost(String name) + { + if(name == null || name.trim().length() == 0 || "/".equals(name.trim())) { - for(RegistryChangeListener listener : _listeners) - { - listener.virtualHostUnregistered(host); - } + name = getDefaultVirtualHostName(); } - }
-
- public VirtualHost getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- public VirtualHost getDefaultVirtualHost()
- {
- return getVirtualHost(getDefaultVirtualHostName());
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHost> getVirtualHosts()
- {
- return new ArrayList<VirtualHost>(_registry.values());
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- return _applicationRegistry;
- }
-
- public ConfigStore getConfigStore()
- {
- return _applicationRegistry.getConfigStore();
- }
-
- public void close()
- {
- for (VirtualHost virtualHost : getVirtualHosts())
- {
- virtualHost.close();
- }
-
- }
- - public static interface RegistryChangeListener + + return _registry.get(name); + } + + public VirtualHost getDefaultVirtualHost() { - void virtualHostRegistered(VirtualHost virtualHost); - void virtualHostUnregistered(VirtualHost virtualHost); + return getVirtualHost(getDefaultVirtualHostName()); + } + private String getDefaultVirtualHostName() + { + return _defaultVirtualHostName; } - public void addRegistryChangeListener(RegistryChangeListener listener) + public void setDefaultVirtualHostName(String defaultVirtualHostName) { - _listeners.add(listener); + _defaultVirtualHostName = defaultVirtualHostName; + } + + + public Collection<VirtualHost> getVirtualHosts() + { + return new ArrayList<VirtualHost>(_registry.values()); + } + + public void close() + { + for (VirtualHost virtualHost : getVirtualHosts()) + { + virtualHost.close(); + } } -}
+} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java deleted file mode 100644 index 12886f400a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.Exchange.BindingListener; -import org.apache.qpid.server.queue.AMQQueue; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -/** - * This is a listener that caches queues that are configured for slow consumer disconnection. - * - * There should be one listener per virtual host, which can be added to all exchanges on - * that host. - * - * TODO In future, it will be possible to configure the policy at runtime, so only the queue - * itself is cached, and the configuration looked up by the housekeeping thread. This means - * that there may be occasions where the copy of the cache contents retrieved by the thread - * does not contain queues that are configured, or that configured queues are not present. - * - * @see BindingListener - */ -public class ConfiguredQueueBindingListener implements BindingListener -{ - private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); - - private String _vhostName; - private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>()); - - public ConfiguredQueueBindingListener(String vhostName) - { - _vhostName = vhostName; - } - - /** - * @see BindingListener#bindingAdded(Exchange, Binding) - */ - public void bindingAdded(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - /** - * @see BindingListener#bindingRemoved(Exchange, Binding) - */ - public void bindingRemoved(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - private void processBinding(Binding binding) - { - AMQQueue queue = binding.getQueue(); - - SlowConsumerDetectionQueueConfiguration config = - queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (config != null) - { - _cache.add(queue); - } - else - { - _cache.remove(queue); - } - } - - /** - * Lookup and return the cache of configured {@link AMQQueue}s. - * - * Note that when accessing the cached queues, the {@link java.util.Iterator} is not thread safe - * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the - * cache is returned. - * - * @return a copy of the cached {@link java.util.Set} of queues - */ - public Set<AMQQueue> getQueueCache() - { - return new HashSet<AMQQueue>(_cache); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java deleted file mode 100644 index bd2e30449a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin -{ - private SlowConsumerDetectionConfiguration _config; - private ConfiguredQueueBindingListener _listener; - - public static class SlowConsumerFactory implements VirtualHostPluginFactory - { - public SlowConsumerDetection newInstance(VirtualHost vhost) - { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); - - if (config == null) - { - return null; - } - - SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); - plugin.configure(config); - return plugin; - } - } - - /** - * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * virtual host to record all the configured queues in a cache for processing by the housekeeping - * thread. - * - * @see org.apache.qpid.server.plugins.Plugin#configure(ConfigurationPlugin) - */ - public void configure(ConfigurationPlugin config) - { - _config = (SlowConsumerDetectionConfiguration) config; - _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName()); - final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); - for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames()) - { - exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener); - } - } - - public SlowConsumerDetection(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); - - Set<AMQQueue> cache = _listener.getQueueCache(); - for (AMQQueue q : cache) - { - CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); - - try - { - final SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (checkQueueStatus(q, config)) - { - final SlowConsumerPolicyPlugin policy = config.getPolicy(); - if (policy == null) - { - // We would only expect to see this during shutdown - getLogger().warn("No slow consumer policy for queue " + q.getName()); - } - else - { - policy.performPolicy(q); - } - - } - } - catch (Exception e) - { - // Don't throw exceptions as this will stop the house keeping task from running. - getLogger().error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); - } - } - - CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); - } - - public long getDelay() - { - return _config.getDelay(); - } - - public TimeUnit getTimeUnit() - { - return _config.getTimeUnit(); - } - - /** - * Check the depth,messageSize,messageAge,messageCount values for this q - * - * @param q the queue to check - * @param config the queue configuration to compare against the queue state - * - * @return true if the queue has reached a threshold. - */ - private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) - { - if (config != null) - { - if (getLogger().isInfoEnabled()) - { - getLogger().info("Retrieved Queue(" + q.getName() + ") Config:" + config); - } - - int count = q.getMessageCount(); - - // First Check message counts - if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || - // The check queue depth - (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || - // finally if we have messages on the queue check Arrival time. - // We must check count as OldestArrival time is Long.MAX_LONG when - // there are no messages. - (config.getMessageAge() != 0 && - ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) - { - - if (getLogger().isDebugEnabled()) - { - getLogger().debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); - getLogger().debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); - getLogger().debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); - getLogger().debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); - } - - return true; - } - } - return false; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java deleted file mode 100644 index 191f8041d2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.virtualhost.HouseKeepingTask; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.concurrent.TimeUnit; - -public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin -{ - private final Logger _logger = Logger.getLogger(getClass()); - - public VirtualHostHouseKeepingPlugin(VirtualHost vhost) - { - super(vhost); - } - - - /** - * Long value representing the delay between repeats - * - * @return - */ - public abstract long getDelay(); - - /** - * Option to specify what the delay value represents - * - * @return - * - * @see java.util.concurrent.TimeUnit for valid value. - */ - public abstract TimeUnit getTimeUnit(); - - - protected Logger getLogger() - { - return _logger; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java deleted file mode 100644 index 35f6228ab9..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.qpid.server.plugins.Plugin; - -import java.util.concurrent.TimeUnit; - -public interface VirtualHostPlugin extends Runnable, Plugin -{ - /** - * Long value representing the delay between repeats - * - * @return - */ - public long getDelay(); - - /** - * Option to specify what the delay value represents - * @see java.util.concurrent.TimeUnit for valid value. - * @return - */ - public TimeUnit getTimeUnit(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java deleted file mode 100644 index c8bea18444..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.qpid.server.virtualhost.VirtualHost; - -public interface VirtualHostPluginFactory -{ - public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties deleted file mode 100644 index 03c56910c2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# Default File used for all non-defined locales. - -RUNNING = SCD-1001 : Running -COMPLETE = SCD-1002 : Complete -CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties deleted file mode 100644 index ed4fb1d45a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# Default File used for all non-defined locales. - -DELETING_QUEUE = TDP-1001 : Deleting Queue -DISCONNECTING = TDP-1002 : Disconnecting Session
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java deleted file mode 100644 index f2f61f204e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.plugins.logging.TopicDeletePolicyMessages; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -public class TopicDeletePolicy implements SlowConsumerPolicyPlugin -{ - private Logger _logger = Logger.getLogger(TopicDeletePolicy.class); - private TopicDeletePolicyConfiguration _configuration; - - public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory - { - public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException - { - TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(config); - return policy; - } - - public String getPluginName() - { - return "topicdelete"; - } - - public Class<TopicDeletePolicy> getPluginClass() - { - return TopicDeletePolicy.class; - } - } - - public void performPolicy(AMQQueue q) - { - if (q == null) - { - return; - } - - AMQSessionModel owner = q.getExclusiveOwningSession(); - - // Only process exclusive queues - if (owner == null) - { - return; - } - - //Only process Topics - if (!validateQueueIsATopic(q)) - { - return; - } - - try - { - CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); - // Close the consumer . this will cause autoDelete Queues to be purged - owner.getConnectionModel(). - closeSession(owner, AMQConstant.RESOURCE_ERROR, - "Consuming to slow."); - - // Actively delete non autoDelete queues if deletePersistent is set - if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) - { - CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); - q.delete(); - } - - } - catch (AMQException e) - { - _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); - } - - } - - /** - * Check the queue bindings to validate the queue is bound to the - * topic exchange. - * - * @param q the Queue - * - * @return true iff Q is bound to a TopicExchange - */ - private boolean validateQueueIsATopic(AMQQueue q) - { - for (Binding binding : q.getBindings()) - { - if (binding.getExchange() instanceof TopicExchange) - { - return true; - } - } - - return false; - } - - public void configure(ConfigurationPlugin config) - { - _configuration = (TopicDeletePolicyConfiguration) config; - } - - @Override - public String toString() - { - return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java deleted file mode 100644 index 48158b7dff..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins.policies; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; - -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.Arrays; -import java.util.List; - -public class TopicDeletePolicyConfiguration extends ConfigurationPlugin -{ - - public static class TopicDeletePolicyConfigurationFactory - implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, - Configuration config) - throws ConfigurationException - { - TopicDeletePolicyConfiguration slowConsumerConfig = - new TopicDeletePolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"delete-persistent"}; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - // No validation required. - } - - public boolean deletePersistent() - { - // If we don't have configuration then we don't deletePersistent Queues - return (hasConfiguration() && contains("delete-persistent")); - } - - @Override - public String formatToString() - { - return (deletePersistent()?"delete-durable":""); - } - - -} |