diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java')
-rwxr-xr-x[-rw-r--r--] | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java | 594 |
1 files changed, 40 insertions, 554 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index eccdce1ebc..e4a382d275 100644..100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,576 +1,62 @@ /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ package org.apache.qpid.server.virtualhost; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; -import org.apache.qpid.server.configuration.ExchangeConfiguration; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.access.ACLManager; -import org.apache.qpid.server.security.access.Accessable; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.management.ManagedObject; -import javax.management.NotCompliantMBeanException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -public class VirtualHost implements Accessable +public interface VirtualHost { - private static final Logger _logger = Logger.getLogger(VirtualHost.class); - - private final String _name; - - private ConnectionRegistry _connectionRegistry; - - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - - private MessageStore _messageStore; - - protected VirtualHostMBean _virtualHostMBean; - - private AMQBrokerManagerMBean _brokerMBean; - - private AuthenticationManager _authenticationManager; - - private ACLManager _accessManager; - - private final Timer _houseKeepingTimer; - private VirtualHostConfiguration _configuration; - private DurableConfigurationStore _durableConfigurationStore; - - public void setAccessableName(String name) - { - _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" - + name + ") ignored remains :" + getAccessableName()); - } - - public String getAccessableName() - { - return _name; - } - - public IConnectionRegistry getConnectionRegistry() - { - return _connectionRegistry; - } - - public VirtualHostConfiguration getConfiguration() - { - return _configuration; - } - - /** - * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. - */ - public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost - { - public VirtualHostMBean() throws NotCompliantMBeanException - { - super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE, ManagedVirtualHost.VERSION); - } - - public String getObjectInstanceName() - { - return _name.toString(); - } - - public String getName() - { - return _name.toString(); - } - - public VirtualHost getVirtualHost() - { - return VirtualHost.this; - } - - } // End of MBean class - - /** - * Normal Constructor - * - * @param hostConfig - * - * @throws Exception - */ - public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception - { - this(hostConfig, null); - } - - public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - _configuration = hostConfig; - _name = hostConfig.getName(); - - CurrentActor.get().message(VirtualHostMessages.VHT_1001(_name)); - - if (_name == null || _name.length() == 0) - { - throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); - } - - _virtualHostMBean = new VirtualHostMBean(); - - _connectionRegistry = new ConnectionRegistry(this); - - _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true); - - _queueRegistry = new DefaultQueueRegistry(this); - - _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(hostConfig); - - _exchangeRegistry = new DefaultExchangeRegistry(this); - - //Create a temporary RT to store the durable entries from the config file - // so we can replay them in to the real _RT after it has been loaded. - /// This should be removed after the _RT has been fully split from the the TL - - StartupRoutingTable configFileRT = new StartupRoutingTable(); - - _durableConfigurationStore = configFileRT; - - // This needs to be after the RT has been defined as it creates the default durable exchanges. - _exchangeRegistry.initialise(); - - // We don't need to store the Default queues in the store as we always - // create them first on start up so don't clear them from the startup - // configuration here. This also ensures that we don't attempt to - // perform a createExchange twice with the same details in the - // MessageStore(RoutingTable) as some instances may not like that. - // Derby being one. - // todo this can be removed with the resolution fo QPID-2096 - configFileRT.exchange.clear(); - - initialiseModel(hostConfig); - - //todo REMOVE Work Around for QPID-2096 - // This means that all durable exchanges declared in the configuration - // will not be stored in the MessageStore. - // They will still be created/registered/available on startup for as - // long as they are contained in the configuration. However, when they - // are removed from the configuration they will no longer exist. - // This differs from durable queues as they will be writen to to the - // store. After QPID-2096 has been resolved exchanges will mirror that - // functionality. - configFileRT.exchange.clear(); - - if (store != null) - { - _messageStore = store; - _durableConfigurationStore = store; - } - else - { - if (hostConfig == null) - { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); - } - initialiseMessageStore(hostConfig); - } - - //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config - // file and write them in to the new routing Table. - for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue) - { - getDurableConfigurationStore().createQueue(cqt.queue, cqt.arguments); - } - - for (Exchange exchange : configFileRT.exchange) - { - getDurableConfigurationStore().createExchange(exchange); - } - - for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings) - { - getDurableConfigurationStore().bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments); - } - - _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig); - - _accessManager = ApplicationRegistry.getInstance().getAccessManager(); - _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration()); - - _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); - } - - private void initialiseHouseKeeping(long period) - { - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ - if (period != 0L) - { - class RemoveExpiredMessagesTask extends TimerTask - { - public void run() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - - try - { - q.checkMessageStatus(); - } - catch (AMQException e) - { - _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e); - throw new RuntimeException(e); - } - } - } - } - - _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), - period / 2, - period); - - class ForceChannelClosuresTask extends TimerTask - { - public void run() - { - _connectionRegistry.expireClosedChannels(); - } - } - } - } - - private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - String messageStoreClass = hostConfig.getMessageStoreClass(); - - Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) - { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); - } - MessageStore messageStore = (MessageStore) o; - messageStore.configure(this, "store", hostConfig); - _messageStore = messageStore; - _durableConfigurationStore = messageStore; - } - - private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException - { - _logger.debug("Loading configuration for virtualhost: " + config.getName()); - - List exchangeNames = config.getExchanges(); - - for (Object exchangeNameObj : exchangeNames) - { - String exchangeName = String.valueOf(exchangeNameObj); - configureExchange(config.getExchangeConfiguration(exchangeName)); - } - - String[] queueNames = config.getQueueNames(); - - for (Object queueNameObj : queueNames) - { - String queueName = String.valueOf(queueNameObj); - configureQueue(config.getQueueConfiguration(queueName)); - } - } - - private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException - { - AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); - - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); - boolean durable = exchangeConfiguration.getDurable(); - boolean autodelete = exchangeConfiguration.getAutoDelete(); - - Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); - _exchangeRegistry.registerExchange(newExchange); - } - } - - private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException - { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); - - if (queue.isDurable()) - { - getDurableConfigurationStore().createQueue(queue); - } - - String exchangeName = queueConfiguration.getExchange(); - - Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if (exchange == null) - { - exchange = _exchangeRegistry.getDefaultExchange(); - } - - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); - } - - List routingKeys = queueConfiguration.getRoutingKeys(); - if (routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getName()); - } - - for (Object routingKeyNameObj : routingKeys) - { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); - } - queue.bind(exchange, routingKey, null); - } - - if (exchange != _exchangeRegistry.getDefaultExchange()) - { - queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null); - } - } - - public String getName() - { - return _name; - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - public TransactionLog getTransactionLog() - { - return _messageStore; - } - - public DurableConfigurationStore getDurableConfigurationStore() - { - return _durableConfigurationStore; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - - public ACLManager getAccessManager() - { - return _accessManager; - } - - public void close() throws Exception - { - - //Stop Connections - _connectionRegistry.close(); - - //Stop the Queues processing - if (_queueRegistry != null) - { - for (AMQQueue queue : _queueRegistry.getQueues()) - { - queue.stop(); - } - } - - //Stop Housekeeping - if (_houseKeepingTimer != null) - { - _houseKeepingTimer.cancel(); - } - - //Close MessageStore - if (_messageStore != null) - { - _messageStore.close(); - } - - CurrentActor.get().message(VirtualHostMessages.VHT_1002()); - } - - public ManagedObject getBrokerMBean() - { - return _brokerMBean; - } - - public ManagedObject getManagedObject() - { - return _virtualHostMBean; - } - - /** - * Temporary Startup RT class to record the creation of persistent queues / exchanges. - * - * - * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. - * This should be removed after the _RT has been fully split from the the TL - */ - private class StartupRoutingTable implements DurableConfigurationStore - { - public List<Exchange> exchange = new LinkedList<Exchange>(); - public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); - public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception - { - } - - public void close() throws Exception - { - } - - public void removeMessage(Long messageId) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } + IConnectionRegistry getConnectionRegistry(); - public void createExchange(Exchange exchange) throws AMQException - { - if (exchange.isDurable()) - { - this.exchange.add(exchange); - } - } + VirtualHostConfiguration getConfiguration(); - public void removeExchange(Exchange exchange) throws AMQException - { - } + String getName(); - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - if (exchange.isDurable() && queue.isDurable()) - { - bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args)); - } - } + QueueRegistry getQueueRegistry(); - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - } + ExchangeRegistry getExchangeRegistry(); - public void createQueue(AMQQueue queue) throws AMQException - { - createQueue(queue, null); - } + ExchangeFactory getExchangeFactory(); - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException - { - if (queue.isDurable()) - { - this.queue.add(new CreateQueueTuple(queue, arguments)); - } - } + MessageStore getMessageStore(); - public void removeQueue(AMQQueue queue) throws AMQException - { - } + TransactionLog getTransactionLog(); + DurableConfigurationStore getDurableConfigurationStore(); - private class CreateQueueTuple - { - public AMQQueue queue; - public FieldTable arguments; + AuthenticationManager getAuthenticationManager(); - public CreateQueueTuple(AMQQueue queue, FieldTable arguments) - { - this.queue = queue; - this.arguments = arguments; - } - } + ACLManager getAccessManager(); - private class CreateBindingTuple - { - public AMQQueue queue; - public FieldTable arguments; - public Exchange exchange; - public AMQShortString routingKey; + void close() throws Exception; - public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - { - this.exchange = exchange; - this.routingKey = routingKey; - this.queue = queue; - arguments = args; - } - } - } + ManagedObject getManagedObject(); } |