diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-17 08:11:06 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-17 08:11:06 +0000 |
commit | c4f7a811226cd0342a6fe3a3845d8aea7fad2a09 (patch) | |
tree | 35a392f712f7e3e51d06d338896eb10a84449276 | |
parent | 6cef1bb516f9751a52d05108e990cb8940c10940 (diff) | |
download | qpid-python-c4f7a811226cd0342a6fe3a3845d8aea7fad2a09.tar.gz |
Initial change to separate virtual hosts into types
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-vhost-refactor@1493675 13f79535-47bb-0310-9956-ffa450edef68
68 files changed, 1415 insertions, 482 deletions
diff --git a/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml b/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml index 6f5f1f38ba..bb6d9ecef7 100644 --- a/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml +++ b/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml @@ -45,14 +45,21 @@ with one being configured as the default for clients that can't or don't specify messages on durable <emphasis>Queues</emphasis> it contains, as well as the configuration of any durable <emphasis>Queues</emphasis>, <emphasis>Exchanges</emphasis>, and <emphasis>Bindings</emphasis> made during its operation.</para> <para> - The following message stores are currently supported: + The Java Broker supports different types of <emphasis>Virtual Host</emphasis>, which providing + for different High Availability patterns. The <emphasis>standard</emphasis> <emphasis>Virtual + Host</emphasis> type provides no replication and is designed for standalone environments. It + supports a number of different <emphasis>Message Store</emphasis> implementations: <itemizedlist> <listitem><para><link linkend="Java-Broker-Stores-SQL-Store">JDBC Message Store</link></para></listitem> <listitem><para><link linkend="Java-Broker-Stores-Derby-Store">Derby Message Store</link></para></listitem> <listitem><para><link linkend="Java-Broker-Stores-BDB-Store">Berkeley DB Message Store</link></para></listitem> - <listitem><para><link linkend="Java-Broker-Stores-HA-BDB-Store">Berkeley DB HA Message Store</link></para></listitem> <listitem><para><link linkend="Java-Broker-Stores-Memory-Store">Memory Message Store</link></para></listitem> </itemizedlist> </para> +<para> + The <emphasis>BDB_HA</emphasis> <emphasis>Virtual Host</emphasis> provides clustering for + reliability using the HA capabilties of the Oracle Berkeley DB (Java Edition) (see + <link linkend="Java-Broker-Stores-HA-BDB-Store">Berkeley DB HA Message Store</link>). +</para> <para>Virtual Hosts configuration is covered in <xref linkend="Java-Broker-Virtual-Hosts"/>.</para> </section> diff --git a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml index 072effa798..e1c128a6c7 100644 --- a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml +++ b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml @@ -305,7 +305,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001" { "name" : "Broker", "defaultVirtualHost" : "default", - "modelVersion" : "1.0", + "modelVersion" : "1.1", "storeVersion" : 1, "authenticationproviders" : [ { "name" : "passwordFile", @@ -332,6 +332,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001" } ], "virtualhosts" : [ { "name" : "default", + "type" : "STANDARD", "storePath" : "${qpid.work_dir}/derbystore/default", "storeType" : "DERBY" } ], @@ -348,7 +349,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001" <itemizedlist> <listitem><para>Authentication Provider of type <emphasis>PlainPasswordFile</emphasis> with name "passwordFile"</para></listitem> <listitem><para>Four Port entries: "AMQP", "HTTP", "RMI_REGISTRY", "JMX_CONNECTOR"</para></listitem> - <listitem><para>Virtual Host with name "default" and DERBY message store type at location "${qpid.work_dir}/derbystore/default".</para></listitem> + <listitem><para>A "STANDARD" (i.e. non-HA) Virtual Host with name "default" and DERBY message store type at location "${qpid.work_dir}/derbystore/default".</para></listitem> <listitem><para>Two management plugins: "jmxManagement" of type "MANAGEMENT-JMX" and "httpManagement" of type "MANAGEMENT-HTTP".</para></listitem> <listitem><para>Broker attributes are stored as a root entry.</para></listitem> </itemizedlist> diff --git a/doc/book/src/java-broker/Java-Broker-High-Availability.xml b/doc/book/src/java-broker/Java-Broker-High-Availability.xml index ca205a865e..4c0391afec 100644 --- a/doc/book/src/java-broker/Java-Broker-High-Availability.xml +++ b/doc/book/src/java-broker/Java-Broker-High-Availability.xml @@ -56,8 +56,6 @@ do not current offer this feature.</para></footnote> to the new Master and continue their work.</para> <para>The Java Broker HA solution is incompatible with the HA solution offered by the CPP Broker. It is not possible to co-locate Java and CPP Brokers within the same cluster.</para> - <para>HA is not currently available for those using the the <emphasis role="bold">Derby Store</emphasis> or <emphasis role="bold">Memory - Message Store</emphasis>.</para> </section> <section role="h3" id="Java-Broker-High-Availability-TwoNodeCluster"> @@ -302,8 +300,8 @@ <virtualhost> <name>vhostname</name> <vhostname> + <type>BDB_HA</type> <store> - <class>org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore</class> <environment-path>${QPID_WORK}/bdbhastore/vhostname</environment-path> <highAvailability> <groupName>myclustername</groupName> diff --git a/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml b/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml index b240d85d4f..b5a2537eac 100644 --- a/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml +++ b/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml @@ -35,7 +35,7 @@ <itemizedlist> <listitem> <para> - <emphasis>Supplying simply a <link linkend="Java-Broker-Stores">store type</link> and a store path</emphasis>: In this case, + <emphasis>Supplying simply a type, store path, and type specific fields (in the case of a "STANDARD" Virtual Host this is simply a <link linkend="Java-Broker-Stores">store type</link>) and a store path</emphasis>: In this case, the virtual host attributes are currently derived from default attribute values defined on the broker. This is the preferred approach. </para> </listitem> @@ -44,7 +44,7 @@ <emphasis>Supplying the path to a <link linkend="Java-Broker-Virtual-Hosts-Configuration-File">Virtual Host XML configuration file</link></emphasis>: In this case, specific per-virtualhost attribute configuration can be set in the file, as well as pre-configuring queues, exchanges, etc. This is no longer the preferred approach and will likely be removed in a future release, however it is currently still neccessary to support certain use-cases such as per-virtualhost attribute configuration, and - specialised store configuration such as for the <link linkend="Java-Broker-Stores-HA-BDB-Store">BDB HA Message Store</link>. + specialised Virtual Host configuration such for the <link linkend="Java-Broker-Stores-HA-BDB-Store">BDB HA Message Store</link>. </para> </listitem> </itemizedlist> diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 3074daa46e..d036a5d39a 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -75,7 +75,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.util.FileUtils; -public abstract class AbstractBDBMessageStore implements MessageStore +public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java new file mode 100644 index 0000000000..0231573053 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -0,0 +1,198 @@ +package org.apache.qpid.server.store.berkeleydb; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.OperationalLoggingListener; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.server.virtualhost.State; +import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public class BDBHAVirtualHost extends AbstractVirtualHost +{ + private BDBHAMessageStore _messageStore; + + private boolean _inVhostInitiatedClose; + + BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) + throws Exception + { + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + + protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + { + _messageStore = new BDBHAMessageStore(); + + final MessageStoreLogSubject storeLogSubject = + new MessageStoreLogSubject(this, _messageStore.getClass().getSimpleName()); + OperationalLoggingListener.listen(_messageStore, storeLogSubject); + + _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); + _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); + _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); + + + + _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); + _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); + + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + + _messageStore.configureConfigStore(getName(), + recoveryHandler, + hostConfig.getStoreConfiguration()); + + _messageStore.configureMessageStore(getName(), + recoveryHandler, + recoveryHandler, + hostConfig.getStoreConfiguration()); + } + + + protected void closeStorage() + { + //Close MessageStore + if (_messageStore != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + _inVhostInitiatedClose = true; + getMessageStore().close(); + } + catch (Exception e) + { + getLogger().error("Failed to close message store", e); + } + finally + { + _inVhostInitiatedClose = false; + } + } + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _messageStore; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + private final class AfterInitialisationListener implements EventListener + { + public void event(Event event) + { + setState(State.PASSIVE); + } + + } + + private final class BeforePassivationListener implements EventListener + { + public void event(Event event) + { + State finalState = State.ERRORED; + + try + { + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare.. + */ + + getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + removeHouseKeepingTasks(); + + getQueueRegistry().stopAllAndUnregisterMBeans(); + getExchangeRegistry().clearAndUnregisterMbeans(); + getDtxRegistry().close(); + + finalState = State.PASSIVE; + } + finally + { + setState(finalState); + reportIfError(getState()); + } + } + + } + + + private final class BeforeActivationListener implements EventListener + { + @Override + public void event(Event event) + { + try + { + initialiseModel(getConfiguration()); + } + catch (Exception e) + { + throw new RuntimeException("Failed to initialise virtual host after state change", e); + } + } + } + + private final class AfterActivationListener implements EventListener + { + @Override + public void event(Event event) + { + attainActivation(); + } + } + + private final class BeforeCloseListener implements EventListener + { + @Override + public void event(Event event) + { + if(!_inVhostInitiatedClose) + { + shutdownHouseKeeping(); + } + + } + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java new file mode 100644 index 0000000000..b01aeafb9a --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -0,0 +1,106 @@ +package org.apache.qpid.server.store.berkeleydb;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public class BDBHAVirtualHostFactory implements VirtualHostFactory +{ + + public static final String TYPE = "BDB_HA"; + + @Override + public String getType() + { + return TYPE; + } + + @Override + public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + return new BDBHAVirtualHost(virtualHostRegistry, + brokerStatisticsGatherer, + parentSecurityManager, + hostConfig); + } + + @Override + public void validateAttributes(Map<String, Object> attributes) + { + validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes); + validateAttribute("haGroupName", String.class, attributes); + validateAttribute("haNodeName", String.class, attributes); + validateAttribute("haNodeAddress", String.class, attributes); + validateAttribute("haHelperAddress", String.class, attributes); + } + + private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes) + { + Object attr = attributes.get(attrName); + if(!clazz.isInstance(attr)) + { + throw new IllegalArgumentException("Attribute '"+ attrName + +"' is required and must be of type "+clazz.getSimpleName()+"."); + } + } + + @Override + public Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter) + { + LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); + convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); + convertedMap.put("store.highAvailability.groupName", virtualHostAdapter.getAttribute("haGroupName")); + convertedMap.put("store.highAvailability.nodeName", virtualHostAdapter.getAttribute("haNodeName")); + convertedMap.put("store.highAvailability.nodeHostPort", virtualHostAdapter.getAttribute("haNodeAddress")); + convertedMap.put("store.highAvailability.helperHostPort", virtualHostAdapter.getAttribute("haHelperAddress")); + + final Object haDurability = virtualHostAdapter.getAttribute("haDurability"); + if(haDurability !=null) + { + convertedMap.put("store.highAvailability.durability", haDurability); + } + + final Object designatedPrimary = virtualHostAdapter.getAttribute("haDesignatedPrimary"); + if(designatedPrimary!=null) + { + convertedMap.put("store.highAvailability.designatedPrimary", designatedPrimary); + } + + final Object coalescingSync = virtualHostAdapter.getAttribute("haCoalescingSync"); + if(coalescingSync!=null) + { + convertedMap.put("store.highAvailability.coalescingSync", coalescingSync); + } + + // TODO REP_CONFIG values + + return convertedMap; + } +} diff --git a/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js new file mode 100644 index 0000000000..44ad5fa57a --- /dev/null +++ b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array) { + return { + show: function() { + + var node = dom.byId("addVirtualHost.typeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,27) == "formAddVirtualHost.specific") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/bdb_ha/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + }}); + } + }; + }); diff --git a/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html new file mode 100644 index 0000000000..1727264d41 --- /dev/null +++ b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html @@ -0,0 +1,37 @@ +<table class="tableContainer-table tableContainer-table-horiz"> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.storePath" + required="true" name="storePath" placeholder="/path/to/message/store"/> + </td> + </tr> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Node Name*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.nodeName" + required="true" name="haNodeName" placeholder="node name"/> + </td> + </tr> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Replication Group*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.groupName" + required="true" name="haGroupName" placeholder="group name"/> + </td> + </tr> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Node Address*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.nodeAddress" + required="true" name="haNodeAddress" data-dojo-props="regExp:'([0-9a-zA-Z.-_]|::)+:[0-9]{1,5}', invalidMessage:'Must be of the form host:port'" placeholder="host:port"/> + </td> + </tr> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Helper Address*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.helperAddress" + required="true" name="haHelperAddress" placeholder="host:port"/> + </td> + </tr> +</table> diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory new file mode 100644 index 0000000000..0f8848cb74 --- /dev/null +++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java index 8e32a1d113..047b102817 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -115,6 +115,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase String vhostPrefix = "virtualhosts.virtualhost." + vhostName; _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); + _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator + port); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index 5cc436a22a..6c6145fabb 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -20,15 +20,26 @@ */ package org.apache.qpid.server.store.berkeleydb; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreTest; import org.apache.qpid.server.store.MessageStore; public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest { + + private BDBMessageStore _bdbMessageStore; + @Override - protected MessageStore createStore() throws Exception + protected BDBMessageStore createMessageStore() throws Exception { - return new BDBMessageStore(); + _bdbMessageStore = new BDBMessageStore(); + return _bdbMessageStore; } + // TODO - this only works so long as createConfigStore is called after createMessageStore + @Override + protected DurableConfigurationStore createConfigStore() throws Exception + { + return _bdbMessageStore; + } } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 4c2fa910f5..353c3a0ec5 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -75,6 +75,7 @@ public class HATestClusterCreator private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; + private String _vhostConfigKeyPrefix; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -83,7 +84,8 @@ public class HATestClusterCreator _groupName = "group" + _testcase.getName(); _ipAddressOfBroker = getIpAddressOfBrokerHost(); _numberOfNodes = numberOfNodes; - _vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + "."; + _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store."; _bdbHelperPort = 0; } @@ -350,6 +352,8 @@ public class HATestClusterCreator { final String nodeName = getNodeNameForNodeAt(bdbPort); + + _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE); _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java index a7066c73d8..8692ecc88c 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java @@ -33,8 +33,9 @@ import javax.servlet.http.HttpServletResponse; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAccessControlProviderAttributes; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAuthenticationProviderAttributes; +import org.apache.qpid.server.management.plugin.servlet.rest.action.ListBrokerAttribute; import org.apache.qpid.server.management.plugin.servlet.rest.action.ListGroupProviderAttributes; -import org.apache.qpid.server.management.plugin.servlet.rest.action.ListMessageStoreTypes; +import org.apache.qpid.server.model.Broker; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -52,8 +53,10 @@ public class HelperServlet extends AbstractServlet _actions = new HashMap<String, Action>(); Action listProviderAttributes = new ListAuthenticationProviderAttributes(); _actions.put(listProviderAttributes.getName(), listProviderAttributes); - Action listMessageStoreTypes = new ListMessageStoreTypes(); + Action listMessageStoreTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES, "ListMessageStoreTypes"); _actions.put(listMessageStoreTypes.getName(), listMessageStoreTypes); + Action listVirtualHostTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_TYPES, "ListVirtualHostTypes"); + _actions.put(listVirtualHostTypes.getName(), listVirtualHostTypes); Action groupProviderAttributes = new ListGroupProviderAttributes(); _actions.put(groupProviderAttributes.getName(), groupProviderAttributes); Action aclProviderAttributes = new ListAccessControlProviderAttributes(); diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java index c0a5d78753..dc414e6a64 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java @@ -25,19 +25,28 @@ import java.util.Map; import org.apache.qpid.server.management.plugin.servlet.rest.Action; import org.apache.qpid.server.model.Broker; -public class ListMessageStoreTypes implements Action +public class ListBrokerAttribute implements Action { + private final String _attributeName; + private final String _name; + + public ListBrokerAttribute(String attributeName, String name) + { + _attributeName = attributeName; + _name = name; + } + @Override public String getName() { - return ListMessageStoreTypes.class.getSimpleName(); + return _name; } @Override public Object perform(Map<String, Object> request, Broker broker) { - return broker.getAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES); + return broker.getAttribute(_attributeName); } } diff --git a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html index 43281f600d..282f4ab8f6 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html +++ b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html @@ -48,17 +48,12 @@ <div id="addVirtualHost.attributesDiv"> <table class="tableContainer-table tableContainer-table-horiz"> <tr> - <td class="tableContainer-labelCell" style="width: 300px;"><strong>Store Type*: </strong></td> - <td class="tableContainer-valueCell" ><div id="addVirtualHost.selectStoreType"></div></td> - </tr> - <tr> - <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td> - <td class="tableContainer-valueCell"> - <input dojoType="dijit.form.ValidationTextBox" id="formAddVirtualHost.storePath" - name="storePath" placeholder="/path/to/message/store" /> - </td> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Type*: </strong></td> + <td class="tableContainer-valueCell" ><div id="addVirtualHost.selectType"></div></td> </tr> </table> + <div id="addVirtualHost.typeSpecificDiv"> + </div> </div> </div> diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js index 0343d3393a..18abfa443f 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js @@ -60,7 +60,7 @@ define(["dojo/_base/xhr", var node = construct.create("div", null, win.body(), "last"); - var convertToPort = function convertToPort(formValues) + var convertToPort = function convertToPort(formValues) { var newPort = {}; newPort.name = dijit.byId("formAddPort.name").value; @@ -478,4 +478,4 @@ define(["dojo/_base/xhr", }; return addPort; - });
\ No newline at end of file + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js index 9c04c3014f..330c6ed40b 100644 --- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js @@ -92,7 +92,7 @@ define(["dojo/_base/xhr", }, "addVirtualHost.configPathDiv"); var attributesPane = new dijit.layout.AccordionPane({ - title: "Store Attributes", + title: "Virtual Host Attributes", selected: true }, "addVirtualHost.attributesDiv"); @@ -112,14 +112,14 @@ define(["dojo/_base/xhr", if(theForm.validate()){ var formValues = theForm.getValues(); - if (formValues.configPath == "" && formValues.storeType == "") + if (formValues.configPath == "" && formValues["type"] == "") { - alert("Please specify either configuration or store type for the virtual host"); + alert("Please specify either configuration file or type for the virtual host"); return false; } - if (formValues.configPath != "" && formValues.storeType != "") + if (formValues.configPath != "" && formValues["type"] != "") { - alert("Either configuration or store type with path have to be specified!"); + alert("Either configuration file or type have to be specified!"); return false; } var newVirtualHost = convertToVirtualHost(formValues); @@ -149,63 +149,51 @@ define(["dojo/_base/xhr", }}); } - addVirtualHost.show = function(virtualHostName) { + addVirtualHost.selectVhostType = function(type) { + if(type && String(type).trim() != "") { + require(["qpid/management/virtualhost/"+type.toLowerCase()+"/addVirtualHost"], + function(vhostType) + { + vhostType.show(); + }); + } + } + + addVirtualHost.show = function() { var that = this; + dom.byId("addVirtualHost.typeSpecificDiv").innerHTML = ""; registry.byId("formAddVirtualHost").reset(); dojo.byId("formAddVirtualHost.id").value=""; - if (!that.hasOwnProperty("storeTypeChooser")) + + if (!that.hasOwnProperty("typeChooser")) { xhr.get({ sync: true, - url: "rest/helper?action=ListMessageStoreTypes", + url: "rest/helper?action=ListVirtualHostTypes", handleAs: "json" }).then( function(data) { - var storeTypes = data; - var storeTypesData = []; - for (var i =0 ; i < storeTypes.length; i++) + var vhostTypes = data; + var vhostTypesData = []; + for (var i =0 ; i < vhostTypes.length; i++) { - storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]}; + vhostTypesData[i]= {id: vhostTypes[i], name: vhostTypes[i]}; } - var storeTypesStore = new Memory({ data: storeTypesData }); - var storeTypesDiv = dom.byId("addVirtualHost.selectStoreType"); - var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv); - that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.storeType", - name: "storeType", - store: storeTypesStore, - searchAttr: "name", required: false}, input); + var typesStore = new Memory({ data: vhostTypesData }); + var typesDiv = dom.byId("addVirtualHost.selectType"); + var input = construct.create("input", {id: "addType", required: false}, typesDiv); + that.typeChooser = new FilteringSelect({ id: "addVirtualHost.type", + name: "type", + store: typesStore, + searchAttr: "name", + required: false, + onChange: that.selectVhostType }, input); }); } - if (virtualHostName) - { - xhr.get({ - url: "rest/virtualhost/" + encodeURIComponent(virtualHostName), - handleAs: "json" - }).then( - function(data) { - var host = data[0]; - var nameField = dijit.byId("formAddVirtualHost.name"); - nameField.set("value", host.name); - dojo.byId("formAddVirtualHost.id").value=host.id; - var configPath = host.configPath; - if (configPath) - { - var configPathField = dijit.byId("formAddVirtualHost.configPath"); - configPathField.set("value", host.configPath); - } - else - { - that.storeTypeChooser.set("value", host.storeType.toLowerCase()); - var storePathField = dijit.byId("formAddVirtualHost.storePath"); - storePathField.set("value", host.storePath); - } - registry.byId("addVirtualHost").show(); - }); - } - else - { - registry.byId("addVirtualHost").show(); - } + + + registry.byId("addVirtualHost").show(); + } return addVirtualHost; - });
\ No newline at end of file + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js new file mode 100644 index 0000000000..cd56ca9cba --- /dev/null +++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + 'dojo/_base/json', + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.typeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,27) == "formAddVirtualHost.specific") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/standard/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + if (that.hasOwnProperty("storeTypeChooser")) + { + that.storeTypeChooser.destroy(); + } + xhr.get({ + sync: true, + url: "rest/helper?action=ListMessageStoreTypes", + handleAs: "json" + }).then( + function(data) { + var storeTypes = data; + var storeTypesData = []; + for (var i =0 ; i < storeTypes.length; i++) + { + storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]}; + } + var storeTypesStore = new Memory({ data: storeTypesData }); + var storeTypesDiv = dom.byId("addVirtualHost.specific.selectStoreType"); + var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv); + that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.specific.storeType", + name: "storeType", + store: storeTypesStore, + searchAttr: "name", required: false}, input); + }); + + }}); + } + }; + }); diff --git a/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html new file mode 100644 index 0000000000..9596ef4175 --- /dev/null +++ b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html @@ -0,0 +1,13 @@ +<table class="tableContainer-table tableContainer-table-horiz"> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Store Type*: </strong></td> + <td class="tableContainer-valueCell" ><div id="addVirtualHost.specific.selectStoreType"></div></td> + </tr> + <tr> + <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td> + <td class="tableContainer-valueCell"> + <input dojoType="dijit/form/ValidationTextBox" required="true" id="formAddVirtualHost.specific.storePath" + name="storePath" placeholder="/path/to/message/store" /> + </td> + </tr> +</table> diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index aff84e5832..7977b8b300 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -295,4 +295,8 @@ public class VirtualHostConfiguration extends AbstractConfiguration return brokerValue == null? false : brokerValue.booleanValue(); } + public String getType() + { + return getStringValue("type", "STANDARD"); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 82adcf4dde..07d934027e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -35,7 +35,7 @@ public interface IConnectionRegistry public void close() throws AMQException; - public void close(String replyText) throws AMQException; + public void close(String replyText); public List<AMQConnectionModel> getConnections(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 803aeceab8..6b453cbbda 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -554,7 +554,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(b); + _virtualHost.getDurableConfigurationStore().unbindQueue(b); } b.logDestruction(); } @@ -626,7 +626,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable() && !restore) { - _virtualHost.getMessageStore().bindQueue(b); + _virtualHost.getDurableConfigurationStore().bindQueue(b); } queue.addQueueDeleteTask(b); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 450e74bfec..142da84524 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -74,7 +74,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public DurableConfigurationStore getDurableConfigurationStore() { - return _host.getMessageStore(); + return _host.getDurableConfigurationStore(); } public void registerExchange(Exchange exchange) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index eba63558ca..b632c68ace 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -134,8 +134,6 @@ public interface Exchange extends ExchangeReferrer */ boolean isBound(AMQShortString routingKey); - boolean isBound(AMQQueue queue); - /** * Returns true if this exchange has at least one binding associated with it. * @return @@ -147,15 +145,17 @@ public interface Exchange extends ExchangeReferrer boolean isBound(String bindingKey); - boolean isBound(String bindingKey, AMQQueue queue); + boolean isBound(AMQQueue queue); - boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); + boolean isBound(Map<String, Object> arguments); - boolean isBound(Map<String, Object> arguments, AMQQueue queue); + boolean isBound(String bindingKey, AMQQueue queue); boolean isBound(String bindingKey, Map<String, Object> arguments); - boolean isBound(Map<String, Object> arguments); + boolean isBound(Map<String, Object> arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); void removeReference(ExchangeReferrer exchange); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index eed0cd6020..9c25d00b1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -71,7 +71,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); } - + synchronized(exchangeRegistry) { Exchange exchange = exchangeRegistry.getExchange(exchangeName); @@ -106,7 +106,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange if (exchange.isDurable()) { - virtualHost.getMessageStore().createExchange(exchange); + virtualHost.getDurableConfigurationStore().createExchange(exchange); } } catch(AMQUnknownExchangeType e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index c889f5660d..2234ee0354 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -65,7 +65,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); final AMQShortString queueName; @@ -80,7 +80,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } AMQQueue queue; - + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? AMQChannel channel = protocolConnection.getChannel(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 762f090b83..b184ce7dfa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -62,7 +62,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); AMQChannel channel = protocolConnection.getChannel(channelId); @@ -110,7 +110,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); } - + int purged = queue.delete(); if (queue.isDurable()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java index f666eb29f1..24fd687240 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java @@ -42,6 +42,7 @@ public interface Broker extends ConfiguredObject String PROCESS_PID = "processPid"; String PRODUCT_VERSION = "productVersion"; String SUPPORTED_BROKER_STORE_TYPES = "supportedBrokerStoreTypes"; + String SUPPORTED_VIRTUALHOST_TYPES = "supportedVirtualHostTypes"; String SUPPORTED_VIRTUALHOST_STORE_TYPES = "supportedVirtualHostStoreTypes"; String SUPPORTED_AUTHENTICATION_PROVIDERS = "supportedAuthenticationProviders"; String CREATED = "created"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java index dab92c50fa..9120a27976 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java @@ -31,9 +31,13 @@ public class Model { /* * API version for the broker model + * + * 1.0 Initial version + * 1.1 Addition of mandatory virtual host type / different types of virtual host + * */ public static final int MODEL_MAJOR_VERSION = 1; - public static final int MODEL_MINOR_VERSION = 0; + public static final int MODEL_MINOR_VERSION = 1; public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION; private static final Model MODEL_INSTANCE = new Model(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 424ba825d7..08e9c5ca5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -83,6 +83,7 @@ public interface VirtualHost extends ConfiguredObject String NAME = "name"; String STATE = "state"; String TIME_TO_LIVE = "timeToLive"; + String TYPE = "type"; String UPDATED = "updated"; String CONFIG_PATH = "configPath"; @@ -92,6 +93,7 @@ public interface VirtualHost extends ConfiguredObject Arrays.asList( ID, NAME, + TYPE, STATE, DURABLE, LIFETIME_POLICY, @@ -139,7 +141,7 @@ public interface VirtualHost extends ConfiguredObject void dequeue(QueueEntry entry); void copy(QueueEntry entry, Queue queue); - + void move(QueueEntry entry, Queue queue); } @@ -158,4 +160,6 @@ public interface VirtualHost extends ConfiguredObject SecurityManager getSecurityManager(); MessageStore getMessageStore(); + + String getType(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java index a8c3f54530..7ff8c88331 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java @@ -54,17 +54,40 @@ abstract class AbstractAdapter implements ConfiguredObject protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor) { + this(id, defaults, attributes, taskExecutor, true); + } + + protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, + TaskExecutor taskExecutor, boolean filterAttributes) + + { _taskExecutor = taskExecutor; _id = id; if (attributes != null) { Collection<String> names = getAttributeNames(); - for (String name : names) + if(filterAttributes) + { + for (String name : names) + { + if (attributes.containsKey(name)) + { + final Object value = attributes.get(name); + if(value != null) + { + _attributes.put(name, value); + } + } + } + } + else { - if (attributes.containsKey(name)) + for(Map.Entry<String, Object> entry : attributes.entrySet()) { - //TODO: dont put nulls - _attributes.put(name, attributes.get(name)); + if(entry.getValue()!=null) + { + _attributes.put(entry.getKey(),entry.getValue()); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index adc30eb944..678db43d58 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.SimpleAuthenticationProviderAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.access.Operation; @@ -771,6 +772,10 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat { return _supportedVirtualHostStoreTypes; } + else if(SUPPORTED_VIRTUALHOST_TYPES.equals(name)) + { + return VirtualHostFactory.TYPES.get(); + } else if(SUPPORTED_AUTHENTICATION_PROVIDERS.equals(name)) { return _authenticationProviderFactory.getSupportedAuthenticationProviders(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index afcce482b6..0f90df00e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -195,7 +195,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs _queue.delete(); if (_queue.isDurable()) { - _queue.getVirtualHost().getMessageStore().removeQueue(_queue); + _queue.getVirtualHost().getDurableConfigurationStore().removeQueue(_queue); } } } @@ -365,7 +365,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { try { - _queue.getVirtualHost().getMessageStore().updateQueue(_queue); + _queue.getVirtualHost().getDurableConfigurationStore().updateQueue(_queue); } catch (AMQStoreException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index de626a7639..383ff2f3f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -79,7 +79,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, @@ -91,6 +91,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual @SuppressWarnings("serial") public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ put(NAME, String.class); + put(TYPE, String.class); put(STORE_PATH, String.class); put(STORE_TYPE, String.class); put(CONFIG_PATH, String.class); @@ -114,7 +115,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor) { - super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); + super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false); _broker = broker; _brokerStatisticsGatherer = brokerStatisticsGatherer; validateAttributes(); @@ -130,18 +131,23 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } String configurationFile = (String) getAttribute(CONFIG_PATH); - String storeType = (String) getAttribute(STORE_TYPE); + String type = (String) getAttribute(TYPE); + boolean invalidAttributes = false; if (configurationFile == null) { - if (storeType == null) + if (type == null) { invalidAttributes = true; } + else + { + validateAttributes(type); + } } else { - if (storeType != null) + if (type != null) { invalidAttributes = true; } @@ -149,7 +155,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (invalidAttributes) { - throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'storeType' and 'storePath' attributes"); + throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes"); } // pre-load the configuration in order to validate @@ -163,6 +169,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } + private void validateAttributes(String type) + { + final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory == null) + { + throw new IllegalArgumentException("Unknown virtual host type '"+ type +"'. Valid types are: " + VirtualHostFactory.TYPES.get()); + } + factory.validateAttributes(getActualAttributes()); + + } + private void populateExchanges() { Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges = @@ -295,7 +312,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual _virtualHost.getExchangeRegistry().registerExchange(exchange); if(durable) { - _virtualHost.getMessageStore().createExchange(exchange); + _virtualHost.getDurableConfigurationStore().createExchange(exchange); } synchronized (_exchangeAdapters) { @@ -417,7 +434,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(durable) { - _virtualHost.getMessageStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); + _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); } synchronized (_queueAdapters) { @@ -444,6 +461,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new IllegalStateException(); } + + public String getType() + { + return (String)getAttribute(TYPE); + } + + public String setType(final String currentType, final String desiredType) + throws IllegalStateException, AccessControlException + { + throw new IllegalStateException(); + } + + @Override public State getActualState() { @@ -1070,7 +1100,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual try { VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName); - _virtualHost = new VirtualHostImpl(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration); + String type = configuration.getType(); + final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory == null) + { + throw new IllegalArgumentException("Unknown virtual host type: " + type); + } + else + { + _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(), + _brokerStatisticsGatherer, + _broker.getSecurityManager(), + configuration); + } } catch (Exception e) { @@ -1106,8 +1148,16 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { final MyConfiguration basicConfiguration = new MyConfiguration(); PropertiesConfiguration config = new PropertiesConfiguration(); - config.addProperty("store.type", (String)getAttribute(STORE_TYPE)); - config.addProperty("store.environment-path", (String)getAttribute(STORE_PATH)); + final String type = (String) getAttribute(TYPE); + config.addProperty("type", type); + VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory != null) + { + for(Map.Entry<String,Object> entry : factory.createVirtualHostConfiguration(this).entrySet()) + { + config.addProperty(entry.getKey(), entry.getValue()); + } + } basicConfiguration.addConfiguration(config); CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java new file mode 100644 index 0000000000..f952e0410c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +public interface VirtualHostFactory +{ + String getType(); + + VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception; + + void validateAttributes(Map<String, Object> attributes); + + Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter); + + static final class TYPES + { + private TYPES() + { + } + + public static Collection<String> get() + { + QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>(); + Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + List<String> names = new ArrayList<String>(); + for(VirtualHostFactory factory : factories) + { + names.add(factory.getType()); + } + return Collections.unmodifiableCollection(names); + } + } + + + static final class FACTORIES + { + private FACTORIES() + { + } + + public static VirtualHostFactory get(String type) + { + QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>(); + Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + for(VirtualHostFactory factory : factories) + { + if(factory.getType().equals(type)) + { + return factory; + } + } + return null; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 37f1f8f7a5..cc0e5ebe7a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -315,7 +315,7 @@ public class AMQQueueFactory exchangeRegistry.registerExchange(dlExchange); //enter the dle in the persistent store - virtualHost.getMessageStore().createExchange(dlExchange); + virtualHost.getDurableConfigurationStore().createExchange(dlExchange); } } @@ -335,7 +335,7 @@ public class AMQQueueFactory dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); //enter the dlq in the persistent store - virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 6b4b8d4a3e..745a06c7fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -55,7 +55,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; -abstract public class AbstractJDBCMessageStore implements MessageStore +abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 4e7bbf04a6..27a40963f6 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -33,7 +33,7 @@ public interface DurableConfigurationStore public static interface Source { - DurableConfigurationStore getMessageStore(); + DurableConfigurationStore getDurableConfigurationStore(); } /** @@ -107,11 +107,11 @@ public interface DurableConfigurationStore * Removes the specified queue from the persistent store. * * @param queue The queue to remove. - * + * * @throws AMQStoreException If the operation fails for any reason. */ void removeQueue(AMQQueue queue) throws AMQStoreException; - + /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index cf8444b089..bbdfaf4959 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -26,7 +26,7 @@ import org.apache.commons.configuration.Configuration; * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * */ -public interface MessageStore extends DurableConfigurationStore +public interface MessageStore { /** * Called after instantiation in order to configure the message store. A particular implementation can define diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java index d67ccfd8a4..fe7dd81e0c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java @@ -50,6 +50,12 @@ public class MessageStoreCreator } } + public boolean isValidType(String storeType) + { + return _factories.containsKey(storeType.toLowerCase()); + } + + public MessageStore createMessageStore(String storeType) { MessageStoreFactory factory = _factories.get(storeType.toLowerCase()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index fdb80295cf..f0936a221c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -public abstract class NullMessageStore implements MessageStore +public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @Override public void configureConfigStore(String name, @@ -125,4 +125,4 @@ public abstract class NullMessageStore implements MessageStore public void onDelete() { } -}
\ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 57024817f5..5b53f9ee6c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -35,6 +35,7 @@ import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; @@ -46,7 +47,7 @@ import org.apache.qpid.util.FileUtils; * mechanism. * */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 548b75f949..63419fce3f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -762,7 +762,7 @@ public class ServerSessionDelegate extends SessionDelegate { if (exchange.isDurable()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.createExchange(exchange); } exchangeRegistry.registerExchange(exchange); @@ -917,7 +917,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable() && !exchange.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.removeExchange(exchange); } } @@ -1241,7 +1241,7 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); AMQQueue queue; @@ -1468,7 +1468,7 @@ public class ServerSessionDelegate extends SessionDelegate queue.delete(); if (queue.isDurable() && !queue.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getMessageStore(); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); store.removeQueue(queue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java index 16e717a9c7..37e0177b00 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java @@ -127,7 +127,13 @@ public class MapValueConverter } else if (rawValue instanceof String) { - return (T) Enum.valueOf(enumType, (String) rawValue); + final String stringValue = (String) rawValue; + + return "null".equals(stringValue) ? null : (T) Enum.valueOf(enumType, stringValue); + } + else if(rawValue == null) + { + return null; } else { @@ -281,14 +287,23 @@ public class MapValueConverter public static Map<String, Object> convert(Map<String, Object> configurationAttributes, Map<String, Type> attributeTypes) { + return convert(configurationAttributes, attributeTypes, true); + } + + public static Map<String, Object> convert(Map<String, Object> configurationAttributes, + Map<String, Type> attributeTypes, + boolean exclusive) + { Map<String, Object> attributes = new HashMap<String, Object>(); - for (Map.Entry<String, Type> attributeEntry : attributeTypes.entrySet()) + for (Map.Entry<String, Object> attribute : configurationAttributes.entrySet()) { - String attributeName = attributeEntry.getKey(); - if (configurationAttributes.containsKey(attributeName)) + String attributeName = attribute.getKey(); + Object rawValue = attribute.getValue(); + + if (attributeTypes.containsKey(attributeName)) { - Type typeObject = attributeEntry.getValue(); - Object rawValue = configurationAttributes.get(attributeName); + Type typeObject = attributeTypes.get(attributeName); + Object value = null; if (typeObject instanceof Class) { @@ -311,16 +326,21 @@ public class MapValueConverter } else { - throw new IllegalArgumentException("Convertion into " + parameterizedType + " is not yet supported"); + throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported"); } } else { - throw new IllegalArgumentException("Convertion into " + typeObject + " is not yet supported"); + throw new IllegalArgumentException("Conversion into " + typeObject + " is not yet supported"); } attributes.put(attributeName, value); } + else if(!exclusive) + { + attributes.put(attributeName, rawValue); + } } + return attributes; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index c63c32188d..6116d46e41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -57,17 +57,17 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; -public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener +public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { - private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); + private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; @@ -97,8 +97,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final DtxRegistry _dtxRegistry; - private final MessageStore _messageStore; - private volatile State _state = State.INITIALISING; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -106,7 +104,10 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); private boolean _blocked; - public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception + public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception { if (hostConfig == null) { @@ -141,18 +142,16 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _exchangeRegistry = new DefaultExchangeRegistry(this); - _messageStore = initialiseMessageStore(hostConfig); - - configureMessageStore(hostConfig); - - activateNonHAMessageStore(); - initialiseStatistics(); - _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + initialiseStorage(hostConfig); + + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } + abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception; + public IConnectionRegistry getConnectionRegistry() { return _connectionRegistry; @@ -187,25 +186,25 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } } - private void shutdownHouseKeeping() + protected void shutdownHouseKeeping() { _houseKeepingTasks.shutdown(); - try - { - if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) - { - _houseKeepingTasks.shutdownNow(); - } - } - catch (InterruptedException e) - { - _logger.warn("Interrupted during Housekeeping shutdown:", e); - Thread.currentThread().interrupt(); - } + try + { + if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + { + _houseKeepingTasks.shutdownNow(); + } + } + catch (InterruptedException e) + { + _logger.warn("Interrupted during Housekeeping shutdown:", e); + Thread.currentThread().interrupt(); + } } - private void removeHouseKeepingTasks() + protected void removeHouseKeepingTasks() { BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue(); for (final Runnable runnable : taskQueue) @@ -257,62 +256,13 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _houseKeepingTasks.getActiveCount(); } - private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - String storeType = hostConfig.getConfig().getString("store.type"); - MessageStore messageStore = null; - if (storeType == null) - { - final Class<?> clazz = Class.forName(hostConfig.getMessageStoreClass()); - final Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) - { - throw new ClassCastException(clazz + " does not implement " + MessageStore.class); - } - messageStore = (MessageStore) o; - } - else - { - messageStore = new MessageStoreCreator().createMessageStore(storeType); - } - - final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); - OperationalLoggingListener.listen(messageStore, storeLogSubject); - - messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); - messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); - messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); - messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - if (messageStore instanceof HAMessageStore) - { - messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); - } - - return messageStore; - } - - private void activateNonHAMessageStore() throws Exception - { - if (!(_messageStore instanceof HAMessageStore)) - { - _messageStore.activate(); - } - } - - private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception - { - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - - _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration()); - _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration()); - } - - private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException + protected void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException { _logger.debug("Loading configuration for virtualhost: " + config.getName()); + _exchangeRegistry.initialise(); + List<String> exchangeNames = config.getExchanges(); for (String exchangeName : exchangeNames) @@ -347,7 +297,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr if (newExchange.isDurable()) { - _messageStore.createExchange(newExchange); + getDurableConfigurationStore().createExchange(newExchange); } } } @@ -359,7 +309,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr if (queue.isDurable()) { - getMessageStore().createQueue(queue); + getDurableConfigurationStore().createQueue(queue); } //get the exchange name (returns default exchange name if none was specified) @@ -390,7 +340,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } else { - configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey)); } } @@ -437,11 +386,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _exchangeFactory; } - public MessageStore getMessageStore() - { - return _messageStore; - } - public SecurityManager getSecurityManager() { return _securityManager; @@ -453,29 +397,42 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _connectionRegistry.close(); _queueRegistry.stopAllAndUnregisterMBeans(); _dtxRegistry.close(); + closeStorage(); + shutdownHouseKeeping(); + + // clear exchange objects + _exchangeRegistry.clearAndUnregisterMbeans(); + + _state = State.STOPPED; + + CurrentActor.get().message(VirtualHostMessages.CLOSED()); + } + protected void closeStorage() + { //Close MessageStore - if (_messageStore != null) + if (getMessageStore() != null) { //Remove MessageStore Interface should not throw Exception try { - _messageStore.close(); + getMessageStore().close(); } catch (Exception e) { _logger.error("Failed to close message store", e); } } + } - // clear exchange objects - _exchangeRegistry.clearAndUnregisterMbeans(); - _state = State.STOPPED; - - CurrentActor.get().message(VirtualHostMessages.CLOSED()); + protected Logger getLogger() + { + return _logger; } + + public VirtualHostRegistry getVirtualHostRegistry() { return _virtualHostRegistry; @@ -618,94 +575,28 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } } - private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) - { - try - { - _exchangeRegistry.initialise(); - initialiseModel(_vhostConfig); - } - catch (Exception e) - { - throw new RuntimeException("Failed to initialise virtual host after state change", e); - } - } - } - - private final class AfterActivationListener implements EventListener - { - @Override - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); - finalState = State.ACTIVE; - } - finally - { - _state = finalState; - reportIfError(_state); - } - } - } - - private final class BeforePassivationListener implements EventListener - { - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - /* the approach here is not ideal as there is a race condition where a - * queue etc could be created while the virtual host is on the way to - * the passivated state. However the store state change from MASTER to UNKNOWN - * is documented as exceptionally rare.. - */ - - _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); - - _queueRegistry.stopAllAndUnregisterMBeans(); - _exchangeRegistry.clearAndUnregisterMbeans(); - _dtxRegistry.close(); - - finalState = State.PASSIVE; - } - finally - { - _state = finalState; - reportIfError(_state); - } - } - + protected void setState(State state) + { + _state = state; } - private final class AfterInitialisationListener implements EventListener + protected void attainActivation() { - public void event(Event event) + State finalState = State.ERRORED; + + try { - _state = State.PASSIVE; + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + finalState = State.ACTIVE; } - - } - - private final class BeforeCloseListener implements EventListener - { - @Override - public void event(Event event) + finally { - shutdownHouseKeeping(); + _state = finalState; + reportIfError(_state); } } - private void reportIfError(State state) + protected void reportIfError(State state) { if (state == State.ERRORED) { @@ -717,7 +608,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { public VirtualHostHouseKeepingTask() { - super(VirtualHostImpl.this); + super(AbstractVirtualHost.this); } public void execute() diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java new file mode 100644 index 0000000000..05a33e7d99 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -0,0 +1,145 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; +import org.apache.qpid.server.store.OperationalLoggingListener; + +public class StandardVirtualHost extends AbstractVirtualHost +{ + private MessageStore _messageStore; + + private DurableConfigurationStore _durableConfigurationStore; + + StandardVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + + private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + { + String storeType = hostConfig.getConfig().getString("store.type"); + MessageStore messageStore = null; + if (storeType == null) + { + final Class<?> clazz = Class.forName(hostConfig.getMessageStoreClass()); + final Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException(clazz + " does not implement " + MessageStore.class); + } + + messageStore = (MessageStore) o; + } + else + { + messageStore = new MessageStoreCreator().createMessageStore(storeType); + } + + final + MessageStoreLogSubject + storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); + OperationalLoggingListener.listen(messageStore, storeLogSubject); + + return messageStore; + } + + private DurableConfigurationStore initialiseConfigurationStore(VirtualHostConfiguration hostConfig) throws Exception + { + DurableConfigurationStore configurationStore; + if(getMessageStore() instanceof DurableConfigurationStore) + { + configurationStore = (DurableConfigurationStore) getMessageStore(); + } + else + { + throw new ClassCastException(getMessageStore().getClass().getSimpleName() + + " is not an instance of DurableConfigurationStore"); + } + return configurationStore; + } + + + protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + { + _messageStore = initialiseMessageStore(hostConfig); + + _durableConfigurationStore = initialiseConfigurationStore(hostConfig); + + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + + final Configuration storeConfiguration = hostConfig.getStoreConfiguration(); + + _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, storeConfiguration); + + _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, storeConfiguration); + + initialiseModel(hostConfig); + + _messageStore.activate(); + + attainActivation(); + } + + + protected void closeStorage() + { + //Close MessageStore + if (_messageStore != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + getMessageStore().close(); + } + catch (Exception e) + { + getLogger().error("Failed to close message store", e); + } + } + } + + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _durableConfigurationStore; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java new file mode 100644 index 0000000000..b47a5dd149 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -0,0 +1,98 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; + +public class StandardVirtualHostFactory implements VirtualHostFactory +{ + + public static final String TYPE = "STANDARD"; + + @Override + public String getType() + { + return TYPE; + } + + @Override + public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig) throws Exception + { + return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + } + + + private static final String STORE_TYPE_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_TYPE; + private static final String STORE_PATH_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_PATH; + + @Override + public void validateAttributes(Map<String, Object> attributes) + { + + // need store type and path + Object storeType = attributes.get(STORE_TYPE_ATTRIBUTE); + if(!(storeType instanceof String)) + { + + throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE + +"' is required and must be of type String."); + } + final MessageStoreCreator storeCreator = new MessageStoreCreator(); + if(!storeCreator.isValidType((String)storeType)) + { + throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE + +"' has value '"+storeType+"' which is not one of the valid values: " + + storeCreator.getStoreTypes() + "."); + + } + + // TODO - each store type should validate its own attributes + if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE)) + { + Object storePath = attributes.get(STORE_PATH_ATTRIBUTE); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE + +"' is required and must be of type String."); + + } + } + + } + + @Override + public Map<String,Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter) + { + Map<String,Object> convertedMap = new LinkedHashMap<String, Object>(); + convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)); + convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); + return convertedMap; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index eb1481b719..8919f4d348 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -49,6 +49,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable ExchangeFactory getExchangeFactory(); + DurableConfigurationStore getDurableConfigurationStore(); + MessageStore getMessageStore(); SecurityManager getSecurityManager(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java new file mode 100644 index 0000000000..626615a59f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java @@ -0,0 +1,65 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.plugin.VirtualHostFactory; + +public class VirtualHostFactoryRegistry +{ + private static Map<String, VirtualHostFactory> getFactoryMap() + { + Map<String, VirtualHostFactory> virtualHostFactories = new HashMap<String, VirtualHostFactory>(); + QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>(); + Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class); + for (VirtualHostFactory virtualHostFactory : factories) + { + String type = virtualHostFactory.getType(); + VirtualHostFactory factory = virtualHostFactories.put(type, virtualHostFactory); + if (factory != null) + { + throw new IllegalStateException("VirtualHostFactory with type name '" + type + + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '" + + virtualHostFactory.getClass().getName() + "'"); + } + } + return virtualHostFactories; + } + + + public static Collection<VirtualHostFactory> getFactories() + { + return Collections.unmodifiableCollection(getFactoryMap().values()); + } + + public static Collection<String> getVirtualHostTypes() + { + return Collections.unmodifiableCollection(getFactoryMap().keySet()); + } + + public static VirtualHostFactory getFactory(String type) + { + return getFactoryMap().get(type); + } +} diff --git a/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory new file mode 100644 index 0000000000..81217884e4 --- /dev/null +++ b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.virtualhost.StandardVirtualHostFactory diff --git a/java/broker/src/main/resources/initial-config.json b/java/broker/src/main/resources/initial-config.json index f01ffca140..9bf7d71e8a 100644 --- a/java/broker/src/main/resources/initial-config.json +++ b/java/broker/src/main/resources/initial-config.json @@ -21,7 +21,7 @@ { "name": "Broker", "storeVersion": 1, - "modelVersion": "1.0", + "modelVersion": "1.1", "defaultVirtualHost" : "default", "authenticationproviders" : [ { "name" : "passwordFile", @@ -49,6 +49,7 @@ }], "virtualhosts" : [ { "name" : "default", + "type" : "STANDARD", "storeType" : "DERBY", "storePath" : "${qpid.work_dir}/derbystore/default" } ], @@ -59,4 +60,4 @@ "pluginType" : "MANAGEMENT-JMX", "name" : "jmxManagement" } ] -}
\ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index 713cd25adb..c6473d9520 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; public class VirtualHostRecovererTest extends TestCase @@ -75,6 +76,8 @@ public class VirtualHostRecovererTest extends TestCase VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); + attributes.put(VirtualHost.STORE_PATH, "/path/to/virtualhost/store"); attributes.put(VirtualHost.STORE_TYPE, "DERBY"); when(entry.getAttributes()).thenReturn(attributes); diff --git a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 478013f61f..05d5d75864 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.model; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -39,14 +38,10 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.NullMessageStore; -import org.apache.qpid.server.store.StateManager; import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.server.virtualhost.StandardVirtualHost; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; public class VirtualHostTest extends TestCase { @@ -96,6 +91,7 @@ public class VirtualHostTest extends TestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); attributes.put(VirtualHost.STATE, State.QUIESCED); @@ -130,36 +126,11 @@ public class VirtualHostTest extends TestCase assertEquals("Unexpected state", State.DELETED, host.getAttribute(VirtualHost.STATE)); } - public void testReplicaState() - { - String hostName = getName(); - File configPath = TestFileUtils.createTempFile(this, ".xml", "<virtualhosts><virtualhost><" + hostName - + "><store><class>" + ReplicaMessageStore.class.getName() + "</class></store></" + hostName - + "></virtualhost></virtualhosts>"); - try - { - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(VirtualHost.NAME, hostName); - attributes.put(VirtualHost.CONFIG_PATH, configPath.getAbsolutePath()); - - VirtualHost host = createHost(attributes); - - assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE)); - - host.setDesiredState(State.INITIALISING, State.ACTIVE); - - assertEquals("Unexpected state", State.REPLICA, host.getAttribute(VirtualHost.STATE)); - } - finally - { - configPath.delete(); - } - } - private VirtualHost createHost() { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); VirtualHost host = createHost(attributes); @@ -174,34 +145,4 @@ public class VirtualHostTest extends TestCase return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); } - public static final class ReplicaMessageStore extends NullMessageStore - { - private final EventManager _eventManager = new EventManager(); - private final StateManager _stateManager = new StateManager(_eventManager); - - @Override - public void activate() throws Exception - { - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISING); - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVATING); - _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVE); - - // this should change the virtual host state to PASSIVE - _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - @Override - public String getStoreType() - { - return ReplicaMessageStore.class.getSimpleName(); - } - } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index f0ecfb6407..8a7d5d85fc 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -65,7 +65,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private static final String EXCHANGE_NAME = "exchangeName"; private String _storePath; private String _storeName; - private MessageStore _store; + private MessageStore _messageStore; private Configuration _configuration; private ConfigurationRecoveryHandler _recoveryHandler; @@ -84,6 +84,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private FieldTable _bindingArgs; private UUID _queueId; private UUID _exchangeId; + private DurableConfigurationStore _configStore; public void setUp() throws Exception { @@ -135,7 +136,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateExchange() throws Exception { Exchange exchange = createTestExchange(); - _store.createExchange(exchange); + _configStore.createExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); @@ -144,9 +145,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testRemoveExchange() throws Exception { Exchange exchange = createTestExchange(); - _store.createExchange(exchange); + _configStore.createExchange(exchange); - _store.removeExchange(exchange); + _configStore.removeExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); @@ -157,7 +158,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _store.bindQueue(binding); + _configStore.bindQueue(binding); reopenStore(); @@ -171,9 +172,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _store.bindQueue(binding); + _configStore.bindQueue(binding); - _store.unbindQueue(binding); + _configStore.unbindQueue(binding); reopenStore(); verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), @@ -183,7 +184,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateQueueAMQQueue() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - _store.createQueue(queue); + _configStore.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null); @@ -197,7 +198,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null); @@ -208,7 +209,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase Exchange alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - _store.createQueue(queue); + _configStore.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId()); @@ -230,11 +231,11 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // update the queue to have exclusive=false queue = createTestQueue(getName(), getName() + "Owner", false); - _store.updateQueue(queue); + _configStore.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null); @@ -248,12 +249,12 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - _store.updateQueue(queue); + _configStore.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId()); @@ -267,10 +268,10 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _store.createQueue(queue, arguments); + _configStore.createQueue(queue, arguments); // remove queue - _store.removeQueue(queue); + _configStore.removeQueue(queue); reopenStore(); verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), any(FieldTable.class), any(UUID.class)); @@ -306,18 +307,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase private void reopenStore() throws Exception { - if (_store != null) + if (_messageStore != null) { - _store.close(); + _messageStore.close(); } - _store = createStore(); + _messageStore = createMessageStore(); + _configStore = createConfigStore(); - _store.configureConfigStore(_storeName, _recoveryHandler, _configuration); - _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); - _store.activate(); + _configStore.configureConfigStore(_storeName, _recoveryHandler, _configuration); + _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); + _messageStore.activate(); } - protected MessageStore createStore() throws Exception + protected MessageStore createMessageStore() throws Exception { String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); if (storeClass == null) @@ -329,6 +331,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase return messageStore; } + protected DurableConfigurationStore createConfigStore() throws Exception + { + String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY); + if (storeClass == null) + { + storeClass = DerbyMessageStore.class.getName(); + } + Class<DurableConfigurationStore> clazz = (Class<DurableConfigurationStore>) Class.forName(storeClass); + DurableConfigurationStore configurationStore ; + if(clazz.isInstance(_messageStore)) + { + configurationStore = (DurableConfigurationStore) _messageStore; + } + else + { + configurationStore = (DurableConfigurationStore) Class.forName(storeClass).newInstance(); + } + return configurationStore; + } + public void testRecordXid() throws Exception { Record enqueueRecord = getTestRecord(1); @@ -338,13 +360,13 @@ public class DurableConfigurationStoreTest extends QpidTestCase byte[] globalId = new byte[] { 1 }; byte[] branchId = new byte[] { 2 }; - Transaction transaction = _store.newTransaction(); + Transaction transaction = _messageStore.newTransaction(); transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); transaction.commitTran(); reopenStore(); verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - transaction = _store.newTransaction(); + transaction = _messageStore.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index f1976ecee3..8743c4111b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -71,7 +71,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple applyStoreSpecificConfiguration(config); _store = createStore(); - _store.configureConfigStore("test", null, config); + ((DurableConfigurationStore)_store).configureConfigStore("test", null, config); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index fbf1828e77..fb255e89f9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -65,7 +65,7 @@ import java.util.Map; /** * This tests the MessageStores by using the available interfaces. * - * For persistent stores, it validates that Exchanges, Queues, Bindings and + * For persistent stores, it validates that Exchanges, Queues, Bindings and * Messages are persisted and recovered correctly. */ public class MessageStoreTest extends QpidTestCase @@ -106,7 +106,7 @@ public class MessageStoreTest extends QpidTestCase BrokerTestHelper.setUp(); String storePath = System.getProperty("QPID_WORK") + File.separator + getName(); - + _config = new PropertiesConfiguration(); _config.addProperty("store.class", getTestProfileMessageStoreClassName()); _config.addProperty("store.environment-path", storePath); @@ -224,8 +224,8 @@ public class MessageStoreTest extends QpidTestCase /** * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above - * before reloading the virtual host and ensuring that the persistent messages were restored. - * + * before reloading the virtual host and ensuring that the persistent messages were restored. + * * More specific testing of message persistence is left to store-specific unit testing. */ public void testMessagePersistence() throws Exception @@ -238,7 +238,7 @@ public class MessageStoreTest extends QpidTestCase validateMessageOnQueues(2, false); validateMessageOnTopics(1, false); } - + /** * Tests message removal by running the testMessagePersistence() method above before * clearing the queues, reloading the virtual host, and ensuring that the persistent @@ -250,15 +250,15 @@ public class MessageStoreTest extends QpidTestCase QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", + assertEquals("Incorrect number of queues registered after recovery", 6, queueRegistry.getQueues().size()); //clear the queue queueRegistry.getQueue(durableQueueName).clearQueue(); - + //check the messages are gone validateMessageOnQueue(durableQueueName, 0); - + //reload and verify messages arent restored reloadVirtualHost(); @@ -284,17 +284,17 @@ public class MessageStoreTest extends QpidTestCase QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", + assertEquals("Incorrect number of queues registered after recovery", 6, queueRegistry.getQueues().size()); //Validate the non-Durable Queues were not recovered. - assertNull("Non-Durable queue still registered:" + priorityQueueName, + assertNull("Non-Durable queue still registered:" + priorityQueueName, queueRegistry.getQueue(priorityQueueName)); - assertNull("Non-Durable queue still registered:" + queueName, + assertNull("Non-Durable queue still registered:" + queueName, queueRegistry.getQueue(queueName)); - assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, queueRegistry.getQueue(priorityTopicQueueName)); - assertNull("Non-Durable queue still registered:" + topicQueueName, + assertNull("Non-Durable queue still registered:" + topicQueueName, queueRegistry.getQueue(topicQueueName)); //Validate normally expected properties of Queues/Topics @@ -320,26 +320,26 @@ public class MessageStoreTest extends QpidTestCase 1, queueRegistry.getQueues().size()); reloadVirtualHost(); - + queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after first recovery", 1, queueRegistry.getQueues().size()); - + //test that removing the queue means it is not recovered next time - getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); reloadVirtualHost(); - + queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after second recovery", 0, queueRegistry.getQueues().size()); - assertNull("Durable queue was not removed:" + durableQueueName, + assertNull("Durable queue was not removed:" + durableQueueName, queueRegistry.getQueue(durableQueueName)); } /** * Tests exchange persistence by creating a selection of exchanges, both durable - * and non durable, and ensuring that following the recovery process the correct + * and non durable, and ensuring that following the recovery process the correct * durable exchanges are still present. */ public void testExchangePersistence() throws Exception @@ -348,7 +348,7 @@ public class MessageStoreTest extends QpidTestCase Map<AMQShortString, Exchange> oldExchanges = createExchanges(); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); reloadVirtualHost(); @@ -367,33 +367,33 @@ public class MessageStoreTest extends QpidTestCase int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size(); createExchange(DirectExchange.TYPE, directExchangeName, true); - + ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); reloadVirtualHost(); - + exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered after first recovery", + assertEquals("Incorrect number of exchanges registered after first recovery", origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); - + //test that removing the exchange means it is not recovered next time - getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); reloadVirtualHost(); - + exchangeRegistry = getVirtualHost().getExchangeRegistry(); - assertEquals("Incorrect number of exchanges registered after second recovery", + assertEquals("Incorrect number of exchanges registered after second recovery", origExchangeCount, exchangeRegistry.getExchangeNames().size()); - assertNull("Durable exchange was not removed:" + directExchangeName, + assertNull("Durable exchange was not removed:" + directExchangeName, exchangeRegistry.getExchange(directExchangeName)); } - + /** * Tests binding persistence by creating a selection of queues and exchanges, both durable * and non durable, then adding bindings with and without selectors before reloading the - * virtual host and verifying that following the recovery process the correct durable + * virtual host and verifying that following the recovery process the correct durable * bindings (those for durable queues to durable exchanges) are still present. */ public void testBindingPersistence() throws Exception @@ -413,7 +413,7 @@ public class MessageStoreTest extends QpidTestCase bindAllQueuesToExchange(directExchange, directRouting); bindAllTopicQueuesToExchange(topicExchange, topicRouting); - assertEquals("Incorrect number of exchanges registered before recovery", + assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); reloadVirtualHost(); @@ -422,10 +422,10 @@ public class MessageStoreTest extends QpidTestCase validateBindingProperties(); } - + /** * Tests binding removal by creating a durable exchange, and queue, binding them together, - * recovering to verify the persistence, then removing it from the store, and ensuring + * recovering to verify the persistence, then removing it from the store, and ensuring * that following the second reload process it is not recovered. */ public void testDurableBindingRemoval() throws Exception @@ -437,14 +437,14 @@ public class MessageStoreTest extends QpidTestCase createQueue(durableQueueName, false, true, false, false); bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); - assertEquals("Incorrect number of bindings registered before recovery", + assertEquals("Incorrect number of bindings registered before recovery", 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); //verify binding is actually normally recovered reloadVirtualHost(); queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after first recovery", + assertEquals("Incorrect number of bindings registered after first recovery", 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); @@ -457,13 +457,13 @@ public class MessageStoreTest extends QpidTestCase reloadVirtualHost(); queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after second recovery", + assertEquals("Incorrect number of bindings registered after second recovery", 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); } /** * Validates that the durable exchanges are still present, the non durable exchange is not, - * and that the new exchanges are not the same objects as the provided list (i.e. that the + * and that the new exchanges are not the same objects as the provided list (i.e. that the * reload actually generated new exchange objects) */ private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges) @@ -484,7 +484,7 @@ public class MessageStoreTest extends QpidTestCase registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); // There should only be the original exchanges + our 2 recovered durable exchanges - assertEquals("Incorrect number of exchanges available", + assertEquals("Incorrect number of exchanges available", originalNumExchanges + 2, registry.getExchangeNames().size()); } @@ -562,7 +562,7 @@ public class MessageStoreTest extends QpidTestCase { assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); } - + if (usePriority) { assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); @@ -705,7 +705,7 @@ public class MessageStoreTest extends QpidTestCase { FieldTable queueArguments = null; - + if(usePriority || lastValueQueue) { assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); @@ -716,7 +716,7 @@ public class MessageStoreTest extends QpidTestCase queueArguments = new FieldTable(); queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); } - + if (lastValueQueue) { queueArguments = new FieldTable(); @@ -735,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getMessageStore().createQueue(queue, queueArguments); + getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -779,7 +779,7 @@ public class MessageStoreTest extends QpidTestCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getMessageStore().createExchange(exchange); + getVirtualHost().getDurableConfigurationStore().createExchange(exchange); } } catch (AMQException e) @@ -836,7 +836,7 @@ public class MessageStoreTest extends QpidTestCase fail(e.getMessage()); } } - + protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) { FieldTable bindArguments = null; @@ -931,4 +931,4 @@ public class MessageStoreTest extends QpidTestCase return _routingKey; } } -}
\ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 065d6408de..88d5852a17 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -73,7 +73,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase setUpStoreConfiguration(_storeConfiguration); _store = createMessageStore(); - _store.configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); + ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 4d10058d17..8de19d9cff 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -55,8 +55,10 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.virtualhost.VirtualHostFactoryRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class BrokerTestHelper @@ -96,14 +98,24 @@ public class BrokerTestHelper throws Exception { StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class); - VirtualHost host = new VirtualHostImpl(virtualHostRegistry, statisticsGatherer, new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); + final VirtualHostFactory factory = + virtualHostConfiguration == null ? new StandardVirtualHostFactory() + : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType()); + VirtualHost host = factory.createVirtualHost(virtualHostRegistry, + statisticsGatherer, + new SecurityManager(mock(Broker.class), false), + virtualHostConfiguration); virtualHostRegistry.registerVirtualHost(host); return host; } public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration) throws Exception { - return new VirtualHostImpl(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); + final VirtualHostFactory factory = + virtualHostConfiguration == null ? new StandardVirtualHostFactory() + : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType()); + + return factory.createVirtualHost(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); } public static VirtualHost createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 324e36e132..7552a653fe 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; @@ -111,6 +112,11 @@ public class MockVirtualHost implements VirtualHost return null; } + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + public String getName() { return _name; @@ -214,4 +220,4 @@ public class MockVirtualHost implements VirtualHost public void unblock() { } -}
\ No newline at end of file +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index 3ae269ff20..1243d9f7dd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -46,7 +46,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -public class VirtualHostImplTest extends QpidTestCase +public class StandardVirtualHostTest extends QpidTestCase { private VirtualHostRegistry _virtualHostRegistry; @@ -109,14 +109,9 @@ public class VirtualHostImplTest extends QpidTestCase createVirtualHost(queueName, config); fail("virtualhost creation should have failed due to illegal configuration"); } - catch (RuntimeException e) + catch (ConfigurationException e) { - assertNotNull(e.getCause()); - - assertEquals(ConfigurationException.class, e.getCause().getClass()); - - Throwable configException = e.getCause(); - assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, configException.getMessage()); + assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, e.getMessage()); } } @@ -172,14 +167,9 @@ public class VirtualHostImplTest extends QpidTestCase createVirtualHost(queueName, config); fail("virtualhost creation should have failed due to illegal configuration"); } - catch (RuntimeException e) + catch (ConfigurationException e) { - assertNotNull(e.getCause()); - - assertEquals(ConfigurationException.class, e.getCause().getClass()); - - Throwable configException = e.getCause(); - assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, configException.getMessage()); + assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, e.getMessage()); } } @@ -274,7 +264,7 @@ public class VirtualHostImplTest extends QpidTestCase _virtualHostRegistry = broker.getVirtualHostRegistry(); VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker); - VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); _virtualHostRegistry.registerVirtualHost(host); return host; @@ -282,7 +272,7 @@ public class VirtualHostImplTest extends QpidTestCase /** * Create a configuration file for testing virtualhost creation - * + * * @param vhostName name of the virtualhost * @param queueName name of the queue * @param exchangeName name of a direct exchange to declare (unless dontDeclare = true) and bind the queue to (null = none) @@ -313,6 +303,7 @@ public class VirtualHostImplTest extends QpidTestCase writer.write(" <virtualhost>"); writer.write(" <name>" + vhostName + "</name>"); writer.write(" <" + vhostName + ">"); + writer.write(" <type>" + StandardVirtualHostFactory.TYPE + "</type>"); writer.write(" <store>"); writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>"); writer.write(" </store>"); @@ -373,7 +364,7 @@ public class VirtualHostImplTest extends QpidTestCase Configuration config = new PropertiesConfiguration(); config.setProperty("store.type", MemoryMessageStore.TYPE); VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); - VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); _virtualHostRegistry.registerVirtualHost(host); return host; } diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 08f7387b75..8f556ece5a 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.test.utils; @@ -110,10 +110,12 @@ public class QpidTestCase extends TestCase } protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name"; + protected static final String CONFIGURATION_STORE_CLASS_NAME_KEY = "configurationstore.class.name"; + protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore"; private static List<String> _exclusionList; - + public QpidTestCase() { super(); @@ -138,7 +140,7 @@ public class QpidTestCase extends TestCase { final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); _logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass); - + return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java index 7492d062fd..a19ba21c5c 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java @@ -26,6 +26,7 @@ import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE; import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD; import java.util.Arrays; +import javax.net.ssl.SSLSocket; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQTestConnection_0_10; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index cff77711ca..ed76c40717 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -35,7 +35,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -public class SlowMessageStore implements MessageStore +public class SlowMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index deaac2dd2a..1d1c474be0 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -50,7 +50,8 @@ public class Asserts { assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost); assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE, - VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, VirtualHost.CONFIG_PATH); + VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, + VirtualHost.CONFIG_PATH, VirtualHost.TYPE); assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME)); assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID)); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 1823b59ba3..940d6a3298 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; import org.codehaus.jackson.JsonGenerationException; @@ -564,6 +565,7 @@ public class VirtualHostRestTest extends QpidRestTestCase } else { + hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); hostData.put(VirtualHost.STORE_PATH, storePath); hostData.put(VirtualHost.STORE_TYPE, storeType); } diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index acad55417a..666449b658 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.security.acl.AbstractACLTestCase; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory; import org.apache.qpid.server.security.group.FileGroupManagerFactory; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.test.utils.TestFileUtils; @@ -981,6 +982,7 @@ public class BrokerACLTest extends QpidRestTestCase hostData.put(VirtualHost.NAME, hostName); hostData.put(VirtualHost.STORE_PATH, getStoreLocation(hostName)); hostData.put(VirtualHost.STORE_TYPE, getTestProfileMessageStoreType()); + hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 9bf7dbd62a..4a81480671 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.client.timeouts; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,8 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { + setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type", + StandardVirtualHostFactory.TYPE); setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); @@ -64,7 +67,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase _connection = getConnection(); - //Create Queue + //Create Queue _queue = (Queue) getInitialContext().lookup("queue"); //Create Consumer |