From cd4a28673d27c557bfb756d2a9f2d592ea498efd Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 19 Jun 2013 15:51:30 +0000 Subject: QPID-4937 : [Java Broker] separate virtualhosts into different types git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1494667 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 2 +- .../server/store/berkeleydb/BDBHAVirtualHost.java | 198 ++++++ .../store/berkeleydb/BDBHAVirtualHostFactory.java | 106 +++ .../virtualhost/bdb_ha/addVirtualHost.js | 51 ++ .../java/resources/virtualhost/bdb_ha/add.html | 37 + ...rg.apache.qpid.server.plugin.VirtualHostFactory | 19 + .../store/berkeleydb/BDBHAMessageStoreTest.java | 1 + .../BDBMessageStoreConfigurationTest.java | 15 +- .../store/berkeleydb/HATestClusterCreator.java | 6 +- .../plugin/servlet/rest/HelperServlet.java | 7 +- .../servlet/rest/action/ListBrokerAttribute.java | 52 ++ .../servlet/rest/action/ListMessageStoreTypes.java | 43 -- .../src/main/java/resources/addVirtualHost.html | 13 +- .../java/resources/js/qpid/management/addPort.js | 4 +- .../resources/js/qpid/management/addVirtualHost.js | 88 +-- .../virtualhost/standard/addVirtualHost.js | 79 +++ .../java/resources/virtualhost/standard/add.html | 13 + .../configuration/VirtualHostConfiguration.java | 8 +- .../configuration/startup/BrokerRecoverer.java | 25 +- .../configuration/startup/StoreUpgrader.java | 86 +++ .../server/connection/IConnectionRegistry.java | 2 +- .../qpid/server/exchange/AbstractExchange.java | 4 +- .../server/exchange/DefaultExchangeRegistry.java | 2 +- .../org/apache/qpid/server/exchange/Exchange.java | 12 +- .../server/handler/ExchangeDeclareHandler.java | 4 +- .../qpid/server/handler/QueueDeclareHandler.java | 4 +- .../qpid/server/handler/QueueDeleteHandler.java | 4 +- .../java/org/apache/qpid/server/model/Broker.java | 1 + .../java/org/apache/qpid/server/model/Model.java | 6 +- .../org/apache/qpid/server/model/VirtualHost.java | 6 +- .../qpid/server/model/adapter/AbstractAdapter.java | 31 +- .../qpid/server/model/adapter/BrokerAdapter.java | 5 + .../qpid/server/model/adapter/QueueAdapter.java | 4 +- .../server/model/adapter/VirtualHostAdapter.java | 72 +- .../qpid/server/plugin/VirtualHostFactory.java | 88 +++ .../apache/qpid/server/queue/AMQQueueFactory.java | 4 +- .../server/store/AbstractJDBCMessageStore.java | 2 +- .../server/store/DurableConfigurationStore.java | 6 +- .../org/apache/qpid/server/store/MessageStore.java | 2 +- .../qpid/server/store/MessageStoreCreator.java | 6 + .../apache/qpid/server/store/NullMessageStore.java | 4 +- .../qpid/server/store/derby/DerbyMessageStore.java | 3 +- .../server/transport/ServerSessionDelegate.java | 8 +- .../apache/qpid/server/util/MapValueConverter.java | 36 +- .../server/virtualhost/AbstractVirtualHost.java | 660 ++++++++++++++++++ .../server/virtualhost/StandardVirtualHost.java | 145 ++++ .../virtualhost/StandardVirtualHostFactory.java | 98 +++ .../qpid/server/virtualhost/VirtualHost.java | 2 + .../virtualhost/VirtualHostFactoryRegistry.java | 65 ++ .../qpid/server/virtualhost/VirtualHostImpl.java | 769 --------------------- ...rg.apache.qpid.server.plugin.VirtualHostFactory | 19 + java/broker/src/main/resources/initial-config.json | 5 +- .../startup/VirtualHostRecovererTest.java | 3 + .../apache/qpid/server/model/VirtualHostTest.java | 67 +- .../store/DurableConfigurationStoreTest.java | 72 +- .../store/MessageStoreQuotaEventsTestBase.java | 2 +- .../apache/qpid/server/store/MessageStoreTest.java | 92 +-- .../qpid/server/store/MessageStoreTestCase.java | 2 +- .../apache/qpid/server/util/BrokerTestHelper.java | 18 +- .../qpid/server/virtualhost/MockVirtualHost.java | 8 +- .../virtualhost/StandardVirtualHostTest.java | 371 ++++++++++ .../server/virtualhost/VirtualHostImplTest.java | 380 ---------- .../org/apache/qpid/test/utils/QpidTestCase.java | 14 +- .../java/org/apache/qpid/client/ssl/SSLTest.java | 1 + .../apache/qpid/server/store/SlowMessageStore.java | 2 +- .../java/org/apache/qpid/systest/rest/Asserts.java | 3 +- .../qpid/systest/rest/VirtualHostRestTest.java | 2 + .../qpid/systest/rest/acl/BrokerACLTest.java | 2 + .../test/client/timeouts/SyncWaitDelayTest.java | 5 +- 69 files changed, 2503 insertions(+), 1473 deletions(-) create mode 100644 java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java create mode 100644 java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java create mode 100644 java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js create mode 100644 java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html create mode 100644 java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory create mode 100644 java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java delete mode 100644 java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java create mode 100644 java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js create mode 100644 java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html create mode 100644 java/broker/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java create mode 100644 java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory create mode 100644 java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java delete mode 100644 java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java (limited to 'java') diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 3074daa46e..d036a5d39a 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -75,7 +75,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.util.FileUtils; -public abstract class AbstractBDBMessageStore implements MessageStore +public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java new file mode 100644 index 0000000000..0231573053 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -0,0 +1,198 @@ +package org.apache.qpid.server.store.berkeleydb; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.server.virtualhost.State; +import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public class BDBHAVirtualHost extends AbstractVirtualHost +{ + private BDBHAMessageStore _messageStore; + + private boolean _inVhostInitiatedClose; + + BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) + throws Exception + { + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + + protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + { + _messageStore = new BDBHAMessageStore(); + + final MessageStoreLogSubject storeLogSubject = + new MessageStoreLogSubject(this, _messageStore.getClass().getSimpleName()); + OperationalLoggingListener.listen(_messageStore, storeLogSubject); + + _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); + _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); + _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); + + + + _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); + _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); + + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + + _messageStore.configureConfigStore(getName(), + recoveryHandler, + hostConfig.getStoreConfiguration()); + + _messageStore.configureMessageStore(getName(), + recoveryHandler, + recoveryHandler, + hostConfig.getStoreConfiguration()); + } + + + protected void closeStorage() + { + //Close MessageStore + if (_messageStore != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + _inVhostInitiatedClose = true; + getMessageStore().close(); + } + catch (Exception e) + { + getLogger().error("Failed to close message store", e); + } + finally + { + _inVhostInitiatedClose = false; + } + } + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _messageStore; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + private final class AfterInitialisationListener implements EventListener + { + public void event(Event event) + { + setState(State.PASSIVE); + } + + } + + private final class BeforePassivationListener implements EventListener + { + public void event(Event event) + { + State finalState = State.ERRORED; + + try + { + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare.. + */ + + getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + removeHouseKeepingTasks(); + + getQueueRegistry().stopAllAndUnregisterMBeans(); + getExchangeRegistry().clearAndUnregisterMbeans(); + getDtxRegistry().close(); + + finalState = State.PASSIVE; + } + finally + { + setState(finalState); + reportIfError(getState()); + } + } + + } + + + private final class BeforeActivationListener implements EventListener + { + @Override + public void event(Event event) + { + try + { + initialiseModel(getConfiguration()); + } + catch (Exception e) + { + throw new RuntimeException("Failed to initialise virtual host after state change", e); + } + } + } + + private final class AfterActivationListener implements EventListener + { + @Override + public void event(Event event) + { + attainActivation(); + } + } + + private final class BeforeCloseListener implements EventListener + { + @Override + public void event(Event event) + { + if(!_inVhostInitiatedClose) + { + shutdownHouseKeeping(); + } + + } + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java new file mode 100644 index 0000000000..b01aeafb9a --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -0,0 +1,106 @@ +package org.apache.qpid.server.store.berkeleydb;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public class BDBHAVirtualHostFactory implements VirtualHostFactory +{ + + public static final String TYPE = "BDB_HA"; + + @Override + public String getType() + { + return TYPE; + } + + @Override + public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + return new BDBHAVirtualHost(virtualHostRegistry, + brokerStatisticsGatherer, + parentSecurityManager, + hostConfig); + } + + @Override + public void validateAttributes(Map attributes) + { + validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes); + validateAttribute("haGroupName", String.class, attributes); + validateAttribute("haNodeName", String.class, attributes); + validateAttribute("haNodeAddress", String.class, attributes); + validateAttribute("haHelperAddress", String.class, attributes); + } + + private void validateAttribute(String attrName, Class clazz, Map attributes) + { + Object attr = attributes.get(attrName); + if(!clazz.isInstance(attr)) + { + throw new IllegalArgumentException("Attribute '"+ attrName + +"' is required and must be of type "+clazz.getSimpleName()+"."); + } + } + + @Override + public Map createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter) + { + LinkedHashMap convertedMap = new LinkedHashMap(); + convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); + convertedMap.put("store.highAvailability.groupName", virtualHostAdapter.getAttribute("haGroupName")); + convertedMap.put("store.highAvailability.nodeName", virtualHostAdapter.getAttribute("haNodeName")); + convertedMap.put("store.highAvailability.nodeHostPort", virtualHostAdapter.getAttribute("haNodeAddress")); + convertedMap.put("store.highAvailability.helperHostPort", virtualHostAdapter.getAttribute("haHelperAddress")); + + final Object haDurability = virtualHostAdapter.getAttribute("haDurability"); + if(haDurability !=null) + { + convertedMap.put("store.highAvailability.durability", haDurability); + } + + final Object designatedPrimary = virtualHostAdapter.getAttribute("haDesignatedPrimary"); + if(designatedPrimary!=null) + { + convertedMap.put("store.highAvailability.designatedPrimary", designatedPrimary); + } + + final Object coalescingSync = virtualHostAdapter.getAttribute("haCoalescingSync"); + if(coalescingSync!=null) + { + convertedMap.put("store.highAvailability.coalescingSync", coalescingSync); + } + + // TODO REP_CONFIG values + + return convertedMap; + } +} diff --git a/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js new file mode 100644 index 0000000000..44ad5fa57a --- /dev/null +++ b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array) { + return { + show: function() { + + var node = dom.byId("addVirtualHost.typeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,27) == "formAddVirtualHost.specific") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/bdb_ha/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + }}); + } + }; + }); diff --git a/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html new file mode 100644 index 0000000000..1727264d41 --- /dev/null +++ b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + +
Path to store location*: + +
Node Name*: + +
Replication Group*: + +
Node Address*: + +
Helper Address*: + +
diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory new file mode 100644 index 0000000000..0f8848cb74 --- /dev/null +++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java index 8e32a1d113..047b102817 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -115,6 +115,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase String vhostPrefix = "virtualhosts.virtualhost." + vhostName; _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); + _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator + port); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index 5cc436a22a..6c6145fabb 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -20,15 +20,26 @@ */ package org.apache.qpid.server.store.berkeleydb; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreTest; import org.apache.qpid.server.store.MessageStore; public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest { + + private BDBMessageStore _bdbMessageStore; + @Override - protected MessageStore createStore() throws Exception + protected BDBMessageStore createMessageStore() throws Exception { - return new BDBMessageStore(); + _bdbMessageStore = new BDBMessageStore(); + return _bdbMessageStore; } + // TODO - this only works so long as createConfigStore is called after createMessageStore + @Override + protected DurableConfigurationStore createConfigStore() throws Exception + { + return _bdbMessageStore; + } } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 4c2fa910f5..353c3a0ec5 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -75,6 +75,7 @@ public class HATestClusterCreator private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; + private String _vhostConfigKeyPrefix; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -83,7 +84,8 @@ public class HATestClusterCreator _groupName = "group" + _testcase.getName(); _ipAddressOfBroker = getIpAddressOfBrokerHost(); _numberOfNodes = numberOfNodes; - _vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + "."; + _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store."; _bdbHelperPort = 0; } @@ -350,6 +352,8 @@ public class HATestClusterCreator { final String nodeName = getNodeNameForNodeAt(bdbPort); + + _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE); _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java index a7066c73d8..8692ecc88c 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java @@ -33,8 +33,9 @@ import javax.servlet.http.HttpServletResponse; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAccessControlProviderAttributes; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAuthenticationProviderAttributes; +import org.apache.qpid.server.management.plugin.servlet.rest.action.ListBrokerAttribute; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListGroupProviderAttributes; -import org.apache.qpid.server.management.plugin.servlet.rest.action.ListMessageStoreTypes; +import org.apache.qpid.server.model.Broker; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -52,8 +53,10 @@ public class HelperServlet extends AbstractServlet _actions = new HashMap(); Action listProviderAttributes = new ListAuthenticationProviderAttributes(); _actions.put(listProviderAttributes.getName(), listProviderAttributes); - Action listMessageStoreTypes = new ListMessageStoreTypes(); + Action listMessageStoreTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES, "ListMessageStoreTypes"); _actions.put(listMessageStoreTypes.getName(), listMessageStoreTypes); + Action listVirtualHostTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_TYPES, "ListVirtualHostTypes"); + _actions.put(listVirtualHostTypes.getName(), listVirtualHostTypes); Action groupProviderAttributes = new ListGroupProviderAttributes(); _actions.put(groupProviderAttributes.getName(), groupProviderAttributes); Action aclProviderAttributes = new ListAccessControlProviderAttributes(); diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java new file mode 100644 index 0000000000..dc414e6a64 --- /dev/null +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.servlet.rest.action; + +import java.util.Map; + +import org.apache.qpid.server.management.plugin.servlet.rest.Action; +import org.apache.qpid.server.model.Broker; + +public class ListBrokerAttribute implements Action +{ + + private final String _attributeName; + private final String _name; + + public ListBrokerAttribute(String attributeName, String name) + { + _attributeName = attributeName; + _name = name; + } + + @Override + public String getName() + { + return _name; + } + + @Override + public Object perform(Map request, Broker broker) + { + return broker.getAttribute(_attributeName); + } + +} diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java deleted file mode 100644 index c0a5d78753..0000000000 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.management.plugin.servlet.rest.action; - -import java.util.Map; - -import org.apache.qpid.server.management.plugin.servlet.rest.Action; -import org.apache.qpid.server.model.Broker; - -public class ListMessageStoreTypes implements Action -{ - - @Override - public String getName() - { - return ListMessageStoreTypes.class.getSimpleName(); - } - - @Override - public Object perform(Map request, Broker broker) - { - return broker.getAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES); - } - -} diff --git a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html index 43281f600d..282f4ab8f6 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html +++ b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html @@ -48,17 +48,12 @@
- - - - - - + +
Store Type*:
Path to store location*: - - Type*:
+
+
diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js index 0343d3393a..18abfa443f 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js @@ -60,7 +60,7 @@ define(["dojo/_base/xhr", var node = construct.create("div", null, win.body(), "last"); - var convertToPort = function convertToPort(formValues) + var convertToPort = function convertToPort(formValues) { var newPort = {}; newPort.name = dijit.byId("formAddPort.name").value; @@ -478,4 +478,4 @@ define(["dojo/_base/xhr", }; return addPort; - }); \ No newline at end of file + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js index 9c04c3014f..330c6ed40b 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js @@ -92,7 +92,7 @@ define(["dojo/_base/xhr", }, "addVirtualHost.configPathDiv"); var attributesPane = new dijit.layout.AccordionPane({ - title: "Store Attributes", + title: "Virtual Host Attributes", selected: true }, "addVirtualHost.attributesDiv"); @@ -112,14 +112,14 @@ define(["dojo/_base/xhr", if(theForm.validate()){ var formValues = theForm.getValues(); - if (formValues.configPath == "" && formValues.storeType == "") + if (formValues.configPath == "" && formValues["type"] == "") { - alert("Please specify either configuration or store type for the virtual host"); + alert("Please specify either configuration file or type for the virtual host"); return false; } - if (formValues.configPath != "" && formValues.storeType != "") + if (formValues.configPath != "" && formValues["type"] != "") { - alert("Either configuration or store type with path have to be specified!"); + alert("Either configuration file or type have to be specified!"); return false; } var newVirtualHost = convertToVirtualHost(formValues); @@ -149,63 +149,51 @@ define(["dojo/_base/xhr", }}); } - addVirtualHost.show = function(virtualHostName) { + addVirtualHost.selectVhostType = function(type) { + if(type && String(type).trim() != "") { + require(["qpid/management/virtualhost/"+type.toLowerCase()+"/addVirtualHost"], + function(vhostType) + { + vhostType.show(); + }); + } + } + + addVirtualHost.show = function() { var that = this; + dom.byId("addVirtualHost.typeSpecificDiv").innerHTML = ""; registry.byId("formAddVirtualHost").reset(); dojo.byId("formAddVirtualHost.id").value=""; - if (!that.hasOwnProperty("storeTypeChooser")) + + if (!that.hasOwnProperty("typeChooser")) { xhr.get({ sync: true, - url: "rest/helper?action=ListMessageStoreTypes", + url: "rest/helper?action=ListVirtualHostTypes", handleAs: "json" }).then( function(data) { - var storeTypes = data; - var storeTypesData = []; - for (var i =0 ; i < storeTypes.length; i++) + var vhostTypes = data; + var vhostTypesData = []; + for (var i =0 ; i < vhostTypes.length; i++) { - storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]}; + vhostTypesData[i]= {id: vhostTypes[i], name: vhostTypes[i]}; } - var storeTypesStore = new Memory({ data: storeTypesData }); - var storeTypesDiv = dom.byId("addVirtualHost.selectStoreType"); - var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv); - that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.storeType", - name: "storeType", - store: storeTypesStore, - searchAttr: "name", required: false}, input); + var typesStore = new Memory({ data: vhostTypesData }); + var typesDiv = dom.byId("addVirtualHost.selectType"); + var input = construct.create("input", {id: "addType", required: false}, typesDiv); + that.typeChooser = new FilteringSelect({ id: "addVirtualHost.type", + name: "type", + store: typesStore, + searchAttr: "name", + required: false, + onChange: that.selectVhostType }, input); }); } - if (virtualHostName) - { - xhr.get({ - url: "rest/virtualhost/" + encodeURIComponent(virtualHostName), - handleAs: "json" - }).then( - function(data) { - var host = data[0]; - var nameField = dijit.byId("formAddVirtualHost.name"); - nameField.set("value", host.name); - dojo.byId("formAddVirtualHost.id").value=host.id; - var configPath = host.configPath; - if (configPath) - { - var configPathField = dijit.byId("formAddVirtualHost.configPath"); - configPathField.set("value", host.configPath); - } - else - { - that.storeTypeChooser.set("value", host.storeType.toLowerCase()); - var storePathField = dijit.byId("formAddVirtualHost.storePath"); - storePathField.set("value", host.storePath); - } - registry.byId("addVirtualHost").show(); - }); - } - else - { - registry.byId("addVirtualHost").show(); - } + + + registry.byId("addVirtualHost").show(); + } return addVirtualHost; - }); \ No newline at end of file + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js new file mode 100644 index 0000000000..cd56ca9cba --- /dev/null +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + 'dojo/_base/json', + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.typeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,27) == "formAddVirtualHost.specific") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/standard/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + if (that.hasOwnProperty("storeTypeChooser")) + { + that.storeTypeChooser.destroy(); + } + xhr.get({ + sync: true, + url: "rest/helper?action=ListMessageStoreTypes", + handleAs: "json" + }).then( + function(data) { + var storeTypes = data; + var storeTypesData = []; + for (var i =0 ; i < storeTypes.length; i++) + { + storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]}; + } + var storeTypesStore = new Memory({ data: storeTypesData }); + var storeTypesDiv = dom.byId("addVirtualHost.specific.selectStoreType"); + var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv); + that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.specific.storeType", + name: "storeType", + store: storeTypesStore, + searchAttr: "name", required: false}, input); + }); + + }}); + } + }; + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html new file mode 100644 index 0000000000..9596ef4175 --- /dev/null +++ b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html @@ -0,0 +1,13 @@ + + + + + + + + + +
Store Type*:
Path to store location*: + +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index aff84e5832..041ccf1f50 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -295,4 +295,8 @@ public class VirtualHostConfiguration extends AbstractConfiguration return brokerValue == null? false : brokerValue.booleanValue(); } + public String getType() + { + return getStringValue("type", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore".equals(getMessageStoreClass()) ? "BDB_HA": "STANDARD"); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java index 4b7b9e3254..ae7e699264 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java @@ -85,10 +85,9 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer @Override public Broker create(RecovererProvider recovererProvider, ConfigurationEntry entry, ConfiguredObject... parents) { - Map attributes = entry.getAttributes(); - validateAttributes(attributes); + //Map attributes = entry.getAttributes(); + Map attributesCopy = validateAttributes(entry); - Map attributesCopy = new HashMap(attributes); attributesCopy.put(Broker.MODEL_VERSION, Model.MODEL_VERSION); StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(entry.getStore()); @@ -128,8 +127,10 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer return broker; } - private void validateAttributes(Map attributes) + private Map validateAttributes(ConfigurationEntry entry) { + Map attributes = entry.getAttributes(); + String modelVersion = null; if (attributes.containsKey(Broker.MODEL_VERSION)) { @@ -157,6 +158,22 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer throw new IllegalConfigurationException("The model version '" + modelVersion + "' in configuration is incompatible with the broker model version '" + Model.MODEL_VERSION + "'"); } + + if(!Model.MODEL_VERSION.equals(modelVersion)) + { + String oldVersion; + do + { + oldVersion = modelVersion; + StoreUpgrader.upgrade(entry.getStore()); + entry = entry.getStore().getRootEntry(); + attributes = entry.getAttributes(); + modelVersion = MapValueConverter.getStringAttribute(Broker.MODEL_VERSION, attributes, null); + } + while(!(modelVersion.equals(oldVersion) || modelVersion.equals(Model.MODEL_VERSION))); + } + + return new HashMap(attributes); } private void recoverType(RecovererProvider recovererProvider, diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java new file mode 100644 index 0000000000..0789664dd8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java @@ -0,0 +1,86 @@ +package org.apache.qpid.server.configuration.startup;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.model.Broker; + +public abstract class StoreUpgrader +{ + + private static Map _upgraders = new HashMap(); + + // Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same + // no matter what changes are made to the code in the future + + private final static StoreUpgrader UPGRADE_1_0 = new StoreUpgrader("1.0") + { + @Override + protected void doUpgrade(ConfigurationEntryStore store) + { + ConfigurationEntry root = store.getRootEntry(); + Map> children = root.getChildren(); + Collection vhosts = children.get("VirtualHost"); + Collection changed = new HashSet(); + for(ConfigurationEntry vhost : vhosts) + { + Map attributes = vhost.getAttributes(); + if(attributes.containsKey("storeType")) + { + attributes = new HashMap(attributes); + attributes.put("type", "STANDARD"); + + changed.add(new ConfigurationEntry(vhost.getId(),vhost.getType(),attributes,vhost.getChildrenIds(),store)); + + } + + } + Map attributes = new HashMap(root.getAttributes()); + attributes.put(Broker.MODEL_VERSION, "1.1"); + changed.add(new ConfigurationEntry(root.getId(),root.getType(),attributes,root.getChildrenIds(),store)); + + store.save(changed.toArray(new ConfigurationEntry[changed.size()])); + + } + }; + + private StoreUpgrader(String version) + { + _upgraders.put(version, this); + } + + public static void upgrade(ConfigurationEntryStore store) + { + StoreUpgrader upgrader = _upgraders.get(store.getRootEntry().getAttributes().get(Broker.MODEL_VERSION).toString()); + if(upgrader != null) + { + upgrader.doUpgrade(store); + } + } + + protected abstract void doUpgrade(ConfigurationEntryStore store); + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 82adcf4dde..07d934027e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -35,7 +35,7 @@ public interface IConnectionRegistry public void close() throws AMQException; - public void close(String replyText) throws AMQException; + public void close(String replyText); public List getConnections(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 803aeceab8..6b453cbbda 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -554,7 +554,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(b); + _virtualHost.getDurableConfigurationStore().unbindQueue(b); } b.logDestruction(); } @@ -626,7 +626,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable() && !restore) { - _virtualHost.getMessageStore().bindQueue(b); + _virtualHost.getDurableConfigurationStore().bindQueue(b); } queue.addQueueDeleteTask(b); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 450e74bfec..142da84524 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -74,7 +74,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public DurableConfigurationStore getDurableConfigurationStore() { - return _host.getMessageStore(); + return _host.getDurableConfigurationStore(); } public void registerExchange(Exchange exchange) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index eba63558ca..b632c68ace 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -134,8 +134,6 @@ public interface Exchange extends ExchangeReferrer */ boolean isBound(AMQShortString routingKey); - boolean isBound(AMQQueue queue); - /** * Returns true if this exchange has at least one binding associated with it. * @return @@ -147,15 +145,17 @@ public interface Exchange extends ExchangeReferrer boolean isBound(String bindingKey); - boolean isBound(String bindingKey, AMQQueue queue); + boolean isBound(AMQQueue queue); - boolean isBound(String bindingKey, Map arguments, AMQQueue queue); + boolean isBound(Map arguments); - boolean isBound(Map arguments, AMQQueue queue); + boolean isBound(String bindingKey, AMQQueue queue); boolean isBound(String bindingKey, Map arguments); - boolean isBound(Map arguments); + boolean isBound(Map arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map arguments, AMQQueue queue); void removeReference(ExchangeReferrer exchange); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index eed0cd6020..9c25d00b1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -71,7 +71,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener defaults, Map attributes, TaskExecutor taskExecutor) + { + this(id, defaults, attributes, taskExecutor, true); + } + + protected AbstractAdapter(UUID id, Map defaults, Map attributes, + TaskExecutor taskExecutor, boolean filterAttributes) + { _taskExecutor = taskExecutor; _id = id; if (attributes != null) { Collection names = getAttributeNames(); - for (String name : names) + if(filterAttributes) + { + for (String name : names) + { + if (attributes.containsKey(name)) + { + final Object value = attributes.get(name); + if(value != null) + { + _attributes.put(name, value); + } + } + } + } + else { - if (attributes.containsKey(name)) + for(Map.Entry entry : attributes.entrySet()) { - //TODO: dont put nulls - _attributes.put(name, attributes.get(name)); + if(entry.getValue()!=null) + { + _attributes.put(entry.getKey(),entry.getValue()); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index adc30eb944..678db43d58 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.SimpleAuthenticationProviderAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.access.Operation; @@ -771,6 +772,10 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat { return _supportedVirtualHostStoreTypes; } + else if(SUPPORTED_VIRTUALHOST_TYPES.equals(name)) + { + return VirtualHostFactory.TYPES.get(); + } else if(SUPPORTED_AUTHENTICATION_PROVIDERS.equals(name)) { return _authenticationProviderFactory.getSupportedAuthenticationProviders(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index afcce482b6..0f90df00e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -195,7 +195,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs _queue.delete(); if (_queue.isDurable()) { - _queue.getVirtualHost().getMessageStore().removeQueue(_queue); + _queue.getVirtualHost().getDurableConfigurationStore().removeQueue(_queue); } } } @@ -365,7 +365,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { try { - _queue.getVirtualHost().getMessageStore().updateQueue(_queue); + _queue.getVirtualHost().getDurableConfigurationStore().updateQueue(_queue); } catch (AMQStoreException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index de626a7639..383ff2f3f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -79,7 +79,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, @@ -91,6 +91,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual @SuppressWarnings("serial") public static final Map ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap(){{ put(NAME, String.class); + put(TYPE, String.class); put(STORE_PATH, String.class); put(STORE_TYPE, String.class); put(CONFIG_PATH, String.class); @@ -114,7 +115,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public VirtualHostAdapter(UUID id, Map attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor) { - super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); + super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false); _broker = broker; _brokerStatisticsGatherer = brokerStatisticsGatherer; validateAttributes(); @@ -130,18 +131,23 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } String configurationFile = (String) getAttribute(CONFIG_PATH); - String storeType = (String) getAttribute(STORE_TYPE); + String type = (String) getAttribute(TYPE); + boolean invalidAttributes = false; if (configurationFile == null) { - if (storeType == null) + if (type == null) { invalidAttributes = true; } + else + { + validateAttributes(type); + } } else { - if (storeType != null) + if (type != null) { invalidAttributes = true; } @@ -149,7 +155,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (invalidAttributes) { - throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'storeType' and 'storePath' attributes"); + throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes"); } // pre-load the configuration in order to validate @@ -163,6 +169,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } + private void validateAttributes(String type) + { + final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory == null) + { + throw new IllegalArgumentException("Unknown virtual host type '"+ type +"'. Valid types are: " + VirtualHostFactory.TYPES.get()); + } + factory.validateAttributes(getActualAttributes()); + + } + private void populateExchanges() { Collection actualExchanges = @@ -295,7 +312,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual _virtualHost.getExchangeRegistry().registerExchange(exchange); if(durable) { - _virtualHost.getMessageStore().createExchange(exchange); + _virtualHost.getDurableConfigurationStore().createExchange(exchange); } synchronized (_exchangeAdapters) { @@ -417,7 +434,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(durable) { - _virtualHost.getMessageStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); + _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); } synchronized (_queueAdapters) { @@ -444,6 +461,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new IllegalStateException(); } + + public String getType() + { + return (String)getAttribute(TYPE); + } + + public String setType(final String currentType, final String desiredType) + throws IllegalStateException, AccessControlException + { + throw new IllegalStateException(); + } + + @Override public State getActualState() { @@ -1070,7 +1100,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual try { VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName); - _virtualHost = new VirtualHostImpl(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration); + String type = configuration.getType(); + final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory == null) + { + throw new IllegalArgumentException("Unknown virtual host type: " + type); + } + else + { + _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(), + _brokerStatisticsGatherer, + _broker.getSecurityManager(), + configuration); + } } catch (Exception e) { @@ -1106,8 +1148,16 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { final MyConfiguration basicConfiguration = new MyConfiguration(); PropertiesConfiguration config = new PropertiesConfiguration(); - config.addProperty("store.type", (String)getAttribute(STORE_TYPE)); - config.addProperty("store.environment-path", (String)getAttribute(STORE_PATH)); + final String type = (String) getAttribute(TYPE); + config.addProperty("type", type); + VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory != null) + { + for(Map.Entry entry : factory.createVirtualHostConfiguration(this).entrySet()) + { + config.addProperty(entry.getKey(), entry.getValue()); + } + } basicConfiguration.addConfiguration(config); CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java new file mode 100644 index 0000000000..f952e0410c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public interface VirtualHostFactory +{ + String getType(); + + VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception; + + void validateAttributes(Map attributes); + + Map createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter); + + static final class TYPES + { + private TYPES() + { + } + + public static Collection get() + { + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + Iterable factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + List names = new ArrayList(); + for(VirtualHostFactory factory : factories) + { + names.add(factory.getType()); + } + return Collections.unmodifiableCollection(names); + } + } + + + static final class FACTORIES + { + private FACTORIES() + { + } + + public static VirtualHostFactory get(String type) + { + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + Iterable factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + for(VirtualHostFactory factory : factories) + { + if(factory.getType().equals(type)) + { + return factory; + } + } + return null; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 37f1f8f7a5..cc0e5ebe7a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -315,7 +315,7 @@ public class AMQQueueFactory exchangeRegistry.registerExchange(dlExchange); //enter the dle in the persistent store - virtualHost.getMessageStore().createExchange(dlExchange); + virtualHost.getDurableConfigurationStore().createExchange(dlExchange); } } @@ -335,7 +335,7 @@ public class AMQQueueFactory dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); //enter the dlq in the persistent store - virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 6b4b8d4a3e..745a06c7fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -55,7 +55,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; -abstract public class AbstractJDBCMessageStore implements MessageStore +abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 4e7bbf04a6..27a40963f6 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -33,7 +33,7 @@ public interface DurableConfigurationStore public static interface Source { - DurableConfigurationStore getMessageStore(); + DurableConfigurationStore getDurableConfigurationStore(); } /** @@ -107,11 +107,11 @@ public interface DurableConfigurationStore * Removes the specified queue from the persistent store. * * @param queue The queue to remove. - * + * * @throws AMQStoreException If the operation fails for any reason. */ void removeQueue(AMQQueue queue) throws AMQStoreException; - + /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index cf8444b089..bbdfaf4959 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -26,7 +26,7 @@ import org.apache.commons.configuration.Configuration; * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * */ -public interface MessageStore extends DurableConfigurationStore +public interface MessageStore { /** * Called after instantiation in order to configure the message store. A particular implementation can define diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java index d67ccfd8a4..fe7dd81e0c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java @@ -50,6 +50,12 @@ public class MessageStoreCreator } } + public boolean isValidType(String storeType) + { + return _factories.containsKey(storeType.toLowerCase()); + } + + public MessageStore createMessageStore(String storeType) { MessageStoreFactory factory = _factories.get(storeType.toLowerCase()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index fdb80295cf..f0936a221c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -public abstract class NullMessageStore implements MessageStore +public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @Override public void configureConfigStore(String name, @@ -125,4 +125,4 @@ public abstract class NullMessageStore implements MessageStore public void onDelete() { } -} \ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 57024817f5..5b53f9ee6c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -35,6 +35,7 @@ import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; @@ -46,7 +47,7 @@ import org.apache.qpid.util.FileUtils; * mechanism. * */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 548b75f949..63419fce3f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -762,7 +762,7 @@ public class ServerSessionDelegate extends SessionDelegate { if (exchange.isDurable()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.createExchange(exchange); } exchangeRegistry.registerExchange(exchange); @@ -917,7 +917,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable() && !exchange.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.removeExchange(exchange); } } @@ -1241,7 +1241,7 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); AMQQueue queue; @@ -1468,7 +1468,7 @@ public class ServerSessionDelegate extends SessionDelegate queue.delete(); if (queue.isDurable() && !queue.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.removeQueue(queue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java index 16e717a9c7..37e0177b00 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java @@ -127,7 +127,13 @@ public class MapValueConverter } else if (rawValue instanceof String) { - return (T) Enum.valueOf(enumType, (String) rawValue); + final String stringValue = (String) rawValue; + + return "null".equals(stringValue) ? null : (T) Enum.valueOf(enumType, stringValue); + } + else if(rawValue == null) + { + return null; } else { @@ -280,15 +286,24 @@ public class MapValueConverter } public static Map convert(Map configurationAttributes, Map attributeTypes) + { + return convert(configurationAttributes, attributeTypes, true); + } + + public static Map convert(Map configurationAttributes, + Map attributeTypes, + boolean exclusive) { Map attributes = new HashMap(); - for (Map.Entry attributeEntry : attributeTypes.entrySet()) + for (Map.Entry attribute : configurationAttributes.entrySet()) { - String attributeName = attributeEntry.getKey(); - if (configurationAttributes.containsKey(attributeName)) + String attributeName = attribute.getKey(); + Object rawValue = attribute.getValue(); + + if (attributeTypes.containsKey(attributeName)) { - Type typeObject = attributeEntry.getValue(); - Object rawValue = configurationAttributes.get(attributeName); + Type typeObject = attributeTypes.get(attributeName); + Object value = null; if (typeObject instanceof Class) { @@ -311,16 +326,21 @@ public class MapValueConverter } else { - throw new IllegalArgumentException("Convertion into " + parameterizedType + " is not yet supported"); + throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported"); } } else { - throw new IllegalArgumentException("Convertion into " + typeObject + " is not yet supported"); + throw new IllegalArgumentException("Conversion into " + typeObject + " is not yet supported"); } attributes.put(attributeName, value); } + else if(!exclusive) + { + attributes.put(attributeName, rawValue); + } } + return attributes; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java new file mode 100644 index 0000000000..6116d46e41 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -0,0 +1,660 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.ExchangeConfiguration; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.connection.ConnectionRegistry; +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v1_0.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; +import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.txn.DtxRegistry; + +public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener +{ + private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); + + private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; + + private final String _name; + + private final UUID _id; + + private final long _createTime = System.currentTimeMillis(); + + private final ScheduledThreadPoolExecutor _houseKeepingTasks; + + private final VirtualHostRegistry _virtualHostRegistry; + + private final StatisticsGatherer _brokerStatisticsGatherer; + + private final SecurityManager _securityManager; + + private final VirtualHostConfiguration _vhostConfig; + + private final QueueRegistry _queueRegistry; + + private final ExchangeRegistry _exchangeRegistry; + + private final ExchangeFactory _exchangeFactory; + + private final ConnectionRegistry _connectionRegistry; + + private final DtxRegistry _dtxRegistry; + + private volatile State _state = State.INITIALISING; + + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + private final Map _linkRegistry = new HashMap(); + private boolean _blocked; + + public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + if (hostConfig == null) + { + throw new IllegalArgumentException("HostConfig cannot be null"); + } + + if (hostConfig.getName() == null || hostConfig.getName().length() == 0) + { + throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost."); + } + + _virtualHostRegistry = virtualHostRegistry; + _brokerStatisticsGatherer = brokerStatisticsGatherer; + _vhostConfig = hostConfig; + _name = _vhostConfig.getName(); + _dtxRegistry = new DtxRegistry(); + + _id = UUIDGenerator.generateVhostUUID(_name); + + CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); + + _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name); + + _connectionRegistry = new ConnectionRegistry(); + _connectionRegistry.addRegistryChangeListener(this); + + _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); + + _queueRegistry = new DefaultQueueRegistry(this); + + _exchangeFactory = new DefaultExchangeFactory(this); + + _exchangeRegistry = new DefaultExchangeRegistry(this); + + initialiseStatistics(); + + initialiseStorage(hostConfig); + + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception; + + public IConnectionRegistry getConnectionRegistry() + { + return _connectionRegistry; + } + + public VirtualHostConfiguration getConfiguration() + { + return _vhostConfig; + } + + public UUID getId() + { + return _id; + } + + public boolean isDurable() + { + return false; + } + + /** + * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers + * and checking for idle or open transactions that have exceeded the permitted thresholds. + * + * @param period + */ + private void initialiseHouseKeeping(long period) + { + if (period != 0L) + { + scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); + } + } + + protected void shutdownHouseKeeping() + { + _houseKeepingTasks.shutdown(); + + try + { + if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + { + _houseKeepingTasks.shutdownNow(); + } + } + catch (InterruptedException e) + { + _logger.warn("Interrupted during Housekeeping shutdown:", e); + Thread.currentThread().interrupt(); + } + } + + protected void removeHouseKeepingTasks() + { + BlockingQueue taskQueue = _houseKeepingTasks.getQueue(); + for (final Runnable runnable : taskQueue) + { + _houseKeepingTasks.remove(runnable); + } + } + + /** + * Allow other broker components to register a HouseKeepingTask + * + * @param period How often this task should run, in ms. + * @param task The task to run. + */ + public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) + { + _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period, + TimeUnit.MILLISECONDS); + } + + public ScheduledFuture scheduleTask(long delay, Runnable task) + { + return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); + } + + public long getHouseKeepingTaskCount() + { + return _houseKeepingTasks.getTaskCount(); + } + + public long getHouseKeepingCompletedTaskCount() + { + return _houseKeepingTasks.getCompletedTaskCount(); + } + + public int getHouseKeepingPoolSize() + { + return _houseKeepingTasks.getCorePoolSize(); + } + + public void setHouseKeepingPoolSize(int newSize) + { + _houseKeepingTasks.setCorePoolSize(newSize); + } + + + public int getHouseKeepingActiveCount() + { + return _houseKeepingTasks.getActiveCount(); + } + + + protected void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException + { + _logger.debug("Loading configuration for virtualhost: " + config.getName()); + + _exchangeRegistry.initialise(); + + List exchangeNames = config.getExchanges(); + + for (String exchangeName : exchangeNames) + { + configureExchange(config.getExchangeConfiguration(exchangeName)); + } + + String[] queueNames = config.getQueueNames(); + + for (Object queueNameObj : queueNames) + { + String queueName = String.valueOf(queueNameObj); + configureQueue(config.getQueueConfiguration(queueName)); + } + } + + private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException + { + AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); + + Exchange exchange; + exchange = _exchangeRegistry.getExchange(exchangeName); + if (exchange == null) + { + + AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); + boolean durable = exchangeConfiguration.getDurable(); + boolean autodelete = exchangeConfiguration.getAutoDelete(); + + Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); + _exchangeRegistry.registerExchange(newExchange); + + if (newExchange.isDurable()) + { + getDurableConfigurationStore().createExchange(newExchange); + } + } + } + + private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException + { + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + String queueName = queue.getName(); + + if (queue.isDurable()) + { + getDurableConfigurationStore().createQueue(queue); + } + + //get the exchange name (returns default exchange name if none was specified) + String exchangeName = queueConfiguration.getExchange(); + + Exchange exchange = _exchangeRegistry.getExchange(exchangeName); + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); + } + + Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); + + //get routing keys in configuration (returns empty list if none are defined) + List routingKeys = queueConfiguration.getRoutingKeys(); + + for (Object routingKeyNameObj : routingKeys) + { + String routingKey = String.valueOf(routingKeyNameObj); + + if (exchange.equals(defaultExchange)) + { + if(!queueName.equals(routingKey)) + { + throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + + "' to the default exchange with a key other than the queue name: " + routingKey); + } + } + else + { + configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey)); + } + } + + if (!exchange.equals(defaultExchange) && !routingKeys.contains(queueName)) + { + //bind the queue to the named exchange using its name + configureBinding(queue, exchange, queueName, null); + } + + } + + private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey, Map arguments) throws AMQException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); + } + exchange.addBinding(routingKey, queue, arguments); + } + + public String getName() + { + return _name; + } + + public long getCreateTime() + { + return _createTime; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public SecurityManager getSecurityManager() + { + return _securityManager; + } + + public void close() + { + //Stop Connections + _connectionRegistry.close(); + _queueRegistry.stopAllAndUnregisterMBeans(); + _dtxRegistry.close(); + closeStorage(); + shutdownHouseKeeping(); + + // clear exchange objects + _exchangeRegistry.clearAndUnregisterMbeans(); + + _state = State.STOPPED; + + CurrentActor.get().message(VirtualHostMessages.CLOSED()); + } + + protected void closeStorage() + { + //Close MessageStore + if (getMessageStore() != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + getMessageStore().close(); + } + catch (Exception e) + { + _logger.error("Failed to close message store", e); + } + } + } + + + protected Logger getLogger() + { + return _logger; + } + + + + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + public void registerMessageDelivered(long messageSize) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + _brokerStatisticsGatherer.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + _brokerStatisticsGatherer.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + + for (AMQConnectionModel connection : _connectionRegistry.getConnections()) + { + connection.resetStatistics(); + } + } + + public void initialiseStatistics() + { + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + } + + public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) + { + LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); + if(linkRegistry == null) + { + linkRegistry = new LinkRegistry(); + _linkRegistry.put(remoteContainerId, linkRegistry); + } + return linkRegistry; + } + + public DtxRegistry getDtxRegistry() + { + return _dtxRegistry; + } + + public String toString() + { + return _name; + } + + public State getState() + { + return _state; + } + + public void block() + { + synchronized (_connectionRegistry) + { + if(!_blocked) + { + _blocked = true; + for(AMQConnectionModel conn : _connectionRegistry.getConnections()) + { + conn.block(); + } + } + } + } + + + public void unblock() + { + synchronized (_connectionRegistry) + { + if(_blocked) + { + _blocked = false; + for(AMQConnectionModel conn : _connectionRegistry.getConnections()) + { + conn.unblock(); + } + } + } + } + + public void connectionRegistered(final AMQConnectionModel connection) + { + if(_blocked) + { + connection.block(); + } + } + + public void connectionUnregistered(final AMQConnectionModel connection) + { + } + + public void event(final Event event) + { + switch(event) + { + case PERSISTENT_MESSAGE_SIZE_OVERFULL: + block(); + break; + case PERSISTENT_MESSAGE_SIZE_UNDERFULL: + unblock(); + break; + } + } + + protected void setState(State state) + { + _state = state; + } + + protected void attainActivation() + { + State finalState = State.ERRORED; + + try + { + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + finalState = State.ACTIVE; + } + finally + { + _state = finalState; + reportIfError(_state); + } + } + + protected void reportIfError(State state) + { + if (state == State.ERRORED) + { + CurrentActor.get().message(VirtualHostMessages.ERRORED()); + } + } + + private class VirtualHostHouseKeepingTask extends HouseKeepingTask + { + public VirtualHostHouseKeepingTask() + { + super(AbstractVirtualHost.this); + } + + public void execute() + { + for (AMQQueue q : _queueRegistry.getQueues()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking message status for queue: " + + q.getName()); + } + try + { + q.checkMessageStatus(); + } catch (Exception e) + { + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + } + for (AMQSessionModel session : connection.getSessionModels()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on session " + session); + } + try + { + session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(), + _vhostConfig.getTransactionTimeoutOpenClose(), + _vhostConfig.getTransactionTimeoutIdleWarn(), + _vhostConfig.getTransactionTimeoutIdleClose()); + } catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java new file mode 100644 index 0000000000..05a33e7d99 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -0,0 +1,145 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; +import org.apache.qpid.server.store.OperationalLoggingListener; + +public class StandardVirtualHost extends AbstractVirtualHost +{ + private MessageStore _messageStore; + + private DurableConfigurationStore _durableConfigurationStore; + + StandardVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + + private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + { + String storeType = hostConfig.getConfig().getString("store.type"); + MessageStore messageStore = null; + if (storeType == null) + { + final Class clazz = Class.forName(hostConfig.getMessageStoreClass()); + final Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException(clazz + " does not implement " + MessageStore.class); + } + + messageStore = (MessageStore) o; + } + else + { + messageStore = new MessageStoreCreator().createMessageStore(storeType); + } + + final + MessageStoreLogSubject + storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); + OperationalLoggingListener.listen(messageStore, storeLogSubject); + + return messageStore; + } + + private DurableConfigurationStore initialiseConfigurationStore(VirtualHostConfiguration hostConfig) throws Exception + { + DurableConfigurationStore configurationStore; + if(getMessageStore() instanceof DurableConfigurationStore) + { + configurationStore = (DurableConfigurationStore) getMessageStore(); + } + else + { + throw new ClassCastException(getMessageStore().getClass().getSimpleName() + + " is not an instance of DurableConfigurationStore"); + } + return configurationStore; + } + + + protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + { + _messageStore = initialiseMessageStore(hostConfig); + + _durableConfigurationStore = initialiseConfigurationStore(hostConfig); + + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + + final Configuration storeConfiguration = hostConfig.getStoreConfiguration(); + + _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, storeConfiguration); + + _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, storeConfiguration); + + initialiseModel(hostConfig); + + _messageStore.activate(); + + attainActivation(); + } + + + protected void closeStorage() + { + //Close MessageStore + if (_messageStore != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + getMessageStore().close(); + } + catch (Exception e) + { + getLogger().error("Failed to close message store", e); + } + } + } + + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _durableConfigurationStore; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java new file mode 100644 index 0000000000..b47a5dd149 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -0,0 +1,98 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; + +public class StandardVirtualHostFactory implements VirtualHostFactory +{ + + public static final String TYPE = "STANDARD"; + + @Override + public String getType() + { + return TYPE; + } + + @Override + public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + private static final String STORE_TYPE_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_TYPE; + private static final String STORE_PATH_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_PATH; + + @Override + public void validateAttributes(Map attributes) + { + + // need store type and path + Object storeType = attributes.get(STORE_TYPE_ATTRIBUTE); + if(!(storeType instanceof String)) + { + + throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE + +"' is required and must be of type String."); + } + final MessageStoreCreator storeCreator = new MessageStoreCreator(); + if(!storeCreator.isValidType((String)storeType)) + { + throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE + +"' has value '"+storeType+"' which is not one of the valid values: " + + storeCreator.getStoreTypes() + "."); + + } + + // TODO - each store type should validate its own attributes + if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE)) + { + Object storePath = attributes.get(STORE_PATH_ATTRIBUTE); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE + +"' is required and must be of type String."); + + } + } + + } + + @Override + public Map createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter) + { + Map convertedMap = new LinkedHashMap(); + convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)); + convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); + return convertedMap; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index eb1481b719..8919f4d348 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -49,6 +49,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable ExchangeFactory getExchangeFactory(); + DurableConfigurationStore getDurableConfigurationStore(); + MessageStore getMessageStore(); SecurityManager getSecurityManager(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java new file mode 100644 index 0000000000..626615a59f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java @@ -0,0 +1,65 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.plugin.VirtualHostFactory; + +public class VirtualHostFactoryRegistry +{ + private static Map getFactoryMap() + { + Map virtualHostFactories = new HashMap(); + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + Iterable factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + for (VirtualHostFactory virtualHostFactory : factories) + { + String type = virtualHostFactory.getType(); + VirtualHostFactory factory = virtualHostFactories.put(type, virtualHostFactory); + if (factory != null) + { + throw new IllegalStateException("VirtualHostFactory with type name '" + type + + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '" + + virtualHostFactory.getClass().getName() + "'"); + } + } + return virtualHostFactories; + } + + + public static Collection getFactories() + { + return Collections.unmodifiableCollection(getFactoryMap().values()); + } + + public static Collection getVirtualHostTypes() + { + return Collections.unmodifiableCollection(getFactoryMap().keySet()); + } + + public static VirtualHostFactory getFactory(String type) + { + return getFactoryMap().get(type); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java deleted file mode 100644 index c63c32188d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ /dev/null @@ -1,769 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.ExchangeConfiguration; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.connection.ConnectionRegistry; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v1_0.LinkRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.HAMessageStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.OperationalLoggingListener; -import org.apache.qpid.server.txn.DtxRegistry; - -public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener -{ - private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); - - private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - - private final String _name; - - private final UUID _id; - - private final long _createTime = System.currentTimeMillis(); - - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - - private final VirtualHostRegistry _virtualHostRegistry; - - private final StatisticsGatherer _brokerStatisticsGatherer; - - private final SecurityManager _securityManager; - - private final VirtualHostConfiguration _vhostConfig; - - private final QueueRegistry _queueRegistry; - - private final ExchangeRegistry _exchangeRegistry; - - private final ExchangeFactory _exchangeFactory; - - private final ConnectionRegistry _connectionRegistry; - - private final DtxRegistry _dtxRegistry; - - private final MessageStore _messageStore; - - private volatile State _state = State.INITIALISING; - - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - - private final Map _linkRegistry = new HashMap(); - private boolean _blocked; - - public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception - { - if (hostConfig == null) - { - throw new IllegalArgumentException("HostConfig cannot be null"); - } - - if (hostConfig.getName() == null || hostConfig.getName().length() == 0) - { - throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost."); - } - - _virtualHostRegistry = virtualHostRegistry; - _brokerStatisticsGatherer = brokerStatisticsGatherer; - _vhostConfig = hostConfig; - _name = _vhostConfig.getName(); - _dtxRegistry = new DtxRegistry(); - - _id = UUIDGenerator.generateVhostUUID(_name); - - CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - - _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name); - - _connectionRegistry = new ConnectionRegistry(); - _connectionRegistry.addRegistryChangeListener(this); - - _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); - - _queueRegistry = new DefaultQueueRegistry(this); - - _exchangeFactory = new DefaultExchangeFactory(this); - - _exchangeRegistry = new DefaultExchangeRegistry(this); - - _messageStore = initialiseMessageStore(hostConfig); - - configureMessageStore(hostConfig); - - activateNonHAMessageStore(); - - initialiseStatistics(); - - _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - public IConnectionRegistry getConnectionRegistry() - { - return _connectionRegistry; - } - - public VirtualHostConfiguration getConfiguration() - { - return _vhostConfig; - } - - public UUID getId() - { - return _id; - } - - public boolean isDurable() - { - return false; - } - - /** - * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers - * and checking for idle or open transactions that have exceeded the permitted thresholds. - * - * @param period - */ - private void initialiseHouseKeeping(long period) - { - if (period != 0L) - { - scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); - } - } - - private void shutdownHouseKeeping() - { - _houseKeepingTasks.shutdown(); - - try - { - if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) - { - _houseKeepingTasks.shutdownNow(); - } - } - catch (InterruptedException e) - { - _logger.warn("Interrupted during Housekeeping shutdown:", e); - Thread.currentThread().interrupt(); - } - } - - private void removeHouseKeepingTasks() - { - BlockingQueue taskQueue = _houseKeepingTasks.getQueue(); - for (final Runnable runnable : taskQueue) - { - _houseKeepingTasks.remove(runnable); - } - } - - /** - * Allow other broker components to register a HouseKeepingTask - * - * @param period How often this task should run, in ms. - * @param task The task to run. - */ - public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) - { - _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period, - TimeUnit.MILLISECONDS); - } - - public ScheduledFuture scheduleTask(long delay, Runnable task) - { - return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); - } - - public long getHouseKeepingTaskCount() - { - return _houseKeepingTasks.getTaskCount(); - } - - public long getHouseKeepingCompletedTaskCount() - { - return _houseKeepingTasks.getCompletedTaskCount(); - } - - public int getHouseKeepingPoolSize() - { - return _houseKeepingTasks.getCorePoolSize(); - } - - public void setHouseKeepingPoolSize(int newSize) - { - _houseKeepingTasks.setCorePoolSize(newSize); - } - - - public int getHouseKeepingActiveCount() - { - return _houseKeepingTasks.getActiveCount(); - } - - private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - String storeType = hostConfig.getConfig().getString("store.type"); - MessageStore messageStore = null; - if (storeType == null) - { - final Class clazz = Class.forName(hostConfig.getMessageStoreClass()); - final Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) - { - throw new ClassCastException(clazz + " does not implement " + MessageStore.class); - } - - messageStore = (MessageStore) o; - } - else - { - messageStore = new MessageStoreCreator().createMessageStore(storeType); - } - - final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); - OperationalLoggingListener.listen(messageStore, storeLogSubject); - - messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); - messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); - messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); - messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - if (messageStore instanceof HAMessageStore) - { - messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); - } - - return messageStore; - } - - private void activateNonHAMessageStore() throws Exception - { - if (!(_messageStore instanceof HAMessageStore)) - { - _messageStore.activate(); - } - } - - private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - - _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration()); - _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration()); - } - - private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException - { - _logger.debug("Loading configuration for virtualhost: " + config.getName()); - - List exchangeNames = config.getExchanges(); - - for (String exchangeName : exchangeNames) - { - configureExchange(config.getExchangeConfiguration(exchangeName)); - } - - String[] queueNames = config.getQueueNames(); - - for (Object queueNameObj : queueNames) - { - String queueName = String.valueOf(queueNameObj); - configureQueue(config.getQueueConfiguration(queueName)); - } - } - - private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException - { - AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); - - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); - boolean durable = exchangeConfiguration.getDurable(); - boolean autodelete = exchangeConfiguration.getAutoDelete(); - - Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); - _exchangeRegistry.registerExchange(newExchange); - - if (newExchange.isDurable()) - { - _messageStore.createExchange(newExchange); - } - } - } - - private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException - { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); - String queueName = queue.getName(); - - if (queue.isDurable()) - { - getMessageStore().createQueue(queue); - } - - //get the exchange name (returns default exchange name if none was specified) - String exchangeName = queueConfiguration.getExchange(); - - Exchange exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); - } - - Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); - - //get routing keys in configuration (returns empty list if none are defined) - List routingKeys = queueConfiguration.getRoutingKeys(); - - for (Object routingKeyNameObj : routingKeys) - { - String routingKey = String.valueOf(routingKeyNameObj); - - if (exchange.equals(defaultExchange)) - { - if(!queueName.equals(routingKey)) - { - throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + - "' to the default exchange with a key other than the queue name: " + routingKey); - } - } - else - { - - configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey)); - } - } - - if (!exchange.equals(defaultExchange) && !routingKeys.contains(queueName)) - { - //bind the queue to the named exchange using its name - configureBinding(queue, exchange, queueName, null); - } - - } - - private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey, Map arguments) throws AMQException - { - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); - } - exchange.addBinding(routingKey, queue, arguments); - } - - public String getName() - { - return _name; - } - - public long getCreateTime() - { - return _createTime; - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - public SecurityManager getSecurityManager() - { - return _securityManager; - } - - public void close() - { - //Stop Connections - _connectionRegistry.close(); - _queueRegistry.stopAllAndUnregisterMBeans(); - _dtxRegistry.close(); - - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - _messageStore.close(); - } - catch (Exception e) - { - _logger.error("Failed to close message store", e); - } - } - - // clear exchange objects - _exchangeRegistry.clearAndUnregisterMbeans(); - - _state = State.STOPPED; - - CurrentActor.get().message(VirtualHostMessages.CLOSED()); - } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public void registerMessageDelivered(long messageSize) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - _brokerStatisticsGatherer.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - _brokerStatisticsGatherer.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (AMQConnectionModel connection : _connectionRegistry.getConnections()) - { - connection.resetStatistics(); - } - } - - public void initialiseStatistics() - { - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); - _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); - _messagesReceived = new StatisticsCounter("messages-received-" + getName()); - _dataReceived = new StatisticsCounter("bytes-received-" + getName()); - } - - public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) - { - LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); - if(linkRegistry == null) - { - linkRegistry = new LinkRegistry(); - _linkRegistry.put(remoteContainerId, linkRegistry); - } - return linkRegistry; - } - - public DtxRegistry getDtxRegistry() - { - return _dtxRegistry; - } - - public String toString() - { - return _name; - } - - public State getState() - { - return _state; - } - - public void block() - { - synchronized (_connectionRegistry) - { - if(!_blocked) - { - _blocked = true; - for(AMQConnectionModel conn : _connectionRegistry.getConnections()) - { - conn.block(); - } - } - } - } - - - public void unblock() - { - synchronized (_connectionRegistry) - { - if(_blocked) - { - _blocked = false; - for(AMQConnectionModel conn : _connectionRegistry.getConnections()) - { - conn.unblock(); - } - } - } - } - - public void connectionRegistered(final AMQConnectionModel connection) - { - if(_blocked) - { - connection.block(); - } - } - - public void connectionUnregistered(final AMQConnectionModel connection) - { - } - - public void event(final Event event) - { - switch(event) - { - case PERSISTENT_MESSAGE_SIZE_OVERFULL: - block(); - break; - case PERSISTENT_MESSAGE_SIZE_UNDERFULL: - unblock(); - break; - } - } - - private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) - { - try - { - _exchangeRegistry.initialise(); - initialiseModel(_vhostConfig); - } - catch (Exception e) - { - throw new RuntimeException("Failed to initialise virtual host after state change", e); - } - } - } - - private final class AfterActivationListener implements EventListener - { - @Override - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); - finalState = State.ACTIVE; - } - finally - { - _state = finalState; - reportIfError(_state); - } - } - } - - private final class BeforePassivationListener implements EventListener - { - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - /* the approach here is not ideal as there is a race condition where a - * queue etc could be created while the virtual host is on the way to - * the passivated state. However the store state change from MASTER to UNKNOWN - * is documented as exceptionally rare.. - */ - - _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); - - _queueRegistry.stopAllAndUnregisterMBeans(); - _exchangeRegistry.clearAndUnregisterMbeans(); - _dtxRegistry.close(); - - finalState = State.PASSIVE; - } - finally - { - _state = finalState; - reportIfError(_state); - } - } - - } - - private final class AfterInitialisationListener implements EventListener - { - public void event(Event event) - { - _state = State.PASSIVE; - } - - } - - private final class BeforeCloseListener implements EventListener - { - @Override - public void event(Event event) - { - shutdownHouseKeeping(); - } - } - - private void reportIfError(State state) - { - if (state == State.ERRORED) - { - CurrentActor.get().message(VirtualHostMessages.ERRORED()); - } - } - - private class VirtualHostHouseKeepingTask extends HouseKeepingTask - { - public VirtualHostHouseKeepingTask() - { - super(VirtualHostImpl.this); - } - - public void execute() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - } - try - { - q.checkMessageStatus(); - } catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - } - for (AMQSessionModel session : connection.getSessionModels()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking for long running open transactions on session " + session); - } - try - { - session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(), - _vhostConfig.getTransactionTimeoutOpenClose(), - _vhostConfig.getTransactionTimeoutIdleWarn(), - _vhostConfig.getTransactionTimeoutIdleClose()); - } catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } - } - } -} diff --git a/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory new file mode 100644 index 0000000000..81217884e4 --- /dev/null +++ b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.virtualhost.StandardVirtualHostFactory diff --git a/java/broker/src/main/resources/initial-config.json b/java/broker/src/main/resources/initial-config.json index f01ffca140..9bf7d71e8a 100644 --- a/java/broker/src/main/resources/initial-config.json +++ b/java/broker/src/main/resources/initial-config.json @@ -21,7 +21,7 @@ { "name": "Broker", "storeVersion": 1, - "modelVersion": "1.0", + "modelVersion": "1.1", "defaultVirtualHost" : "default", "authenticationproviders" : [ { "name" : "passwordFile", @@ -49,6 +49,7 @@ }], "virtualhosts" : [ { "name" : "default", + "type" : "STANDARD", "storeType" : "DERBY", "storePath" : "${qpid.work_dir}/derbystore/default" } ], @@ -59,4 +60,4 @@ "pluginType" : "MANAGEMENT-JMX", "name" : "jmxManagement" } ] -} \ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index 713cd25adb..c6473d9520 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; public class VirtualHostRecovererTest extends TestCase @@ -75,6 +76,8 @@ public class VirtualHostRecovererTest extends TestCase VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer); Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); + attributes.put(VirtualHost.STORE_PATH, "/path/to/virtualhost/store"); attributes.put(VirtualHost.STORE_TYPE, "DERBY"); when(entry.getAttributes()).thenReturn(attributes); diff --git a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 478013f61f..05d5d75864 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.model; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -39,14 +38,10 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.NullMessageStore; -import org.apache.qpid.server.store.StateManager; import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.server.virtualhost.StandardVirtualHost; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; public class VirtualHostTest extends TestCase { @@ -96,6 +91,7 @@ public class VirtualHostTest extends TestCase { Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); attributes.put(VirtualHost.STATE, State.QUIESCED); @@ -130,36 +126,11 @@ public class VirtualHostTest extends TestCase assertEquals("Unexpected state", State.DELETED, host.getAttribute(VirtualHost.STATE)); } - public void testReplicaState() - { - String hostName = getName(); - File configPath = TestFileUtils.createTempFile(this, ".xml", "<" + hostName - + ">" + ReplicaMessageStore.class.getName() + ""); - try - { - Map attributes = new HashMap(); - attributes.put(VirtualHost.NAME, hostName); - attributes.put(VirtualHost.CONFIG_PATH, configPath.getAbsolutePath()); - - VirtualHost host = createHost(attributes); - - assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE)); - - host.setDesiredState(State.INITIALISING, State.ACTIVE); - - assertEquals("Unexpected state", State.REPLICA, host.getAttribute(VirtualHost.STATE)); - } - finally - { - configPath.delete(); - } - } - private VirtualHost createHost() { Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); VirtualHost host = createHost(attributes); @@ -174,34 +145,4 @@ public class VirtualHostTest extends TestCase return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); } - public static final class ReplicaMessageStore extends NullMessageStore - { - private final EventManager _eventManager = new EventManager(); - private final StateManager _stateManager = new StateManager(_eventManager); - - @Override - public void activate() throws Exception - { - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISING); - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVATING); - _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVE); - - // this should change the virtual host state to PASSIVE - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - @Override - public String getStoreType() - { - return ReplicaMessageStore.class.getSimpleName(); - } - } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index f0ecfb6407..8a7d5d85fc 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -65,7 +65,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private static final String EXCHANGE_NAME = "exchangeName"; private String _storePath; private String _storeName; - private MessageStore _store; + private MessageStore _messageStore; private Configuration _configuration; private ConfigurationRecoveryHandler _recoveryHandler; @@ -84,6 +84,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private FieldTable _bindingArgs; private UUID _queueId; private UUID _exchangeId; + private DurableConfigurationStore _configStore; public void setUp() throws Exception { @@ -135,7 +136,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateExchange() throws Exception { Exchange exchange = createTestExchange(); - _store.createExchange(exchange); + _configStore.createExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); @@ -144,9 +145,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testRemoveExchange() throws Exception { Exchange exchange = createTestExchange(); - _store.createExchange(exchange); + _configStore.createExchange(exchange); - _store.removeExchange(exchange); + _configStore.removeExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); @@ -157,7 +158,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _store.bindQueue(binding); + _configStore.bindQueue(binding); reopenStore(); @@ -171,9 +172,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _store.bindQueue(binding); + _configStore.bindQueue(binding); - _store.unbindQueue(binding); + _configStore.unbindQueue(binding); reopenStore(); verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), @@ -183,7 +184,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateQueueAMQQueue() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - _store.createQueue(queue); + _configStore.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null); @@ -197,7 +198,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null); @@ -208,7 +209,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase Exchange alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - _store.createQueue(queue); + _configStore.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId()); @@ -230,11 +231,11 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // update the queue to have exclusive=false queue = createTestQueue(getName(), getName() + "Owner", false); - _store.updateQueue(queue); + _configStore.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null); @@ -248,12 +249,12 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - _store.updateQueue(queue); + _configStore.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId()); @@ -267,10 +268,10 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // remove queue - _store.removeQueue(queue); + _configStore.removeQueue(queue); reopenStore(); verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), any(FieldTable.class), any(UUID.class)); @@ -306,18 +307,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase private void reopenStore() throws Exception { - if (_store != null) + if (_messageStore != null) { - _store.close(); + _messageStore.close(); } - _store = createStore(); + _messageStore = createMessageStore(); + _configStore = createConfigStore(); - _store.configureConfigStore(_storeName, _recoveryHandler, _configuration); - _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); - _store.activate(); + _configStore.configureConfigStore(_storeName, _recoveryHandler, _configuration); + _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); + _messageStore.activate(); } - protected MessageStore createStore() throws Exception + protected MessageStore createMessageStore() throws Exception { String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); if (storeClass == null) @@ -329,6 +331,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase return messageStore; } + protected DurableConfigurationStore createConfigStore() throws Exception + { + String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY); + if (storeClass == null) + { + storeClass = DerbyMessageStore.class.getName(); + } + Class clazz = (Class) Class.forName(storeClass); + DurableConfigurationStore configurationStore ; + if(clazz.isInstance(_messageStore)) + { + configurationStore = (DurableConfigurationStore) _messageStore; + } + else + { + configurationStore = (DurableConfigurationStore) Class.forName(storeClass).newInstance(); + } + return configurationStore; + } + public void testRecordXid() throws Exception { Record enqueueRecord = getTestRecord(1); @@ -338,13 +360,13 @@ public class DurableConfigurationStoreTest extends QpidTestCase byte[] globalId = new byte[] { 1 }; byte[] branchId = new byte[] { 2 }; - Transaction transaction = _store.newTransaction(); + Transaction transaction = _messageStore.newTransaction(); transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); transaction.commitTran(); reopenStore(); verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - transaction = _store.newTransaction(); + transaction = _messageStore.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index f1976ecee3..8743c4111b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -71,7 +71,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple applyStoreSpecificConfiguration(config); _store = createStore(); - _store.configureConfigStore("test", null, config); + ((DurableConfigurationStore)_store).configureConfigStore("test", null, config); _transactionResource = UUID.randomUUID(); _events = new ArrayList(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index fbf1828e77..fb255e89f9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -65,7 +65,7 @@ import java.util.Map; /** * This tests the MessageStores by using the available interfaces. * - * For persistent stores, it validates that Exchanges, Queues, Bindings and + * For persistent stores, it validates that Exchanges, Queues, Bindings and * Messages are persisted and recovered correctly. */ public class MessageStoreTest extends QpidTestCase @@ -106,7 +106,7 @@ public class MessageStoreTest extends QpidTestCase BrokerTestHelper.setUp(); String storePath = System.getProperty("QPID_WORK") + File.separator + getName(); - + _config = new PropertiesConfiguration(); _config.addProperty("store.class", getTestProfileMessageStoreClassName()); _config.addProperty("store.environment-path", storePath); @@ -224,8 +224,8 @@ public class MessageStoreTest extends QpidTestCase /** * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above - * before reloading the virtual host and ensuring that the persistent messages were restored. - * + * before reloading the virtual host and ensuring that the persistent messages were restored. + * * More specific testing of message persistence is left to store-specific unit testing. */ public void testMessagePersistence() throws Exception @@ -238,7 +238,7 @@ public class MessageStoreTest extends QpidTestCase validateMessageOnQueues(2, false); validateMessageOnTopics(1, false); } - + /** * Tests message removal by running the testMessagePersistence() method above before * clearing the queues, reloading the virtual host, and ensuring that the persistent @@ -250,15 +250,15 @@ public class MessageStoreTest extends QpidTestCase QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", + assertEquals("Incorrect number of queues registered after recovery", 6, queueRegistry.getQueues().size()); //clear the queue queueRegistry.getQueue(durableQueueName).clearQueue(); - + //check the messages are gone validateMessageOnQueue(durableQueueName, 0); - + //reload and verify messages arent restored reloadVirtualHost(); @@ -284,17 +284,17 @@ public class MessageStoreTest extends QpidTestCase QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", + assertEquals("Incorrect number of queues registered after recovery", 6, queueRegistry.getQueues().size()); //Validate the non-Durable Queues were not recovered. - assertNull("Non-Durable queue still registered:" + priorityQueueName, + assertNull("Non-Durable queue still registered:" + priorityQueueName, queueRegistry.getQueue(priorityQueueName)); - assertNull("Non-Durable queue still registered:" + queueName, + assertNull("Non-Durable queue still registered:" + queueName, queueRegistry.getQueue(queueName)); - assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, queueRegistry.getQueue(priorityTopicQueueName)); - assertNull("Non-Durable queue still registered:" + topicQueueName, + assertNull("Non-Durable queue still registered:" + topicQueueName, queueRegistry.getQueue(topicQueueName)); //Validate normally expected properties of Queues/Topics @@ -320,26 +320,26 @@ public class MessageStoreTest extends QpidTestCase 1, queueRegistry.getQueues().size()); reloadVirtualHost(); - + queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after first recovery", 1, queueRegistry.getQueues().size()); - + //test that removing the queue means it is not recovered next time - getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); reloadVirtualHost(); - + queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after second recovery", 0, queueRegistry.getQueues().size()); - assertNull("Durable queue was not removed:" + durableQueueName, + assertNull("Durable queue was not removed:" + durableQueueName, queueRegistry.getQueue(durableQueueName)); } /** * Tests exchange persistence by creating a selection of exchanges, both durable - * and non durable, and ensuring that following the recovery process the correct + * and non durable, and ensuring that following the recovery process the correct * durable exchanges are still present. */ public void testExchangePersistence() throws Exception @@ -348,7 +348,7 @@ public class MessageStoreTest extends QpidTestCase Map oldExchanges = createExchanges(); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); reloadVirtualHost(); @@ -367,33 +367,33 @@ public class MessageStoreTest extends QpidTestCase int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size(); createExchange(DirectExchange.TYPE, directExchangeName, true); - + ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); reloadVirtualHost(); - + exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered after first recovery", + assertEquals("Incorrect number of exchanges registered after first recovery", origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); - + //test that removing the exchange means it is not recovered next time - getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); reloadVirtualHost(); - + exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered after second recovery", + assertEquals("Incorrect number of exchanges registered after second recovery", origExchangeCount, exchangeRegistry.getExchangeNames().size()); - assertNull("Durable exchange was not removed:" + directExchangeName, + assertNull("Durable exchange was not removed:" + directExchangeName, exchangeRegistry.getExchange(directExchangeName)); } - + /** * Tests binding persistence by creating a selection of queues and exchanges, both durable * and non durable, then adding bindings with and without selectors before reloading the - * virtual host and verifying that following the recovery process the correct durable + * virtual host and verifying that following the recovery process the correct durable * bindings (those for durable queues to durable exchanges) are still present. */ public void testBindingPersistence() throws Exception @@ -413,7 +413,7 @@ public class MessageStoreTest extends QpidTestCase bindAllQueuesToExchange(directExchange, directRouting); bindAllTopicQueuesToExchange(topicExchange, topicRouting); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); reloadVirtualHost(); @@ -422,10 +422,10 @@ public class MessageStoreTest extends QpidTestCase validateBindingProperties(); } - + /** * Tests binding removal by creating a durable exchange, and queue, binding them together, - * recovering to verify the persistence, then removing it from the store, and ensuring + * recovering to verify the persistence, then removing it from the store, and ensuring * that following the second reload process it is not recovered. */ public void testDurableBindingRemoval() throws Exception @@ -437,14 +437,14 @@ public class MessageStoreTest extends QpidTestCase createQueue(durableQueueName, false, true, false, false); bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); - assertEquals("Incorrect number of bindings registered before recovery", + assertEquals("Incorrect number of bindings registered before recovery", 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); //verify binding is actually normally recovered reloadVirtualHost(); queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after first recovery", + assertEquals("Incorrect number of bindings registered after first recovery", 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); @@ -457,13 +457,13 @@ public class MessageStoreTest extends QpidTestCase reloadVirtualHost(); queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after second recovery", + assertEquals("Incorrect number of bindings registered after second recovery", 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); } /** * Validates that the durable exchanges are still present, the non durable exchange is not, - * and that the new exchanges are not the same objects as the provided list (i.e. that the + * and that the new exchanges are not the same objects as the provided list (i.e. that the * reload actually generated new exchange objects) */ private void validateExchanges(int originalNumExchanges, Map oldExchanges) @@ -484,7 +484,7 @@ public class MessageStoreTest extends QpidTestCase registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); // There should only be the original exchanges + our 2 recovered durable exchanges - assertEquals("Incorrect number of exchanges available", + assertEquals("Incorrect number of exchanges available", originalNumExchanges + 2, registry.getExchangeNames().size()); } @@ -562,7 +562,7 @@ public class MessageStoreTest extends QpidTestCase { assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); } - + if (usePriority) { assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); @@ -705,7 +705,7 @@ public class MessageStoreTest extends QpidTestCase { FieldTable queueArguments = null; - + if(usePriority || lastValueQueue) { assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); @@ -716,7 +716,7 @@ public class MessageStoreTest extends QpidTestCase queueArguments = new FieldTable(); queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); } - + if (lastValueQueue) { queueArguments = new FieldTable(); @@ -735,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getMessageStore().createQueue(queue, queueArguments); + getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -779,7 +779,7 @@ public class MessageStoreTest extends QpidTestCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getMessageStore().createExchange(exchange); + getVirtualHost().getDurableConfigurationStore().createExchange(exchange); } } catch (AMQException e) @@ -836,7 +836,7 @@ public class MessageStoreTest extends QpidTestCase fail(e.getMessage()); } } - + protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) { FieldTable bindArguments = null; @@ -931,4 +931,4 @@ public class MessageStoreTest extends QpidTestCase return _routingKey; } } -} \ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 065d6408de..88d5852a17 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -73,7 +73,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase setUpStoreConfiguration(_storeConfiguration); _store = createMessageStore(); - _store.configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); + ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 4d10058d17..8de19d9cff 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -55,8 +55,10 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.virtualhost.VirtualHostFactoryRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class BrokerTestHelper @@ -96,14 +98,24 @@ public class BrokerTestHelper throws Exception { StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class); - VirtualHost host = new VirtualHostImpl(virtualHostRegistry, statisticsGatherer, new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); + final VirtualHostFactory factory = + virtualHostConfiguration == null ? new StandardVirtualHostFactory() + : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType()); + VirtualHost host = factory.createVirtualHost(virtualHostRegistry, + statisticsGatherer, + new SecurityManager(mock(Broker.class), false), + virtualHostConfiguration); virtualHostRegistry.registerVirtualHost(host); return host; } public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration) throws Exception { - return new VirtualHostImpl(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); + final VirtualHostFactory factory = + virtualHostConfiguration == null ? new StandardVirtualHostFactory() + : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType()); + + return factory.createVirtualHost(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); } public static VirtualHost createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 324e36e132..7552a653fe 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; @@ -111,6 +112,11 @@ public class MockVirtualHost implements VirtualHost return null; } + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + public String getName() { return _name; @@ -214,4 +220,4 @@ public class MockVirtualHost implements VirtualHost public void unblock() { } -} \ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java new file mode 100644 index 0000000000..1243d9f7dd --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -0,0 +1,371 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import static org.mockito.Mockito.mock; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class StandardVirtualHostTest extends QpidTestCase +{ + private VirtualHostRegistry _virtualHostRegistry; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_virtualHostRegistry != null) + { + _virtualHostRegistry.close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + + } + + /** + * Tests that custom routing keys for the queue specified in the configuration + * file are correctly bound to the exchange (in addition to the queue name) + */ + public void testSpecifyingCustomBindings() throws Exception + { + customBindingTestImpl(new String[]{"custom1","custom2"}); + } + + /** + * Tests that a queue specified in the configuration file to be bound to a + * specified(non-default) direct exchange is a correctly bound to the exchange + * and the default exchange using the queue name. + */ + public void testQueueSpecifiedInConfigurationIsBoundToDefaultExchange() throws Exception + { + customBindingTestImpl(new String[0]); + } + + /** + * Tests that specifying custom routing keys for a queue in the configuration file results in failure + * to create the vhost (since this is illegal, only queue names are used with the default exchange) + */ + public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception + { + final String queueName = getName(); + final String customBinding = "custom-binding"; + File config = writeConfigFile(queueName, queueName, null, false, new String[]{customBinding}); + + try + { + createVirtualHost(queueName, config); + fail("virtualhost creation should have failed due to illegal configuration"); + } + catch (ConfigurationException e) + { + assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, e.getMessage()); + } + } + + public void testVirtualHostBecomesActive() throws Exception + { + File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); + VirtualHost vhost = createVirtualHost(getName(), config); + assertNotNull(vhost); + assertEquals(State.ACTIVE, vhost.getState()); + } + + public void testVirtualHostHavingStoreSetAsTypeBecomesActive() throws Exception + { + String virtualHostName = getName(); + VirtualHost host = createVirtualHostUsingStoreType(virtualHostName); + assertNotNull(host); + assertEquals(State.ACTIVE, host.getState()); + } + + public void testVirtualHostBecomesStoppedOnClose() throws Exception + { + File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); + VirtualHost vhost = createVirtualHost(getName(), config); + assertNotNull(vhost); + assertEquals(State.ACTIVE, vhost.getState()); + vhost.close(); + assertEquals(State.STOPPED, vhost.getState()); + assertEquals(0, vhost.getHouseKeepingActiveCount()); + } + + public void testVirtualHostHavingStoreSetAsTypeBecomesStoppedOnClose() throws Exception + { + String virtualHostName = getName(); + VirtualHost host = createVirtualHostUsingStoreType(virtualHostName); + assertNotNull(host); + assertEquals(State.ACTIVE, host.getState()); + host.close(); + assertEquals(State.STOPPED, host.getState()); + assertEquals(0, host.getHouseKeepingActiveCount()); + } + + /** + * Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost + */ + public void testSpecifyingUnknownExchangeThrowsException() throws Exception + { + final String queueName = getName(); + final String exchangeName = "made-up-exchange"; + File config = writeConfigFile(queueName, queueName, exchangeName, true, new String[0]); + + try + { + createVirtualHost(queueName, config); + fail("virtualhost creation should have failed due to illegal configuration"); + } + catch (ConfigurationException e) + { + assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, e.getMessage()); + } + } + + public void testCreateVirtualHostWithoutConfigurationInConfigFile() throws Exception + { + File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); + String hostName = getName() + "-not-existing"; + try + { + createVirtualHost(hostName, config); + fail("virtualhost creation should have failed due to illegal configuration"); + } + catch (RuntimeException e) + { + assertEquals("No configuration found for virtual host '" + hostName + "' in " + config.getAbsolutePath(), e.getMessage()); + } + } + + public void testBindingArguments() throws Exception + { + String exchangeName = getName() +".direct"; + String vhostName = getName(); + String queueName = getName(); + + Map bindingArguments = new HashMap(); + bindingArguments.put("ping", new String[]{"x-filter-jms-selector=select=1", "x-qpid-no-local"}); + bindingArguments.put("pong", new String[]{"x-filter-jms-selector=select='pong'"}); + File config = writeConfigFile(vhostName, queueName, exchangeName, false, new String[]{"ping","pong"}, bindingArguments); + VirtualHost vhost = createVirtualHost(vhostName, config); + + Exchange exch = vhost.getExchangeRegistry().getExchange(getName() +".direct"); + Collection bindings = exch.getBindings(); + assertNotNull("Bindings cannot be null", bindings); + assertEquals("Unexpected number of bindings", 3, bindings.size()); + + boolean foundPong = false; + boolean foundPing = false; + for (Binding binding : bindings) + { + String qn = binding.getQueue().getName(); + assertEquals("Unexpected queue name", getName(), qn); + Map arguments = binding.getArguments(); + + if ("ping".equals(binding.getBindingKey())) + { + foundPing = true; + assertEquals("Unexpected number of binding arguments for ping", 2, arguments.size()); + assertEquals("Unexpected x-filter-jms-selector for ping", "select=1", arguments.get("x-filter-jms-selector")); + assertTrue("Unexpected x-qpid-no-local for ping", arguments.containsKey("x-qpid-no-local")); + } + else if ("pong".equals(binding.getBindingKey())) + { + foundPong = true; + assertEquals("Unexpected number of binding arguments for pong", 1, arguments.size()); + assertEquals("Unexpected x-filter-jms-selector for pong", "select='pong'", arguments.get("x-filter-jms-selector")); + } + } + + assertTrue("Pong binding is not found", foundPong); + assertTrue("Ping binding is not found", foundPing); + } + + private void customBindingTestImpl(final String[] routingKeys) throws Exception + { + String exchangeName = getName() +".direct"; + String vhostName = getName(); + String queueName = getName(); + + File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys); + VirtualHost vhost = createVirtualHost(vhostName, config); + assertNotNull("virtualhost should exist", vhost); + + AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + assertNotNull("queue should exist", queue); + + Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); + assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); + + Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); + assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); + + for(String key: routingKeys) + { + assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue)); + } + } + + + private VirtualHost createVirtualHost(String vhostName, File config) throws Exception + { + Broker broker = BrokerTestHelper.createBrokerMock(); + _virtualHostRegistry = broker.getVirtualHostRegistry(); + + VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + _virtualHostRegistry.registerVirtualHost(host); + + return host; + } + + /** + * Create a configuration file for testing virtualhost creation + * + * @param vhostName name of the virtualhost + * @param queueName name of the queue + * @param exchangeName name of a direct exchange to declare (unless dontDeclare = true) and bind the queue to (null = none) + * @param dontDeclare if true then dont declare the exchange, even if its name is non-null + * @param routingKeys routingKeys to bind the queue with (empty array = none) + * @return + */ + private File writeConfigFile(String vhostName, String queueName, String exchangeName, boolean dontDeclare, String[] routingKeys) + { + return writeConfigFile(vhostName, queueName, exchangeName, dontDeclare, routingKeys, null); + } + + private File writeConfigFile(String vhostName, String queueName, String exchangeName, boolean dontDeclare, String[] routingKeys, Map bindingArguments) + { + File tmpFile = null; + try + { + tmpFile = File.createTempFile(getName(), ".tmp"); + tmpFile.deleteOnExit(); + + FileWriter fstream = new FileWriter(tmpFile); + BufferedWriter writer = new BufferedWriter(fstream); + + //extra outer tag to please Commons Configuration + + writer.write(""); + writer.write(" " + vhostName + ""); + writer.write(" "); + writer.write(" " + vhostName + ""); + writer.write(" <" + vhostName + ">"); + writer.write(" " + StandardVirtualHostFactory.TYPE + ""); + writer.write(" "); + writer.write(" " + MemoryMessageStore.class.getName() + ""); + writer.write(" "); + if(exchangeName != null && !dontDeclare) + { + writer.write(" "); + writer.write(" "); + writer.write(" direct"); + writer.write(" " + exchangeName + ""); + writer.write(" "); + writer.write(" "); + } + writer.write(" "); + writer.write(" "); + writer.write(" " + queueName + ""); + writer.write(" <" + queueName + ">"); + if(exchangeName != null) + { + writer.write(" " + exchangeName + ""); + } + for(String routingKey: routingKeys) + { + writer.write(" " + routingKey + "\n"); + if (bindingArguments!= null && bindingArguments.containsKey(routingKey)) + { + writer.write(" <" + routingKey + ">\n"); + String[] arguments = (String[])bindingArguments.get(routingKey); + for (String argument : arguments) + { + writer.write(" " + argument + "\n"); + } + writer.write(" \n"); + } + } + writer.write(" "); + writer.write(" "); + writer.write(" "); + writer.write(" "); + writer.write(" "); + writer.write(""); + + writer.flush(); + writer.close(); + } + catch (IOException e) + { + fail("Unable to create virtualhost configuration"); + } + + return tmpFile; + } + + private VirtualHost createVirtualHostUsingStoreType(String virtualHostName) throws ConfigurationException, Exception + { + Broker broker = BrokerTestHelper.createBrokerMock(); + _virtualHostRegistry = broker.getVirtualHostRegistry(); + + Configuration config = new PropertiesConfiguration(); + config.setProperty("store.type", MemoryMessageStore.TYPE); + VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + _virtualHostRegistry.registerVirtualHost(host); + return host; + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java deleted file mode 100644 index 3ae269ff20..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import static org.mockito.Mockito.mock; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; - -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; - -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -public class VirtualHostImplTest extends QpidTestCase -{ - private VirtualHostRegistry _virtualHostRegistry; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHostRegistry != null) - { - _virtualHostRegistry.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - - } - - /** - * Tests that custom routing keys for the queue specified in the configuration - * file are correctly bound to the exchange (in addition to the queue name) - */ - public void testSpecifyingCustomBindings() throws Exception - { - customBindingTestImpl(new String[]{"custom1","custom2"}); - } - - /** - * Tests that a queue specified in the configuration file to be bound to a - * specified(non-default) direct exchange is a correctly bound to the exchange - * and the default exchange using the queue name. - */ - public void testQueueSpecifiedInConfigurationIsBoundToDefaultExchange() throws Exception - { - customBindingTestImpl(new String[0]); - } - - /** - * Tests that specifying custom routing keys for a queue in the configuration file results in failure - * to create the vhost (since this is illegal, only queue names are used with the default exchange) - */ - public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception - { - final String queueName = getName(); - final String customBinding = "custom-binding"; - File config = writeConfigFile(queueName, queueName, null, false, new String[]{customBinding}); - - try - { - createVirtualHost(queueName, config); - fail("virtualhost creation should have failed due to illegal configuration"); - } - catch (RuntimeException e) - { - assertNotNull(e.getCause()); - - assertEquals(ConfigurationException.class, e.getCause().getClass()); - - Throwable configException = e.getCause(); - assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, configException.getMessage()); - } - } - - public void testVirtualHostBecomesActive() throws Exception - { - File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); - VirtualHost vhost = createVirtualHost(getName(), config); - assertNotNull(vhost); - assertEquals(State.ACTIVE, vhost.getState()); - } - - public void testVirtualHostHavingStoreSetAsTypeBecomesActive() throws Exception - { - String virtualHostName = getName(); - VirtualHost host = createVirtualHostUsingStoreType(virtualHostName); - assertNotNull(host); - assertEquals(State.ACTIVE, host.getState()); - } - - public void testVirtualHostBecomesStoppedOnClose() throws Exception - { - File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); - VirtualHost vhost = createVirtualHost(getName(), config); - assertNotNull(vhost); - assertEquals(State.ACTIVE, vhost.getState()); - vhost.close(); - assertEquals(State.STOPPED, vhost.getState()); - assertEquals(0, vhost.getHouseKeepingActiveCount()); - } - - public void testVirtualHostHavingStoreSetAsTypeBecomesStoppedOnClose() throws Exception - { - String virtualHostName = getName(); - VirtualHost host = createVirtualHostUsingStoreType(virtualHostName); - assertNotNull(host); - assertEquals(State.ACTIVE, host.getState()); - host.close(); - assertEquals(State.STOPPED, host.getState()); - assertEquals(0, host.getHouseKeepingActiveCount()); - } - - /** - * Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost - */ - public void testSpecifyingUnknownExchangeThrowsException() throws Exception - { - final String queueName = getName(); - final String exchangeName = "made-up-exchange"; - File config = writeConfigFile(queueName, queueName, exchangeName, true, new String[0]); - - try - { - createVirtualHost(queueName, config); - fail("virtualhost creation should have failed due to illegal configuration"); - } - catch (RuntimeException e) - { - assertNotNull(e.getCause()); - - assertEquals(ConfigurationException.class, e.getCause().getClass()); - - Throwable configException = e.getCause(); - assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, configException.getMessage()); - } - } - - public void testCreateVirtualHostWithoutConfigurationInConfigFile() throws Exception - { - File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); - String hostName = getName() + "-not-existing"; - try - { - createVirtualHost(hostName, config); - fail("virtualhost creation should have failed due to illegal configuration"); - } - catch (RuntimeException e) - { - assertEquals("No configuration found for virtual host '" + hostName + "' in " + config.getAbsolutePath(), e.getMessage()); - } - } - - public void testBindingArguments() throws Exception - { - String exchangeName = getName() +".direct"; - String vhostName = getName(); - String queueName = getName(); - - Map bindingArguments = new HashMap(); - bindingArguments.put("ping", new String[]{"x-filter-jms-selector=select=1", "x-qpid-no-local"}); - bindingArguments.put("pong", new String[]{"x-filter-jms-selector=select='pong'"}); - File config = writeConfigFile(vhostName, queueName, exchangeName, false, new String[]{"ping","pong"}, bindingArguments); - VirtualHost vhost = createVirtualHost(vhostName, config); - - Exchange exch = vhost.getExchangeRegistry().getExchange(getName() +".direct"); - Collection bindings = exch.getBindings(); - assertNotNull("Bindings cannot be null", bindings); - assertEquals("Unexpected number of bindings", 3, bindings.size()); - - boolean foundPong = false; - boolean foundPing = false; - for (Binding binding : bindings) - { - String qn = binding.getQueue().getName(); - assertEquals("Unexpected queue name", getName(), qn); - Map arguments = binding.getArguments(); - - if ("ping".equals(binding.getBindingKey())) - { - foundPing = true; - assertEquals("Unexpected number of binding arguments for ping", 2, arguments.size()); - assertEquals("Unexpected x-filter-jms-selector for ping", "select=1", arguments.get("x-filter-jms-selector")); - assertTrue("Unexpected x-qpid-no-local for ping", arguments.containsKey("x-qpid-no-local")); - } - else if ("pong".equals(binding.getBindingKey())) - { - foundPong = true; - assertEquals("Unexpected number of binding arguments for pong", 1, arguments.size()); - assertEquals("Unexpected x-filter-jms-selector for pong", "select='pong'", arguments.get("x-filter-jms-selector")); - } - } - - assertTrue("Pong binding is not found", foundPong); - assertTrue("Ping binding is not found", foundPing); - } - - private void customBindingTestImpl(final String[] routingKeys) throws Exception - { - String exchangeName = getName() +".direct"; - String vhostName = getName(); - String queueName = getName(); - - File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys); - VirtualHost vhost = createVirtualHost(vhostName, config); - assertNotNull("virtualhost should exist", vhost); - - AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); - assertNotNull("queue should exist", queue); - - Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); - assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); - - Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); - assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); - - for(String key: routingKeys) - { - assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue)); - } - } - - - private VirtualHost createVirtualHost(String vhostName, File config) throws Exception - { - Broker broker = BrokerTestHelper.createBrokerMock(); - _virtualHostRegistry = broker.getVirtualHostRegistry(); - - VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker); - VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); - _virtualHostRegistry.registerVirtualHost(host); - - return host; - } - - /** - * Create a configuration file for testing virtualhost creation - * - * @param vhostName name of the virtualhost - * @param queueName name of the queue - * @param exchangeName name of a direct exchange to declare (unless dontDeclare = true) and bind the queue to (null = none) - * @param dontDeclare if true then dont declare the exchange, even if its name is non-null - * @param routingKeys routingKeys to bind the queue with (empty array = none) - * @return - */ - private File writeConfigFile(String vhostName, String queueName, String exchangeName, boolean dontDeclare, String[] routingKeys) - { - return writeConfigFile(vhostName, queueName, exchangeName, dontDeclare, routingKeys, null); - } - - private File writeConfigFile(String vhostName, String queueName, String exchangeName, boolean dontDeclare, String[] routingKeys, Map bindingArguments) - { - File tmpFile = null; - try - { - tmpFile = File.createTempFile(getName(), ".tmp"); - tmpFile.deleteOnExit(); - - FileWriter fstream = new FileWriter(tmpFile); - BufferedWriter writer = new BufferedWriter(fstream); - - //extra outer tag to please Commons Configuration - - writer.write(""); - writer.write(" " + vhostName + ""); - writer.write(" "); - writer.write(" " + vhostName + ""); - writer.write(" <" + vhostName + ">"); - writer.write(" "); - writer.write(" " + MemoryMessageStore.class.getName() + ""); - writer.write(" "); - if(exchangeName != null && !dontDeclare) - { - writer.write(" "); - writer.write(" "); - writer.write(" direct"); - writer.write(" " + exchangeName + ""); - writer.write(" "); - writer.write(" "); - } - writer.write(" "); - writer.write(" "); - writer.write(" " + queueName + ""); - writer.write(" <" + queueName + ">"); - if(exchangeName != null) - { - writer.write(" " + exchangeName + ""); - } - for(String routingKey: routingKeys) - { - writer.write(" " + routingKey + "\n"); - if (bindingArguments!= null && bindingArguments.containsKey(routingKey)) - { - writer.write(" <" + routingKey + ">\n"); - String[] arguments = (String[])bindingArguments.get(routingKey); - for (String argument : arguments) - { - writer.write(" " + argument + "\n"); - } - writer.write(" \n"); - } - } - writer.write(" "); - writer.write(" "); - writer.write(" "); - writer.write(" "); - writer.write(" "); - writer.write(""); - - writer.flush(); - writer.close(); - } - catch (IOException e) - { - fail("Unable to create virtualhost configuration"); - } - - return tmpFile; - } - - private VirtualHost createVirtualHostUsingStoreType(String virtualHostName) throws ConfigurationException, Exception - { - Broker broker = BrokerTestHelper.createBrokerMock(); - _virtualHostRegistry = broker.getVirtualHostRegistry(); - - Configuration config = new PropertiesConfiguration(); - config.setProperty("store.type", MemoryMessageStore.TYPE); - VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); - VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); - _virtualHostRegistry.registerVirtualHost(host); - return host; - } -} diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 08f7387b75..8f556ece5a 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.test.utils; @@ -110,10 +110,12 @@ public class QpidTestCase extends TestCase } protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name"; + protected static final String CONFIGURATION_STORE_CLASS_NAME_KEY = "configurationstore.class.name"; + protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore"; private static List _exclusionList; - + public QpidTestCase() { super(); @@ -138,7 +140,7 @@ public class QpidTestCase extends TestCase { final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); _logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass); - + return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java index 7492d062fd..a19ba21c5c 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java @@ -26,6 +26,7 @@ import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE; import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD; import java.util.Arrays; +import javax.net.ssl.SSLSocket; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQTestConnection_0_10; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index cff77711ca..ed76c40717 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -35,7 +35,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -public class SlowMessageStore implements MessageStore +public class SlowMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index deaac2dd2a..1d1c474be0 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -50,7 +50,8 @@ public class Asserts { assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost); assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE, - VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, VirtualHost.CONFIG_PATH); + VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, + VirtualHost.CONFIG_PATH, VirtualHost.TYPE); assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME)); assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID)); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 1823b59ba3..940d6a3298 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; import org.codehaus.jackson.JsonGenerationException; @@ -564,6 +565,7 @@ public class VirtualHostRestTest extends QpidRestTestCase } else { + hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); hostData.put(VirtualHost.STORE_PATH, storePath); hostData.put(VirtualHost.STORE_TYPE, storeType); } diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index acad55417a..666449b658 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.security.acl.AbstractACLTestCase; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory; import org.apache.qpid.server.security.group.FileGroupManagerFactory; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.test.utils.TestFileUtils; @@ -981,6 +982,7 @@ public class BrokerACLTest extends QpidRestTestCase hostData.put(VirtualHost.NAME, hostName); hostData.put(VirtualHost.STORE_PATH, getStoreLocation(hostName)); hostData.put(VirtualHost.STORE_TYPE, getTestProfileMessageStoreType()); + hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 9bf7dbd62a..4a81480671 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.client.timeouts; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,8 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { + setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type", + StandardVirtualHostFactory.TYPE); setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); @@ -64,7 +67,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase _connection = getConnection(); - //Create Queue + //Create Queue _queue = (Queue) getInitialContext().lookup("queue"); //Create Consumer -- cgit v1.2.1