summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-20 18:39:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-20 18:39:44 +0000
commitbc33c0d9e43996153bd50823a436c3694460aa4e (patch)
treec7ff6547c61da68d1fdc4b4fbd92498734cf86a4
parentf0754f9b6960437ba79a95bebbbb82789d8f7e07 (diff)
downloadqpid-python-bc33c0d9e43996153bd50823a436c3694460aa4e.tar.gz
QPID-5087 : [Java Broker] Allow use of separate message stores and configuration stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1515914 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java76
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java10
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java10
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java31
-rw-r--r--java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory19
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java2
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java3
-rw-r--r--java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java2
-rw-r--r--java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java31
-rw-r--r--java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory19
-rw-r--r--java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java18
-rw-r--r--java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java39
-rw-r--r--java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Model.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java55
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java16
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java78
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java513
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java33
-rw-r--r--java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory19
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java299
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java13
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java2
35 files changed, 1337 insertions, 132 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index a4383d94c4..e772498ee9 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -139,6 +139,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private String _storeLocation;
private Map<String, String> _envConfigMap;
+ private VirtualHost _virtualHost;
public AbstractBDBMessageStore()
{
@@ -151,34 +152,58 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
_eventManager.addEventListener(eventListener, events);
}
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = recoveryHandler;
-
- configure(name, virtualHost);
+ _virtualHost = virtualHost;
}
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ // Is acting as a message store, but not a durable config store
+ _stateManager.attainState(State.INITIALISING);
+ }
+
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
+ _virtualHost = virtualHost;
+
+ completeInitialisation();
+ }
+
+ private void completeInitialisation() throws Exception
+ {
+ configure(_virtualHost);
_stateManager.attainState(State.INITIALISED);
}
public synchronized void activate() throws Exception
{
+ // check if acting as a durable config store, but not a message store
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ completeInitialisation();
+ }
_stateManager.attainState(State.ACTIVATING);
- recoverConfig(_configRecoveryHandler);
- recoverMessages(_messageRecoveryHandler);
- recoverQueueEntries(_tlogRecoveryHandler);
+ if(_configRecoveryHandler != null)
+ {
+ recoverConfig(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ recoverQueueEntries(_tlogRecoveryHandler);
+ }
_stateManager.attainState(State.ACTIVE);
}
@@ -192,23 +217,38 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
* Called after instantiation in order to configure the message store.
*
*
- * @param name The name of the virtual host using this store
- * @param virtualHost
+ *
+ * @param virtualHost The virtual host using this store
* @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(String name, VirtualHost virtualHost) throws Exception
+ public void configure(VirtualHost virtualHost) throws Exception
{
+ configure(virtualHost, _messageRecoveryHandler != null);
+ }
-
+ public void configure(VirtualHost virtualHost, boolean isMessageStore) throws Exception
+ {
+ String name = virtualHost.getName();
final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
-
- String storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
- if(storeLocation == null)
+ String storeLocation;
+ if(isMessageStore)
{
- storeLocation = defaultPath;
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+ else // we are acting only as the durable config store
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
}
Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 561e4fa660..fb1dc1f1d3 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -122,14 +122,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private Map<String, String> _repConfig;
@Override
- public void configure(String name, VirtualHost virtualHost) throws Exception
+ public void configure(VirtualHost virtualHost) throws Exception
{
//Mandatory configuration
_groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
_nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
_nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
_helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
- _name = name;
+ _name = virtualHost.getName();
//Optional configuration
String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null);
@@ -157,7 +157,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- super.configure(name, virtualHost);
+ super.configure(virtualHost);
}
@@ -260,10 +260,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
@Override
- public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
- super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler);
+ super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index b92a97c8cb..bb3c7b108d 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -77,12 +77,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
new DefaultUpgraderProvider(this, getExchangeRegistry()));
- _messageStore.configureConfigStore(getName(),
- configRecoverer,
- virtualHost);
+ _messageStore.configureConfigStore(
+ virtualHost, configRecoverer
+ );
- _messageStore.configureMessageStore(getName(),
- recoveryHandler,
+ _messageStore.configureMessageStore(
+ virtualHost, recoveryHandler,
recoveryHandler
);
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 4eac54dd6f..d7c8b23d39 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -26,10 +26,12 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-public class BDBMessageStoreFactory implements MessageStoreFactory
+public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory
{
@Override
@@ -39,6 +41,12 @@ public class BDBMessageStoreFactory implements MessageStoreFactory
}
@Override
+ public DurableConfigurationStore createDurableConfigurationStore()
+ {
+ return new BDBMessageStore();
+ }
+
+ @Override
public MessageStore createMessageStore()
{
return new BDBMessageStore();
@@ -76,12 +84,25 @@ public class BDBMessageStoreFactory implements MessageStoreFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
- if(!(storePath instanceof String))
+ if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ {
+ Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ +"' is required and must be of type String.");
+
+ }
+ }
+ if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE)))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
- +"' is required and must be of type String.");
+ Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH
+ +"' is required and must be of type String.");
+ }
}
}
}
diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
new file mode 100644
index 0000000000..a822405565
--- /dev/null
+++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 21342b5715..76b990038d 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -230,7 +230,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
messageStore.close();
AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.configure("", getVirtualHostModel());
+ newStore.configure(getVirtualHostModel(),true);
newStore.startWithNoRecover();
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
index 5ad49462ac..3f32df4b0c 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -20,7 +20,6 @@
package org.apache.qpid.server.store.berkeleydb;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -34,7 +33,7 @@ public class HAMessageStoreSmokeTest extends QpidTestCase
{
try
{
- _store.configure("test", mock(VirtualHost.class));
+ _store.configure(mock(VirtualHost.class));
fail("Expected an exception to be thrown");
}
catch (ConfigurationException ce)
diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index ac310d02c9..bc8d157346 100644
--- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -132,7 +132,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
_driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
- String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ String databasePath = isConfigStoreOnly() ? (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH) : (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
if(databasePath == null)
{
databasePath = defaultPath;
diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
index 1b111ad65e..47a451ccf6 100644
--- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -24,10 +24,12 @@ import java.util.Collections;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-public class DerbyMessageStoreFactory implements MessageStoreFactory
+public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory
{
@Override
@@ -37,6 +39,12 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory
}
@Override
+ public DurableConfigurationStore createDurableConfigurationStore()
+ {
+ return new DerbyMessageStore();
+ }
+
+ @Override
public MessageStore createMessageStore()
{
return new DerbyMessageStore();
@@ -52,12 +60,25 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
- if(!(storePath instanceof String))
+ if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ {
+ Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ +"' is required and must be of type String.");
+
+ }
+ }
+ if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE)))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
- +"' is required and must be of type String.");
+ Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH
+ +"' is required and must be of type String.");
+ }
}
}
diff --git a/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
new file mode 100644
index 0000000000..88ca1fed5e
--- /dev/null
+++ b/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index f8d93536bb..6fdfa40561 100644
--- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -51,6 +51,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
public static final String TYPE = "JDBC";
public static final String CONNECTION_URL = "connectionURL";
+ public static final String CONFIG_CONNECTION_URL = "configConnectionURL";
protected String _connectionURL;
private ConnectionProvider _connectionProvider;
@@ -280,11 +281,20 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
throws ClassNotFoundException, SQLException
{
+ String connectionURL;
+ if(!isConfigStoreOnly())
+ {
+ connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null
+ ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH))
+ : String.valueOf(virtualHost.getAttribute(CONNECTION_URL));
+ }
+ else
+ {
+ connectionURL = virtualHost.getAttribute(CONFIG_CONNECTION_URL) == null
+ ? String.valueOf(virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH))
+ : String.valueOf(virtualHost.getAttribute(CONFIG_CONNECTION_URL));
- String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null
- ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH))
- : String.valueOf(virtualHost.getAttribute(CONNECTION_URL));
-
+ }
JDBCDetails details = null;
String[] components = connectionURL.split(":",3);
diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
index 82d2275156..1144eaaf18 100644
--- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
+++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
@@ -24,10 +24,12 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-public class JDBCMessageStoreFactory implements MessageStoreFactory
+public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory
{
@Override
@@ -43,6 +45,12 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory
}
@Override
+ public DurableConfigurationStore createDurableConfigurationStore()
+ {
+ return new JDBCMessageStore();
+ }
+
+ @Override
public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration)
{
Map<String,Object> convertedMap = new HashMap<String,Object>();
@@ -67,15 +75,32 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL);
- if(!(connectionURL instanceof String))
+ if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ {
+ Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL);
+ if(!(connectionURL instanceof String))
+ {
+ Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL
+ +"' is required and must be of type String.");
+
+ }
+ }
+ }
+ if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
- if(!(storePath instanceof String))
+ Object connectionURL = attributes.get(JDBCMessageStore.CONFIG_CONNECTION_URL);
+ if(!(connectionURL instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL
- +"' is required and must be of type String.");
+ Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONFIG_CONNECTION_URL
+ +"' is required and must be of type String.");
+ }
}
}
}
diff --git a/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
new file mode 100644
index 0000000000..a77458f27d
--- /dev/null
+++ b/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
index 9120a27976..f652dcec67 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
@@ -77,12 +77,12 @@ public class Model
addRelationship(Connection.class, Session.class);
- addRelationship(Exchange.class, Binding.class);
- addRelationship(Exchange.class, Publisher.class);
-
addRelationship(Queue.class, Binding.class);
addRelationship(Queue.class, Consumer.class);
+ addRelationship(Exchange.class, Binding.class);
+ addRelationship(Exchange.class, Publisher.class);
+
addRelationship(Session.class, Consumer.class);
addRelationship(Session.class, Publisher.class);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 26ac99d5bd..ae07005679 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -74,6 +74,8 @@ public interface VirtualHost extends ConfiguredObject
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
String STORE_TYPE = "storeType";
String STORE_PATH = "storePath";
+ String CONFIG_STORE_TYPE = "configStoreType";
+ String CONFIG_STORE_PATH = "configStorePath";
String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes";
String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes";
String CREATED = "created";
@@ -107,6 +109,8 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_MAXIMUM_DELIVERY_ATTEMPTS,
QUEUE_FLOW_CONTROL_SIZE_BYTES,
QUEUE_FLOW_RESUME_SIZE_BYTES,
+ CONFIG_STORE_TYPE,
+ CONFIG_STORE_PATH,
STORE_TYPE,
STORE_PATH,
STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java
new file mode 100644
index 0000000000..94a029ced3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+
+public interface DurableConfigurationStoreFactory extends Pluggable
+{
+ String getType();
+
+ DurableConfigurationStore createDurableConfigurationStore();
+
+ void validateAttributes(Map<String, Object> attributes);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index d4b0c66351..589f385d22 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -42,9 +42,7 @@ public class QueueArgumentsConverter
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
- /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
- public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- */
+
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e2fea8f50b..4a1452d86c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -169,6 +169,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
+ private VirtualHost _virtualHost;
public AbstractJDBCMessageStore()
{
@@ -176,46 +177,69 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler configRecoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
-
- commonConfiguration(name, virtualHost);
+ _virtualHost = virtualHost;
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ _virtualHost = virtualHost;
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
+ completeInitialisation();
+ }
+
+ private void completeInitialisation() throws ClassNotFoundException, SQLException, AMQStoreException
+ {
+ commonConfiguration();
+
_stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ completeInitialisation();
+ }
_stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
- recoverConfiguration(_configRecoveryHandler);
- recoverMessages(_messageRecoveryHandler);
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
- recoverXids(dtxrh);
+ if(_configRecoveryHandler != null)
+ {
+ recoverConfiguration(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ recoverXids(dtxrh);
+
+ }
_stateManager.attainState(State.ACTIVE);
}
- private void commonConfiguration(String name, VirtualHost virtualHost)
+ private void commonConfiguration()
throws ClassNotFoundException, SQLException, AMQStoreException
{
- implementationSpecificConfiguration(name, virtualHost);
+ implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
createOrOpenDatabase();
upgradeIfNecessary();
}
@@ -1071,6 +1095,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
+ protected boolean isConfigStoreOnly()
+ {
+ return _messageRecoveryHandler == null;
+ }
+
private static final class ConnectionWrapper
{
private final Connection _connection;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index ac95d9fdb3..3abf083026 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -80,24 +80,30 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
}
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISING);
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ _stateManager.attainState(State.INITIALISING);
+ }
_stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
+
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ _stateManager.attainState(State.INITIALISED);
+ }
_stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 08f3d83c4e..6b0748b0c3 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -45,14 +45,13 @@ public interface DurableConfigurationStore
*
*
*
- * @param name The name to be used by this store
- * @param recoveryHandler Handler to be called as the store recovers on start up
+ *
+ *
* @param virtualHost
+ * @param recoveryHandler Handler to be called as the store recovers on start up
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception;
+ void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception;
/**
@@ -96,4 +95,5 @@ public interface DurableConfigurationStore
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException;
+ void close() throws Exception;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java
new file mode 100644
index 0000000000..3a69f802f0
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+
+public class DurableConfigurationStoreCreator
+{
+ private Map<String, DurableConfigurationStoreFactory> _factories = new HashMap<String, DurableConfigurationStoreFactory>();
+
+ public DurableConfigurationStoreCreator()
+ {
+ QpidServiceLoader<DurableConfigurationStoreFactory> qpidServiceLoader = new QpidServiceLoader<DurableConfigurationStoreFactory>();
+ Iterable<DurableConfigurationStoreFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(DurableConfigurationStoreFactory.class);
+ for (DurableConfigurationStoreFactory durableConfigurationStoreFactory : factories)
+ {
+ String type = durableConfigurationStoreFactory.getType();
+ DurableConfigurationStoreFactory factory = _factories.put(type.toLowerCase(), durableConfigurationStoreFactory);
+ if (factory != null)
+ {
+ throw new IllegalStateException("DurableConfigurationStoreFactory with type name '" + type
+ + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '"
+ + durableConfigurationStoreFactory.getClass().getName() + "'");
+ }
+ }
+ }
+
+ public boolean isValidType(String storeType)
+ {
+ return _factories.containsKey(storeType.toLowerCase());
+ }
+
+
+ public DurableConfigurationStore createMessageStore(String storeType)
+ {
+ DurableConfigurationStoreFactory factory = _factories.get(storeType.toLowerCase());
+ if (factory == null)
+ {
+ throw new IllegalConfigurationException("Unknown store type: " + storeType
+ + ". Supported types: " + _factories.keySet());
+ }
+ return factory.createDurableConfigurationStore();
+ }
+
+ public Collection<DurableConfigurationStoreFactory> getFactories()
+ {
+ return Collections.unmodifiableCollection(_factories.values());
+ }
+
+ public Collection<String> getStoreTypes()
+ {
+ return Collections.unmodifiableCollection(_factories.keySet());
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
new file mode 100644
index 0000000000..8eed1fa5a4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -0,0 +1,513 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.VirtualHost;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+public class JsonFileConfigStore implements DurableConfigurationStore
+{
+ private static final Model MODEL = Model.getInstance();
+
+ private static final Map<String,Class<? extends ConfiguredObject>> CLASS_NAME_MAPPING = generateClassNameMap(VirtualHost.class);
+ public static final String TYPE = "JSON";
+
+ private final Map<UUID, ConfiguredObjectRecord> _objectsById = new HashMap<UUID, ConfiguredObjectRecord>();
+ private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>();
+ private final ObjectMapper _objectMapper = new ObjectMapper();
+
+ private String _directoryName;
+ private String _name;
+ private FileLock _fileLock;
+ private String _configFileName;
+ private String _backupFileName;
+ private int _configVersion;
+
+ public JsonFileConfigStore()
+ {
+ _objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT);
+ }
+
+ @Override
+ public void configureConfigStore(final VirtualHost virtualHost, final ConfigurationRecoveryHandler recoveryHandler)
+ throws Exception
+ {
+ _name = virtualHost.getName();
+
+ Object storePathAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ if(!(storePathAttr instanceof String))
+ {
+ throw new AMQStoreException("Cannot determine path for configuration storage");
+ }
+ _directoryName = (String) storePathAttr;
+ _configFileName = _name + ".json";
+ _backupFileName = _name + ".bak";
+ checkDirectoryIsWritable(_directoryName);
+ getFileLock();
+
+ if(!fileExists(_configFileName))
+ {
+ if(!fileExists(_backupFileName))
+ {
+ File newFile = new File(_directoryName, _configFileName);
+ _objectMapper.writeValue(newFile, Collections.emptyMap());
+ }
+ else
+ {
+ renameFile(_backupFileName, _configFileName);
+ }
+ }
+
+
+ load();
+ recoveryHandler.beginConfigurationRecovery(this,_configVersion);
+ List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
+ for(ConfiguredObjectRecord record : records)
+ {
+ recoveryHandler.configuredObject(record.getId(), record.getType(), record.getAttributes());
+ }
+ int oldConfigVersion = _configVersion;
+ _configVersion = recoveryHandler.completeConfigurationRecovery();
+ if(oldConfigVersion != _configVersion)
+ {
+ save();
+ }
+ }
+
+ private void renameFile(String fromFileName, String toFileName) throws AMQStoreException
+ {
+ File toFile = new File(_directoryName, toFileName);
+ if(toFile.exists())
+ {
+ if(!toFile.delete())
+ {
+ throw new AMQStoreException("Cannot delete file " + toFile.getAbsolutePath());
+ }
+ }
+ File fromFile = new File(_directoryName, fromFileName);
+
+ if(!fromFile.renameTo(toFile))
+ {
+ throw new AMQStoreException("Cannot rename file " + fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath());
+ }
+ }
+
+ private boolean fileExists(String fileName)
+ {
+ File file = new File(_directoryName, fileName);
+ return file.exists();
+ }
+
+ private void getFileLock() throws IOException, AMQStoreException
+ {
+ File lockFile = new File(_directoryName, _name + ".lck");
+ lockFile.createNewFile();
+
+ FileOutputStream out = new FileOutputStream(lockFile);
+ FileChannel channel = out.getChannel();
+ try
+ {
+ _fileLock = channel.tryLock();
+ }
+ catch(OverlappingFileLockException e)
+ {
+ _fileLock = null;
+ }
+ if(_fileLock == null)
+ {
+ throw new AMQStoreException("Cannot get lock on file " + lockFile.getAbsolutePath() + " is another instance running?");
+ }
+ lockFile.deleteOnExit();
+ }
+
+ private void checkDirectoryIsWritable(String directoryName) throws AMQStoreException
+ {
+ File dir = new File(directoryName);
+ if(dir.exists())
+ {
+ if(dir.isDirectory())
+ {
+ if(!dir.canWrite())
+ {
+ throw new AMQStoreException("Configuration path " + directoryName + " exists, but is not writable");
+ }
+
+ }
+ else
+ {
+ throw new AMQStoreException("Configuration path " + directoryName + " exists, but is not a directory");
+ }
+ }
+ else if(!dir.mkdirs())
+ {
+ throw new AMQStoreException("Cannot create directory " + directoryName);
+ }
+ }
+
+ private void load() throws IOException
+ {
+ Map data = _objectMapper.readValue(new File(_directoryName,_configFileName),Map.class);
+ Collection<Class<? extends ConfiguredObject>> childClasses =
+ MODEL.getChildTypes(VirtualHost.class);
+ String modelVersion = (String) data.remove("modelVersion");
+ Object configVersion;
+ if((configVersion = data.remove("configVersion")) instanceof Integer)
+ {
+ _configVersion = (Integer) configVersion;
+ }
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ final String type = childClass.getSimpleName();
+ String attrName = type.toLowerCase() + "s";
+ Object children = data.remove(attrName);
+ if(children != null)
+ {
+ if(children instanceof Collection)
+ {
+ for(Object child : (Collection)children)
+ {
+ if(child instanceof Map)
+ {
+ loadChild(childClass, (Map)child, VirtualHost.class, null);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void loadChild(final Class<? extends ConfiguredObject> clazz,
+ final Map<String,Object> data,
+ final Class<? extends ConfiguredObject> parentClass,
+ final UUID parentId)
+ {
+ Collection<Class<? extends ConfiguredObject>> childClasses =
+ MODEL.getChildTypes(clazz);
+ String idStr = (String) data.remove("id");
+ final UUID id = UUID.fromString(idStr);
+ final String type = clazz.getSimpleName();
+
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ final String childType = childClass.getSimpleName();
+ String attrName = childType.toLowerCase() + "s";
+ Object children = data.remove(attrName);
+ if(children != null)
+ {
+ if(children instanceof Collection)
+ {
+ for(Object child : (Collection)children)
+ {
+ if(child instanceof Map)
+ {
+ loadChild(childClass, (Map)child, clazz, id);
+ }
+ }
+ }
+ }
+
+ }
+ if(parentId != null)
+ {
+ data.put(parentClass.getSimpleName().toLowerCase(),parentId);
+ for(Class<? extends ConfiguredObject> otherParent : MODEL.getParentTypes(clazz))
+ {
+ if(otherParent != parentClass)
+ {
+ final String otherParentAttr = otherParent.getSimpleName().toLowerCase();
+ Object otherParentId = data.get(otherParentAttr);
+ if(otherParentId instanceof String)
+ {
+ try
+ {
+ data.put(otherParentAttr, UUID.fromString((String) otherParentId));
+ }
+ catch(IllegalArgumentException e)
+ {
+ //
+ }
+ }
+ }
+
+ }
+ }
+
+ _objectsById.put(id, new ConfiguredObjectRecord(id, type, data));
+ List<UUID> idsForType = _idsByType.get(type);
+ if(idsForType == null)
+ {
+ idsForType = new ArrayList<UUID>();
+ _idsByType.put(type, idsForType);
+ }
+ idsForType.add(id);
+
+ }
+
+ @Override
+ public synchronized void create(final UUID id, final String type, final Map<String, Object> attributes) throws AMQStoreException
+ {
+ if(_objectsById.containsKey(id))
+ {
+ throw new AMQStoreException("Object with id " + id + " already exists");
+ }
+ else if(!CLASS_NAME_MAPPING.containsKey(type))
+ {
+ throw new AMQStoreException("Cannot create object of unknown type " + type);
+ }
+ else
+ {
+ ConfiguredObjectRecord record = new ConfiguredObjectRecord(id, type, attributes);
+ _objectsById.put(id, record);
+ List<UUID> idsForType = _idsByType.get(type);
+ if(idsForType == null)
+ {
+ idsForType = new ArrayList<UUID>();
+ _idsByType.put(type, idsForType);
+ }
+ idsForType.add(id);
+ save();
+ }
+ }
+
+ private void save() throws AMQStoreException
+ {
+ Collection<Class<? extends ConfiguredObject>> childClasses =
+ MODEL.getChildTypes(VirtualHost.class);
+
+ Map<String, Object> virtualHostMap = new LinkedHashMap<String, Object>();
+ virtualHostMap.put("modelVersion", Model.MODEL_VERSION);
+ virtualHostMap.put("configVersion", _configVersion);
+
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ final String type = childClass.getSimpleName();
+ String attrName = type.toLowerCase() + "s";
+ List<UUID> childIds = _idsByType.get(type);
+ if(childIds != null && !childIds.isEmpty())
+ {
+ List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
+ for(UUID id : childIds)
+ {
+ entities.add(build(childClass,id));
+ }
+ virtualHostMap.put(attrName, entities);
+ }
+ }
+
+ try
+ {
+
+ File tmpFile = File.createTempFile("cfg","tmp", new File(_directoryName));
+ tmpFile.deleteOnExit();
+ _objectMapper.writeValue(tmpFile,virtualHostMap);
+ renameFile(_configFileName,_backupFileName);
+ renameFile(tmpFile.getName(),_configFileName);
+ tmpFile.delete();
+ File backupFile = new File(_directoryName, _backupFileName);
+ backupFile.delete();
+
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Cannot save to store", e);
+ }
+ }
+
+ private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id)
+ {
+ ConfiguredObjectRecord record = _objectsById.get(id);
+ Map<String,Object> map = new LinkedHashMap<String, Object>();
+ map.put("id", id);
+ map.putAll(record.getAttributes());
+ map.remove(MODEL.getParentTypes(type).iterator().next().getSimpleName().toLowerCase());
+
+ Collection<Class<? extends ConfiguredObject>> childClasses =
+ new ArrayList<Class<? extends ConfiguredObject>>(MODEL.getChildTypes(type));
+
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ // only add if this is the "first" parent
+ if(MODEL.getParentTypes(childClass).iterator().next() == type)
+ {
+ String attrName = childClass.getSimpleName().toLowerCase() + "s";
+ List<UUID> childIds = _idsByType.get(childClass.getSimpleName());
+ if(childIds != null)
+ {
+ List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
+ for(UUID childId : childIds)
+ {
+ ConfiguredObjectRecord childRecord = _objectsById.get(childId);
+ final String parentArg = type.getSimpleName().toLowerCase();
+ if(id.toString().equals(String.valueOf(childRecord.getAttributes().get(parentArg))))
+ {
+ entities.add(build(childClass,childId));
+ }
+ }
+ if(!entities.isEmpty())
+ {
+ map.put(attrName,entities);
+ }
+ }
+ }
+ }
+
+ return map;
+ }
+
+ @Override
+ public void remove(final UUID id, final String type) throws AMQStoreException
+ {
+ removeConfiguredObjects(id);
+ }
+
+ @Override
+ public synchronized UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+ {
+ List<UUID> removedIds = new ArrayList<UUID>();
+ for(UUID id : objects)
+ {
+ ConfiguredObjectRecord record = _objectsById.remove(id);
+ if(record != null)
+ {
+ removedIds.add(id);
+ _idsByType.get(record.getType()).remove(id);
+ }
+ }
+ save();
+ return removedIds.toArray(new UUID[removedIds.size()]);
+ }
+
+ @Override
+ public void update(final UUID id, final String type, final Map<String, Object> attributes) throws AMQStoreException
+ {
+ update(false, new ConfiguredObjectRecord(id, type, attributes));
+ }
+
+ @Override
+ public void update(final ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ update(false, records);
+ }
+
+ @Override
+ public void update(final boolean createIfNecessary, final ConfiguredObjectRecord... records)
+ throws AMQStoreException
+ {
+ for(ConfiguredObjectRecord record : records)
+ {
+ final UUID id = record.getId();
+ final String type = record.getType();
+
+ if(_objectsById.containsKey(id))
+ {
+ final ConfiguredObjectRecord existingRecord = _objectsById.get(id);
+ if(!type.equals(existingRecord.getType()))
+ {
+ throw new AMQStoreException("Cannot change the type of record " + id + " from type "
+ + existingRecord.getType() + " to type " + type);
+ }
+ }
+ else if(!createIfNecessary)
+ {
+ throw new AMQStoreException("Cannot update record with id " + id
+ + " of type " + type + " as it does not exist");
+ }
+ else if(!CLASS_NAME_MAPPING.containsKey(type))
+ {
+ throw new AMQStoreException("Cannot update record of unknown type " + type);
+ }
+ }
+ for(ConfiguredObjectRecord record : records)
+ {
+ final UUID id = record.getId();
+ final String type = record.getType();
+ if(_objectsById.put(id, record) == null)
+ {
+ List<UUID> idsForType = _idsByType.get(type);
+ if(idsForType == null)
+ {
+ idsForType = new ArrayList<UUID>();
+ _idsByType.put(type, idsForType);
+ }
+ idsForType.add(id);
+ }
+ }
+
+ save();
+ }
+
+ public void close() throws Exception
+ {
+ try
+ {
+ releaseFileLock();
+ }
+ finally
+ {
+ _fileLock = null;
+ _idsByType.clear();
+ _objectsById.clear();
+ }
+
+ }
+
+ private void releaseFileLock() throws IOException
+ {
+ _fileLock.release();
+ _fileLock.channel().close();
+ }
+
+
+ private static Map<String,Class<? extends ConfiguredObject>> generateClassNameMap(final Class<? extends ConfiguredObject> clazz)
+ {
+ Map<String,Class<? extends ConfiguredObject>>map = new HashMap<String, Class<? extends ConfiguredObject>>();
+ map.put(clazz.getSimpleName().toString(), clazz);
+ Collection<Class<? extends ConfiguredObject>> childClasses = MODEL.getChildTypes(clazz);
+ if(childClasses != null)
+ {
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ map.putAll(generateClassNameMap(childClass));
+ }
+ }
+ return map;
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java
new file mode 100644
index 0000000000..374a35d10d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
+
+public class JsonFileConfigStoreFactory implements DurableConfigurationStoreFactory
+{
+ @Override
+ public String getType()
+ {
+ return JsonFileConfigStore.TYPE;
+ }
+
+ @Override
+ public DurableConfigurationStore createDurableConfigurationStore()
+ {
+ return new JsonFileConfigStore();
+ }
+
+ @Override
+ public void validateAttributes(Map<String, Object> attributes)
+ {
+ Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ if(!(storePath instanceof String))
+ {
+ throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH
+ +"' is required and must be of type String.");
+
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index c605e1b599..996d71d51d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.model.VirtualHost;
+
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
@@ -31,13 +33,14 @@ public interface MessageStore
* whatever parameters it wants.
*
*
- * @param name The name to be used by this store
+ *
+ *
+ * @param virtualHost
* @param messageRecoveryHandler Handler to be called as the store recovers on start up
* @param tlogRecoveryHandler
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
+ void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception;
void activate() throws Exception;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 9eb0d85914..57dbfabaa4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -27,9 +27,7 @@ import org.apache.qpid.server.model.VirtualHost;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
{
}
@@ -66,8 +64,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index d87431a415..d782594a0d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -660,6 +660,18 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_logger.error("Failed to close message store", e);
}
}
+ if (getDurableConfigurationStore() != null)
+ {
+ //Remove MessageStore Interface should not throw Exception
+ try
+ {
+ getDurableConfigurationStore().close();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to close message store", e);
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 143bdce85b..b7e51d88d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
import org.apache.qpid.server.store.OperationalLoggingListener;
@@ -78,7 +79,14 @@ public class StandardVirtualHost extends AbstractVirtualHost
private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception
{
DurableConfigurationStore configurationStore;
- if(getMessageStore() instanceof DurableConfigurationStore)
+ final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_TYPE);
+ String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
+
+ if(storeType != null)
+ {
+ configurationStore = new DurableConfigurationStoreCreator().createMessageStore(storeType);
+ }
+ else if(getMessageStore() instanceof DurableConfigurationStore)
{
configurationStore = (DurableConfigurationStore) getMessageStore();
}
@@ -100,10 +108,10 @@ public class StandardVirtualHost extends AbstractVirtualHost
DurableConfigurationRecoverer configRecoverer =
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
new DefaultUpgraderProvider(this, getExchangeRegistry()));
- _durableConfigurationStore.configureConfigStore(getName(), configRecoverer, virtualHost);
+ _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer);
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
- _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler);
+ _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler);
initialiseModel(hostConfig);
@@ -112,25 +120,6 @@ public class StandardVirtualHost extends AbstractVirtualHost
attainActivation();
}
-
- protected void closeStorage()
- {
- //Close MessageStore
- if (_messageStore != null)
- {
- //Remove MessageStore Interface should not throw Exception
- try
- {
- getMessageStore().close();
- }
- catch (Exception e)
- {
- getLogger().error("Failed to close message store", e);
- }
- }
- }
-
-
@Override
public MessageStore getMessageStore()
{
diff --git a/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
new file mode 100644
index 0000000000..d183d91f18
--- /dev/null
+++ b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.JsonFileConfigStoreFactory
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index e9ad4ba236..50a3582811 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -406,8 +406,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_messageStore = createMessageStore();
_configStore = createConfigStore();
- _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost);
- _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _configStore.configureConfigStore(_virtualHost, _recoveryHandler);
+ _messageStore.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler);
_messageStore.activate();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
new file mode 100644
index 0000000000..b6300e6f48
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.InOrder;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JsonFileConfigStoreTest extends QpidTestCase
+{
+ private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
+ private VirtualHost _virtualHost;
+ private JsonFileConfigStore _store;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ removeStoreFile();
+ _virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getName()).thenReturn(getName());
+ when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(TMP_FOLDER);
+ _store = new JsonFileConfigStore();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ removeStoreFile();
+ }
+
+ private void removeStoreFile()
+ {
+ File file = new File(TMP_FOLDER, getName() + ".json");
+ if(file.exists())
+ {
+ file.delete();
+ }
+ }
+
+ public void testNoStorePath() throws Exception
+ {
+ when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(null);
+ try
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ fail("Store should not successfully configure if there is no path set");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass
+ }
+ }
+
+
+ public void testInvalidStorePath() throws Exception
+ {
+ when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(System.getProperty("file.separator"));
+ try
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ fail("Store should not successfully configure if there is an invalid path set");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass
+ }
+ }
+
+ public void testStartFromNoStore() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ InOrder inorder = inOrder(_recoveryHandler);
+ inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+ inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap());
+ inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+ _store.close();
+ }
+
+ public void testUpdatedConfigVersionIsRetained() throws Exception
+ {
+ final int NEW_CONFIG_VERSION = 42;
+ when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.close();
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ InOrder inorder = inOrder(_recoveryHandler);
+
+ // first time the config version should be the initial version - 0
+ inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+
+ // second time the config version should be the updated version
+ inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+
+ _store.close();
+ }
+
+ public void testCreateObject() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID queueId = new UUID(0, 1);
+ final String queueType = Queue.class.getSimpleName();
+ final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
+
+ _store.create(queueId, queueType, queueAttr);
+ _store.close();
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
+ _store.close();
+ }
+
+ public void testCreateAndUpdateObject() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID queueId = new UUID(0, 1);
+ final String queueType = Queue.class.getSimpleName();
+ Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
+
+ _store.create(queueId, queueType, queueAttr);
+
+
+ queueAttr = new HashMap<String,Object>(queueAttr);
+ queueAttr.put("owner", "theowner");
+ _store.update(queueId, queueType, queueAttr);
+
+ _store.close();
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
+ _store.close();
+ }
+
+
+ public void testCreateAndRemoveObject() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID queueId = new UUID(0, 1);
+ final String queueType = Queue.class.getSimpleName();
+ Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
+
+ _store.create(queueId, queueType, queueAttr);
+
+
+ _store.remove(queueId, queueType);
+
+ _store.close();
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap());
+ _store.close();
+ }
+
+ public void testCreateUnknownObjectType() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ try
+ {
+ _store.create(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap());
+ fail("Should not be able to create instance of type wibble");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass
+ }
+ }
+
+ public void testTwoObjectsWithSameId() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID id = UUID.randomUUID();
+ _store.create(id, "Queue", Collections.<String, Object>emptyMap());
+ try
+ {
+ _store.create(id, "Exchange", Collections.<String, Object>emptyMap());
+ fail("Should not be able to create two objects with same id");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass
+ }
+ }
+
+
+ public void testChangeTypeOfObject() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID id = UUID.randomUUID();
+ _store.create(id, "Queue", Collections.<String, Object>emptyMap());
+ _store.close();
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+
+ try
+ {
+ _store.update(id, "Exchange", Collections.<String, Object>emptyMap());
+ fail("Should not be able to update object to different type");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass
+ }
+ }
+
+ public void testLockFileGuaranteesExclusiveAccess() throws Exception
+ {
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+
+ JsonFileConfigStore secondStore = new JsonFileConfigStore();
+
+ try
+ {
+ secondStore.configureConfigStore(_virtualHost, _recoveryHandler);
+ fail("Should not be able to open a second store with the same path");
+ }
+ catch(AMQStoreException e)
+ {
+ // pass
+ }
+ _store.close();
+ secondStore.configureConfigStore(_virtualHost, _recoveryHandler);
+
+
+ }
+
+ public void testCreatedNestedObjects() throws Exception
+ {
+
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ final UUID queueId = new UUID(0, 1);
+ final UUID queue2Id = new UUID(1, 1);
+
+ final Map<String, Object> EMPTY_ATTR = Collections.emptyMap();
+ final UUID exchangeId = new UUID(0, 2);
+ final Map<String, Object> bindingAttributes = new HashMap<String, Object>();
+ bindingAttributes.put(Binding.EXCHANGE, exchangeId);
+ bindingAttributes.put(Binding.QUEUE, queueId);
+ final Map<String, Object> binding2Attributes = new HashMap<String, Object>();
+ binding2Attributes.put(Binding.EXCHANGE, exchangeId);
+ binding2Attributes.put(Binding.QUEUE, queue2Id);
+
+ final UUID bindingId = new UUID(0, 3);
+ final UUID binding2Id = new UUID(1, 3);
+
+ _store.create(queueId, "Queue", EMPTY_ATTR);
+ _store.create(queue2Id, "Queue", EMPTY_ATTR);
+ _store.create(exchangeId, "Exchange", EMPTY_ATTR);
+ _store.update(true,
+ new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes),
+ new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes));
+ _store.close();
+ _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes));
+ verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes));
+ _store.close();
+
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index ea47be83ec..b23890b10c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -66,11 +66,13 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
VirtualHost vhost = mock(VirtualHost.class);
when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath());
+ when(vhost.getName()).thenReturn("test");
+
applyStoreSpecificConfiguration(vhost);
_store = createStore();
- ((DurableConfigurationStore)_store).configureConfigStore("test", null, vhost);
- _store.configureMessageStore("test", null, null);
+ ((DurableConfigurationStore)_store).configureConfigStore(vhost, null);
+ _store.configureMessageStore(vhost, null, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 2d68e94fcd..7ebfd54df6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,16 +52,18 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
_virtualHost = mock(VirtualHost.class);
+
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
setUpStoreConfiguration(_virtualHost);
+ when(_virtualHost.getName()).thenReturn(getTestName());
_store = createMessageStore();
- ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _virtualHost);
+ ((DurableConfigurationStore)_store).configureConfigStore(_virtualHost, _recoveryHandler);
- _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _store.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler);
}
protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index a1d3d1542e..e19aeb38e7 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -48,9 +48,7 @@ public class
}
@Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost)
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
throws Exception
{
Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
@@ -77,7 +75,7 @@ public class
}
@Override
- public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISED);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index cec982c2c5..e08061deea 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -48,11 +48,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
// ***** MessageStore Interface.
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- VirtualHost virtualHost) throws Exception
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception
{
- _logger.info("Starting SlowMessageStore on Virtualhost:" + name);
+ _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays");
@@ -84,7 +82,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
_durableConfigurationStore = (DurableConfigurationStore)o;
}
}
- _durableConfigurationStore.configureConfigStore(name, recoveryHandler, virtualHost);
+ _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler);
}
@@ -153,11 +151,10 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
- _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler);
+ _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
}
public void close() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 1d1c474be0..ad328eaede 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -51,7 +51,7 @@ public class Asserts
assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost);
assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE,
VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH,
- VirtualHost.CONFIG_PATH, VirtualHost.TYPE);
+ VirtualHost.CONFIG_PATH, VirtualHost.TYPE, VirtualHost.CONFIG_STORE_PATH, VirtualHost.CONFIG_STORE_TYPE);
assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME));
assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID));