diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-20 18:39:44 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-20 18:39:44 +0000 |
commit | bc33c0d9e43996153bd50823a436c3694460aa4e (patch) | |
tree | c7ff6547c61da68d1fdc4b4fbd92498734cf86a4 | |
parent | f0754f9b6960437ba79a95bebbbb82789d8f7e07 (diff) | |
download | qpid-python-bc33c0d9e43996153bd50823a436c3694460aa4e.tar.gz |
QPID-5087 : [Java Broker] Allow use of separate message stores and configuration stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1515914 13f79535-47bb-0310-9956-ffa450edef68
35 files changed, 1337 insertions, 132 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index a4383d94c4..e772498ee9 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -139,6 +139,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private String _storeLocation; private Map<String, String> _envConfigMap; + private VirtualHost _virtualHost; public AbstractBDBMessageStore() { @@ -151,34 +152,58 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _eventManager.addEventListener(eventListener, events); } - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; - - configure(name, virtualHost); + _virtualHost = virtualHost; } - public void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { + if(_stateManager.isInState(State.INITIAL)) + { + // Is acting as a message store, but not a durable config store + _stateManager.attainState(State.INITIALISING); + } + _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; + _virtualHost = virtualHost; + + completeInitialisation(); + } + + private void completeInitialisation() throws Exception + { + configure(_virtualHost); _stateManager.attainState(State.INITIALISED); } public synchronized void activate() throws Exception { + // check if acting as a durable config store, but not a message store + if(_stateManager.isInState(State.INITIALISING)) + { + completeInitialisation(); + } _stateManager.attainState(State.ACTIVATING); - recoverConfig(_configRecoveryHandler); - recoverMessages(_messageRecoveryHandler); - recoverQueueEntries(_tlogRecoveryHandler); + if(_configRecoveryHandler != null) + { + recoverConfig(_configRecoveryHandler); + } + if(_messageRecoveryHandler != null) + { + recoverMessages(_messageRecoveryHandler); + } + if(_tlogRecoveryHandler != null) + { + recoverQueueEntries(_tlogRecoveryHandler); + } _stateManager.attainState(State.ACTIVE); } @@ -192,23 +217,38 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo * Called after instantiation in order to configure the message store. * * - * @param name The name of the virtual host using this store - * @param virtualHost + * + * @param virtualHost The virtual host using this store * @return whether a new store environment was created or not (to indicate whether recovery is necessary) * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - public void configure(String name, VirtualHost virtualHost) throws Exception + public void configure(VirtualHost virtualHost) throws Exception { + configure(virtualHost, _messageRecoveryHandler != null); + } - + public void configure(VirtualHost virtualHost, boolean isMessageStore) throws Exception + { + String name = virtualHost.getName(); final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) + String storeLocation; + if(isMessageStore) { - storeLocation = defaultPath; + storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + else // we are acting only as the durable config store + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } } Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 561e4fa660..fb1dc1f1d3 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -122,14 +122,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private Map<String, String> _repConfig; @Override - public void configure(String name, VirtualHost virtualHost) throws Exception + public void configure(VirtualHost virtualHost) throws Exception { //Mandatory configuration _groupName = getValidatedStringAttribute(virtualHost, "haGroupName"); _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName"); _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress"); _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress"); - _name = name; + _name = virtualHost.getName(); //Optional configuration String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null); @@ -157,7 +157,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess + "! Please set highAvailability.coalescingSync to false in store configuration."); } - super.configure(name, virtualHost); + super.configure(virtualHost); } @@ -260,10 +260,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } @Override - public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); + super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index b92a97c8cb..bb3c7b108d 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -77,12 +77,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), new DefaultUpgraderProvider(this, getExchangeRegistry())); - _messageStore.configureConfigStore(getName(), - configRecoverer, - virtualHost); + _messageStore.configureConfigStore( + virtualHost, configRecoverer + ); - _messageStore.configureMessageStore(getName(), - recoveryHandler, + _messageStore.configureMessageStore( + virtualHost, recoveryHandler, recoveryHandler ); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 4eac54dd6f..d7c8b23d39 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -26,10 +26,12 @@ import java.util.List; import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -public class BDBMessageStoreFactory implements MessageStoreFactory +public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory { @Override @@ -39,6 +41,12 @@ public class BDBMessageStoreFactory implements MessageStoreFactory } @Override + public DurableConfigurationStore createDurableConfigurationStore() + { + return new BDBMessageStore(); + } + + @Override public MessageStore createMessageStore() { return new BDBMessageStore(); @@ -76,12 +84,25 @@ public class BDBMessageStoreFactory implements MessageStoreFactory @Override public void validateAttributes(Map<String, Object> attributes) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) + if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + +"' is required and must be of type String."); + + } + } + if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH - +"' is required and must be of type String."); + Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH + +"' is required and must be of type String."); + } } } } diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory new file mode 100644 index 0000000000..a822405565 --- /dev/null +++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 21342b5715..76b990038d 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -230,7 +230,7 @@ public class BDBMessageStoreTest extends MessageStoreTest messageStore.close(); AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure("", getVirtualHostModel()); + newStore.configure(getVirtualHostModel(),true); newStore.startWithNoRecover(); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index 5ad49462ac..3f32df4b0c 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -34,7 +33,7 @@ public class HAMessageStoreSmokeTest extends QpidTestCase { try { - _store.configure("test", mock(VirtualHost.class)); + _store.configure(mock(VirtualHost.class)); fail("Expected an exception to be thrown"); } catch (ConfigurationException ce) diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index ac310d02c9..bc8d157346 100644 --- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -132,7 +132,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + String databasePath = isConfigStoreOnly() ? (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH) : (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); if(databasePath == null) { databasePath = defaultPath; diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index 1b111ad65e..47a451ccf6 100644 --- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -24,10 +24,12 @@ import java.util.Collections; import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -public class DerbyMessageStoreFactory implements MessageStoreFactory +public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory { @Override @@ -37,6 +39,12 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory } @Override + public DurableConfigurationStore createDurableConfigurationStore() + { + return new DerbyMessageStore(); + } + + @Override public MessageStore createMessageStore() { return new DerbyMessageStore(); @@ -52,12 +60,25 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory @Override public void validateAttributes(Map<String, Object> attributes) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) + if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + +"' is required and must be of type String."); + + } + } + if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH - +"' is required and must be of type String."); + Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH + +"' is required and must be of type String."); + } } } diff --git a/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory new file mode 100644 index 0000000000..88ca1fed5e --- /dev/null +++ b/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.derby.DerbyMessageStoreFactory diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index f8d93536bb..6fdfa40561 100644 --- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -51,6 +51,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag public static final String TYPE = "JDBC"; public static final String CONNECTION_URL = "connectionURL"; + public static final String CONFIG_CONNECTION_URL = "configConnectionURL"; protected String _connectionURL; private ConnectionProvider _connectionProvider; @@ -280,11 +281,20 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag throws ClassNotFoundException, SQLException { + String connectionURL; + if(!isConfigStoreOnly()) + { + connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null + ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) + : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); + } + else + { + connectionURL = virtualHost.getAttribute(CONFIG_CONNECTION_URL) == null + ? String.valueOf(virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)) + : String.valueOf(virtualHost.getAttribute(CONFIG_CONNECTION_URL)); - String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null - ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) - : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); - + } JDBCDetails details = null; String[] components = connectionURL.split(":",3); diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java index 82d2275156..1144eaaf18 100644 --- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -24,10 +24,12 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -public class JDBCMessageStoreFactory implements MessageStoreFactory +public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory { @Override @@ -43,6 +45,12 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory } @Override + public DurableConfigurationStore createDurableConfigurationStore() + { + return new JDBCMessageStore(); + } + + @Override public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) { Map<String,Object> convertedMap = new HashMap<String,Object>(); @@ -67,15 +75,32 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory @Override public void validateAttributes(Map<String, Object> attributes) { - Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); - if(!(connectionURL instanceof String)) + if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + { + Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); + if(!(connectionURL instanceof String)) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL + +"' is required and must be of type String."); + + } + } + } + if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) + Object connectionURL = attributes.get(JDBCMessageStore.CONFIG_CONNECTION_URL); + if(!(connectionURL instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL - +"' is required and must be of type String."); + Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONFIG_CONNECTION_URL + +"' is required and must be of type String."); + } } } } diff --git a/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory new file mode 100644 index 0000000000..a77458f27d --- /dev/null +++ b/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java index 9120a27976..f652dcec67 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java @@ -77,12 +77,12 @@ public class Model addRelationship(Connection.class, Session.class); - addRelationship(Exchange.class, Binding.class); - addRelationship(Exchange.class, Publisher.class); - addRelationship(Queue.class, Binding.class); addRelationship(Queue.class, Consumer.class); + addRelationship(Exchange.class, Binding.class); + addRelationship(Exchange.class, Publisher.class); + addRelationship(Session.class, Consumer.class); addRelationship(Session.class, Publisher.class); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 26ac99d5bd..ae07005679 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -74,6 +74,8 @@ public interface VirtualHost extends ConfiguredObject String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn"; String STORE_TYPE = "storeType"; String STORE_PATH = "storePath"; + String CONFIG_STORE_TYPE = "configStoreType"; + String CONFIG_STORE_PATH = "configStorePath"; String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes"; String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes"; String CREATED = "created"; @@ -107,6 +109,8 @@ public interface VirtualHost extends ConfiguredObject QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, QUEUE_FLOW_CONTROL_SIZE_BYTES, QUEUE_FLOW_RESUME_SIZE_BYTES, + CONFIG_STORE_TYPE, + CONFIG_STORE_PATH, STORE_TYPE, STORE_PATH, STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java new file mode 100644 index 0000000000..94a029ced3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/plugin/DurableConfigurationStoreFactory.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; + +public interface DurableConfigurationStoreFactory extends Pluggable +{ + String getType(); + + DurableConfigurationStore createDurableConfigurationStore(); + + void validateAttributes(Map<String, Object> attributes); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index d4b0c66351..589f385d22 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -42,9 +42,7 @@ public class QueueArgumentsConverter public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; public static final String X_QPID_DESCRIPTION = "x-qpid-description"; - /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; - public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; - */ + public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e2fea8f50b..4a1452d86c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -169,6 +169,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; + private VirtualHost _virtualHost; public AbstractJDBCMessageStore() { @@ -176,46 +177,69 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler configRecoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = configRecoveryHandler; - - commonConfiguration(name, virtualHost); + _virtualHost = virtualHost; } @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { + if(_stateManager.isInState(State.INITIAL)) + { + _stateManager.attainState(State.INITIALISING); + } + + _virtualHost = virtualHost; _tlogRecoveryHandler = tlogRecoveryHandler; _messageRecoveryHandler = recoveryHandler; + completeInitialisation(); + } + + private void completeInitialisation() throws ClassNotFoundException, SQLException, AMQStoreException + { + commonConfiguration(); + _stateManager.attainState(State.INITIALISED); } @Override public void activate() throws Exception { + if(_stateManager.isInState(State.INITIALISING)) + { + completeInitialisation(); + } _stateManager.attainState(State.ACTIVATING); // this recovers durable exchanges, queues, and bindings - recoverConfiguration(_configRecoveryHandler); - recoverMessages(_messageRecoveryHandler); - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler); - recoverXids(dtxrh); + if(_configRecoveryHandler != null) + { + recoverConfiguration(_configRecoveryHandler); + } + if(_messageRecoveryHandler != null) + { + recoverMessages(_messageRecoveryHandler); + } + if(_tlogRecoveryHandler != null) + { + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler); + recoverXids(dtxrh); + + } _stateManager.attainState(State.ACTIVE); } - private void commonConfiguration(String name, VirtualHost virtualHost) + private void commonConfiguration() throws ClassNotFoundException, SQLException, AMQStoreException { - implementationSpecificConfiguration(name, virtualHost); + implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost); createOrOpenDatabase(); upgradeIfNecessary(); } @@ -1071,6 +1095,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } + protected boolean isConfigStoreOnly() + { + return _messageRecoveryHandler == null; + } + private static final class ConnectionWrapper { private final Connection _connection; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index ac95d9fdb3..3abf083026 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -80,24 +80,30 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore } @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISING); } @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { + if(_stateManager.isInState(State.INITIAL)) + { + _stateManager.attainState(State.INITIALISING); + } _stateManager.attainState(State.INITIALISED); } @Override public void activate() throws Exception { + + if(_stateManager.isInState(State.INITIALISING)) + { + _stateManager.attainState(State.INITIALISED); + } _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 08f3d83c4e..6b0748b0c3 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -45,14 +45,13 @@ public interface DurableConfigurationStore * * * - * @param name The name to be used by this store - * @param recoveryHandler Handler to be called as the store recovers on start up + * + * * @param virtualHost + * @param recoveryHandler Handler to be called as the store recovers on start up * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception; + void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception; /** @@ -96,4 +95,5 @@ public interface DurableConfigurationStore public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException; + void close() throws Exception; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java new file mode 100644 index 0000000000..3a69f802f0 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreCreator.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; + +public class DurableConfigurationStoreCreator +{ + private Map<String, DurableConfigurationStoreFactory> _factories = new HashMap<String, DurableConfigurationStoreFactory>(); + + public DurableConfigurationStoreCreator() + { + QpidServiceLoader<DurableConfigurationStoreFactory> qpidServiceLoader = new QpidServiceLoader<DurableConfigurationStoreFactory>(); + Iterable<DurableConfigurationStoreFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(DurableConfigurationStoreFactory.class); + for (DurableConfigurationStoreFactory durableConfigurationStoreFactory : factories) + { + String type = durableConfigurationStoreFactory.getType(); + DurableConfigurationStoreFactory factory = _factories.put(type.toLowerCase(), durableConfigurationStoreFactory); + if (factory != null) + { + throw new IllegalStateException("DurableConfigurationStoreFactory with type name '" + type + + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '" + + durableConfigurationStoreFactory.getClass().getName() + "'"); + } + } + } + + public boolean isValidType(String storeType) + { + return _factories.containsKey(storeType.toLowerCase()); + } + + + public DurableConfigurationStore createMessageStore(String storeType) + { + DurableConfigurationStoreFactory factory = _factories.get(storeType.toLowerCase()); + if (factory == null) + { + throw new IllegalConfigurationException("Unknown store type: " + storeType + + ". Supported types: " + _factories.keySet()); + } + return factory.createDurableConfigurationStore(); + } + + public Collection<DurableConfigurationStoreFactory> getFactories() + { + return Collections.unmodifiableCollection(_factories.values()); + } + + public Collection<String> getStoreTypes() + { + return Collections.unmodifiableCollection(_factories.keySet()); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java new file mode 100644 index 0000000000..8eed1fa5a4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -0,0 +1,513 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.VirtualHost; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; + +public class JsonFileConfigStore implements DurableConfigurationStore +{ + private static final Model MODEL = Model.getInstance(); + + private static final Map<String,Class<? extends ConfiguredObject>> CLASS_NAME_MAPPING = generateClassNameMap(VirtualHost.class); + public static final String TYPE = "JSON"; + + private final Map<UUID, ConfiguredObjectRecord> _objectsById = new HashMap<UUID, ConfiguredObjectRecord>(); + private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>(); + private final ObjectMapper _objectMapper = new ObjectMapper(); + + private String _directoryName; + private String _name; + private FileLock _fileLock; + private String _configFileName; + private String _backupFileName; + private int _configVersion; + + public JsonFileConfigStore() + { + _objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT); + } + + @Override + public void configureConfigStore(final VirtualHost virtualHost, final ConfigurationRecoveryHandler recoveryHandler) + throws Exception + { + _name = virtualHost.getName(); + + Object storePathAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(!(storePathAttr instanceof String)) + { + throw new AMQStoreException("Cannot determine path for configuration storage"); + } + _directoryName = (String) storePathAttr; + _configFileName = _name + ".json"; + _backupFileName = _name + ".bak"; + checkDirectoryIsWritable(_directoryName); + getFileLock(); + + if(!fileExists(_configFileName)) + { + if(!fileExists(_backupFileName)) + { + File newFile = new File(_directoryName, _configFileName); + _objectMapper.writeValue(newFile, Collections.emptyMap()); + } + else + { + renameFile(_backupFileName, _configFileName); + } + } + + + load(); + recoveryHandler.beginConfigurationRecovery(this,_configVersion); + List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values()); + for(ConfiguredObjectRecord record : records) + { + recoveryHandler.configuredObject(record.getId(), record.getType(), record.getAttributes()); + } + int oldConfigVersion = _configVersion; + _configVersion = recoveryHandler.completeConfigurationRecovery(); + if(oldConfigVersion != _configVersion) + { + save(); + } + } + + private void renameFile(String fromFileName, String toFileName) throws AMQStoreException + { + File toFile = new File(_directoryName, toFileName); + if(toFile.exists()) + { + if(!toFile.delete()) + { + throw new AMQStoreException("Cannot delete file " + toFile.getAbsolutePath()); + } + } + File fromFile = new File(_directoryName, fromFileName); + + if(!fromFile.renameTo(toFile)) + { + throw new AMQStoreException("Cannot rename file " + fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath()); + } + } + + private boolean fileExists(String fileName) + { + File file = new File(_directoryName, fileName); + return file.exists(); + } + + private void getFileLock() throws IOException, AMQStoreException + { + File lockFile = new File(_directoryName, _name + ".lck"); + lockFile.createNewFile(); + + FileOutputStream out = new FileOutputStream(lockFile); + FileChannel channel = out.getChannel(); + try + { + _fileLock = channel.tryLock(); + } + catch(OverlappingFileLockException e) + { + _fileLock = null; + } + if(_fileLock == null) + { + throw new AMQStoreException("Cannot get lock on file " + lockFile.getAbsolutePath() + " is another instance running?"); + } + lockFile.deleteOnExit(); + } + + private void checkDirectoryIsWritable(String directoryName) throws AMQStoreException + { + File dir = new File(directoryName); + if(dir.exists()) + { + if(dir.isDirectory()) + { + if(!dir.canWrite()) + { + throw new AMQStoreException("Configuration path " + directoryName + " exists, but is not writable"); + } + + } + else + { + throw new AMQStoreException("Configuration path " + directoryName + " exists, but is not a directory"); + } + } + else if(!dir.mkdirs()) + { + throw new AMQStoreException("Cannot create directory " + directoryName); + } + } + + private void load() throws IOException + { + Map data = _objectMapper.readValue(new File(_directoryName,_configFileName),Map.class); + Collection<Class<? extends ConfiguredObject>> childClasses = + MODEL.getChildTypes(VirtualHost.class); + String modelVersion = (String) data.remove("modelVersion"); + Object configVersion; + if((configVersion = data.remove("configVersion")) instanceof Integer) + { + _configVersion = (Integer) configVersion; + } + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + final String type = childClass.getSimpleName(); + String attrName = type.toLowerCase() + "s"; + Object children = data.remove(attrName); + if(children != null) + { + if(children instanceof Collection) + { + for(Object child : (Collection)children) + { + if(child instanceof Map) + { + loadChild(childClass, (Map)child, VirtualHost.class, null); + } + } + } + } + } + } + + private void loadChild(final Class<? extends ConfiguredObject> clazz, + final Map<String,Object> data, + final Class<? extends ConfiguredObject> parentClass, + final UUID parentId) + { + Collection<Class<? extends ConfiguredObject>> childClasses = + MODEL.getChildTypes(clazz); + String idStr = (String) data.remove("id"); + final UUID id = UUID.fromString(idStr); + final String type = clazz.getSimpleName(); + + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + final String childType = childClass.getSimpleName(); + String attrName = childType.toLowerCase() + "s"; + Object children = data.remove(attrName); + if(children != null) + { + if(children instanceof Collection) + { + for(Object child : (Collection)children) + { + if(child instanceof Map) + { + loadChild(childClass, (Map)child, clazz, id); + } + } + } + } + + } + if(parentId != null) + { + data.put(parentClass.getSimpleName().toLowerCase(),parentId); + for(Class<? extends ConfiguredObject> otherParent : MODEL.getParentTypes(clazz)) + { + if(otherParent != parentClass) + { + final String otherParentAttr = otherParent.getSimpleName().toLowerCase(); + Object otherParentId = data.get(otherParentAttr); + if(otherParentId instanceof String) + { + try + { + data.put(otherParentAttr, UUID.fromString((String) otherParentId)); + } + catch(IllegalArgumentException e) + { + // + } + } + } + + } + } + + _objectsById.put(id, new ConfiguredObjectRecord(id, type, data)); + List<UUID> idsForType = _idsByType.get(type); + if(idsForType == null) + { + idsForType = new ArrayList<UUID>(); + _idsByType.put(type, idsForType); + } + idsForType.add(id); + + } + + @Override + public synchronized void create(final UUID id, final String type, final Map<String, Object> attributes) throws AMQStoreException + { + if(_objectsById.containsKey(id)) + { + throw new AMQStoreException("Object with id " + id + " already exists"); + } + else if(!CLASS_NAME_MAPPING.containsKey(type)) + { + throw new AMQStoreException("Cannot create object of unknown type " + type); + } + else + { + ConfiguredObjectRecord record = new ConfiguredObjectRecord(id, type, attributes); + _objectsById.put(id, record); + List<UUID> idsForType = _idsByType.get(type); + if(idsForType == null) + { + idsForType = new ArrayList<UUID>(); + _idsByType.put(type, idsForType); + } + idsForType.add(id); + save(); + } + } + + private void save() throws AMQStoreException + { + Collection<Class<? extends ConfiguredObject>> childClasses = + MODEL.getChildTypes(VirtualHost.class); + + Map<String, Object> virtualHostMap = new LinkedHashMap<String, Object>(); + virtualHostMap.put("modelVersion", Model.MODEL_VERSION); + virtualHostMap.put("configVersion", _configVersion); + + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + final String type = childClass.getSimpleName(); + String attrName = type.toLowerCase() + "s"; + List<UUID> childIds = _idsByType.get(type); + if(childIds != null && !childIds.isEmpty()) + { + List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>(); + for(UUID id : childIds) + { + entities.add(build(childClass,id)); + } + virtualHostMap.put(attrName, entities); + } + } + + try + { + + File tmpFile = File.createTempFile("cfg","tmp", new File(_directoryName)); + tmpFile.deleteOnExit(); + _objectMapper.writeValue(tmpFile,virtualHostMap); + renameFile(_configFileName,_backupFileName); + renameFile(tmpFile.getName(),_configFileName); + tmpFile.delete(); + File backupFile = new File(_directoryName, _backupFileName); + backupFile.delete(); + + } + catch (IOException e) + { + throw new AMQStoreException("Cannot save to store", e); + } + } + + private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id) + { + ConfiguredObjectRecord record = _objectsById.get(id); + Map<String,Object> map = new LinkedHashMap<String, Object>(); + map.put("id", id); + map.putAll(record.getAttributes()); + map.remove(MODEL.getParentTypes(type).iterator().next().getSimpleName().toLowerCase()); + + Collection<Class<? extends ConfiguredObject>> childClasses = + new ArrayList<Class<? extends ConfiguredObject>>(MODEL.getChildTypes(type)); + + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + // only add if this is the "first" parent + if(MODEL.getParentTypes(childClass).iterator().next() == type) + { + String attrName = childClass.getSimpleName().toLowerCase() + "s"; + List<UUID> childIds = _idsByType.get(childClass.getSimpleName()); + if(childIds != null) + { + List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>(); + for(UUID childId : childIds) + { + ConfiguredObjectRecord childRecord = _objectsById.get(childId); + final String parentArg = type.getSimpleName().toLowerCase(); + if(id.toString().equals(String.valueOf(childRecord.getAttributes().get(parentArg)))) + { + entities.add(build(childClass,childId)); + } + } + if(!entities.isEmpty()) + { + map.put(attrName,entities); + } + } + } + } + + return map; + } + + @Override + public void remove(final UUID id, final String type) throws AMQStoreException + { + removeConfiguredObjects(id); + } + + @Override + public synchronized UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException + { + List<UUID> removedIds = new ArrayList<UUID>(); + for(UUID id : objects) + { + ConfiguredObjectRecord record = _objectsById.remove(id); + if(record != null) + { + removedIds.add(id); + _idsByType.get(record.getType()).remove(id); + } + } + save(); + return removedIds.toArray(new UUID[removedIds.size()]); + } + + @Override + public void update(final UUID id, final String type, final Map<String, Object> attributes) throws AMQStoreException + { + update(false, new ConfiguredObjectRecord(id, type, attributes)); + } + + @Override + public void update(final ConfiguredObjectRecord... records) throws AMQStoreException + { + update(false, records); + } + + @Override + public void update(final boolean createIfNecessary, final ConfiguredObjectRecord... records) + throws AMQStoreException + { + for(ConfiguredObjectRecord record : records) + { + final UUID id = record.getId(); + final String type = record.getType(); + + if(_objectsById.containsKey(id)) + { + final ConfiguredObjectRecord existingRecord = _objectsById.get(id); + if(!type.equals(existingRecord.getType())) + { + throw new AMQStoreException("Cannot change the type of record " + id + " from type " + + existingRecord.getType() + " to type " + type); + } + } + else if(!createIfNecessary) + { + throw new AMQStoreException("Cannot update record with id " + id + + " of type " + type + " as it does not exist"); + } + else if(!CLASS_NAME_MAPPING.containsKey(type)) + { + throw new AMQStoreException("Cannot update record of unknown type " + type); + } + } + for(ConfiguredObjectRecord record : records) + { + final UUID id = record.getId(); + final String type = record.getType(); + if(_objectsById.put(id, record) == null) + { + List<UUID> idsForType = _idsByType.get(type); + if(idsForType == null) + { + idsForType = new ArrayList<UUID>(); + _idsByType.put(type, idsForType); + } + idsForType.add(id); + } + } + + save(); + } + + public void close() throws Exception + { + try + { + releaseFileLock(); + } + finally + { + _fileLock = null; + _idsByType.clear(); + _objectsById.clear(); + } + + } + + private void releaseFileLock() throws IOException + { + _fileLock.release(); + _fileLock.channel().close(); + } + + + private static Map<String,Class<? extends ConfiguredObject>> generateClassNameMap(final Class<? extends ConfiguredObject> clazz) + { + Map<String,Class<? extends ConfiguredObject>>map = new HashMap<String, Class<? extends ConfiguredObject>>(); + map.put(clazz.getSimpleName().toString(), clazz); + Collection<Class<? extends ConfiguredObject>> childClasses = MODEL.getChildTypes(clazz); + if(childClasses != null) + { + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + map.putAll(generateClassNameMap(childClass)); + } + } + return map; + } + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java new file mode 100644 index 0000000000..374a35d10d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Map; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; + +public class JsonFileConfigStoreFactory implements DurableConfigurationStoreFactory +{ + @Override + public String getType() + { + return JsonFileConfigStore.TYPE; + } + + @Override + public DurableConfigurationStore createDurableConfigurationStore() + { + return new JsonFileConfigStore(); + } + + @Override + public void validateAttributes(Map<String, Object> attributes) + { + Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH + +"' is required and must be of type String."); + + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index c605e1b599..996d71d51d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store; +import org.apache.qpid.server.model.VirtualHost; + /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * @@ -31,13 +33,14 @@ public interface MessageStore * whatever parameters it wants. * * - * @param name The name to be used by this store + * + * + * @param virtualHost * @param messageRecoveryHandler Handler to be called as the store recovers on start up * @param tlogRecoveryHandler * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, + void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception; void activate() throws Exception; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 9eb0d85914..57dbfabaa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -27,9 +27,7 @@ import org.apache.qpid.server.model.VirtualHost; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { } @@ -66,8 +64,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index d87431a415..d782594a0d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -660,6 +660,18 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _logger.error("Failed to close message store", e); } } + if (getDurableConfigurationStore() != null) + { + //Remove MessageStore Interface should not throw Exception + try + { + getDurableConfigurationStore().close(); + } + catch (Exception e) + { + _logger.error("Failed to close message store", e); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 143bdce85b..b7e51d88d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfigurationStoreCreator; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.OperationalLoggingListener; @@ -78,7 +79,14 @@ public class StandardVirtualHost extends AbstractVirtualHost private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception { DurableConfigurationStore configurationStore; - if(getMessageStore() instanceof DurableConfigurationStore) + final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_TYPE); + String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr); + + if(storeType != null) + { + configurationStore = new DurableConfigurationStoreCreator().createMessageStore(storeType); + } + else if(getMessageStore() instanceof DurableConfigurationStore) { configurationStore = (DurableConfigurationStore) getMessageStore(); } @@ -100,10 +108,10 @@ public class StandardVirtualHost extends AbstractVirtualHost DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), new DefaultUpgraderProvider(this, getExchangeRegistry())); - _durableConfigurationStore.configureConfigStore(getName(), configRecoverer, virtualHost); + _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); - _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler); + _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler); initialiseModel(hostConfig); @@ -112,25 +120,6 @@ public class StandardVirtualHost extends AbstractVirtualHost attainActivation(); } - - protected void closeStorage() - { - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - getMessageStore().close(); - } - catch (Exception e) - { - getLogger().error("Failed to close message store", e); - } - } - } - - @Override public MessageStore getMessageStore() { diff --git a/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory new file mode 100644 index 0000000000..d183d91f18 --- /dev/null +++ b/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.JsonFileConfigStoreFactory diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index e9ad4ba236..50a3582811 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -406,8 +406,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest _messageStore = createMessageStore(); _configStore = createConfigStore(); - _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost); - _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler); + _configStore.configureConfigStore(_virtualHost, _recoveryHandler); + _messageStore.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler); _messageStore.activate(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java new file mode 100644 index 0000000000..b6300e6f48 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -0,0 +1,299 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.InOrder; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JsonFileConfigStoreTest extends QpidTestCase +{ + private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class); + private VirtualHost _virtualHost; + private JsonFileConfigStore _store; + + @Override + public void setUp() throws Exception + { + super.setUp(); + removeStoreFile(); + _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getName()).thenReturn(getName()); + when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(TMP_FOLDER); + _store = new JsonFileConfigStore(); + } + + @Override + public void tearDown() throws Exception + { + removeStoreFile(); + } + + private void removeStoreFile() + { + File file = new File(TMP_FOLDER, getName() + ".json"); + if(file.exists()) + { + file.delete(); + } + } + + public void testNoStorePath() throws Exception + { + when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(null); + try + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + fail("Store should not successfully configure if there is no path set"); + } + catch (AMQStoreException e) + { + // pass + } + } + + + public void testInvalidStorePath() throws Exception + { + when(_virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)).thenReturn(System.getProperty("file.separator")); + try + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + fail("Store should not successfully configure if there is an invalid path set"); + } + catch (AMQStoreException e) + { + // pass + } + } + + public void testStartFromNoStore() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + InOrder inorder = inOrder(_recoveryHandler); + inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); + inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap()); + inorder.verify(_recoveryHandler).completeConfigurationRecovery(); + _store.close(); + } + + public void testUpdatedConfigVersionIsRetained() throws Exception + { + final int NEW_CONFIG_VERSION = 42; + when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION); + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.close(); + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + InOrder inorder = inOrder(_recoveryHandler); + + // first time the config version should be the initial version - 0 + inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); + + // second time the config version should be the updated version + inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION)); + + _store.close(); + } + + public void testCreateObject() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID queueId = new UUID(0, 1); + final String queueType = Queue.class.getSimpleName(); + final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); + + _store.create(queueId, queueType, queueAttr); + _store.close(); + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); + _store.close(); + } + + public void testCreateAndUpdateObject() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID queueId = new UUID(0, 1); + final String queueType = Queue.class.getSimpleName(); + Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); + + _store.create(queueId, queueType, queueAttr); + + + queueAttr = new HashMap<String,Object>(queueAttr); + queueAttr.put("owner", "theowner"); + _store.update(queueId, queueType, queueAttr); + + _store.close(); + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); + _store.close(); + } + + + public void testCreateAndRemoveObject() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID queueId = new UUID(0, 1); + final String queueType = Queue.class.getSimpleName(); + Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); + + _store.create(queueId, queueType, queueAttr); + + + _store.remove(queueId, queueType); + + _store.close(); + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); + _store.close(); + } + + public void testCreateUnknownObjectType() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + try + { + _store.create(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap()); + fail("Should not be able to create instance of type wibble"); + } + catch (AMQStoreException e) + { + // pass + } + } + + public void testTwoObjectsWithSameId() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID id = UUID.randomUUID(); + _store.create(id, "Queue", Collections.<String, Object>emptyMap()); + try + { + _store.create(id, "Exchange", Collections.<String, Object>emptyMap()); + fail("Should not be able to create two objects with same id"); + } + catch (AMQStoreException e) + { + // pass + } + } + + + public void testChangeTypeOfObject() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID id = UUID.randomUUID(); + _store.create(id, "Queue", Collections.<String, Object>emptyMap()); + _store.close(); + _store.configureConfigStore(_virtualHost, _recoveryHandler); + + try + { + _store.update(id, "Exchange", Collections.<String, Object>emptyMap()); + fail("Should not be able to update object to different type"); + } + catch (AMQStoreException e) + { + // pass + } + } + + public void testLockFileGuaranteesExclusiveAccess() throws Exception + { + _store.configureConfigStore(_virtualHost, _recoveryHandler); + + JsonFileConfigStore secondStore = new JsonFileConfigStore(); + + try + { + secondStore.configureConfigStore(_virtualHost, _recoveryHandler); + fail("Should not be able to open a second store with the same path"); + } + catch(AMQStoreException e) + { + // pass + } + _store.close(); + secondStore.configureConfigStore(_virtualHost, _recoveryHandler); + + + } + + public void testCreatedNestedObjects() throws Exception + { + + _store.configureConfigStore(_virtualHost, _recoveryHandler); + final UUID queueId = new UUID(0, 1); + final UUID queue2Id = new UUID(1, 1); + + final Map<String, Object> EMPTY_ATTR = Collections.emptyMap(); + final UUID exchangeId = new UUID(0, 2); + final Map<String, Object> bindingAttributes = new HashMap<String, Object>(); + bindingAttributes.put(Binding.EXCHANGE, exchangeId); + bindingAttributes.put(Binding.QUEUE, queueId); + final Map<String, Object> binding2Attributes = new HashMap<String, Object>(); + binding2Attributes.put(Binding.EXCHANGE, exchangeId); + binding2Attributes.put(Binding.QUEUE, queue2Id); + + final UUID bindingId = new UUID(0, 3); + final UUID binding2Id = new UUID(1, 3); + + _store.create(queueId, "Queue", EMPTY_ATTR); + _store.create(queue2Id, "Queue", EMPTY_ATTR); + _store.create(exchangeId, "Exchange", EMPTY_ATTR); + _store.update(true, + new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes), + new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes)); + _store.close(); + _store.configureConfigStore(_virtualHost, _recoveryHandler); + verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes)); + verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes)); + _store.close(); + + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index ea47be83ec..b23890b10c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -66,11 +66,13 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple VirtualHost vhost = mock(VirtualHost.class); when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath()); + when(vhost.getName()).thenReturn("test"); + applyStoreSpecificConfiguration(vhost); _store = createStore(); - ((DurableConfigurationStore)_store).configureConfigStore("test", null, vhost); - _store.configureMessageStore("test", null, null); + ((DurableConfigurationStore)_store).configureConfigStore(vhost, null); + _store.configureMessageStore(vhost, null, null); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 2d68e94fcd..7ebfd54df6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,16 +52,18 @@ public abstract class MessageStoreTestCase extends QpidTestCase _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); _virtualHost = mock(VirtualHost.class); + when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); setUpStoreConfiguration(_virtualHost); + when(_virtualHost.getName()).thenReturn(getTestName()); _store = createMessageStore(); - ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _virtualHost); + ((DurableConfigurationStore)_store).configureConfigStore(_virtualHost, _recoveryHandler); - _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler); + _store.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler); } protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index a1d3d1542e..e19aeb38e7 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -48,9 +48,7 @@ public class } @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); @@ -77,7 +75,7 @@ public class } @Override - public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISED); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index cec982c2c5..e08061deea 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -48,11 +48,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore // ***** MessageStore Interface. - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { - _logger.info("Starting SlowMessageStore on Virtualhost:" + name); + _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays"); @@ -84,7 +82,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _durableConfigurationStore = (DurableConfigurationStore)o; } } - _durableConfigurationStore.configureConfigStore(name, recoveryHandler, virtualHost); + _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler); } @@ -153,11 +151,10 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } - public void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); + _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); } public void close() throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index 1d1c474be0..ad328eaede 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -51,7 +51,7 @@ public class Asserts assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost); assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE, VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, - VirtualHost.CONFIG_PATH, VirtualHost.TYPE); + VirtualHost.CONFIG_PATH, VirtualHost.TYPE, VirtualHost.CONFIG_STORE_PATH, VirtualHost.CONFIG_STORE_TYPE); assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME)); assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID)); |