summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-17 08:11:06 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-17 08:11:06 +0000
commitc4f7a811226cd0342a6fe3a3845d8aea7fad2a09 (patch)
tree35a392f712f7e3e51d06d338896eb10a84449276
parent6cef1bb516f9751a52d05108e990cb8940c10940 (diff)
downloadqpid-python-c4f7a811226cd0342a6fe3a3845d8aea7fad2a09.tar.gz
Initial change to separate virtual hosts into types
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-vhost-refactor@1493675 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml11
-rw-r--r--doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml5
-rw-r--r--doc/book/src/java-broker/Java-Broker-High-Availability.xml4
-rw-r--r--doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java2
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java198
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java106
-rw-r--r--java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js51
-rw-r--r--java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html37
-rw-r--r--java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory19
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java1
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java15
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java6
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java7
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java (renamed from java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java)15
-rw-r--r--java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html13
-rw-r--r--java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js4
-rw-r--r--java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js88
-rw-r--r--java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js79
-rw-r--r--java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Broker.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Model.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java2
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (renamed from java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java)241
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java145
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java98
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java65
-rw-r--r--java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory19
-rw-r--r--java/broker/src/main/resources/initial-config.json5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java67
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java72
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java92
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java (renamed from java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java)27
-rw-r--r--java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java5
68 files changed, 1415 insertions, 482 deletions
diff --git a/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml b/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml
index 6f5f1f38ba..bb6d9ecef7 100644
--- a/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml
+++ b/doc/book/src/java-broker/Java-Broker-Concepts-Virtual-Hosts.xml
@@ -45,14 +45,21 @@ with one being configured as the default for clients that can't or don't specify
messages on durable <emphasis>Queues</emphasis> it contains, as well as the configuration of any durable
<emphasis>Queues</emphasis>, <emphasis>Exchanges</emphasis>, and <emphasis>Bindings</emphasis> made during its operation.</para>
<para>
- The following message stores are currently supported:
+ The Java Broker supports different types of <emphasis>Virtual Host</emphasis>, which providing
+ for different High Availability patterns. The <emphasis>standard</emphasis> <emphasis>Virtual
+ Host</emphasis> type provides no replication and is designed for standalone environments. It
+ supports a number of different <emphasis>Message Store</emphasis> implementations:
<itemizedlist>
<listitem><para><link linkend="Java-Broker-Stores-SQL-Store">JDBC Message Store</link></para></listitem>
<listitem><para><link linkend="Java-Broker-Stores-Derby-Store">Derby Message Store</link></para></listitem>
<listitem><para><link linkend="Java-Broker-Stores-BDB-Store">Berkeley DB Message Store</link></para></listitem>
- <listitem><para><link linkend="Java-Broker-Stores-HA-BDB-Store">Berkeley DB HA Message Store</link></para></listitem>
<listitem><para><link linkend="Java-Broker-Stores-Memory-Store">Memory Message Store</link></para></listitem>
</itemizedlist>
</para>
+<para>
+ The <emphasis>BDB_HA</emphasis> <emphasis>Virtual Host</emphasis> provides clustering for
+ reliability using the HA capabilties of the Oracle Berkeley DB (Java Edition) (see
+ <link linkend="Java-Broker-Stores-HA-BDB-Store">Berkeley DB HA Message Store</link>).
+</para>
<para>Virtual Hosts configuration is covered in <xref linkend="Java-Broker-Virtual-Hosts"/>.</para>
</section>
diff --git a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml
index 072effa798..e1c128a6c7 100644
--- a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml
+++ b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-Config-Files.xml
@@ -305,7 +305,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001"
{
"name" : "Broker",
"defaultVirtualHost" : "default",
- "modelVersion" : "1.0",
+ "modelVersion" : "1.1",
"storeVersion" : 1,
"authenticationproviders" : [ {
"name" : "passwordFile",
@@ -332,6 +332,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001"
} ],
"virtualhosts" : [ {
"name" : "default",
+ "type" : "STANDARD",
"storePath" : "${qpid.work_dir}/derbystore/default",
"storeType" : "DERBY"
} ],
@@ -348,7 +349,7 @@ $ ./qpid-server -prop "qpid.amqp_port=10000" -prop "qpid.http_port=10001"
<itemizedlist>
<listitem><para>Authentication Provider of type <emphasis>PlainPasswordFile</emphasis> with name "passwordFile"</para></listitem>
<listitem><para>Four Port entries: "AMQP", "HTTP", "RMI_REGISTRY", "JMX_CONNECTOR"</para></listitem>
- <listitem><para>Virtual Host with name "default" and DERBY message store type at location "${qpid.work_dir}/derbystore/default".</para></listitem>
+ <listitem><para>A "STANDARD" (i.e. non-HA) Virtual Host with name "default" and DERBY message store type at location "${qpid.work_dir}/derbystore/default".</para></listitem>
<listitem><para>Two management plugins: "jmxManagement" of type "MANAGEMENT-JMX" and "httpManagement" of type "MANAGEMENT-HTTP".</para></listitem>
<listitem><para>Broker attributes are stored as a root entry.</para></listitem>
</itemizedlist>
diff --git a/doc/book/src/java-broker/Java-Broker-High-Availability.xml b/doc/book/src/java-broker/Java-Broker-High-Availability.xml
index ca205a865e..4c0391afec 100644
--- a/doc/book/src/java-broker/Java-Broker-High-Availability.xml
+++ b/doc/book/src/java-broker/Java-Broker-High-Availability.xml
@@ -56,8 +56,6 @@
do not current offer this feature.</para></footnote> to the new Master and continue their work.</para>
<para>The Java Broker HA solution is incompatible with the HA solution offered by the CPP Broker. It is not possible to co-locate Java and CPP
Brokers within the same cluster.</para>
- <para>HA is not currently available for those using the the <emphasis role="bold">Derby Store</emphasis> or <emphasis role="bold">Memory
- Message Store</emphasis>.</para>
</section>
<section role="h3" id="Java-Broker-High-Availability-TwoNodeCluster">
@@ -302,8 +300,8 @@
<virtualhost>
<name>vhostname</name>
<vhostname>
+ <type>BDB_HA</type>
<store>
- <class>org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore</class>
<environment-path>${QPID_WORK}/bdbhastore/vhostname</environment-path>
<highAvailability>
<groupName>myclustername</groupName>
diff --git a/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml b/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml
index b240d85d4f..b5a2537eac 100644
--- a/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml
+++ b/doc/book/src/java-broker/Java-Broker-Virtual-Hosts.xml
@@ -35,7 +35,7 @@
<itemizedlist>
<listitem>
<para>
- <emphasis>Supplying simply a <link linkend="Java-Broker-Stores">store type</link> and a store path</emphasis>: In this case,
+ <emphasis>Supplying simply a type, store path, and type specific fields (in the case of a "STANDARD" Virtual Host this is simply a <link linkend="Java-Broker-Stores">store type</link>) and a store path</emphasis>: In this case,
the virtual host attributes are currently derived from default attribute values defined on the broker. This is the preferred approach.
</para>
</listitem>
@@ -44,7 +44,7 @@
<emphasis>Supplying the path to a <link linkend="Java-Broker-Virtual-Hosts-Configuration-File">Virtual Host XML configuration file</link></emphasis>: In this case, specific per-virtualhost attribute configuration
can be set in the file, as well as pre-configuring queues, exchanges, etc. This is no longer the preferred approach and will likely be removed in
a future release, however it is currently still neccessary to support certain use-cases such as per-virtualhost attribute configuration, and
- specialised store configuration such as for the <link linkend="Java-Broker-Stores-HA-BDB-Store">BDB HA Message Store</link>.
+ specialised Virtual Host configuration such for the <link linkend="Java-Broker-Stores-HA-BDB-Store">BDB HA Message Store</link>.
</para>
</listitem>
</itemizedlist>
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 3074daa46e..d036a5d39a 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -75,7 +75,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.util.FileUtils;
-public abstract class AbstractBDBMessageStore implements MessageStore
+public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class);
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
new file mode 100644
index 0000000000..0231573053
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -0,0 +1,198 @@
+package org.apache.qpid.server.store.berkeleydb;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.OperationalLoggingListener;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.server.virtualhost.State;
+import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+public class BDBHAVirtualHost extends AbstractVirtualHost
+{
+ private BDBHAMessageStore _messageStore;
+
+ private boolean _inVhostInitiatedClose;
+
+ BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ org.apache.qpid.server.security.SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig)
+ throws Exception
+ {
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ }
+
+
+
+ protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ _messageStore = new BDBHAMessageStore();
+
+ final MessageStoreLogSubject storeLogSubject =
+ new MessageStoreLogSubject(this, _messageStore.getClass().getSimpleName());
+ OperationalLoggingListener.listen(_messageStore, storeLogSubject);
+
+ _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
+ _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
+ _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
+
+
+
+ _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
+ _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
+
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+
+ _messageStore.configureConfigStore(getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration());
+
+ _messageStore.configureMessageStore(getName(),
+ recoveryHandler,
+ recoveryHandler,
+ hostConfig.getStoreConfiguration());
+ }
+
+
+ protected void closeStorage()
+ {
+ //Close MessageStore
+ if (_messageStore != null)
+ {
+ //Remove MessageStore Interface should not throw Exception
+ try
+ {
+ _inVhostInitiatedClose = true;
+ getMessageStore().close();
+ }
+ catch (Exception e)
+ {
+ getLogger().error("Failed to close message store", e);
+ }
+ finally
+ {
+ _inVhostInitiatedClose = false;
+ }
+ }
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ private final class AfterInitialisationListener implements EventListener
+ {
+ public void event(Event event)
+ {
+ setState(State.PASSIVE);
+ }
+
+ }
+
+ private final class BeforePassivationListener implements EventListener
+ {
+ public void event(Event event)
+ {
+ State finalState = State.ERRORED;
+
+ try
+ {
+ /* the approach here is not ideal as there is a race condition where a
+ * queue etc could be created while the virtual host is on the way to
+ * the passivated state. However the store state change from MASTER to UNKNOWN
+ * is documented as exceptionally rare..
+ */
+
+ getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ removeHouseKeepingTasks();
+
+ getQueueRegistry().stopAllAndUnregisterMBeans();
+ getExchangeRegistry().clearAndUnregisterMbeans();
+ getDtxRegistry().close();
+
+ finalState = State.PASSIVE;
+ }
+ finally
+ {
+ setState(finalState);
+ reportIfError(getState());
+ }
+ }
+
+ }
+
+
+ private final class BeforeActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ initialiseModel(getConfiguration());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
+
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ attainActivation();
+ }
+ }
+
+ private final class BeforeCloseListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ if(!_inVhostInitiatedClose)
+ {
+ shutdownHouseKeeping();
+ }
+
+ }
+ }
+
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
new file mode 100644
index 0000000000..b01aeafb9a
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -0,0 +1,106 @@
+package org.apache.qpid.server.store.berkeleydb;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+public class BDBHAVirtualHostFactory implements VirtualHostFactory
+{
+
+ public static final String TYPE = "BDB_HA";
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ org.apache.qpid.server.security.SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig) throws Exception
+ {
+ return new BDBHAVirtualHost(virtualHostRegistry,
+ brokerStatisticsGatherer,
+ parentSecurityManager,
+ hostConfig);
+ }
+
+ @Override
+ public void validateAttributes(Map<String, Object> attributes)
+ {
+ validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes);
+ validateAttribute("haGroupName", String.class, attributes);
+ validateAttribute("haNodeName", String.class, attributes);
+ validateAttribute("haNodeAddress", String.class, attributes);
+ validateAttribute("haHelperAddress", String.class, attributes);
+ }
+
+ private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes)
+ {
+ Object attr = attributes.get(attrName);
+ if(!clazz.isInstance(attr))
+ {
+ throw new IllegalArgumentException("Attribute '"+ attrName
+ +"' is required and must be of type "+clazz.getSimpleName()+".");
+ }
+ }
+
+ @Override
+ public Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter)
+ {
+ LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
+ convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
+ convertedMap.put("store.highAvailability.groupName", virtualHostAdapter.getAttribute("haGroupName"));
+ convertedMap.put("store.highAvailability.nodeName", virtualHostAdapter.getAttribute("haNodeName"));
+ convertedMap.put("store.highAvailability.nodeHostPort", virtualHostAdapter.getAttribute("haNodeAddress"));
+ convertedMap.put("store.highAvailability.helperHostPort", virtualHostAdapter.getAttribute("haHelperAddress"));
+
+ final Object haDurability = virtualHostAdapter.getAttribute("haDurability");
+ if(haDurability !=null)
+ {
+ convertedMap.put("store.highAvailability.durability", haDurability);
+ }
+
+ final Object designatedPrimary = virtualHostAdapter.getAttribute("haDesignatedPrimary");
+ if(designatedPrimary!=null)
+ {
+ convertedMap.put("store.highAvailability.designatedPrimary", designatedPrimary);
+ }
+
+ final Object coalescingSync = virtualHostAdapter.getAttribute("haCoalescingSync");
+ if(coalescingSync!=null)
+ {
+ convertedMap.put("store.highAvailability.coalescingSync", coalescingSync);
+ }
+
+ // TODO REP_CONFIG values
+
+ return convertedMap;
+ }
+}
diff --git a/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js
new file mode 100644
index 0000000000..44ad5fa57a
--- /dev/null
+++ b/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhost/bdb_ha/addVirtualHost.js
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+define(["dojo/_base/xhr",
+ "dojo/dom",
+ "dojo/dom-construct",
+ "dojo/_base/window",
+ "dijit/registry",
+ "dojo/parser",
+ "dojo/_base/array",
+ "dojo/domReady!"],
+ function (xhr, dom, construct, win, registry, parser, array) {
+ return {
+ show: function() {
+
+ var node = dom.byId("addVirtualHost.typeSpecificDiv");
+ var that = this;
+
+ array.forEach(registry.toArray(),
+ function(item) {
+ if(item.id.substr(0,27) == "formAddVirtualHost.specific") {
+ item.destroyRecursive();
+ }
+ });
+
+ xhr.get({url: "virtualhost/bdb_ha/add.html",
+ sync: true,
+ load: function(data) {
+ node.innerHTML = data;
+ parser.parse(node);
+ }});
+ }
+ };
+ });
diff --git a/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html
new file mode 100644
index 0000000000..1727264d41
--- /dev/null
+++ b/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html
@@ -0,0 +1,37 @@
+<table class="tableContainer-table tableContainer-table-horiz">
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.storePath"
+ required="true" name="storePath" placeholder="/path/to/message/store"/>
+ </td>
+ </tr>
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Node Name*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.nodeName"
+ required="true" name="haNodeName" placeholder="node name"/>
+ </td>
+ </tr>
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Replication Group*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.groupName"
+ required="true" name="haGroupName" placeholder="group name"/>
+ </td>
+ </tr>
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Node Address*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.nodeAddress"
+ required="true" name="haNodeAddress" data-dojo-props="regExp:'([0-9a-zA-Z.-_]|::)+:[0-9]{1,5}', invalidMessage:'Must be of the form host:port'" placeholder="host:port"/>
+ </td>
+ </tr>
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Helper Address*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" id="formAddVirtualHost.specific.helperAddress"
+ required="true" name="haHelperAddress" placeholder="host:port"/>
+ </td>
+ </tr>
+</table>
diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory
new file mode 100644
index 0000000000..0f8848cb74
--- /dev/null
+++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
index 8e32a1d113..047b102817 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -115,6 +115,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase
String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
_configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
+ _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE);
_configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName());
_configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator
+ port);
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
index 5cc436a22a..6c6145fabb 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -20,15 +20,26 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreTest;
import org.apache.qpid.server.store.MessageStore;
public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest
{
+
+ private BDBMessageStore _bdbMessageStore;
+
@Override
- protected MessageStore createStore() throws Exception
+ protected BDBMessageStore createMessageStore() throws Exception
{
- return new BDBMessageStore();
+ _bdbMessageStore = new BDBMessageStore();
+ return _bdbMessageStore;
}
+ // TODO - this only works so long as createConfigStore is called after createMessageStore
+ @Override
+ protected DurableConfigurationStore createConfigStore() throws Exception
+ {
+ return _bdbMessageStore;
+ }
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 4c2fa910f5..353c3a0ec5 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -75,6 +75,7 @@ public class HATestClusterCreator
private final int _numberOfNodes;
private int _bdbHelperPort;
private int _primaryBrokerPort;
+ private String _vhostConfigKeyPrefix;
public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
{
@@ -83,7 +84,8 @@ public class HATestClusterCreator
_groupName = "group" + _testcase.getName();
_ipAddressOfBroker = getIpAddressOfBrokerHost();
_numberOfNodes = numberOfNodes;
- _vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".";
+ _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store.";
_bdbHelperPort = 0;
}
@@ -350,6 +352,8 @@ public class HATestClusterCreator
{
final String nodeName = getNodeNameForNodeAt(bdbPort);
+
+ _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE);
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName);
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java
index a7066c73d8..8692ecc88c 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/HelperServlet.java
@@ -33,8 +33,9 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAccessControlProviderAttributes;
import org.apache.qpid.server.management.plugin.servlet.rest.action.ListAuthenticationProviderAttributes;
+import org.apache.qpid.server.management.plugin.servlet.rest.action.ListBrokerAttribute;
import org.apache.qpid.server.management.plugin.servlet.rest.action.ListGroupProviderAttributes;
-import org.apache.qpid.server.management.plugin.servlet.rest.action.ListMessageStoreTypes;
+import org.apache.qpid.server.model.Broker;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -52,8 +53,10 @@ public class HelperServlet extends AbstractServlet
_actions = new HashMap<String, Action>();
Action listProviderAttributes = new ListAuthenticationProviderAttributes();
_actions.put(listProviderAttributes.getName(), listProviderAttributes);
- Action listMessageStoreTypes = new ListMessageStoreTypes();
+ Action listMessageStoreTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES, "ListMessageStoreTypes");
_actions.put(listMessageStoreTypes.getName(), listMessageStoreTypes);
+ Action listVirtualHostTypes = new ListBrokerAttribute(Broker.SUPPORTED_VIRTUALHOST_TYPES, "ListVirtualHostTypes");
+ _actions.put(listVirtualHostTypes.getName(), listVirtualHostTypes);
Action groupProviderAttributes = new ListGroupProviderAttributes();
_actions.put(groupProviderAttributes.getName(), groupProviderAttributes);
Action aclProviderAttributes = new ListAccessControlProviderAttributes();
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java
index c0a5d78753..dc414e6a64 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListMessageStoreTypes.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/action/ListBrokerAttribute.java
@@ -25,19 +25,28 @@ import java.util.Map;
import org.apache.qpid.server.management.plugin.servlet.rest.Action;
import org.apache.qpid.server.model.Broker;
-public class ListMessageStoreTypes implements Action
+public class ListBrokerAttribute implements Action
{
+ private final String _attributeName;
+ private final String _name;
+
+ public ListBrokerAttribute(String attributeName, String name)
+ {
+ _attributeName = attributeName;
+ _name = name;
+ }
+
@Override
public String getName()
{
- return ListMessageStoreTypes.class.getSimpleName();
+ return _name;
}
@Override
public Object perform(Map<String, Object> request, Broker broker)
{
- return broker.getAttribute(Broker.SUPPORTED_VIRTUALHOST_STORE_TYPES);
+ return broker.getAttribute(_attributeName);
}
}
diff --git a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html
index 43281f600d..282f4ab8f6 100644
--- a/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html
+++ b/java/broker-plugins/management-http/src/main/java/resources/addVirtualHost.html
@@ -48,17 +48,12 @@
<div id="addVirtualHost.attributesDiv">
<table class="tableContainer-table tableContainer-table-horiz">
<tr>
- <td class="tableContainer-labelCell" style="width: 300px;"><strong>Store Type*: </strong></td>
- <td class="tableContainer-valueCell" ><div id="addVirtualHost.selectStoreType"></div></td>
- </tr>
- <tr>
- <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>
- <td class="tableContainer-valueCell">
- <input dojoType="dijit.form.ValidationTextBox" id="formAddVirtualHost.storePath"
- name="storePath" placeholder="/path/to/message/store" />
- </td>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Type*: </strong></td>
+ <td class="tableContainer-valueCell" ><div id="addVirtualHost.selectType"></div></td>
</tr>
</table>
+ <div id="addVirtualHost.typeSpecificDiv">
+ </div>
</div>
</div>
diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
index 0343d3393a..18abfa443f 100644
--- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
+++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
@@ -60,7 +60,7 @@ define(["dojo/_base/xhr",
var node = construct.create("div", null, win.body(), "last");
- var convertToPort = function convertToPort(formValues)
+ var convertToPort = function convertToPort(formValues)
{
var newPort = {};
newPort.name = dijit.byId("formAddPort.name").value;
@@ -478,4 +478,4 @@ define(["dojo/_base/xhr",
};
return addPort;
- }); \ No newline at end of file
+ });
diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js
index 9c04c3014f..330c6ed40b 100644
--- a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js
+++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addVirtualHost.js
@@ -92,7 +92,7 @@ define(["dojo/_base/xhr",
}, "addVirtualHost.configPathDiv");
var attributesPane = new dijit.layout.AccordionPane({
- title: "Store Attributes",
+ title: "Virtual Host Attributes",
selected: true
}, "addVirtualHost.attributesDiv");
@@ -112,14 +112,14 @@ define(["dojo/_base/xhr",
if(theForm.validate()){
var formValues = theForm.getValues();
- if (formValues.configPath == "" && formValues.storeType == "")
+ if (formValues.configPath == "" && formValues["type"] == "")
{
- alert("Please specify either configuration or store type for the virtual host");
+ alert("Please specify either configuration file or type for the virtual host");
return false;
}
- if (formValues.configPath != "" && formValues.storeType != "")
+ if (formValues.configPath != "" && formValues["type"] != "")
{
- alert("Either configuration or store type with path have to be specified!");
+ alert("Either configuration file or type have to be specified!");
return false;
}
var newVirtualHost = convertToVirtualHost(formValues);
@@ -149,63 +149,51 @@ define(["dojo/_base/xhr",
}});
}
- addVirtualHost.show = function(virtualHostName) {
+ addVirtualHost.selectVhostType = function(type) {
+ if(type && String(type).trim() != "") {
+ require(["qpid/management/virtualhost/"+type.toLowerCase()+"/addVirtualHost"],
+ function(vhostType)
+ {
+ vhostType.show();
+ });
+ }
+ }
+
+ addVirtualHost.show = function() {
var that = this;
+ dom.byId("addVirtualHost.typeSpecificDiv").innerHTML = "";
registry.byId("formAddVirtualHost").reset();
dojo.byId("formAddVirtualHost.id").value="";
- if (!that.hasOwnProperty("storeTypeChooser"))
+
+ if (!that.hasOwnProperty("typeChooser"))
{
xhr.get({
sync: true,
- url: "rest/helper?action=ListMessageStoreTypes",
+ url: "rest/helper?action=ListVirtualHostTypes",
handleAs: "json"
}).then(
function(data) {
- var storeTypes = data;
- var storeTypesData = [];
- for (var i =0 ; i < storeTypes.length; i++)
+ var vhostTypes = data;
+ var vhostTypesData = [];
+ for (var i =0 ; i < vhostTypes.length; i++)
{
- storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]};
+ vhostTypesData[i]= {id: vhostTypes[i], name: vhostTypes[i]};
}
- var storeTypesStore = new Memory({ data: storeTypesData });
- var storeTypesDiv = dom.byId("addVirtualHost.selectStoreType");
- var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv);
- that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.storeType",
- name: "storeType",
- store: storeTypesStore,
- searchAttr: "name", required: false}, input);
+ var typesStore = new Memory({ data: vhostTypesData });
+ var typesDiv = dom.byId("addVirtualHost.selectType");
+ var input = construct.create("input", {id: "addType", required: false}, typesDiv);
+ that.typeChooser = new FilteringSelect({ id: "addVirtualHost.type",
+ name: "type",
+ store: typesStore,
+ searchAttr: "name",
+ required: false,
+ onChange: that.selectVhostType }, input);
});
}
- if (virtualHostName)
- {
- xhr.get({
- url: "rest/virtualhost/" + encodeURIComponent(virtualHostName),
- handleAs: "json"
- }).then(
- function(data) {
- var host = data[0];
- var nameField = dijit.byId("formAddVirtualHost.name");
- nameField.set("value", host.name);
- dojo.byId("formAddVirtualHost.id").value=host.id;
- var configPath = host.configPath;
- if (configPath)
- {
- var configPathField = dijit.byId("formAddVirtualHost.configPath");
- configPathField.set("value", host.configPath);
- }
- else
- {
- that.storeTypeChooser.set("value", host.storeType.toLowerCase());
- var storePathField = dijit.byId("formAddVirtualHost.storePath");
- storePathField.set("value", host.storePath);
- }
- registry.byId("addVirtualHost").show();
- });
- }
- else
- {
- registry.byId("addVirtualHost").show();
- }
+
+
+ registry.byId("addVirtualHost").show();
+
}
return addVirtualHost;
- }); \ No newline at end of file
+ });
diff --git a/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js
new file mode 100644
index 0000000000..cd56ca9cba
--- /dev/null
+++ b/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/standard/addVirtualHost.js
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+define(["dojo/_base/xhr",
+ "dojo/dom",
+ "dojo/dom-construct",
+ "dojo/_base/window",
+ "dijit/registry",
+ "dojo/parser",
+ "dojo/_base/array",
+ "dojo/_base/event",
+ 'dojo/_base/json',
+ "dojo/store/Memory",
+ "dijit/form/FilteringSelect",
+ "dojo/domReady!"],
+ function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, FilteringSelect) {
+ return {
+ show: function() {
+ var node = dom.byId("addVirtualHost.typeSpecificDiv");
+ var that = this;
+
+ array.forEach(registry.toArray(),
+ function(item) {
+ if(item.id.substr(0,27) == "formAddVirtualHost.specific") {
+ item.destroyRecursive();
+ }
+ });
+
+ xhr.get({url: "virtualhost/standard/add.html",
+ sync: true,
+ load: function(data) {
+ node.innerHTML = data;
+ parser.parse(node);
+ if (that.hasOwnProperty("storeTypeChooser"))
+ {
+ that.storeTypeChooser.destroy();
+ }
+ xhr.get({
+ sync: true,
+ url: "rest/helper?action=ListMessageStoreTypes",
+ handleAs: "json"
+ }).then(
+ function(data) {
+ var storeTypes = data;
+ var storeTypesData = [];
+ for (var i =0 ; i < storeTypes.length; i++)
+ {
+ storeTypesData[i]= {id: storeTypes[i], name: storeTypes[i]};
+ }
+ var storeTypesStore = new Memory({ data: storeTypesData });
+ var storeTypesDiv = dom.byId("addVirtualHost.specific.selectStoreType");
+ var input = construct.create("input", {id: "addStoreType", required: false}, storeTypesDiv);
+ that.storeTypeChooser = new FilteringSelect({ id: "addVirtualHost.specific.storeType",
+ name: "storeType",
+ store: storeTypesStore,
+ searchAttr: "name", required: false}, input);
+ });
+
+ }});
+ }
+ };
+ });
diff --git a/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html
new file mode 100644
index 0000000000..9596ef4175
--- /dev/null
+++ b/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html
@@ -0,0 +1,13 @@
+<table class="tableContainer-table tableContainer-table-horiz">
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Store Type*: </strong></td>
+ <td class="tableContainer-valueCell" ><div id="addVirtualHost.specific.selectStoreType"></div></td>
+ </tr>
+ <tr>
+ <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>
+ <td class="tableContainer-valueCell">
+ <input dojoType="dijit/form/ValidationTextBox" required="true" id="formAddVirtualHost.specific.storePath"
+ name="storePath" placeholder="/path/to/message/store" />
+ </td>
+ </tr>
+</table>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index aff84e5832..7977b8b300 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -295,4 +295,8 @@ public class VirtualHostConfiguration extends AbstractConfiguration
return brokerValue == null? false : brokerValue.booleanValue();
}
+ public String getType()
+ {
+ return getStringValue("type", "STANDARD");
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 82adcf4dde..07d934027e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -35,7 +35,7 @@ public interface IConnectionRegistry
public void close() throws AMQException;
- public void close(String replyText) throws AMQException;
+ public void close(String replyText);
public List<AMQConnectionModel> getConnections();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 803aeceab8..6b453cbbda 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -554,7 +554,7 @@ public abstract class AbstractExchange implements Exchange
if (b.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(b);
+ _virtualHost.getDurableConfigurationStore().unbindQueue(b);
}
b.logDestruction();
}
@@ -626,7 +626,7 @@ public abstract class AbstractExchange implements Exchange
if (b.isDurable() && !restore)
{
- _virtualHost.getMessageStore().bindQueue(b);
+ _virtualHost.getDurableConfigurationStore().bindQueue(b);
}
queue.addQueueDeleteTask(b);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 450e74bfec..142da84524 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -74,7 +74,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public DurableConfigurationStore getDurableConfigurationStore()
{
- return _host.getMessageStore();
+ return _host.getDurableConfigurationStore();
}
public void registerExchange(Exchange exchange) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index eba63558ca..b632c68ace 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -134,8 +134,6 @@ public interface Exchange extends ExchangeReferrer
*/
boolean isBound(AMQShortString routingKey);
- boolean isBound(AMQQueue queue);
-
/**
* Returns true if this exchange has at least one binding associated with it.
* @return
@@ -147,15 +145,17 @@ public interface Exchange extends ExchangeReferrer
boolean isBound(String bindingKey);
- boolean isBound(String bindingKey, AMQQueue queue);
+ boolean isBound(AMQQueue queue);
- boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
+ boolean isBound(Map<String, Object> arguments);
- boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+ boolean isBound(String bindingKey, AMQQueue queue);
boolean isBound(String bindingKey, Map<String, Object> arguments);
- boolean isBound(Map<String, Object> arguments);
+ boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+
+ boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
void removeReference(ExchangeReferrer exchange);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index eed0cd6020..9c25d00b1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -71,7 +71,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
_logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
}
-
+
synchronized(exchangeRegistry)
{
Exchange exchange = exchangeRegistry.getExchange(exchangeName);
@@ -106,7 +106,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
if (exchange.isDurable())
{
- virtualHost.getMessageStore().createExchange(exchange);
+ virtualHost.getDurableConfigurationStore().createExchange(exchange);
}
}
catch(AMQUnknownExchangeType e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index c889f5660d..2234ee0354 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -65,7 +65,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
final AMQShortString queueName;
@@ -80,7 +80,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
AMQQueue queue;
-
+
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
AMQChannel channel = protocolConnection.getChannel(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 762f090b83..b184ce7dfa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -62,7 +62,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
AMQChannel channel = protocolConnection.getChannel(channelId);
@@ -110,7 +110,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
}
-
+
int purged = queue.delete();
if (queue.isDurable())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
index f666eb29f1..24fd687240 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -42,6 +42,7 @@ public interface Broker extends ConfiguredObject
String PROCESS_PID = "processPid";
String PRODUCT_VERSION = "productVersion";
String SUPPORTED_BROKER_STORE_TYPES = "supportedBrokerStoreTypes";
+ String SUPPORTED_VIRTUALHOST_TYPES = "supportedVirtualHostTypes";
String SUPPORTED_VIRTUALHOST_STORE_TYPES = "supportedVirtualHostStoreTypes";
String SUPPORTED_AUTHENTICATION_PROVIDERS = "supportedAuthenticationProviders";
String CREATED = "created";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
index dab92c50fa..9120a27976 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
@@ -31,9 +31,13 @@ public class Model
{
/*
* API version for the broker model
+ *
+ * 1.0 Initial version
+ * 1.1 Addition of mandatory virtual host type / different types of virtual host
+ *
*/
public static final int MODEL_MAJOR_VERSION = 1;
- public static final int MODEL_MINOR_VERSION = 0;
+ public static final int MODEL_MINOR_VERSION = 1;
public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION;
private static final Model MODEL_INSTANCE = new Model();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 424ba825d7..08e9c5ca5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -83,6 +83,7 @@ public interface VirtualHost extends ConfiguredObject
String NAME = "name";
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
+ String TYPE = "type";
String UPDATED = "updated";
String CONFIG_PATH = "configPath";
@@ -92,6 +93,7 @@ public interface VirtualHost extends ConfiguredObject
Arrays.asList(
ID,
NAME,
+ TYPE,
STATE,
DURABLE,
LIFETIME_POLICY,
@@ -139,7 +141,7 @@ public interface VirtualHost extends ConfiguredObject
void dequeue(QueueEntry entry);
void copy(QueueEntry entry, Queue queue);
-
+
void move(QueueEntry entry, Queue queue);
}
@@ -158,4 +160,6 @@ public interface VirtualHost extends ConfiguredObject
SecurityManager getSecurityManager();
MessageStore getMessageStore();
+
+ String getType();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
index a8c3f54530..7ff8c88331 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
@@ -54,17 +54,40 @@ abstract class AbstractAdapter implements ConfiguredObject
protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor)
{
+ this(id, defaults, attributes, taskExecutor, true);
+ }
+
+ protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes,
+ TaskExecutor taskExecutor, boolean filterAttributes)
+
+ {
_taskExecutor = taskExecutor;
_id = id;
if (attributes != null)
{
Collection<String> names = getAttributeNames();
- for (String name : names)
+ if(filterAttributes)
+ {
+ for (String name : names)
+ {
+ if (attributes.containsKey(name))
+ {
+ final Object value = attributes.get(name);
+ if(value != null)
+ {
+ _attributes.put(name, value);
+ }
+ }
+ }
+ }
+ else
{
- if (attributes.containsKey(name))
+ for(Map.Entry<String, Object> entry : attributes.entrySet())
{
- //TODO: dont put nulls
- _attributes.put(name, attributes.get(name));
+ if(entry.getValue()!=null)
+ {
+ _attributes.put(entry.getKey(),entry.getValue());
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index adc30eb944..678db43d58 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -60,6 +60,7 @@ import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.SimpleAuthenticationProviderAdapter;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.access.Operation;
@@ -771,6 +772,10 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
{
return _supportedVirtualHostStoreTypes;
}
+ else if(SUPPORTED_VIRTUALHOST_TYPES.equals(name))
+ {
+ return VirtualHostFactory.TYPES.get();
+ }
else if(SUPPORTED_AUTHENTICATION_PROVIDERS.equals(name))
{
return _authenticationProviderFactory.getSupportedAuthenticationProviders();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index afcce482b6..0f90df00e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -195,7 +195,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
_queue.delete();
if (_queue.isDurable())
{
- _queue.getVirtualHost().getMessageStore().removeQueue(_queue);
+ _queue.getVirtualHost().getDurableConfigurationStore().removeQueue(_queue);
}
}
}
@@ -365,7 +365,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
try
{
- _queue.getVirtualHost().getMessageStore().updateQueue(_queue);
+ _queue.getVirtualHost().getDurableConfigurationStore().updateQueue(_queue);
}
catch (AMQStoreException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index de626a7639..383ff2f3f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -79,7 +79,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
@@ -91,6 +91,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
@SuppressWarnings("serial")
public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
put(NAME, String.class);
+ put(TYPE, String.class);
put(STORE_PATH, String.class);
put(STORE_TYPE, String.class);
put(CONFIG_PATH, String.class);
@@ -114,7 +115,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor)
{
- super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor);
+ super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES, false), taskExecutor, false);
_broker = broker;
_brokerStatisticsGatherer = brokerStatisticsGatherer;
validateAttributes();
@@ -130,18 +131,23 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
String configurationFile = (String) getAttribute(CONFIG_PATH);
- String storeType = (String) getAttribute(STORE_TYPE);
+ String type = (String) getAttribute(TYPE);
+
boolean invalidAttributes = false;
if (configurationFile == null)
{
- if (storeType == null)
+ if (type == null)
{
invalidAttributes = true;
}
+ else
+ {
+ validateAttributes(type);
+ }
}
else
{
- if (storeType != null)
+ if (type != null)
{
invalidAttributes = true;
}
@@ -149,7 +155,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
if (invalidAttributes)
{
- throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'storeType' and 'storePath' attributes");
+ throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes");
}
// pre-load the configuration in order to validate
@@ -163,6 +169,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
}
+ private void validateAttributes(String type)
+ {
+ final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
+ if(factory == null)
+ {
+ throw new IllegalArgumentException("Unknown virtual host type '"+ type +"'. Valid types are: " + VirtualHostFactory.TYPES.get());
+ }
+ factory.validateAttributes(getActualAttributes());
+
+ }
+
private void populateExchanges()
{
Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges =
@@ -295,7 +312,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
_virtualHost.getExchangeRegistry().registerExchange(exchange);
if(durable)
{
- _virtualHost.getMessageStore().createExchange(exchange);
+ _virtualHost.getDurableConfigurationStore().createExchange(exchange);
}
synchronized (_exchangeAdapters)
{
@@ -417,7 +434,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(durable)
{
- _virtualHost.getMessageStore().createQueue(queue, FieldTable.convertToFieldTable(attributes));
+ _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes));
}
synchronized (_queueAdapters)
{
@@ -444,6 +461,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
throw new IllegalStateException();
}
+
+ public String getType()
+ {
+ return (String)getAttribute(TYPE);
+ }
+
+ public String setType(final String currentType, final String desiredType)
+ throws IllegalStateException, AccessControlException
+ {
+ throw new IllegalStateException();
+ }
+
+
@Override
public State getActualState()
{
@@ -1070,7 +1100,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
try
{
VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName);
- _virtualHost = new VirtualHostImpl(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration);
+ String type = configuration.getType();
+ final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
+ if(factory == null)
+ {
+ throw new IllegalArgumentException("Unknown virtual host type: " + type);
+ }
+ else
+ {
+ _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(),
+ _brokerStatisticsGatherer,
+ _broker.getSecurityManager(),
+ configuration);
+ }
}
catch (Exception e)
{
@@ -1106,8 +1148,16 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
final MyConfiguration basicConfiguration = new MyConfiguration();
PropertiesConfiguration config = new PropertiesConfiguration();
- config.addProperty("store.type", (String)getAttribute(STORE_TYPE));
- config.addProperty("store.environment-path", (String)getAttribute(STORE_PATH));
+ final String type = (String) getAttribute(TYPE);
+ config.addProperty("type", type);
+ VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
+ if(factory != null)
+ {
+ for(Map.Entry<String,Object> entry : factory.createVirtualHostConfiguration(this).entrySet())
+ {
+ config.addProperty(entry.getKey(), entry.getValue());
+ }
+ }
basicConfiguration.addConfiguration(config);
CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
new file mode 100644
index 0000000000..f952e0410c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+public interface VirtualHostFactory
+{
+ String getType();
+
+ VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig) throws Exception;
+
+ void validateAttributes(Map<String, Object> attributes);
+
+ Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter);
+
+ static final class TYPES
+ {
+ private TYPES()
+ {
+ }
+
+ public static Collection<String> get()
+ {
+ QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>();
+ Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class);
+ List<String> names = new ArrayList<String>();
+ for(VirtualHostFactory factory : factories)
+ {
+ names.add(factory.getType());
+ }
+ return Collections.unmodifiableCollection(names);
+ }
+ }
+
+
+ static final class FACTORIES
+ {
+ private FACTORIES()
+ {
+ }
+
+ public static VirtualHostFactory get(String type)
+ {
+ QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>();
+ Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class);
+ for(VirtualHostFactory factory : factories)
+ {
+ if(factory.getType().equals(type))
+ {
+ return factory;
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 37f1f8f7a5..cc0e5ebe7a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -315,7 +315,7 @@ public class AMQQueueFactory
exchangeRegistry.registerExchange(dlExchange);
//enter the dle in the persistent store
- virtualHost.getMessageStore().createExchange(dlExchange);
+ virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
}
}
@@ -335,7 +335,7 @@ public class AMQQueueFactory
dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
//enter the dlq in the persistent store
- virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 6b4b8d4a3e..745a06c7fe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -55,7 +55,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-abstract public class AbstractJDBCMessageStore implements MessageStore
+abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 4e7bbf04a6..27a40963f6 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -33,7 +33,7 @@ public interface DurableConfigurationStore
public static interface Source
{
- DurableConfigurationStore getMessageStore();
+ DurableConfigurationStore getDurableConfigurationStore();
}
/**
@@ -107,11 +107,11 @@ public interface DurableConfigurationStore
* Removes the specified queue from the persistent store.
*
* @param queue The queue to remove.
- *
+ *
* @throws AMQStoreException If the operation fails for any reason.
*/
void removeQueue(AMQQueue queue) throws AMQStoreException;
-
+
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index cf8444b089..bbdfaf4959 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -26,7 +26,7 @@ import org.apache.commons.configuration.Configuration;
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
*/
-public interface MessageStore extends DurableConfigurationStore
+public interface MessageStore
{
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
index d67ccfd8a4..fe7dd81e0c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
@@ -50,6 +50,12 @@ public class MessageStoreCreator
}
}
+ public boolean isValidType(String storeType)
+ {
+ return _factories.containsKey(storeType.toLowerCase());
+ }
+
+
public MessageStore createMessageStore(String storeType)
{
MessageStoreFactory factory = _factories.get(storeType.toLowerCase());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index fdb80295cf..f0936a221c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-public abstract class NullMessageStore implements MessageStore
+public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@Override
public void configureConfigStore(String name,
@@ -125,4 +125,4 @@ public abstract class NullMessageStore implements MessageStore
public void onDelete()
{
}
-} \ No newline at end of file
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 57024817f5..5b53f9ee6c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -35,6 +35,7 @@ import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
@@ -46,7 +47,7 @@ import org.apache.qpid.util.FileUtils;
* mechanism.
*
*/
-public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore
+public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 548b75f949..63419fce3f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -762,7 +762,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
if (exchange.isDurable())
{
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
store.createExchange(exchange);
}
exchangeRegistry.registerExchange(exchange);
@@ -917,7 +917,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (exchange.isDurable() && !exchange.isAutoDelete())
{
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
store.removeExchange(exchange);
}
}
@@ -1241,7 +1241,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
AMQQueue queue;
@@ -1468,7 +1468,7 @@ public class ServerSessionDelegate extends SessionDelegate
queue.delete();
if (queue.isDurable() && !queue.isAutoDelete())
{
- DurableConfigurationStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
store.removeQueue(queue);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
index 16e717a9c7..37e0177b00 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
@@ -127,7 +127,13 @@ public class MapValueConverter
}
else if (rawValue instanceof String)
{
- return (T) Enum.valueOf(enumType, (String) rawValue);
+ final String stringValue = (String) rawValue;
+
+ return "null".equals(stringValue) ? null : (T) Enum.valueOf(enumType, stringValue);
+ }
+ else if(rawValue == null)
+ {
+ return null;
}
else
{
@@ -281,14 +287,23 @@ public class MapValueConverter
public static Map<String, Object> convert(Map<String, Object> configurationAttributes, Map<String, Type> attributeTypes)
{
+ return convert(configurationAttributes, attributeTypes, true);
+ }
+
+ public static Map<String, Object> convert(Map<String, Object> configurationAttributes,
+ Map<String, Type> attributeTypes,
+ boolean exclusive)
+ {
Map<String, Object> attributes = new HashMap<String, Object>();
- for (Map.Entry<String, Type> attributeEntry : attributeTypes.entrySet())
+ for (Map.Entry<String, Object> attribute : configurationAttributes.entrySet())
{
- String attributeName = attributeEntry.getKey();
- if (configurationAttributes.containsKey(attributeName))
+ String attributeName = attribute.getKey();
+ Object rawValue = attribute.getValue();
+
+ if (attributeTypes.containsKey(attributeName))
{
- Type typeObject = attributeEntry.getValue();
- Object rawValue = configurationAttributes.get(attributeName);
+ Type typeObject = attributeTypes.get(attributeName);
+
Object value = null;
if (typeObject instanceof Class)
{
@@ -311,16 +326,21 @@ public class MapValueConverter
}
else
{
- throw new IllegalArgumentException("Convertion into " + parameterizedType + " is not yet supported");
+ throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported");
}
}
else
{
- throw new IllegalArgumentException("Convertion into " + typeObject + " is not yet supported");
+ throw new IllegalArgumentException("Conversion into " + typeObject + " is not yet supported");
}
attributes.put(attributeName, value);
}
+ else if(!exclusive)
+ {
+ attributes.put(attributeName, rawValue);
+ }
}
+
return attributes;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c63c32188d..6116d46e41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -57,17 +57,17 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
-public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
+public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
- private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
+ private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class);
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
@@ -97,8 +97,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private final DtxRegistry _dtxRegistry;
- private final MessageStore _messageStore;
-
private volatile State _state = State.INITIALISING;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -106,7 +104,10 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
private boolean _blocked;
- public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception
+ public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig) throws Exception
{
if (hostConfig == null)
{
@@ -141,18 +142,16 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_exchangeRegistry = new DefaultExchangeRegistry(this);
- _messageStore = initialiseMessageStore(hostConfig);
-
- configureMessageStore(hostConfig);
-
- activateNonHAMessageStore();
-
initialiseStatistics();
- _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ initialiseStorage(hostConfig);
+
+ getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
+ abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception;
+
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
@@ -187,25 +186,25 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
}
- private void shutdownHouseKeeping()
+ protected void shutdownHouseKeeping()
{
_houseKeepingTasks.shutdown();
- try
- {
- if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
- {
- _houseKeepingTasks.shutdownNow();
- }
- }
- catch (InterruptedException e)
- {
- _logger.warn("Interrupted during Housekeeping shutdown:", e);
- Thread.currentThread().interrupt();
- }
+ try
+ {
+ if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
+ {
+ _houseKeepingTasks.shutdownNow();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted during Housekeeping shutdown:", e);
+ Thread.currentThread().interrupt();
+ }
}
- private void removeHouseKeepingTasks()
+ protected void removeHouseKeepingTasks()
{
BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue();
for (final Runnable runnable : taskQueue)
@@ -257,62 +256,13 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return _houseKeepingTasks.getActiveCount();
}
- private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
- {
- String storeType = hostConfig.getConfig().getString("store.type");
- MessageStore messageStore = null;
- if (storeType == null)
- {
- final Class<?> clazz = Class.forName(hostConfig.getMessageStoreClass());
- final Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException(clazz + " does not implement " + MessageStore.class);
- }
- messageStore = (MessageStore) o;
- }
- else
- {
- messageStore = new MessageStoreCreator().createMessageStore(storeType);
- }
-
- final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
- OperationalLoggingListener.listen(messageStore, storeLogSubject);
-
- messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
- messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
- messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
- messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
- if (messageStore instanceof HAMessageStore)
- {
- messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
- }
-
- return messageStore;
- }
-
- private void activateNonHAMessageStore() throws Exception
- {
- if (!(_messageStore instanceof HAMessageStore))
- {
- _messageStore.activate();
- }
- }
-
- private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception
- {
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
-
- _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration());
- _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration());
- }
-
- private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
+ protected void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
{
_logger.debug("Loading configuration for virtualhost: " + config.getName());
+ _exchangeRegistry.initialise();
+
List<String> exchangeNames = config.getExchanges();
for (String exchangeName : exchangeNames)
@@ -347,7 +297,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
if (newExchange.isDurable())
{
- _messageStore.createExchange(newExchange);
+ getDurableConfigurationStore().createExchange(newExchange);
}
}
}
@@ -359,7 +309,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
if (queue.isDurable())
{
- getMessageStore().createQueue(queue);
+ getDurableConfigurationStore().createQueue(queue);
}
//get the exchange name (returns default exchange name if none was specified)
@@ -390,7 +340,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
else
{
-
configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey));
}
}
@@ -437,11 +386,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return _exchangeFactory;
}
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
public SecurityManager getSecurityManager()
{
return _securityManager;
@@ -453,29 +397,42 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_connectionRegistry.close();
_queueRegistry.stopAllAndUnregisterMBeans();
_dtxRegistry.close();
+ closeStorage();
+ shutdownHouseKeeping();
+
+ // clear exchange objects
+ _exchangeRegistry.clearAndUnregisterMbeans();
+
+ _state = State.STOPPED;
+
+ CurrentActor.get().message(VirtualHostMessages.CLOSED());
+ }
+ protected void closeStorage()
+ {
//Close MessageStore
- if (_messageStore != null)
+ if (getMessageStore() != null)
{
//Remove MessageStore Interface should not throw Exception
try
{
- _messageStore.close();
+ getMessageStore().close();
}
catch (Exception e)
{
_logger.error("Failed to close message store", e);
}
}
+ }
- // clear exchange objects
- _exchangeRegistry.clearAndUnregisterMbeans();
- _state = State.STOPPED;
-
- CurrentActor.get().message(VirtualHostMessages.CLOSED());
+ protected Logger getLogger()
+ {
+ return _logger;
}
+
+
public VirtualHostRegistry getVirtualHostRegistry()
{
return _virtualHostRegistry;
@@ -618,94 +575,28 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
}
- private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- try
- {
- _exchangeRegistry.initialise();
- initialiseModel(_vhostConfig);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to initialise virtual host after state change", e);
- }
- }
- }
-
- private final class AfterActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- State finalState = State.ERRORED;
-
- try
- {
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
- finalState = State.ACTIVE;
- }
- finally
- {
- _state = finalState;
- reportIfError(_state);
- }
- }
- }
-
- private final class BeforePassivationListener implements EventListener
- {
- public void event(Event event)
- {
- State finalState = State.ERRORED;
-
- try
- {
- /* the approach here is not ideal as there is a race condition where a
- * queue etc could be created while the virtual host is on the way to
- * the passivated state. However the store state change from MASTER to UNKNOWN
- * is documented as exceptionally rare..
- */
-
- _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- removeHouseKeepingTasks();
-
- _queueRegistry.stopAllAndUnregisterMBeans();
- _exchangeRegistry.clearAndUnregisterMbeans();
- _dtxRegistry.close();
-
- finalState = State.PASSIVE;
- }
- finally
- {
- _state = finalState;
- reportIfError(_state);
- }
- }
-
+ protected void setState(State state)
+ {
+ _state = state;
}
- private final class AfterInitialisationListener implements EventListener
+ protected void attainActivation()
{
- public void event(Event event)
+ State finalState = State.ERRORED;
+
+ try
{
- _state = State.PASSIVE;
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ finalState = State.ACTIVE;
}
-
- }
-
- private final class BeforeCloseListener implements EventListener
- {
- @Override
- public void event(Event event)
+ finally
{
- shutdownHouseKeeping();
+ _state = finalState;
+ reportIfError(_state);
}
}
- private void reportIfError(State state)
+ protected void reportIfError(State state)
{
if (state == State.ERRORED)
{
@@ -717,7 +608,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
{
public VirtualHostHouseKeepingTask()
{
- super(VirtualHostImpl.this);
+ super(AbstractVirtualHost.this);
}
public void execute()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
new file mode 100644
index 0000000000..05a33e7d99
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -0,0 +1,145 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreCreator;
+import org.apache.qpid.server.store.OperationalLoggingListener;
+
+public class StandardVirtualHost extends AbstractVirtualHost
+{
+ private MessageStore _messageStore;
+
+ private DurableConfigurationStore _durableConfigurationStore;
+
+ StandardVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ org.apache.qpid.server.security.SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig) throws Exception
+ {
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ }
+
+
+
+ private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ String storeType = hostConfig.getConfig().getString("store.type");
+ MessageStore messageStore = null;
+ if (storeType == null)
+ {
+ final Class<?> clazz = Class.forName(hostConfig.getMessageStoreClass());
+ final Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException(clazz + " does not implement " + MessageStore.class);
+ }
+
+ messageStore = (MessageStore) o;
+ }
+ else
+ {
+ messageStore = new MessageStoreCreator().createMessageStore(storeType);
+ }
+
+ final
+ MessageStoreLogSubject
+ storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
+ OperationalLoggingListener.listen(messageStore, storeLogSubject);
+
+ return messageStore;
+ }
+
+ private DurableConfigurationStore initialiseConfigurationStore(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ DurableConfigurationStore configurationStore;
+ if(getMessageStore() instanceof DurableConfigurationStore)
+ {
+ configurationStore = (DurableConfigurationStore) getMessageStore();
+ }
+ else
+ {
+ throw new ClassCastException(getMessageStore().getClass().getSimpleName() +
+ " is not an instance of DurableConfigurationStore");
+ }
+ return configurationStore;
+ }
+
+
+ protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ _messageStore = initialiseMessageStore(hostConfig);
+
+ _durableConfigurationStore = initialiseConfigurationStore(hostConfig);
+
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+
+ final Configuration storeConfiguration = hostConfig.getStoreConfiguration();
+
+ _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, storeConfiguration);
+
+ _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, storeConfiguration);
+
+ initialiseModel(hostConfig);
+
+ _messageStore.activate();
+
+ attainActivation();
+ }
+
+
+ protected void closeStorage()
+ {
+ //Close MessageStore
+ if (_messageStore != null)
+ {
+ //Remove MessageStore Interface should not throw Exception
+ try
+ {
+ getMessageStore().close();
+ }
+ catch (Exception e)
+ {
+ getLogger().error("Failed to close message store", e);
+ }
+ }
+ }
+
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _durableConfigurationStore;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
new file mode 100644
index 0000000000..b47a5dd149
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
@@ -0,0 +1,98 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStoreCreator;
+
+public class StandardVirtualHostFactory implements VirtualHostFactory
+{
+
+ public static final String TYPE = "STANDARD";
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
+ StatisticsGatherer brokerStatisticsGatherer,
+ org.apache.qpid.server.security.SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig) throws Exception
+ {
+ return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ }
+
+
+ private static final String STORE_TYPE_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_TYPE;
+ private static final String STORE_PATH_ATTRIBUTE = org.apache.qpid.server.model.VirtualHost.STORE_PATH;
+
+ @Override
+ public void validateAttributes(Map<String, Object> attributes)
+ {
+
+ // need store type and path
+ Object storeType = attributes.get(STORE_TYPE_ATTRIBUTE);
+ if(!(storeType instanceof String))
+ {
+
+ throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE
+ +"' is required and must be of type String.");
+ }
+ final MessageStoreCreator storeCreator = new MessageStoreCreator();
+ if(!storeCreator.isValidType((String)storeType))
+ {
+ throw new IllegalArgumentException("Attribute '"+ STORE_TYPE_ATTRIBUTE
+ +"' has value '"+storeType+"' which is not one of the valid values: "
+ + storeCreator.getStoreTypes() + ".");
+
+ }
+
+ // TODO - each store type should validate its own attributes
+ if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE))
+ {
+ Object storePath = attributes.get(STORE_PATH_ATTRIBUTE);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE
+ +"' is required and must be of type String.");
+
+ }
+ }
+
+ }
+
+ @Override
+ public Map<String,Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter)
+ {
+ Map<String,Object> convertedMap = new LinkedHashMap<String, Object>();
+ convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE));
+ convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
+ return convertedMap;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index eb1481b719..8919f4d348 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -49,6 +49,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
ExchangeFactory getExchangeFactory();
+ DurableConfigurationStore getDurableConfigurationStore();
+
MessageStore getMessageStore();
SecurityManager getSecurityManager();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java
new file mode 100644
index 0000000000..626615a59f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostFactoryRegistry.java
@@ -0,0 +1,65 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
+
+public class VirtualHostFactoryRegistry
+{
+ private static Map<String, VirtualHostFactory> getFactoryMap()
+ {
+ Map<String, VirtualHostFactory> virtualHostFactories = new HashMap<String, VirtualHostFactory>();
+ QpidServiceLoader<VirtualHostFactory> qpidServiceLoader = new QpidServiceLoader<VirtualHostFactory>();
+ Iterable<VirtualHostFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(VirtualHostFactory.class);
+ for (VirtualHostFactory virtualHostFactory : factories)
+ {
+ String type = virtualHostFactory.getType();
+ VirtualHostFactory factory = virtualHostFactories.put(type, virtualHostFactory);
+ if (factory != null)
+ {
+ throw new IllegalStateException("VirtualHostFactory with type name '" + type
+ + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '"
+ + virtualHostFactory.getClass().getName() + "'");
+ }
+ }
+ return virtualHostFactories;
+ }
+
+
+ public static Collection<VirtualHostFactory> getFactories()
+ {
+ return Collections.unmodifiableCollection(getFactoryMap().values());
+ }
+
+ public static Collection<String> getVirtualHostTypes()
+ {
+ return Collections.unmodifiableCollection(getFactoryMap().keySet());
+ }
+
+ public static VirtualHostFactory getFactory(String type)
+ {
+ return getFactoryMap().get(type);
+ }
+}
diff --git a/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory
new file mode 100644
index 0000000000..81217884e4
--- /dev/null
+++ b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.VirtualHostFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.virtualhost.StandardVirtualHostFactory
diff --git a/java/broker/src/main/resources/initial-config.json b/java/broker/src/main/resources/initial-config.json
index f01ffca140..9bf7d71e8a 100644
--- a/java/broker/src/main/resources/initial-config.json
+++ b/java/broker/src/main/resources/initial-config.json
@@ -21,7 +21,7 @@
{
"name": "Broker",
"storeVersion": 1,
- "modelVersion": "1.0",
+ "modelVersion": "1.1",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "passwordFile",
@@ -49,6 +49,7 @@
}],
"virtualhosts" : [ {
"name" : "default",
+ "type" : "STANDARD",
"storeType" : "DERBY",
"storePath" : "${qpid.work_dir}/derbystore/default"
} ],
@@ -59,4 +60,4 @@
"pluginType" : "MANAGEMENT-JMX",
"name" : "jmxManagement"
} ]
-} \ No newline at end of file
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
index 713cd25adb..c6473d9520 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.TestFileUtils;
public class VirtualHostRecovererTest extends TestCase
@@ -75,6 +76,8 @@ public class VirtualHostRecovererTest extends TestCase
VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
+
attributes.put(VirtualHost.STORE_PATH, "/path/to/virtualhost/store");
attributes.put(VirtualHost.STORE_TYPE, "DERBY");
when(entry.getAttributes()).thenReturn(attributes);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 478013f61f..05d5d75864 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.model;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -39,14 +38,10 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.NullMessageStore;
-import org.apache.qpid.server.store.StateManager;
import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.server.virtualhost.StandardVirtualHost;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
public class VirtualHostTest extends TestCase
{
@@ -96,6 +91,7 @@ public class VirtualHostTest extends TestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE);
attributes.put(VirtualHost.STATE, State.QUIESCED);
@@ -130,36 +126,11 @@ public class VirtualHostTest extends TestCase
assertEquals("Unexpected state", State.DELETED, host.getAttribute(VirtualHost.STATE));
}
- public void testReplicaState()
- {
- String hostName = getName();
- File configPath = TestFileUtils.createTempFile(this, ".xml", "<virtualhosts><virtualhost><" + hostName
- + "><store><class>" + ReplicaMessageStore.class.getName() + "</class></store></" + hostName
- + "></virtualhost></virtualhosts>");
- try
- {
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(VirtualHost.NAME, hostName);
- attributes.put(VirtualHost.CONFIG_PATH, configPath.getAbsolutePath());
-
- VirtualHost host = createHost(attributes);
-
- assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE));
-
- host.setDesiredState(State.INITIALISING, State.ACTIVE);
-
- assertEquals("Unexpected state", State.REPLICA, host.getAttribute(VirtualHost.STATE));
- }
- finally
- {
- configPath.delete();
- }
- }
-
private VirtualHost createHost()
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE);
VirtualHost host = createHost(attributes);
@@ -174,34 +145,4 @@ public class VirtualHostTest extends TestCase
return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
}
- public static final class ReplicaMessageStore extends NullMessageStore
- {
- private final EventManager _eventManager = new EventManager();
- private final StateManager _stateManager = new StateManager(_eventManager);
-
- @Override
- public void activate() throws Exception
- {
- _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISING);
- _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
- _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVATING);
- _stateManager.attainState(org.apache.qpid.server.store.State.ACTIVE);
-
- // this should change the virtual host state to PASSIVE
- _stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- @Override
- public String getStoreType()
- {
- return ReplicaMessageStore.class.getSimpleName();
- }
- }
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
index f0ecfb6407..8a7d5d85fc 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -65,7 +65,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
private static final String EXCHANGE_NAME = "exchangeName";
private String _storePath;
private String _storeName;
- private MessageStore _store;
+ private MessageStore _messageStore;
private Configuration _configuration;
private ConfigurationRecoveryHandler _recoveryHandler;
@@ -84,6 +84,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
private FieldTable _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
+ private DurableConfigurationStore _configStore;
public void setUp() throws Exception
{
@@ -135,7 +136,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
public void testCreateExchange() throws Exception
{
Exchange exchange = createTestExchange();
- _store.createExchange(exchange);
+ _configStore.createExchange(exchange);
reopenStore();
verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true);
@@ -144,9 +145,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase
public void testRemoveExchange() throws Exception
{
Exchange exchange = createTestExchange();
- _store.createExchange(exchange);
+ _configStore.createExchange(exchange);
- _store.removeExchange(exchange);
+ _configStore.removeExchange(exchange);
reopenStore();
verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean());
@@ -157,7 +158,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
- _store.bindQueue(binding);
+ _configStore.bindQueue(binding);
reopenStore();
@@ -171,9 +172,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
- _store.bindQueue(binding);
+ _configStore.bindQueue(binding);
- _store.unbindQueue(binding);
+ _configStore.unbindQueue(binding);
reopenStore();
verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(),
@@ -183,7 +184,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
public void testCreateQueueAMQQueue() throws Exception
{
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
- _store.createQueue(queue);
+ _configStore.createQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null);
@@ -197,7 +198,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _store.createQueue(queue, arguments);
+ _configStore.createQueue(queue, arguments);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null);
@@ -208,7 +209,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
Exchange alternateExchange = createTestAlternateExchange();
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
- _store.createQueue(queue);
+ _configStore.createQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId());
@@ -230,11 +231,11 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _store.createQueue(queue, arguments);
+ _configStore.createQueue(queue, arguments);
// update the queue to have exclusive=false
queue = createTestQueue(getName(), getName() + "Owner", false);
- _store.updateQueue(queue);
+ _configStore.updateQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null);
@@ -248,12 +249,12 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _store.createQueue(queue, arguments);
+ _configStore.createQueue(queue, arguments);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
- _store.updateQueue(queue);
+ _configStore.updateQueue(queue);
reopenStore();
verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId());
@@ -267,10 +268,10 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _store.createQueue(queue, arguments);
+ _configStore.createQueue(queue, arguments);
// remove queue
- _store.removeQueue(queue);
+ _configStore.removeQueue(queue);
reopenStore();
verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(),
any(FieldTable.class), any(UUID.class));
@@ -306,18 +307,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase
private void reopenStore() throws Exception
{
- if (_store != null)
+ if (_messageStore != null)
{
- _store.close();
+ _messageStore.close();
}
- _store = createStore();
+ _messageStore = createMessageStore();
+ _configStore = createConfigStore();
- _store.configureConfigStore(_storeName, _recoveryHandler, _configuration);
- _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration);
- _store.activate();
+ _configStore.configureConfigStore(_storeName, _recoveryHandler, _configuration);
+ _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration);
+ _messageStore.activate();
}
- protected MessageStore createStore() throws Exception
+ protected MessageStore createMessageStore() throws Exception
{
String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
if (storeClass == null)
@@ -329,6 +331,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase
return messageStore;
}
+ protected DurableConfigurationStore createConfigStore() throws Exception
+ {
+ String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY);
+ if (storeClass == null)
+ {
+ storeClass = DerbyMessageStore.class.getName();
+ }
+ Class<DurableConfigurationStore> clazz = (Class<DurableConfigurationStore>) Class.forName(storeClass);
+ DurableConfigurationStore configurationStore ;
+ if(clazz.isInstance(_messageStore))
+ {
+ configurationStore = (DurableConfigurationStore) _messageStore;
+ }
+ else
+ {
+ configurationStore = (DurableConfigurationStore) Class.forName(storeClass).newInstance();
+ }
+ return configurationStore;
+ }
+
public void testRecordXid() throws Exception
{
Record enqueueRecord = getTestRecord(1);
@@ -338,13 +360,13 @@ public class DurableConfigurationStoreTest extends QpidTestCase
byte[] globalId = new byte[] { 1 };
byte[] branchId = new byte[] { 2 };
- Transaction transaction = _store.newTransaction();
+ Transaction transaction = _messageStore.newTransaction();
transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
reopenStore();
verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
- transaction = _store.newTransaction();
+ transaction = _messageStore.newTransaction();
transaction.removeXid(1l, globalId, branchId);
transaction.commitTran();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index f1976ecee3..8743c4111b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -71,7 +71,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
applyStoreSpecificConfiguration(config);
_store = createStore();
- _store.configureConfigStore("test", null, config);
+ ((DurableConfigurationStore)_store).configureConfigStore("test", null, config);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index fbf1828e77..fb255e89f9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -65,7 +65,7 @@ import java.util.Map;
/**
* This tests the MessageStores by using the available interfaces.
*
- * For persistent stores, it validates that Exchanges, Queues, Bindings and
+ * For persistent stores, it validates that Exchanges, Queues, Bindings and
* Messages are persisted and recovered correctly.
*/
public class MessageStoreTest extends QpidTestCase
@@ -106,7 +106,7 @@ public class MessageStoreTest extends QpidTestCase
BrokerTestHelper.setUp();
String storePath = System.getProperty("QPID_WORK") + File.separator + getName();
-
+
_config = new PropertiesConfiguration();
_config.addProperty("store.class", getTestProfileMessageStoreClassName());
_config.addProperty("store.environment-path", storePath);
@@ -224,8 +224,8 @@ public class MessageStoreTest extends QpidTestCase
/**
* Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
- * before reloading the virtual host and ensuring that the persistent messages were restored.
- *
+ * before reloading the virtual host and ensuring that the persistent messages were restored.
+ *
* More specific testing of message persistence is left to store-specific unit testing.
*/
public void testMessagePersistence() throws Exception
@@ -238,7 +238,7 @@ public class MessageStoreTest extends QpidTestCase
validateMessageOnQueues(2, false);
validateMessageOnTopics(1, false);
}
-
+
/**
* Tests message removal by running the testMessagePersistence() method above before
* clearing the queues, reloading the virtual host, and ensuring that the persistent
@@ -250,15 +250,15 @@ public class MessageStoreTest extends QpidTestCase
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of queues registered after recovery",
+ assertEquals("Incorrect number of queues registered after recovery",
6, queueRegistry.getQueues().size());
//clear the queue
queueRegistry.getQueue(durableQueueName).clearQueue();
-
+
//check the messages are gone
validateMessageOnQueue(durableQueueName, 0);
-
+
//reload and verify messages arent restored
reloadVirtualHost();
@@ -284,17 +284,17 @@ public class MessageStoreTest extends QpidTestCase
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of queues registered after recovery",
+ assertEquals("Incorrect number of queues registered after recovery",
6, queueRegistry.getQueues().size());
//Validate the non-Durable Queues were not recovered.
- assertNull("Non-Durable queue still registered:" + priorityQueueName,
+ assertNull("Non-Durable queue still registered:" + priorityQueueName,
queueRegistry.getQueue(priorityQueueName));
- assertNull("Non-Durable queue still registered:" + queueName,
+ assertNull("Non-Durable queue still registered:" + queueName,
queueRegistry.getQueue(queueName));
- assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
+ assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
queueRegistry.getQueue(priorityTopicQueueName));
- assertNull("Non-Durable queue still registered:" + topicQueueName,
+ assertNull("Non-Durable queue still registered:" + topicQueueName,
queueRegistry.getQueue(topicQueueName));
//Validate normally expected properties of Queues/Topics
@@ -320,26 +320,26 @@ public class MessageStoreTest extends QpidTestCase
1, queueRegistry.getQueues().size());
reloadVirtualHost();
-
+
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after first recovery",
1, queueRegistry.getQueues().size());
-
+
//test that removing the queue means it is not recovered next time
- getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName));
+ getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName));
reloadVirtualHost();
-
+
queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after second recovery",
0, queueRegistry.getQueues().size());
- assertNull("Durable queue was not removed:" + durableQueueName,
+ assertNull("Durable queue was not removed:" + durableQueueName,
queueRegistry.getQueue(durableQueueName));
}
/**
* Tests exchange persistence by creating a selection of exchanges, both durable
- * and non durable, and ensuring that following the recovery process the correct
+ * and non durable, and ensuring that following the recovery process the correct
* durable exchanges are still present.
*/
public void testExchangePersistence() throws Exception
@@ -348,7 +348,7 @@ public class MessageStoreTest extends QpidTestCase
Map<AMQShortString, Exchange> oldExchanges = createExchanges();
- assertEquals("Incorrect number of exchanges registered before recovery",
+ assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
reloadVirtualHost();
@@ -367,33 +367,33 @@ public class MessageStoreTest extends QpidTestCase
int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
createExchange(DirectExchange.TYPE, directExchangeName, true);
-
+
ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
- assertEquals("Incorrect number of exchanges registered before recovery",
+ assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
reloadVirtualHost();
-
+
exchangeRegistry = getVirtualHost().getExchangeRegistry();
- assertEquals("Incorrect number of exchanges registered after first recovery",
+ assertEquals("Incorrect number of exchanges registered after first recovery",
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
-
+
//test that removing the exchange means it is not recovered next time
- getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
+ getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
reloadVirtualHost();
-
+
exchangeRegistry = getVirtualHost().getExchangeRegistry();
- assertEquals("Incorrect number of exchanges registered after second recovery",
+ assertEquals("Incorrect number of exchanges registered after second recovery",
origExchangeCount, exchangeRegistry.getExchangeNames().size());
- assertNull("Durable exchange was not removed:" + directExchangeName,
+ assertNull("Durable exchange was not removed:" + directExchangeName,
exchangeRegistry.getExchange(directExchangeName));
}
-
+
/**
* Tests binding persistence by creating a selection of queues and exchanges, both durable
* and non durable, then adding bindings with and without selectors before reloading the
- * virtual host and verifying that following the recovery process the correct durable
+ * virtual host and verifying that following the recovery process the correct durable
* bindings (those for durable queues to durable exchanges) are still present.
*/
public void testBindingPersistence() throws Exception
@@ -413,7 +413,7 @@ public class MessageStoreTest extends QpidTestCase
bindAllQueuesToExchange(directExchange, directRouting);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
- assertEquals("Incorrect number of exchanges registered before recovery",
+ assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
reloadVirtualHost();
@@ -422,10 +422,10 @@ public class MessageStoreTest extends QpidTestCase
validateBindingProperties();
}
-
+
/**
* Tests binding removal by creating a durable exchange, and queue, binding them together,
- * recovering to verify the persistence, then removing it from the store, and ensuring
+ * recovering to verify the persistence, then removing it from the store, and ensuring
* that following the second reload process it is not recovered.
*/
public void testDurableBindingRemoval() throws Exception
@@ -437,14 +437,14 @@ public class MessageStoreTest extends QpidTestCase
createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
- assertEquals("Incorrect number of bindings registered before recovery",
+ assertEquals("Incorrect number of bindings registered before recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
//verify binding is actually normally recovered
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of bindings registered after first recovery",
+ assertEquals("Incorrect number of bindings registered after first recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
@@ -457,13 +457,13 @@ public class MessageStoreTest extends QpidTestCase
reloadVirtualHost();
queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of bindings registered after second recovery",
+ assertEquals("Incorrect number of bindings registered after second recovery",
0, queueRegistry.getQueue(durableQueueName).getBindings().size());
}
/**
* Validates that the durable exchanges are still present, the non durable exchange is not,
- * and that the new exchanges are not the same objects as the provided list (i.e. that the
+ * and that the new exchanges are not the same objects as the provided list (i.e. that the
* reload actually generated new exchange objects)
*/
private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges)
@@ -484,7 +484,7 @@ public class MessageStoreTest extends QpidTestCase
registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
// There should only be the original exchanges + our 2 recovered durable exchanges
- assertEquals("Incorrect number of exchanges available",
+ assertEquals("Incorrect number of exchanges available",
originalNumExchanges + 2, registry.getExchangeNames().size());
}
@@ -562,7 +562,7 @@ public class MessageStoreTest extends QpidTestCase
{
assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
}
-
+
if (usePriority)
{
assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
@@ -705,7 +705,7 @@ public class MessageStoreTest extends QpidTestCase
{
FieldTable queueArguments = null;
-
+
if(usePriority || lastValueQueue)
{
assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
@@ -716,7 +716,7 @@ public class MessageStoreTest extends QpidTestCase
queueArguments = new FieldTable();
queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
}
-
+
if (lastValueQueue)
{
queueArguments = new FieldTable();
@@ -735,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- getVirtualHost().getMessageStore().createQueue(queue, queueArguments);
+ getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
@@ -779,7 +779,7 @@ public class MessageStoreTest extends QpidTestCase
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
- getVirtualHost().getMessageStore().createExchange(exchange);
+ getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
}
}
catch (AMQException e)
@@ -836,7 +836,7 @@ public class MessageStoreTest extends QpidTestCase
fail(e.getMessage());
}
}
-
+
protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
FieldTable bindArguments = null;
@@ -931,4 +931,4 @@ public class MessageStoreTest extends QpidTestCase
return _routingKey;
}
}
-} \ No newline at end of file
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 065d6408de..88d5852a17 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -73,7 +73,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
setUpStoreConfiguration(_storeConfiguration);
_store = createMessageStore();
- _store.configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration);
+ ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration);
_store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 4d10058d17..8de19d9cff 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -55,8 +55,10 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
+import org.apache.qpid.server.virtualhost.VirtualHostFactoryRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class BrokerTestHelper
@@ -96,14 +98,24 @@ public class BrokerTestHelper
throws Exception
{
StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class);
- VirtualHost host = new VirtualHostImpl(virtualHostRegistry, statisticsGatherer, new SecurityManager(mock(Broker.class), false), virtualHostConfiguration);
+ final VirtualHostFactory factory =
+ virtualHostConfiguration == null ? new StandardVirtualHostFactory()
+ : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType());
+ VirtualHost host = factory.createVirtualHost(virtualHostRegistry,
+ statisticsGatherer,
+ new SecurityManager(mock(Broker.class), false),
+ virtualHostConfiguration);
virtualHostRegistry.registerVirtualHost(host);
return host;
}
public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration) throws Exception
{
- return new VirtualHostImpl(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration);
+ final VirtualHostFactory factory =
+ virtualHostConfiguration == null ? new StandardVirtualHostFactory()
+ : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType());
+
+ return factory.createVirtualHost(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration);
}
public static VirtualHost createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 324e36e132..7552a653fe 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -111,6 +112,11 @@ public class MockVirtualHost implements VirtualHost
return null;
}
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return null;
+ }
+
public String getName()
{
return _name;
@@ -214,4 +220,4 @@ public class MockVirtualHost implements VirtualHost
public void unblock()
{
}
-} \ No newline at end of file
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index 3ae269ff20..1243d9f7dd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -46,7 +46,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-public class VirtualHostImplTest extends QpidTestCase
+public class StandardVirtualHostTest extends QpidTestCase
{
private VirtualHostRegistry _virtualHostRegistry;
@@ -109,14 +109,9 @@ public class VirtualHostImplTest extends QpidTestCase
createVirtualHost(queueName, config);
fail("virtualhost creation should have failed due to illegal configuration");
}
- catch (RuntimeException e)
+ catch (ConfigurationException e)
{
- assertNotNull(e.getCause());
-
- assertEquals(ConfigurationException.class, e.getCause().getClass());
-
- Throwable configException = e.getCause();
- assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, configException.getMessage());
+ assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, e.getMessage());
}
}
@@ -172,14 +167,9 @@ public class VirtualHostImplTest extends QpidTestCase
createVirtualHost(queueName, config);
fail("virtualhost creation should have failed due to illegal configuration");
}
- catch (RuntimeException e)
+ catch (ConfigurationException e)
{
- assertNotNull(e.getCause());
-
- assertEquals(ConfigurationException.class, e.getCause().getClass());
-
- Throwable configException = e.getCause();
- assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, configException.getMessage());
+ assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, e.getMessage());
}
}
@@ -274,7 +264,7 @@ public class VirtualHostImplTest extends QpidTestCase
_virtualHostRegistry = broker.getVirtualHostRegistry();
VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker);
- VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
+ VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
_virtualHostRegistry.registerVirtualHost(host);
return host;
@@ -282,7 +272,7 @@ public class VirtualHostImplTest extends QpidTestCase
/**
* Create a configuration file for testing virtualhost creation
- *
+ *
* @param vhostName name of the virtualhost
* @param queueName name of the queue
* @param exchangeName name of a direct exchange to declare (unless dontDeclare = true) and bind the queue to (null = none)
@@ -313,6 +303,7 @@ public class VirtualHostImplTest extends QpidTestCase
writer.write(" <virtualhost>");
writer.write(" <name>" + vhostName + "</name>");
writer.write(" <" + vhostName + ">");
+ writer.write(" <type>" + StandardVirtualHostFactory.TYPE + "</type>");
writer.write(" <store>");
writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>");
writer.write(" </store>");
@@ -373,7 +364,7 @@ public class VirtualHostImplTest extends QpidTestCase
Configuration config = new PropertiesConfiguration();
config.setProperty("store.type", MemoryMessageStore.TYPE);
VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker);
- VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
+ VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
_virtualHostRegistry.registerVirtualHost(host);
return host;
}
diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
index 08f7387b75..8f556ece5a 100644
--- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.test.utils;
@@ -110,10 +110,12 @@ public class QpidTestCase extends TestCase
}
protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name";
+ protected static final String CONFIGURATION_STORE_CLASS_NAME_KEY = "configurationstore.class.name";
+
protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore";
private static List<String> _exclusionList;
-
+
public QpidTestCase()
{
super();
@@ -138,7 +140,7 @@ public class QpidTestCase extends TestCase
{
final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
_logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass);
-
+
return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
index 7492d062fd..a19ba21c5c 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -26,6 +26,7 @@ import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE;
import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD;
import java.util.Arrays;
+import javax.net.ssl.SSLSocket;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQTestConnection_0_10;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index cff77711ca..ed76c40717 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -35,7 +35,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
-public class SlowMessageStore implements MessageStore
+public class SlowMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
private static final String DELAYS = "delays";
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index deaac2dd2a..1d1c474be0 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -50,7 +50,8 @@ public class Asserts
{
assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost);
assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE,
- VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, VirtualHost.CONFIG_PATH);
+ VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH,
+ VirtualHost.CONFIG_PATH, VirtualHost.TYPE);
assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME));
assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID));
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 1823b59ba3..940d6a3298 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
import org.codehaus.jackson.JsonGenerationException;
@@ -564,6 +565,7 @@ public class VirtualHostRestTest extends QpidRestTestCase
}
else
{
+ hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
hostData.put(VirtualHost.STORE_PATH, storePath);
hostData.put(VirtualHost.STORE_TYPE, storeType);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index acad55417a..666449b658 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.security.acl.AbstractACLTestCase;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory;
import org.apache.qpid.server.security.group.FileGroupManagerFactory;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.systest.rest.QpidRestTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -981,6 +982,7 @@ public class BrokerACLTest extends QpidRestTestCase
hostData.put(VirtualHost.NAME, hostName);
hostData.put(VirtualHost.STORE_PATH, getStoreLocation(hostName));
hostData.put(VirtualHost.STORE_TYPE, getTestProfileMessageStoreType());
+ hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
index 9bf7dbd62a..4a81480671 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.test.client.timeouts;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,8 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
+ setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type",
+ StandardVirtualHostFactory.TYPE);
setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore");
setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
@@ -64,7 +67,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
_connection = getConnection();
- //Create Queue
+ //Create Queue
_queue = (Queue) getInitialContext().lookup("queue");
//Create Consumer