diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost')
15 files changed, 0 insertions, 2221 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java deleted file mode 100644 index 2db1944cd1..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; - -public abstract class HouseKeepingTask implements Runnable -{ - Logger _logger = Logger.getLogger(this.getClass()); - - private VirtualHost _virtualHost; - - private String _name; - - private RootMessageLogger _rootLogger; - public HouseKeepingTask(VirtualHost vhost) - { - _virtualHost = vhost; - _name = _virtualHost.getName() + ":" + this.getClass().getSimpleName(); - _rootLogger = CurrentActor.get().getRootMessageLogger(); - } - - final public void run() - { - // Don't need to undo this as this is a thread pool thread so will - // always go through here before we do any real work. - Thread.currentThread().setName(_name); - CurrentActor.set(new AbstractActor(_rootLogger) - { - @Override - public String getLogMessage() - { - return _name; - } - }); - - try - { - execute(); - } - catch (Throwable e) - { - _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e); - } - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - /** Execute the plugin. */ - public abstract void execute(); - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java deleted file mode 100644 index 767474d5ae..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java +++ /dev/null @@ -1,44 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.io.IOException;
-
-import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
-
-/**
- * The management interface exposed to allow management of a virtualHost
- */
-public interface ManagedVirtualHost
-{
- static final String TYPE = "VirtualHost";
- static final int VERSION = 1;
-
- /**
- * Returns the name of the managed virtualHost.
- * @return the name of the exchange.
- * @throws java.io.IOException
- */
- @MBeanAttribute(name="Name", description= TYPE + " Name")
- String getName() throws IOException;
-
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java deleted file mode 100755 index 04f19b79bb..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ /dev/null @@ -1,100 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.virtualhost; - -import java.util.UUID; - -import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.VirtualHostConfig; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; - -public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer -{ - IConnectionRegistry getConnectionRegistry(); - - VirtualHostConfiguration getConfiguration(); - - String getName(); - - QueueRegistry getQueueRegistry(); - - ExchangeRegistry getExchangeRegistry(); - - ExchangeFactory getExchangeFactory(); - - MessageStore getMessageStore(); - - TransactionLog getTransactionLog(); - - DurableConfigurationStore getDurableConfigurationStore(); - - AuthenticationManager getAuthenticationManager(); - - SecurityManager getSecurityManager(); - - void close(); - - ManagedObject getManagedObject(); - - UUID getBrokerId(); - - void scheduleHouseKeepingTask(long period, HouseKeepingTask task); - - long getHouseKeepingTaskCount(); - - public long getHouseKeepingCompletedTaskCount(); - - int getHouseKeepingPoolSize(); - - void setHouseKeepingPoolSize(int newSize); - - int getHouseKeepingActiveCount(); - - IApplicationRegistry getApplicationRegistry(); - - BindingFactory getBindingFactory(); - - void createBrokerConnection(String transport, - String host, - int port, - String vhost, - boolean durable, - String authMechanism, String username, String password); - - ConfigStore getConfigStore(); - - void removeBrokerConnection(BrokerLink brokerLink); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java deleted file mode 100755 index 96a9ac729e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ /dev/null @@ -1,360 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.virtualhost; - -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; - -import org.apache.log4j.Logger; - -import java.nio.ByteBuffer; - -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.TreeMap; - -public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, - ConfigurationRecoveryHandler.QueueRecoveryHandler, - ConfigurationRecoveryHandler.ExchangeRecoveryHandler, - ConfigurationRecoveryHandler.BindingRecoveryHandler, - MessageStoreRecoveryHandler, - MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, - TransactionLogRecoveryHandler, - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler -{ - private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); - - - private final VirtualHost _virtualHost; - - private MessageStoreLogSubject _logSubject; - private List<ProcessAction> _actions; - - private MessageStore _store; - private TransactionLog _transactionLog; - - private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); - private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); - - - - public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - public QueueRecoveryHandler begin(MessageStore store) - { - _logSubject = new MessageStoreLogSubject(_virtualHost,store); - _store = store; - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); - - return this; - } - - public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) - { - try - { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); - - if (q == null) - { - q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost, - arguments); - _virtualHost.getQueueRegistry().registerQueue(q); - } - - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); - - //Record that we have a queue for recovery - _queueRecoveries.put(queueName, 0); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - - public ExchangeRecoveryHandler completeQueueRecovery() - { - return this; - } - - public void exchange(String exchangeName, String type, boolean autoDelete) - { - try - { - Exchange exchange; - AMQShortString exchangeNameSS = new AMQShortString(exchangeName); - exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); - if (exchange == null) - { - exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); - _virtualHost.getExchangeRegistry().registerExchange(exchange); - } - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - - public BindingRecoveryHandler completeExchangeRecovery() - { - return this; - } - - public StoredMessageRecoveryHandler begin() - { - // TODO - log begin - return this; - } - - public void message(StoredMessage message) - { - ServerMessage serverMessage; - switch(message.getMetaData().getType()) - { - case META_DATA_0_8: - serverMessage = new AMQMessage(message); - break; - case META_DATA_0_10: - serverMessage = new MessageTransferMessage(message, null); - break; - default: - throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); - } - - //_logger.debug("Recovered message with id " + serverMessage); - - - _recoveredMessages.put(message.getMessageNumber(), serverMessage); - _unusedMessages.put(message.getMessageNumber(), message); - } - - public void completeMessageRecovery() - { - //TODO - log end - //To change body of implemented methods use File | Settings | File Templates. - } - - public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log) - { - _transactionLog = log; - return this; - } - - private static final class ProcessAction - { - private final AMQQueue _queue; - private final AMQMessage _message; - - public ProcessAction(AMQQueue queue, AMQMessage message) - { - _queue = queue; - _message = message; - } - - public void process() - { - try - { - _queue.enqueue(_message); - } - catch(AMQException e) - { - throw new RuntimeException(e); - } - } - - } - - public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) - { - _actions = new ArrayList<ProcessAction>(); - try - { - Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); - if (exchange == null) - { - _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName); - return; - } - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (queue == null) - { - _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName); - } - else - { - FieldTable argumentsFT = null; - if(buf != null) - { - argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit()); - } - - BindingFactory bf = _virtualHost.getBindingFactory(); - - Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT); - - if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) - { - - _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName - + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - - bf.restoreBinding(bindingKey, queue, exchange, argumentMap); - } - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void completeBindingRecovery() - { - //return this; - } - - public void complete() - { - - - } - - public void queueEntry(final String queueName, long messageId) - { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); - - try - { - if(queue != null) - { - ServerMessage message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - - - if (_logger.isDebugEnabled()) - { - _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString()); - } - - Integer count = _queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queue.enqueue(message); - - _queueRecoveries.put(queueName, ++count); - } - else - { - _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); - txn.dequeueMessage(queue, messageId); - txn.commitTranAsync(); - } - } - else - { - _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); - TransactionLog.Transaction txn = _transactionLog.newTransaction(); - TransactionLogResource mockQueue = - new TransactionLogResource() - { - - public String getResourceName() - { - return queueName; - } - }; - txn.dequeueMessage(mockQueue, messageId); - txn.commitTranAsync(); - } - - } - catch(AMQException e) - { - throw new RuntimeException(e); - } - - - - } - - public void completeQueueEntryRecovery() - { - - for(StoredMessage m : _unusedMessages.values()) - { - _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); - m.remove(); - } - - for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) - { - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); - - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); - } - - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java deleted file mode 100644 index 33c713c62a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ /dev/null @@ -1,876 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.BrokerConfig; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ExchangeConfiguration; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfigType; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.connection.ConnectionRegistry; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; - -public class VirtualHostImpl implements VirtualHost -{ - private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); - - private final String _name; - - private ConnectionRegistry _connectionRegistry; - - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - - private MessageStore _messageStore; - - protected VirtualHostMBean _virtualHostMBean; - - private AMQBrokerManagerMBean _brokerMBean; - - private final AuthenticationManager _authenticationManager; - - private SecurityManager _securityManager; - - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - private final IApplicationRegistry _appRegistry; - private VirtualHostConfiguration _configuration; - private DurableConfigurationStore _durableConfigurationStore; - private BindingFactory _bindingFactory; - private BrokerConfig _broker; - private UUID _id; - - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - - private final long _createTime = System.currentTimeMillis(); - private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); - private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - - public IConnectionRegistry getConnectionRegistry() - { - return _connectionRegistry; - } - - public VirtualHostConfiguration getConfiguration() - { - return _configuration; - } - - public UUID getId() - { - return _id; - } - - public VirtualHostConfigType getConfigType() - { - return VirtualHostConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getBroker(); - } - - public boolean isDurable() - { - return false; - } - - /** - * Virtual host JMX MBean class. - * - * This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. - */ - public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost - { - public VirtualHostMBean() throws NotCompliantMBeanException - { - super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE); - } - - public String getObjectInstanceName() - { - return ObjectName.quote(_name); - } - - public String getName() - { - return _name; - } - - public VirtualHostImpl getVirtualHost() - { - return VirtualHostImpl.this; - } - } - - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception - { - this(appRegistry, hostConfig, null); - } - - - public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - this(ApplicationRegistry.getInstance(),hostConfig,store); - } - - private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - if (hostConfig == null) - { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); - } - - _appRegistry = appRegistry; - _broker = _appRegistry.getBroker(); - _configuration = hostConfig; - _name = _configuration.getName(); - - _id = _appRegistry.getConfigStore().createId(); - - CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - - if (_name == null || _name.length() == 0) - { - throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); - } - - _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); - _securityManager.configureHostPlugins(_configuration); - - _virtualHostMBean = new VirtualHostMBean(); - - _connectionRegistry = new ConnectionRegistry(); - - _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); - - _queueRegistry = new DefaultQueueRegistry(this); - - _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(_configuration); - - _exchangeRegistry = new DefaultExchangeRegistry(this); - - StartupRoutingTable configFileRT = new StartupRoutingTable(); - - _durableConfigurationStore = configFileRT; - - // This needs to be after the RT has been defined as it creates the default durable exchanges. - _exchangeRegistry.initialise(); - - _bindingFactory = new BindingFactory(this); - - initialiseModel(_configuration); - - if (store != null) - { - _messageStore = store; - _durableConfigurationStore = store; - } - else - { - initialiseMessageStore(hostConfig); - } - - _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); - - _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); - - initialiseStatistics(); - } - - private void initialiseHouseKeeping(long period) - { - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ - if (period != 0L) - { - class ExpiredMessagesTask extends HouseKeepingTask - { - public ExpiredMessagesTask(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - try - { - q.checkMessageStatus(); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } - } - } - - scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); - - Map<String, VirtualHostPluginFactory> plugins = - ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); - - if (plugins != null) - { - for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet()) - { - String pluginName = entry.getKey(); - VirtualHostPluginFactory factory = entry.getValue(); - try - { - VirtualHostPlugin plugin = factory.newInstance(this); - - // If we had configuration for the plugin the schedule it. - if (plugin != null) - { - _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2, - plugin.getDelay(), plugin.getTimeUnit()); - - _logger.info("Loaded VirtualHostPlugin:" + plugin); - } - } - catch (RuntimeException e) - { - _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e); - } - } - } - } - } - - /** - * Allow other broker components to register a HouseKeepingTask - * - * @param period How often this task should run, in ms. - * @param task The task to run. - */ - public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) - { - _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period, - TimeUnit.MILLISECONDS); - } - - public long getHouseKeepingTaskCount() - { - return _houseKeepingTasks.getTaskCount(); - } - - public long getHouseKeepingCompletedTaskCount() - { - return _houseKeepingTasks.getCompletedTaskCount(); - } - - public int getHouseKeepingPoolSize() - { - return _houseKeepingTasks.getCorePoolSize(); - } - - public void setHouseKeepingPoolSize(int newSize) - { - _houseKeepingTasks.setCorePoolSize(newSize); - } - - - public int getHouseKeepingActiveCount() - { - return _houseKeepingTasks.getActiveCount(); - } - - - private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - String messageStoreClass = hostConfig.getMessageStoreClass(); - - Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) - { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); - } - MessageStore messageStore = (MessageStore) o; - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - - MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore); - - messageStore.configureConfigStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - - messageStore.configureMessageStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - messageStore.configureTransactionLog(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - - _messageStore = messageStore; - _durableConfigurationStore = messageStore; - } - - private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException - { - _logger.debug("Loading configuration for virtualhost: " + config.getName()); - - List exchangeNames = config.getExchanges(); - - for (Object exchangeNameObj : exchangeNames) - { - String exchangeName = String.valueOf(exchangeNameObj); - configureExchange(config.getExchangeConfiguration(exchangeName)); - } - - String[] queueNames = config.getQueueNames(); - - for (Object queueNameObj : queueNames) - { - String queueName = String.valueOf(queueNameObj); - configureQueue(config.getQueueConfiguration(queueName)); - } - } - - private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException - { - AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); - - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); - boolean durable = exchangeConfiguration.getDurable(); - boolean autodelete = exchangeConfiguration.getAutoDelete(); - - Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); - _exchangeRegistry.registerExchange(newExchange); - - if (newExchange.isDurable()) - { - _durableConfigurationStore.createExchange(newExchange); - } - } - } - - private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException - { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); - - if (queue.isDurable()) - { - getDurableConfigurationStore().createQueue(queue); - } - - String exchangeName = queueConfiguration.getExchange(); - - Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if (exchange == null) - { - exchange = _exchangeRegistry.getDefaultExchange(); - } - - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); - } - - List routingKeys = queueConfiguration.getRoutingKeys(); - if (routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getNameShortString()); - } - - for (Object routingKeyNameObj : routingKeys) - { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); - } - _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null); - } - - if (exchange != _exchangeRegistry.getDefaultExchange()) - { - _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null); - } - } - - public String getName() - { - return _name; - } - - public BrokerConfig getBroker() - { - return _broker; - } - - public String getFederationTag() - { - return _broker.getFederationTag(); - } - - public void setBroker(final BrokerConfig broker) - { - _broker = broker; - } - - public long getCreateTime() - { - return _createTime; - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - public TransactionLog getTransactionLog() - { - return _messageStore; - } - - public DurableConfigurationStore getDurableConfigurationStore() - { - return _durableConfigurationStore; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - - public SecurityManager getSecurityManager() - { - return _securityManager; - } - - public void close() - { - //Stop Connections - _connectionRegistry.close(); - - //Stop the Queues processing - if (_queueRegistry != null) - { - for (AMQQueue queue : _queueRegistry.getQueues()) - { - queue.stop(); - } - } - - //Stop Housekeeping - if (_houseKeepingTasks != null) - { - _houseKeepingTasks.shutdown(); - - try - { - if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) - { - _houseKeepingTasks.shutdownNow(); - } - } - catch (InterruptedException e) - { - _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage()); - // Swallowing InterruptedException ok as we are shutting down. - } - } - - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - _messageStore.close(); - } - catch (Exception e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - CurrentActor.get().message(VirtualHostMessages.CLOSED()); - } - - public ManagedObject getBrokerMBean() - { - return _brokerMBean; - } - - public ManagedObject getManagedObject() - { - return _virtualHostMBean; - } - - public UUID getBrokerId() - { - return _appRegistry.getBrokerId(); - } - - public IApplicationRegistry getApplicationRegistry() - { - return _appRegistry; - } - - public BindingFactory getBindingFactory() - { - return _bindingFactory; - } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _appRegistry.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _appRegistry.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (AMQConnectionModel connection : _connectionRegistry.getConnections()) - { - connection.resetStatistics(); - } - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); - _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); - _messagesReceived = new StatisticsCounter("messages-received-" + getName()); - _dataReceived = new StatisticsCounter("bytes-received-" + getName()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } - - public void createBrokerConnection(final String transport, - final String host, - final int port, - final String vhost, - final boolean durable, - final String authMechanism, - final String username, - final String password) - { - BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password); - if(_links.putIfAbsent(blink,blink) != null) - { - getConfigStore().addConfiguredObject(blink); - } - } - - public void removeBrokerConnection(final String transport, - final String host, - final int port, - final String vhost) - { - removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null)); - - } - - public void removeBrokerConnection(BrokerLink blink) - { - blink = _links.get(blink); - if(blink != null) - { - blink.close(); - getConfigStore().removeConfiguredObject(blink); - } - } - - public ConfigStore getConfigStore() - { - return getApplicationRegistry().getConfigStore(); - } - - /** - * Temporary Startup RT class to record the creation of persistent queues / exchanges. - * - * - * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. - * This should be removed after the _RT has been fully split from the the TL - */ - private static class StartupRoutingTable implements DurableConfigurationStore - { - public List<Exchange> exchange = new LinkedList<Exchange>(); - public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); - public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception - { - } - - public void close() throws Exception - { - } - - public void removeMessage(Long messageId) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (exchange.isDurable()) - { - this.exchange.add(exchange); - } - } - - public void removeExchange(Exchange exchange) throws AMQStoreException - { - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - if (exchange.isDurable() && queue.isDurable()) - { - bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args)); - } - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - } - - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - if (queue.isDurable()) - { - this.queue.add(new CreateQueueTuple(queue, arguments)); - } - } - - public void removeQueue(AMQQueue queue) throws AMQStoreException - { - } - - - private static class CreateQueueTuple - { - public AMQQueue queue; - public FieldTable arguments; - - public CreateQueueTuple(AMQQueue queue, FieldTable arguments) - { - this.queue = queue; - this.arguments = arguments; - } - } - - private static class CreateBindingTuple - { - public AMQQueue queue; - public FieldTable arguments; - public Exchange exchange; - public AMQShortString routingKey; - - public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - { - this.exchange = exchange; - this.routingKey = routingKey; - this.queue = queue; - arguments = args; - } - } - - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - } - } - - @Override - public String toString() - { - return _name; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java deleted file mode 100644 index 32d0c4c4d1..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ /dev/null @@ -1,109 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.configuration.ConfigStore;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry implements Closeable
-{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
-
-
- private String _defaultVirtualHostName;
- private ApplicationRegistry _applicationRegistry;
-
- public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
- {
- _applicationRegistry = applicationRegistry;
- }
-
- public synchronized void registerVirtualHost(VirtualHost host) throws Exception
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- }
-
- public synchronized void unregisterVirtualHost(VirtualHost host)
- {
- _registry.remove(host.getName());
- }
-
- public VirtualHost getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- public VirtualHost getDefaultVirtualHost()
- {
- return getVirtualHost(getDefaultVirtualHostName());
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHost> getVirtualHosts()
- {
- return new ArrayList<VirtualHost>(_registry.values());
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- return _applicationRegistry;
- }
-
- public ConfigStore getConfigStore()
- {
- return _applicationRegistry.getConfigStore();
- }
-
- public void close()
- {
- for (VirtualHost virtualHost : getVirtualHosts())
- {
- virtualHost.close();
- }
-
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java deleted file mode 100644 index 12206013eb..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.Exchange.BindingListener; -import org.apache.qpid.server.queue.AMQQueue; - -/** - * This is a listener that caches queues that are configured for slow consumer disconnection. - * - * There should be one listener per virtual host, which can be added to all exchanges on - * that host. - * - * TODO In future, it will be possible to configure the policy at runtime, so only the queue - * itself is cached, and the configuration looked up by the housekeeping thread. This means - * that there may be occasions where the copy of the cache contents retrieved by the thread - * does not contain queues that are configured, or that configured queues are not present. - * - * @see BindingListener - */ -public class ConfiguredQueueBindingListener implements BindingListener -{ - private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); - - private String _vhostName; - private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>()); - - public ConfiguredQueueBindingListener(String vhostName) - { - _vhostName = vhostName; - } - - /** - * @see BindingListener#bindingAdded(Exchange, Binding) - */ - public void bindingAdded(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - /** - * @see BindingListener#bindingRemoved(Exchange, Binding) - */ - public void bindingRemoved(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - private void processBinding(Binding binding) - { - AMQQueue queue = binding.getQueue(); - - SlowConsumerDetectionQueueConfiguration config = - queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (config != null) - { - _cache.add(queue); - } - else - { - _cache.remove(queue); - } - } - - /** - * Lookup and return the cache of configured {@link AMQQueue}s. - * - * Note that when accessing the cached queues, the {@link Iterator} is not thread safe - * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the - * cache is returned. - * - * @return a copy of the cached {@link java.util.Set} of queues - */ - public Set<AMQQueue> getQueueCache() - { - return new HashSet<AMQQueue>(_cache); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java deleted file mode 100644 index 5c4fe0aab8..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; - -public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin -{ - private SlowConsumerDetectionConfiguration _config; - private ConfiguredQueueBindingListener _listener; - - public static class SlowConsumerFactory implements VirtualHostPluginFactory - { - public SlowConsumerDetection newInstance(VirtualHost vhost) - { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); - - if (config == null) - { - return null; - } - - SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); - plugin.configure(config); - return plugin; - } - } - - /** - * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping - * thread. - * - * @see Plugin#configure(ConfigurationPlugin) - */ - public void configure(ConfigurationPlugin config) - { - _config = (SlowConsumerDetectionConfiguration) config; - _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName()); - for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames()) - { - getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); - } - } - - public SlowConsumerDetection(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); - - Set<AMQQueue> cache = _listener.getQueueCache(); - for (AMQQueue q : cache) - { - CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); - - try - { - SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (checkQueueStatus(q, config)) - { - config.getPolicy().performPolicy(q); - } - } - catch (Exception e) - { - // Don't throw exceptions as this will stop the house keeping task from running. - _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); - } - } - - CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); - } - - public long getDelay() - { - return _config.getDelay(); - } - - public TimeUnit getTimeUnit() - { - return _config.getTimeUnit(); - } - - /** - * Check the depth,messageSize,messageAge,messageCount values for this q - * - * @param q the queue to check - * @param config the queue configuration to compare against the queue state - * - * @return true if the queue has reached a threshold. - */ - private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) - { - if (config != null) - { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - - int count = q.getMessageCount(); - - // First Check message counts - if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || - // The check queue depth - (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || - // finally if we have messages on the queue check Arrival time. - // We must check count as OldestArrival time is Long.MAX_LONG when - // there are no messages. - (config.getMessageAge() != 0 && - ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); - _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); - _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); - _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); - } - - return true; - } - } - return false; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java deleted file mode 100644 index 3798f47f0b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.virtualhost.HouseKeepingTask; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.concurrent.TimeUnit; - -public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin -{ - protected final Logger _logger = Logger.getLogger(getClass()); - - public VirtualHostHouseKeepingPlugin(VirtualHost vhost) - { - super(vhost); - } - - - /** - * Long value representing the delay between repeats - * - * @return - */ - public abstract long getDelay(); - - /** - * Option to specify what the delay value represents - * - * @return - * - * @see java.util.concurrent.TimeUnit for valid value. - */ - public abstract TimeUnit getTimeUnit(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java deleted file mode 100644 index 1886c2d01d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public interface VirtualHostPlugin extends Runnable, Plugin -{ - /** - * Long value representing the delay between repeats - * - * @return - */ - public long getDelay(); - - /** - * Option to specify what the delay value represents - * @see java.util.concurrent.TimeUnit for valid value. - * @return - */ - public TimeUnit getTimeUnit(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java deleted file mode 100644 index c8bea18444..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins; - -import org.apache.qpid.server.virtualhost.VirtualHost; - -public interface VirtualHostPluginFactory -{ - public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties deleted file mode 100644 index 03c56910c2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# Default File used for all non-defined locales. - -RUNNING = SCD-1001 : Running -COMPLETE = SCD-1002 : Complete -CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties deleted file mode 100644 index ed4fb1d45a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# Default File used for all non-defined locales. - -DELETING_QUEUE = TDP-1001 : Deleting Queue -DISCONNECTING = TDP-1002 : Disconnecting Session
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java deleted file mode 100644 index 6028f63fdb..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.plugins.logging.TopicDeletePolicyMessages; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -public class TopicDeletePolicy implements SlowConsumerPolicyPlugin -{ - Logger _logger = Logger.getLogger(TopicDeletePolicy.class); - private TopicDeletePolicyConfiguration _configuration; - - public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory - { - public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException - { - TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(config); - return policy; - } - - public String getPluginName() - { - return "topicdelete"; - } - - public Class<TopicDeletePolicy> getPluginClass() - { - return TopicDeletePolicy.class; - } - } - - public void performPolicy(AMQQueue q) - { - if (q == null) - { - return; - } - - AMQSessionModel owner = q.getExclusiveOwningSession(); - - // Only process exclusive queues - if (owner == null) - { - return; - } - - //Only process Topics - if (!validateQueueIsATopic(q)) - { - return; - } - - try - { - CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); - // Close the consumer . this will cause autoDelete Queues to be purged - owner.getConnectionModel(). - closeSession(owner, AMQConstant.RESOURCE_ERROR, - "Consuming to slow."); - - // Actively delete non autoDelete queues if deletePersistent is set - if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) - { - CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); - q.delete(); - } - - } - catch (AMQException e) - { - _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); - } - - } - - /** - * Check the queue bindings to validate the queue is bound to the - * topic exchange. - * - * @param q the Queue - * - * @return true iff Q is bound to a TopicExchange - */ - private boolean validateQueueIsATopic(AMQQueue q) - { - for (Binding binding : q.getBindings()) - { - if (binding.getExchange() instanceof TopicExchange) - { - return true; - } - } - - return false; - } - - public void configure(ConfigurationPlugin config) - { - _configuration = (TopicDeletePolicyConfiguration) config; - } - - @Override - public String toString() - { - return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java deleted file mode 100644 index 7dfd22c733..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugins.policies; - -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -public class TopicDeletePolicyConfiguration extends ConfigurationPlugin -{ - - public static class TopicDeletePolicyConfigurationFactory - implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, - Configuration config) - throws ConfigurationException - { - TopicDeletePolicyConfiguration slowConsumerConfig = - new TopicDeletePolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"delete-persistent"}; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - // No validation required. - } - - public boolean deletePersistent() - { - // If we don't have configuration then we don't deletePersistent Queues - return (hasConfiguration() && contains("delete-persistent")); - } - - @Override - public String formatToString() - { - return (deletePersistent()?"delete-durable":""); - } - - -} |