diff options
70 files changed, 3153 insertions, 725 deletions
diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml index 7c305c7c2f..7df048c691 100644 --- a/qpid/java/bdbstore/build.xml +++ b/qpid/java/bdbstore/build.xml @@ -17,7 +17,7 @@ - under the License. --> <project name="bdbstore" xmlns:ivy="antlib:org.apache.ivy.ant" default="build"> - <property name="module.depends" value="common broker" /> + <property name="module.depends" value="management/common common broker" /> <property name="module.test.depends" value="test client common/test broker/test management/common systests" /> <property name="module.genpom" value="true"/> diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index a9ae4eb16d..789d5714c8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -33,11 +33,12 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.TransactionConfig; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; @@ -128,8 +130,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected final StateManager _stateManager; - protected TransactionConfig _transactionConfig = new TransactionConfig(); - private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; @@ -146,6 +146,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + private Map<String, String> _envConfigMap; + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); @@ -161,7 +163,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; @@ -179,12 +181,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } - public void activate() throws Exception + public synchronized void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); recoverConfig(_configRecoveryHandler); recoverMessages(_messageRecoveryHandler); @@ -231,11 +233,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore _storeLocation = storeLocation; + _envConfigMap = getConfigMap(storeConfig, "envConfig"); + LOGGER.info("Configuring BDB message store"); setupStore(environmentPath, name); } + protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException + { + final List<Object> argumentNames = config.getList(prefix + ".name"); + final List<Object> argumentValues = config.getList(prefix + ".value"); + final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i).toString(); + final String argValue = argumentValues.get(i).toString(); + + attributes.put(argName, argValue); + } + + return Collections.unmodifiableMap(attributes); + } + @Override public String getStoreLocation() { @@ -251,9 +272,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ void startWithNoRecover() throws AMQStoreException { - _stateManager.attainState(State.CONFIGURING); - _stateManager.attainState(State.CONFIGURED); - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.INITIALISING); + _stateManager.attainState(State.INITIALISED); + _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); } @@ -268,51 +289,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _totalStoreSize = getSizeOnDisk(); } - protected Environment createEnvironment(File environmentPath) throws DatabaseException - { - LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); - - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); - try - { - return new Environment(environmentPath, envConfig); - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - //Allow the creation this time - envConfig.setAllowCreate(true); - if (_environment != null ) - { - _environment.cleanLog(); - _environment.close(); - } - return new Environment(environmentPath, envConfig); - } - else - { - throw de; - } - } - } + protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException; public Environment getEnvironment() { @@ -352,14 +329,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void close() throws Exception { - if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED)) - { - _stateManager.stateTransition(State.ACTIVE, State.CLOSING); - - closeInternal(); - - _stateManager.stateTransition(State.CLOSING, State.CLOSED); - } + _stateManager.attainState(State.CLOSING); + closeInternal(); + _stateManager.attainState(State.CLOSED); } protected void closeInternal() throws Exception @@ -1504,6 +1476,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore return true; } + @SuppressWarnings("unchecked") public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { if(metaData.isPersistent()) @@ -1914,4 +1887,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore { return _persistentSizeHighThreshold; } + + private void setEnvironmentConfigProperties(EnvironmentConfig envConfig) + { + for (Map.Entry<String, String> configItem : _envConfigMap.entrySet()) + { + LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + protected EnvironmentConfig createEnvironmentConfig() + { + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); + + setEnvironmentConfigProperties(envConfig); + + return envConfig; + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java new file mode 100644 index 0000000000..f887c8ce36 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.store.HAMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.OperationFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore +{ + private static final String MUTLI_SYNC = "MUTLI_SYNC"; + private static final String DEFAULT_REPLICATION_POLICY = + MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + + private String _groupName; + private String _nodeName; + private String _nodeHostPort; + private String _helperHostPort; + private String _replicationPolicy; + private Durability _replicationDurability; + + private String _name; + + private BDBHAMessageStoreManagerMBean _managedObject; + + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + private CommitThreadWrapper _commitThreadWrapper; + private boolean _localMultiSyncCommits; + private boolean _autoDesignatedPrimary; + private Map<String, String> _repConfigMap; + + @Override + public void configure(String name, Configuration storeConfig) throws Exception + { + //Mandatory configuration + _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig); + _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig); + _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig); + _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig); + _name = name; + + //Optional configuration + _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim(); + _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE); + + if(_replicationPolicy.startsWith(MUTLI_SYNC)) + { + _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name())); + _localMultiSyncCommits = true; + } + else + { + _replicationDurability = Durability.parse(_replicationPolicy); + _localMultiSyncCommits = false; + } + + _repConfigMap = getConfigMap(storeConfig, "repConfig"); + + _managedObject = new BDBHAMessageStoreManagerMBean(this); + _managedObject.register(); + + super.configure(name, storeConfig); + } + + @Override + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException + { + super.setupStore(storePath, name); + + if(_localMultiSyncCommits) + { + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); + } + } + + private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException + { + if (!config.containsKey(key)) + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + + key.replace('.', '/')); + } + return config.getString(key); + } + + @Override + protected Environment createEnvironment(File environmentPath) throws DatabaseException + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Environment path " + environmentPath.getAbsolutePath()); + LOGGER.info("Group name " + _groupName); + LOGGER.info("Node name " + _nodeName); + LOGGER.info("Node host port " + _nodeHostPort); + LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Replication policy " + _replicationPolicy); + } + + final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + + replicationConfig.setHelperHosts(_helperHostPort); + setReplicationConfigProperties(replicationConfig); + + final EnvironmentConfig envConfig = createEnvironmentConfig(); + envConfig.setDurability(_replicationDurability); + + ReplicatedEnvironment replicatedEnvironment = null; + try + { + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + + return replicatedEnvironment; + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + + replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + } + + @Override + public synchronized void passivate() + { + if (_stateManager.isNotInState(State.INITIALISED)) + { + LOGGER.debug("Store becoming passive"); + _stateManager.attainState(State.INITIALISED); + } + } + + @Override + protected void closeInternal() throws Exception + { + try + { + if(_localMultiSyncCommits) + { + _commitThreadWrapper.stopCommitThread(); + } + super.closeInternal(); + } + finally + { + if (_managedObject != null) + { + _managedObject.unregister(); + } + } + } + + @Override + protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + tx.commit(); + + + if(_localMultiSyncCommits) + { + return _commitThreadWrapper.commit(tx, syncCommit); + } + else + { + return StoreFuture.IMMEDIATE_FUTURE; + } + } + + public String getName() + { + return _name; + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeName() + { + return _nodeName; + } + + public String getNodeHostPort() + { + return _nodeHostPort; + } + + public String getHelperHostPort() + { + return _helperHostPort; + } + + public String getReplicationPolicy() + { + return _replicationPolicy; + } + + public String getNodeState() + { + ReplicatedEnvironment.State state = getReplicatedEnvironment().getState(); + return state.toString(); + } + + public Boolean isDesignatedPrimary() + { + return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary(); + } + + public List<Map<String, String>> getGroupMembers() + { + List<Map<String, String>> members = new ArrayList<Map<String,String>>(); + + for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes()) + { + Map<String, String> nodeMap = new HashMap<String, String>(); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(String nodeName) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().removeMember(nodeName); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); + } + } + + public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException + { + try + { + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + synchronized(replicatedEnvironment) + { + final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + replicatedEnvironment.setRepMutableConfig(newConfig); + } + + LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e); + } + } + + ReplicatedEnvironment getReplicatedEnvironment() + { + return (ReplicatedEnvironment)getEnvironment(); + } + + public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e); + } + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_groupName, helpers); + } + + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener + { + private final Executor _executor = Executors.newSingleThreadExecutor(); + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + LOGGER.info("Received BDB event indicating transition to state " + state); + + switch (state) + { + case MASTER: + activateStoreAsync(); + break; + case REPLICA: + passivateStore(); + break; + case DETACHED: + LOGGER.error("BDB replicated node in detached state, therefore passivating."); + passivateStore(); + break; + case UNKNOWN: + LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); + break; + default: + LOGGER.error("Unexpected state change: " + state); + throw new IllegalStateException("Unexpected state change: " + state); + } + } + + /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */ + private void passivateStore() + { + try + { + passivate(); + } + catch(Exception e) + { + LOGGER.error("Unable to passivate", e); + throw new RuntimeException("Unable to passivate", e); + } + } + + /** + * Calls {@link MessageStore#activate()}. + * + * <p/> + * + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * activate may execute transactions, which can't complete until + * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned. + */ + private void activateStoreAsync() + { + final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); + + _executor.execute(new Runnable() + { + private static final String _THREAD_NAME = "BDBHANodeActivationThread"; + + @Override + public void run() + { + Thread.currentThread().setName(_THREAD_NAME); + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return _THREAD_NAME; + } + }); + + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + } + + } + }); + } + } + + @Override + public synchronized void activate() throws Exception + { + // Before proceeding, perform a log flush with an fsync + getEnvironment().flushLog(true); + + super.activate(); + + //For replica groups with 2 electable nodes, set the new master to be the + //designated primary, such that it can continue working if the replica goes + //down and leaves it without a 'majority of 2'. + if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) + { + setDesignatedPrimary(true); + } + } + + private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + { + for (Map.Entry<String, String> configItem : _repConfigMap.entrySet()) + { + LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java new file mode 100644 index 0000000000..cd4e990607 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.NotCompliantMBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.management.AMQManagedObject; + +public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); + + private static final TabularType GROUP_MEMBERS_TABLE; + private static final CompositeType GROUP_MEMBER_ROW; + private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES; + + static + { + try + { + GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING}; + final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; + GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", + itemNames, + itemDescriptions, + GROUP_MEMBER_ATTRIBUTE_TYPES ); + GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", + GROUP_MEMBER_ROW, + new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); + } + catch (final OpenDataException ode) + { + throw new ExceptionInInitializerError(ode); + } + } + + private final BDBHAMessageStore _store; + + protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException + { + super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE); + _store = store; + } + + @Override + public String getObjectInstanceName() + { + return _store.getName(); + } + + @Override + public String getGroupName() throws IOException + { + return _store.getGroupName(); + } + + @Override + public String getNodeName() throws IOException + { + return _store.getNodeName(); + } + + @Override + public String getNodeHostPort() throws IOException + { + return _store.getNodeHostPort(); + } + + @Override + public String getHelperHostPort() throws IOException + { + return _store.getHelperHostPort(); + } + + @Override + public String getReplicationPolicy() throws IOException + { + return _store.getReplicationPolicy(); + } + + @Override + public String getNodeState() throws IOException + { + return _store.getNodeState(); + } + + @Override + public boolean getDesignatedPrimary() throws IOException + { + return _store.isDesignatedPrimary(); + } + + @Override + public TabularData getAllNodesInGroup() throws IOException, JMException + { + final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); + final List<Map<String, String>> members = _store.getGroupMembers(); + + for (Map<String, String> map : members) + { + CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); + data.put(memberData); + } + return data; + } + + @Override + public void removeNodeFromGroup(String nodeName) throws JMException + { + try + { + _store.removeNodeFromGroup(nodeName); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to remove node " + nodeName + " from group", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void setDesignatedPrimary(boolean primary) throws JMException + { + try + { + _store.setDesignatedPrimary(primary); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException + { + try + { + _store.updateAddress(nodeName, newHostName, newPort); + } + catch(AMQStoreException e) + { + LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); + throw new JMException(e.getMessage()); + } + } + + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index b414441b92..7c29e281d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,23 +21,12 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; -import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; @@ -53,52 +42,22 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - - private final CommitThread _commitThread = new CommitThread("Commit-Thread"); - - private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>(); + private CommitThreadWrapper _commitThreadWrapper; @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { super.setupStore(storePath, name); - startCommitThread(); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException { LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - - Properties props = System.getProperties(); - - for(String propName : props.stringPropertyNames()) - { - if(propName.startsWith("qpid.bdb.envconfig.je.")) - { - envConfig.setConfigParam(propName.substring(19), props.getProperty(propName)); - } - } - - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); + EnvironmentConfig envConfig = createEnvironmentConfig(); - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); try { return new Environment(environmentPath, envConfig); @@ -118,12 +77,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore } } - - @Override protected void closeInternal() throws Exception { - stopCommitThread(); + _commitThreadWrapper.stopCommitThread(); super.closeInternal(); } @@ -133,204 +90,6 @@ public class BDBMessageStore extends AbstractBDBMessageStore { tx.commitNoSync(); - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - - return commitFuture; - } - - private void startCommitThread() - { - _commitThread.start(); - } - - private void stopCommitThread() throws InterruptedException - { - _commitThread.close(); - _commitThread.join(); - } - - private static final class BDBCommitFuture implements StoreFuture - { - private final CommitThread _commitThread; - private final com.sleepycat.je.Transaction _tx; - private DatabaseException _databaseException; - private boolean _complete; - private boolean _syncCommit; - - public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) - { - _commitThread = commitThread; - _tx = tx; - _syncCommit = syncCommit; - } - - public synchronized void complete() - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); - } - _complete = true; - - notifyAll(); - } - - public synchronized void abort(DatabaseException databaseException) - { - _complete = true; - _databaseException = databaseException; - - notifyAll(); - } - - public void commit() throws DatabaseException - { - _commitThread.addJob(this, _syncCommit); - - if(!_syncCommit) - { - LOGGER.debug("CommitAsync was requested, returning immediately."); - return; - } - - waitForCompletion(); - - if (_databaseException != null) - { - throw _databaseException; - } - - } - - public synchronized boolean isComplete() - { - return _complete; - } - - public synchronized void waitForCompletion() - { - while (!isComplete()) - { - _commitThread.explicitNotify(); - try - { - wait(250); - } - catch (InterruptedException e) - { - //TODO Should we ignore, or throw a 'StoreException'? - throw new RuntimeException(e); - } - } - } - } - - /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations - * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before - * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> - */ - private class CommitThread extends Thread - { - private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); - private final CheckpointConfig _config = new CheckpointConfig(); - private final Object _lock = new Object(); - - public CommitThread(String name) - { - super(name); - _config.setForce(true); - - } - - public void explicitNotify() - { - synchronized (_lock) - { - _lock.notify(); - } - } - - public void run() - { - while (!_stopped.get()) - { - synchronized (_lock) - { - while (!_stopped.get() && !hasJobs()) - { - try - { - // RHM-7 Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) - { - } - } - } - processJobs(); - } - } - - private void processJobs() - { - int size = _jobQueue.size(); - - try - { - getEnvironment().flushLog(true); - - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.complete(); - } - - } - catch (DatabaseException e) - { - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.abort(e); - } - } - - } - - private boolean hasJobs() - { - return !_jobQueue.isEmpty(); - } - - public void addJob(BDBCommitFuture commit, final boolean sync) - { - - _jobQueue.add(commit); - if(sync) - { - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - - public void close() - { - synchronized (_lock) - { - _stopped.set(true); - _lock.notifyAll(); - } - } + return _commitThreadWrapper.commit(tx, syncCommit); } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java new file mode 100644 index 0000000000..5b0abbec93 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -0,0 +1,227 @@ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreFuture; + +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; + +public class CommitThreadWrapper +{ + private final CommitThread _commitThread; + + public CommitThreadWrapper(String name, Environment env) + { + _commitThread = new CommitThread(name, env); + } + + public void startCommitThread() + { + _commitThread.start(); + } + + public void stopCommitThread() throws InterruptedException + { + _commitThread.close(); + _commitThread.join(); + } + + public StoreFuture commit(Transaction tx, boolean syncCommit) + { + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + return commitFuture; + } + + private static final class BDBCommitFuture implements StoreFuture + { + private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + + private final CommitThread _commitThread; + private final Transaction _tx; + private DatabaseException _databaseException; + private boolean _complete; + private boolean _syncCommit; + + public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + { + _commitThread = commitThread; + _tx = tx; + _syncCommit = syncCommit; + } + + public synchronized void complete() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + } + _complete = true; + + notifyAll(); + } + + public synchronized void abort(DatabaseException databaseException) + { + _complete = true; + _databaseException = databaseException; + + notifyAll(); + } + + public void commit() throws DatabaseException + { + _commitThread.addJob(this, _syncCommit); + + if(!_syncCommit) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + return; + } + + waitForCompletion(); + + if (_databaseException != null) + { + throw _databaseException; + } + + } + + public synchronized boolean isComplete() + { + return _complete; + } + + public synchronized void waitForCompletion() + { + while (!isComplete()) + { + _commitThread.explicitNotify(); + try + { + wait(250); + } + catch (InterruptedException e) + { + //TODO Should we ignore, or throw a 'StoreException'? + throw new RuntimeException(e); + } + } + } + } + + /** + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before + * continuing, but it is the responsibility of this thread to tell the commit operations when they have been + * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> + */ + private static class CommitThread extends Thread + { + private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final CheckpointConfig _config = new CheckpointConfig(); + private final Object _lock = new Object(); + private Environment _environment; + + public CommitThread(String name, Environment env) + { + super(name); + _config.setForce(true); + _environment = env; + } + + public void explicitNotify() + { + synchronized (_lock) + { + _lock.notify(); + } + } + + public void run() + { + while (!_stopped.get()) + { + synchronized (_lock) + { + while (!_stopped.get() && !hasJobs()) + { + try + { + // RHM-7 Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) + { + } + } + } + processJobs(); + } + } + + private void processJobs() + { + int size = _jobQueue.size(); + + try + { + _environment.flushLog(true); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.complete(); + } + + } + catch (DatabaseException e) + { + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.abort(e); + } + } + + } + + private boolean hasJobs() + { + return !_jobQueue.isEmpty(); + } + + public void addJob(BDBCommitFuture commit, final boolean sync) + { + + _jobQueue.add(commit); + if(sync) + { + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + + public void close() + { + synchronized (_lock) + { + _stopped.set(true); + _lock.notifyAll(); + } + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java new file mode 100644 index 0000000000..bfc7bbf128 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; + +import javax.management.JMException; +import javax.management.openmbean.TabularData; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; + +public interface ManagedBDBHAMessageStore +{ + public static final String TYPE = "BDBHAMessageStore"; + + public static final String ATTR_GROUP_NAME = "GroupName"; + public static final String ATTR_NODE_NAME = "NodeName"; + public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; + public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; + public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy"; + public static final String ATTR_NODE_STATE = "NodeState"; + public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + + @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") + String getGroupName() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") + String getNodeName() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") + String getNodeHostPort() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") + String getNodeState() throws IOException; + + @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") + String getHelperHostPort() throws IOException; + + @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy") + String getReplicationPolicy() throws IOException; + + @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") + boolean getDesignatedPrimary() throws IOException; + + @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + TabularData getAllNodesInGroup() throws IOException, JMException; + + @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") + void removeNodeFromGroup(String nodeName) throws JMException; + + @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") + void setDesignatedPrimary(boolean primary) throws JMException; + + @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") + void updateAddress(String nodeName, String newHostName, int newPort) throws JMException; +} + diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index e71e39cbb8..f1ab012efc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -24,7 +24,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -63,7 +63,7 @@ public class Upgrader if(versionDb.count() == 0L) { - int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion(); + int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion(); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(sourceVersion, key); DatabaseEntry value = new DatabaseEntry(); @@ -87,7 +87,7 @@ public class Upgrader int getSourceVersion(Database versionDb) { - int version = BDBMessageStore.VERSION + 1; + int version = AbstractBDBMessageStore.VERSION + 1; OperationStatus result; do @@ -106,7 +106,7 @@ public class Upgrader void performUpgradeFromVersion(int sourceVersion, Database versionDb) throws AMQStoreException { - while(sourceVersion != BDBMessageStore.VERSION) + while(sourceVersion != AbstractBDBMessageStore.VERSION) { upgrade(sourceVersion, ++sourceVersion); DatabaseEntry key = new DatabaseEntry(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java new file mode 100644 index 0000000000..00f99b7097 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; + +public class BDBHAMessageStoreManagerMBeanTest extends TestCase +{ + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "host:1234"; + private static final String TEST_HELPER_HOST_PORT = "host:5678"; + private static final String TEST_REPLICATION_POLICY = "sync,sync,all"; + private static final String TEST_NODE_STATE = "MASTER"; + private static final String TEST_STORE_NAME = "testStoreName"; + private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; + + private BDBHAMessageStore _store; + private BDBHAMessageStoreManagerMBean _mBean; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + _store = mock(BDBHAMessageStore.class); + _mBean = new BDBHAMessageStoreManagerMBean(_store); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + public void testObjectName() throws Exception + { + when(_store.getName()).thenReturn(TEST_STORE_NAME); + + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; + assertEquals(expectedObjectName, _mBean.getObjectName().toString()); + } + + public void testGroupName() throws Exception + { + when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); + + assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); + } + + public void testNodeName() throws Exception + { + when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); + + assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); + } + + public void testNodeHostPort() throws Exception + { + when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); + + assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); + } + + public void testHelperHostPort() throws Exception + { + when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + + assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); + } + + public void testReplicationPolicy() throws Exception + { + when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY); + + assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY)); + } + + public void testNodeState() throws Exception + { + when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); + + assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); + } + + public void testDesignatedPrimaryFlag() throws Exception + { + when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + + assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); + } + + public void testGroupMembersForGroupWithOneNode() throws Exception + { + List<Map<String, String>> members = Collections.singletonList(createTestNodeResult()); + when(_store.getGroupMembers()).thenReturn(members); + + final TabularData resultsTable = _mBean.getAllNodesInGroup(); + + assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); + + final int numberOfDataRows = resultsTable.size(); + assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); + final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); + assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + + public void testRemoveNodeFromReplicationGroup() throws Exception + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + + verify(_store).removeNodeFromGroup(TEST_NODE_NAME); + } + + public void testRemoveNodeFromReplicationGroupWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); + + try + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testSetAsDesignatedPrimary() throws Exception + { + _mBean.setDesignatedPrimary(true); + + verify(_store).setDesignatedPrimary(true); + } + + public void testSetAsDesignatedPrimaryWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true); + + try + { + _mBean.setDesignatedPrimary(true); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testUpdateAddress() throws Exception + { + String newHostName = "newHostName"; + int newPort = 1967; + + _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); + + verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); + } + + private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) + { + CompositeType headingsRow = resultsTable.getTabularType().getRowType(); + for (final String headingName : headingNames) + { + assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName)); + } + } + + private Map<String, String> createTestNodeResult() + { + Map<String, String> items = new HashMap<String, String>(); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + return items; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java new file mode 100644 index 0000000000..6f851bd94e --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -0,0 +1,144 @@ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetAddress; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + +public class BDBHAMessageStoreTest extends QpidTestCase +{ + private static final String TEST_LOG_FILE_MAX = "1000000"; + private static final String TEST_ELECTION_RETRIES = "1000"; + private static final String TEST_NUMBER_OF_THREADS = "10"; + private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; + private String _groupName; + private String _workDir; + private int _masterPort; + private String _host; + private XMLConfiguration _configXml; + + public void setUp() throws Exception + { + super.setUp(); + + _workDir = TMP_FOLDER + File.separator + getName(); + _host = InetAddress.getByName("localhost").getHostAddress(); + _groupName = "group" + getName(); + _masterPort = -1; + + FileUtils.delete(new File(_workDir), true); + _configXml = new XMLConfiguration(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_workDir), true); + super.tearDown(); + } + + public void testSetSystemConfiguration() throws Exception + { + // create virtual host configuration, registry and host instance + addVirtualHostConfiguration(); + TestApplicationRegistry registry = initialize(); + try + { + VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort); + BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore(); + + // test whether JVM system settings were applied + Environment env = store.getEnvironment(); + assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); + assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); + assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, + repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); + assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, + repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void addVirtualHostConfiguration() throws Exception + { + int port = findFreePort(); + if (_masterPort == -1) + { + _masterPort = port; + } + String nodeName = getNodeNameForNodeAt(port); + + String vhostName = "test" + port; + String vhostPrefix = "virtualhosts.virtualhost." + vhostName; + + _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); + _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); + _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator + + port); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort", + getNodeHostPortForNodeAt(port)); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort", + getHelperHostPort()); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT); + } + + private String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + getName() + bdbPort; + } + + private String getNodeHostPortForNodeAt(final int bdbPort) + { + return _host + ":" + bdbPort; + } + + private String getHelperHostPort() + { + if (_masterPort == -1) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + return _host + ":" + _masterPort; + } + + private TestApplicationRegistry initialize() throws Exception + { + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + ServerConfiguration configuration = new ServerConfiguration(_configXml); + TestApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort); + return registry; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a318187f13..591bc27d1e 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); // Create content ByteBuffers. // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. @@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto * Use this method instead of reloading the virtual host like other tests in order * to avoid the recovery handler deleting the message for not being on a queue. */ - private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception + private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception { messageStore.close(); - BDBMessageStore newStore = new BDBMessageStore(); + AbstractBDBMessageStore newStore = new BDBMessageStore(); newStore.configure("", _config.subset("store")); newStore.startWithNoRecover(); @@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testGetContentWithOffset() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testMessageCreationAndRemoval() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Retrieved content when none was expected", 0, bdbStore.getContent(messageid_0_8, 0, dst)); } - private BDBMessageStore assertBDBStore(MessageStore store) + private AbstractBDBMessageStore assertBDBStore(MessageStore store) { assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - return (BDBMessageStore) store; + return (AbstractBDBMessageStore) store; } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) @@ -405,7 +405,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -443,7 +443,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -484,7 +484,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java new file mode 100644 index 0000000000..afe0435901 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + * + * @see HAClusterWhiteboxTest + */ +public class HAClusterBlackboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverAwaitingListener; + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME) + + File.separator + "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + + _clusterCreator.startCluster(); + _failoverAwaitingListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfActiveNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _clusterCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception + { + LOGGER.info("Connecting to " + _brokerFailoverUrl); + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.stopNode(inactiveBrokerPort); + + _failoverAwaitingListener.assertFailoverDoesNotOccur(2000); + + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void assertFailoverOccurs(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); + } + + public void assertFailoverDoesNotOccur(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount()); + } + + + @Override + public void failoverComplete() + { + _failoverLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..1afa45fd5a --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.EnvironmentFailureException; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + final int numberOfDataRows = groupMembers.size(); + assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveNodeFromGroup() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + storeBean.removeNodeFromGroup(removedNodeName); + + final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); + } + + /** + * Updates the address of a node. + * + * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit + * assert. + * + * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case + */ + public void testUpdateAddress() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + _clusterCreator.startNode(brokerPortNumberToBeMoved); + } + + /** + * @see #testUpdateAddress() + */ + public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + // now deliberately don't call updateAddress + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + try + { + _clusterCreator.startNode(brokerPortNumberToBeMoved); + fail("Exception not thrown"); + } + catch(RuntimeException rte) + { + //check cause was BDBs EnvironmentFailureException + assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); + // PASS + } + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..5f995ae25d --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final long RECEIVE_TIMEOUT = 5000l; + + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean autoDesignedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", + System.getProperty(QPID_HOME) + File.separator + "etc" + File.separator + "log.properties"); + + String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST; + + setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s"); + + setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + /** + * Tests that a two node cluster, in which the master CAN automatically designate itself primary + * (after becoming master) continues to operate after being shut down and restarted. + * + * The test does not concern itself with which broker becomes master at any given point + * (which is likely to swap during the test). + */ + public void testClusterRestartWithAutoDesignatedPrimary() throws Exception + { + testClusterRestartImpl(true); + } + + /** + * Tests that a two node cluster, in which the master can NOT automatically designate itself + * primary (after becoming master) continues to operate after being shut down and restarted. + * + * The test does not concern itself with which broker becomes master at any given point + * (which is likely to swap during the test). + */ + public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception + { + testClusterRestartImpl(false); + } + + private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception + { + startCluster(autoDesignatedPrimary); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + /** + * This test make sure than JMS operations are still working after stopping replica + * when master is designated primary (which is by default). + * <p> + * When master is not designated primary this test should fail. + */ + public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception + { + + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + try + { + assertProducingConsuming(connection); + fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); + } + catch(JMSException e) + { + // JMSException should be thrown on transaction start/commit + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java new file mode 100644 index 0000000000..4b64466ff2 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import com.sleepycat.je.rep.InsufficientLogException; + +/** + * The HA white box tests test the BDB cluster where the test retains the knowledge of the + * individual test nodes. It uses this knowledge to examine the nodes to ensure that they + * remain in the correct state throughout the test. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterWhiteboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + + private final int NUMBER_OF_NODES = 3; + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + // TODO Test for node falling behind + // TODO Factory refactoring?? // MessageStore construction?? + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME) + + File.separator + "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testClusterPermitsConnectionToOnlyOneNode() throws Exception + { + int connectionSuccesses = 0; + int connectionFails = 0; + + for (int brokerPortNumber : getBrokerPortNumbers()) + { + try + { + getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + connectionSuccesses++; + } + catch(JMSException e) + { + assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + connectionFails++; + } + } + + assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); + assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); + } + + public void testClusterThatLosesNodeStillAllowsConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + + // verify that JMS persistence operations are working + assertProducingConsuming(subsequentConnection); + + closeConnection(initialConnection); + } + + public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); + + killBroker(subsequentPortNumber); + + final Connection finalConnection = getConnectionToNodeInCluster(); + assertNull(finalConnection); + + closeConnection(initialConnection); + } + + public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception + { + final Connection connection = getConnectionToNodeInCluster(); + assertNotNull(connection); + + final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); + + _clusterCreator.stopNode(brokerPortNumber); + _clusterCreator.startNode(brokerPortNumber); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + } + + public void testClusterLosingNodeRetainsData() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + + final String queueNamePrefix = getTestQueueName(); + final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; + final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; + + populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + + assertNotNull("no valid connection obtained", subsequentConnection); + + checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + } + + public void testRecoveryOfOutOfDateNode() throws Exception + { + /* + * TODO: Implement + * + * Cant yet find a way to control cleaning in a deterministic way to allow provoking + * a node to become out of date. We do now know that even a new joiner to the group + * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has + * done *any* cleaning and then adding a new node should be sufficient to cause this. + */ + } + + private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception + { + populateBrokerWithData(connection, 1, queueUrls); + } + + private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception + { + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + session.createConsumer(queue).close(); + sendMessage(session, queue, noOfMessages); + } + } + + private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException + { + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + final MessageConsumer consumer = session.createConsumer(queue); + final Message message = consumer.receive(1000); + session.commit(); + assertNotNull("Queue " + queue + " should have message", message); + assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); + } + } + + private Connection getConnectionToNodeInCluster() throws URLSyntaxException + { + Connection connection = null; + Set<Integer> runningBrokerPorts = getBrokerPortNumbers(); + + for (int brokerPortNumber : runningBrokerPorts) + { + try + { + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + break; + } + catch(JMSException je) + { + assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + } + } + return connection; + } + + private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException, + InterruptedException + { + try + { + // NewMasterEvent is received twice: first for the existing master, + // second for a new master + CountDownLatch newMasterLatch = new CountDownLatch(2); + _clusterCreator.startMonitorNode(); + _clusterCreator.statListeningForNewMasterEvent(newMasterLatch); + + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + killBroker(initialPortNumber); + + assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS)); + } + finally + { + _clusterCreator.shutdownMonitor(); + } + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + Message m2 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 2 is not received", m2); + assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); + session.commit(); + } + + private void closeConnection(final Connection initialConnection) + { + try + { + initialConnection.close(); + } + catch(Exception e) + { + // ignore. + // java.net.SocketException is seen sometimes on active connection + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index 7e5ef3f94c..eaa3c3eba4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -19,22 +19,25 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreFactory; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.test.utils.QpidTestCase; -public class BDBMessageStoreFactory implements MessageStoreFactory +public class HAMessageStoreSmokeTest extends QpidTestCase { + private final BDBHAMessageStore _store = new BDBHAMessageStore(); + private final XMLConfiguration _config = new XMLConfiguration(); - @Override - public MessageStore createMessageStore() + public void testMissingHAConfigThrowsException() throws Exception { - return new BDBMessageStore(); + try + { + _store.configure("test", _config); + fail("Expected an exception to be thrown"); + } + catch (ConfigurationException ce) + { + assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); + } } - - @Override - public String getStoreClassName() - { - return BDBMessageStore.class.getSimpleName(); - } - -} +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java new file mode 100644 index 0000000000..43cfa5f4d5 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.monitor.GroupChangeEvent; +import com.sleepycat.je.rep.monitor.JoinGroupEvent; +import com.sleepycat.je.rep.monitor.LeaveGroupEvent; +import com.sleepycat.je.rep.monitor.Monitor; +import com.sleepycat.je.rep.monitor.MonitorChangeListener; +import com.sleepycat.je.rep.monitor.MonitorConfig; +import com.sleepycat.je.rep.monitor.NewMasterEvent; + +public class HATestClusterCreator +{ + protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + + private static final int CYCLECOUNT = 2; + private static final int RETRIES = 2; + private static final int CONNECTDELAY = 1000; + + private final QpidBrokerTestCase _testcase; + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); + private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); + private final String _virtualHostName; + private final String _configKeyPrefix; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + private Monitor _monitor; + + public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = "group" + _testcase.getName(); + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + configureClusterNode(brokerPort, bdbPort); + collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts()); + + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + } + + public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); + final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary"); + brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary)); + _primaryBrokerPort = brokerConfigEntry.getKey(); + } + + /** + * @param configKeySuffix "highAvailability.designatedPrimary", for example + * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example + */ + private String getConfigKey(String configKeySuffix) + { + final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts."); + return configKey; + } + + public void startNode(final int brokerPortNumber) throws Exception + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + + _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration()); + _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); + + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); + try + { + List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + Future<Object> future = executor.submit(new Callable<Object>() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), + brokerConfigHolder.getTestVirtualhosts()); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future<Object> future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.stopBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + shutdownMonitor(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set<Integer> getBdbPortNumbers() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); + } + + public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException + { + String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set<Integer> portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set<Integer> getBrokerPortNumbersForNodes() + { + return new HashSet<Integer>(_brokerConfigurations.keySet()); + } + + private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception + { + final String nodeName = getNodeNameForNodeAt(bdbPort); + + _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + // TODO replication policy + } + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(), + (XMLConfiguration) testVirtualhosts.clone())); + } + + public class BrokerConfigHolder + { + private final XMLConfiguration _testConfiguration; + private final XMLConfiguration _testVirtualhosts; + + public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _testConfiguration = testConfiguration; + _testVirtualhosts = testVirtualhosts; + } + + public XMLConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); + final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); + + final String configKey = getConfigKey("highAvailability.nodeHostPort"); + final String oldBdbHostPort = virtualHostConfig.getString(configKey); + + final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + final String oldHost = oldHostAndPort[0]; + + final String newBdbHostPort = oldHost + ":" + newBdbPort; + + virtualHostConfig.setProperty(configKey, newBdbHostPort); + collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); + } + + public void startMonitorNode() + { + shutdownMonitor(); + + MonitorConfig config = new MonitorConfig(); + config.setGroupName(_groupName); + int monitorPort = _testcase.findFreePort(); + config.setNodeName(getNodeNameForNodeAt(monitorPort)); + config.setNodeHostPort("" + monitorPort); + config.setHelperHosts(getHelperHostPort()); + + _monitor = new Monitor(config); + + ReplicationNode currentMaster = _monitor.register(); + LOGGER.info("Current master " + currentMaster.getName()); + } + + public void startListening(MonitorChangeListener listener) throws IOException + { + _monitor.startListener(listener); + } + + public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException + { + startListening(new MonitorChangeListenerSupport(){ + @Override + public void notify(NewMasterEvent newMasterEvent) + { + LOGGER.debug("New master is elected " + newMasterEvent.getMasterName()); + latch.countDown(); + } + }); + } + + public void shutdownMonitor() + { + if (_monitor != null) + { + try + { + _monitor.shutdown(); + } + catch (Exception e) + { + LOGGER.warn("Monitor shutdown error:", e); + } + } + } + + public static class MonitorChangeListenerSupport implements MonitorChangeListener + { + + @Override + public void notify(NewMasterEvent newMasterEvent) + { + } + + @Override + public void notify(GroupChangeEvent groupChangeEvent) + { + } + + @Override + public void notify(JoinGroupEvent joinGroupEvent) + { + } + + @Override + public void notify(LeaveGroupEvent leaveGroupEvent) + { + } + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index ba5ca842bf..23fd9bc24f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -24,7 +24,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import com.sleepycat.bind.tuple.IntegerBinding; @@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase { assertEquals("Unexpected store version", -1, getStoreVersion()); _upgrader.upgradeIfNecessary(); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); assertContent(); } @@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase List<String> expectedDatabases = new ArrayList<String>(); expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); nonExistentStoreLocation.delete(); } diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index 1f7f91de9a..0f7cc7866f 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -25,8 +25,8 @@ <name>localhost</name> <localhost> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> - <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> @@ -86,8 +86,8 @@ <name>development</name> <development> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> - <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> @@ -125,8 +125,8 @@ <name>test</name> <test> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> - <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 5f472b6ddd..59c6926b76 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.MemoryMessageStoreFactory; import java.util.ArrayList; import java.util.Arrays; @@ -103,14 +102,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin return getConfig().subset("store"); } - public String getMessageStoreFactoryClass() + public String getMessageStoreClass() { - return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName()); + return getStringValue("store.class", MemoryMessageStore.class.getName()); } - public void setMessageStoreFactoryClass(String storeFactoryClass) + public void setMessageStoreClass(String storeFactoryClass) { - getConfig().setProperty("store.factoryclass", storeFactoryClass); + getConfig().setProperty("store.class", storeFactoryClass); } public List getExchanges() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java index 9b5ceef35f..c681126c11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java @@ -23,12 +23,20 @@ public enum Event { BEFORE_INIT, AFTER_INIT, + BEFORE_ACTIVATE, AFTER_ACTIVATE, + BEFORE_PASSIVATE, AFTER_PASSIVATE, + BEFORE_CLOSE, AFTER_CLOSE, + + BEFORE_QUIESCE, + AFTER_QUIESCE, + BEFORE_RESTART, + PERSISTENT_MESSAGE_SIZE_OVERFULL, - PERSISTENT_MESSAGE_SIZE_UNDERFULL, + PERSISTENT_MESSAGE_SIZE_UNDERFULL } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java index 21ae3924b8..bf3de2611d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java @@ -24,9 +24,12 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + public class EventManager { private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class); + private static final Logger _LOGGER = Logger.getLogger(EventManager.class); public synchronized void addEventListener(EventListener listener, Event... events) { @@ -46,6 +49,11 @@ public class EventManager { if (_listeners.containsKey(event)) { + if(_LOGGER.isDebugEnabled()) + { + _LOGGER.debug("Received event " + event); + } + for (EventListener listener : _listeners.get(event)) { listener.event(event); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java index a35db62b03..59483751ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java @@ -19,9 +19,11 @@ */ package org.apache.qpid.server.store; -public interface MessageStoreFactory +public interface HAMessageStore extends MessageStore { - MessageStore createMessageStore(); - - String getStoreClassName(); + /** + * Used to indicate that a store requires to make itself unavailable for read and read/write + * operations. + */ + void passivate(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 59624b7a75..7b98b30860 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -83,19 +83,19 @@ public class MemoryMessageStore extends NullMessageStore @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); } @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception { - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } @Override public void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java deleted file mode 100644 index 8724f102c6..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -public class MemoryMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new MemoryMessageStore(); - } - - @Override - public String getStoreClassName() - { - return MemoryMessageStore.class.getSimpleName(); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java index 7cbdede85e..2783637b2a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java @@ -20,19 +20,30 @@ */ package org.apache.qpid.server.store; +import org.apache.qpid.server.configuration.ConfiguredObject; + public enum State { - + /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */ INITIAL, - CONFIGURING, - CONFIGURED, - RECOVERING, + + INITIALISING, + /** + * The initial set-up of the store has completed. + * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk. + * + * From the point of view of the user, the store is essentially stopped. + */ + INITIALISED, + + ACTIVATING, ACTIVE, - QUIESCING, - QUIESCED, + CLOSING, - CLOSED; - + CLOSED, + QUIESCING, + /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */ + QUIESCED; }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java index 5998be5bb6..613b329beb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java @@ -24,6 +24,8 @@ package org.apache.qpid.server.store; import java.util.EnumMap; import java.util.Map; +import org.apache.qpid.server.store.StateManager.Transition; + public class StateManager { private State _state = State.INITIAL; @@ -70,16 +72,23 @@ public class StateManager } - public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT); - public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT); - public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE); - public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE); + public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT); + public static final Transition INITALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT); + + public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE); + public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE); + + public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);; public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE); public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE); public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE); - public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE); - public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE); - public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE); + + public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE); + + public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE); + public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE); + + public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART); public StateManager(final EventManager eventManager) @@ -105,16 +114,6 @@ public class StateManager return _state; } - public synchronized void stateTransition(final State current, final State desired) - { - if (_state != current) - { - throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current - + "; currently in state: " + _state); - } - attainState(desired); - } - public synchronized void attainState(State desired) { Transition transition = null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index de1ce1a9db..c065eb263b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -260,7 +260,7 @@ public class DerbyMessageStore implements MessageStore ConfigurationRecoveryHandler configRecoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = configRecoveryHandler; commonConfiguration(name, storeConfiguration); @@ -276,13 +276,13 @@ public class DerbyMessageStore implements MessageStore _tlogRecoveryHandler = tlogRecoveryHandler; _messageRecoveryHandler = recoveryHandler; - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } @Override public void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); // this recovers durable exchanges, queues, and bindings recoverConfiguration(_configRecoveryHandler); @@ -716,7 +716,7 @@ public class DerbyMessageStore implements MessageStore public void close() throws Exception { _closed.getAndSet(true); - _stateManager.stateTransition(State.ACTIVE, State.CLOSING); + _stateManager.attainState(State.CLOSING); try { @@ -737,7 +737,7 @@ public class DerbyMessageStore implements MessageStore } } - _stateManager.stateTransition(State.CLOSING, State.CLOSED); + _stateManager.attainState(State.CLOSED); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java deleted file mode 100644 index 12d7f64a8d..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreFactory; - -public class DerbyMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new DerbyMessageStore(); - } - - @Override - public String getStoreClassName() - { - return DerbyMessageStore.class.getSimpleName(); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index b05025467d..5a14092930 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -59,8 +59,8 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreFactory; import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; @@ -173,7 +173,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass()); + _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass()); configureMessageStore(hostConfig); @@ -329,20 +329,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } - private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception + private MessageStore initialiseMessageStore(final String messageStoreClass) throws Exception { - final Class<?> clazz = Class.forName(messageStoreFactoryClass); + final Class<?> clazz = Class.forName(messageStoreClass); final Object o = clazz.newInstance(); - if (!(o instanceof MessageStoreFactory)) + if (!(o instanceof MessageStore)) { - throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class + + throw new ClassCastException("Message store factory class must implement " + MessageStore.class + ". Class " + clazz + " does not."); } - final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o; - final MessageStore messageStore = messageStoreFactory.createMessageStore(); - final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName()); + final MessageStore messageStore = (MessageStore) o; + final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName()); OperationalLoggingListener.listen(messageStore, storeLogSubject); messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); @@ -366,7 +365,10 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private void activateNonHAMessageStore() throws Exception { - _messageStore.activate(); + if (!(_messageStore instanceof HAMessageStore)) + { + _messageStore.activate(); + } } private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException @@ -801,42 +803,42 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) - { - try - { - _exchangeRegistry.initialise(); - initialiseModel(_vhostConfig); - } - catch (Exception e) - { - throw new RuntimeException("Failed to initialise virtual host after state change", e); - } - } - } - - private final class AfterActivationListener implements EventListener - { - @Override - public void event(Event event) - { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); - try - { - _brokerMBean.register(); - } - catch (JMException e) - { - throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); - } + { + @Override + public void event(Event event) + { + try + { + _exchangeRegistry.initialise(); + initialiseModel(_vhostConfig); + } + catch (Exception e) + { + throw new RuntimeException("Failed to initialise virtual host after state change", e); + } + } + } + + private final class AfterActivationListener implements EventListener + { + @Override + public void event(Event event) + { + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } + catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } - _state = State.ACTIVE; - } - } + _state = State.ACTIVE; + } + } - public class BeforePassivationListener implements EventListener + private final class BeforePassivationListener implements EventListener { public void event(Event event) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index d26e286c90..d34d1bbef3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -36,7 +36,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -165,7 +165,7 @@ public class AMQBrokerManagerMBeanTest extends QpidTestCase XMLConfiguration configXml = new XMLConfiguration(); configXml.addProperty("virtualhosts.virtualhost(-1).name", "test"); - configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); + configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); ServerConfiguration configuration = new ServerConfiguration(configXml); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index c4c93acfb6..50e7f0588b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -27,7 +27,6 @@ import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -162,7 +161,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2"); getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true"); getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0"); - getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName()); // Start the broker now. super.createBroker(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index e123a968a4..337ff194c3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.UUID; - import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; @@ -37,7 +35,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -56,7 +53,7 @@ public class AMQQueueFactoryTest extends QpidTestCase XMLConfiguration configXml = new XMLConfiguration(); configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); - configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); + configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); ServerConfiguration configuration = new ServerConfiguration(configXml); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 52ad4a7c5b..a8fad96063 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -108,7 +107,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase PropertiesConfiguration env = new PropertiesConfiguration(); final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env); - vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName()); + vhostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName()); _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index a1cbb2cbc8..48e631a0f4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -27,7 +27,6 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; -import org.apache.qpid.server.logging.subjects.TestBlankSubject; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; @@ -41,7 +40,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecover import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory; +import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -262,14 +261,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase protected MessageStore createStore() throws Exception { - String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); - if (storeFactoryClass == null) + String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); + if (storeClass == null) { - storeFactoryClass = DerbyMessageStoreFactory.class.getName(); + storeClass = DerbyMessageStore.class.getName(); } CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); - MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance(); - return factory.createMessageStore(); + MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance(); + return messageStore; } public void testRecordXid() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 3fb0776083..64048d294b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -59,7 +59,6 @@ import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; /** * This tests the MessageStores by using the available interfaces. @@ -103,7 +102,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase String storePath = System.getProperty("QPID_WORK") + "/" + getName(); _config = new PropertiesConfiguration(); - _config.addProperty("store.factoryclass", getTestProfileMessageStoreFactoryClassName()); + _config.addProperty("store.class", getTestProfileMessageStoreClassName()); _config.addProperty("store.environment-path", storePath); cleanup(new File(storePath)); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java index 42746f9119..c6ef35d255 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java @@ -73,11 +73,11 @@ public class OperationalLoggingListenerTest extends TestCase } - messageStore.attainState(State.CONFIGURING); + messageStore.attainState(State.INITIALISING); assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size()); assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString()); - messageStore.attainState(State.CONFIGURED); + messageStore.attainState(State.INITIALISED); assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size()); assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString()); assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString()); @@ -86,7 +86,7 @@ public class OperationalLoggingListenerTest extends TestCase assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString()); } - messageStore.attainState(State.RECOVERING); + messageStore.attainState(State.ACTIVATING); assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size()); assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java index 97c88ca1d3..18efb976eb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java @@ -45,8 +45,8 @@ public class StateManagerTest extends TestCase implements EventListener { assertEquals(State.INITIAL, _manager.getState()); - _manager.stateTransition(State.INITIAL, State.CONFIGURING); - assertEquals(State.CONFIGURING, _manager.getState()); + _manager.attainState(State.INITIALISING); + assertEquals(State.INITIALISING, _manager.getState()); } public void testStateTransitionDisallowed() @@ -55,7 +55,7 @@ public class StateManagerTest extends TestCase implements EventListener try { - _manager.stateTransition(State.ACTIVE, State.CLOSING); + _manager.attainState(State.CLOSING); fail("Exception not thrown"); } catch (IllegalStateException e) @@ -98,22 +98,29 @@ public class StateManagerTest extends TestCase implements EventListener public void testValidStateTransitions() { assertEquals(State.INITIAL, _manager.getState()); - performValidTransition(StateManager.CONFIGURE); - performValidTransition(StateManager.CONFIGURE_COMPLETE); - performValidTransition(StateManager.RECOVER); + performValidTransition(StateManager.INITIALISE); + performValidTransition(StateManager.INITALISE_COMPLETE); performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.ACTIVATE_COMPLETE); performValidTransition(StateManager.QUIESCE); performValidTransition(StateManager.QUIESCE_COMPLETE); performValidTransition(StateManager.RESTART); - performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.ACTIVATE_COMPLETE); performValidTransition(StateManager.CLOSE_ACTIVE); performValidTransition(StateManager.CLOSE_COMPLETE); + + _manager = new StateManager(this); + assertEquals(State.INITIAL, _manager.getState()); + performValidTransition(StateManager.INITIALISE); + performValidTransition(StateManager.INITALISE_COMPLETE); + performValidTransition(StateManager.CLOSE_INITIALISED); + performValidTransition(StateManager.CLOSE_COMPLETE); _manager = new StateManager(this); - performValidTransition(StateManager.CONFIGURE); - performValidTransition(StateManager.CONFIGURE_COMPLETE); - performValidTransition(StateManager.RECOVER); + performValidTransition(StateManager.INITIALISE); + performValidTransition(StateManager.INITALISE_COMPLETE); performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.ACTIVATE_COMPLETE); performValidTransition(StateManager.QUIESCE); performValidTransition(StateManager.QUIESCE_COMPLETE); performValidTransition(StateManager.CLOSE_QUIESCED); @@ -132,54 +139,50 @@ public class StateManagerTest extends TestCase implements EventListener { assertEquals(State.INITIAL, _manager.getState()); - - performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED); - performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING); - performInvalidTransitions(StateManager.RECOVER, State.ACTIVE); - performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING); + performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED); + performInvalidTransitions(StateManager.INITALISE_COMPLETE, State.ACTIVATING, State.CLOSING); + performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE); + performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED); performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED); - performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING); + performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING); performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED); performInvalidTransitions(StateManager.CLOSE_COMPLETE); - - - } - private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions) + private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates) { if(preTransition != null) { performValidTransition(preTransition); } - EnumSet<State> nextStates = EnumSet.allOf(State.class); + EnumSet<State> endStates = EnumSet.allOf(State.class); - if(validTransitions != null) + if(validEndStates != null) { - for(State state: validTransitions) + for(State state: validEndStates) { - nextStates.remove(state); + endStates.remove(state); } } - for(State nextState : nextStates) + for(State invalidEndState : endStates) { - performInvalidStateTransition(nextState); + performInvalidStateTransition(invalidEndState); } } - private void performInvalidStateTransition(State state) + private void performInvalidStateTransition(State invalidEndState) { try { _event = null; State startState = _manager.getState(); - _manager.attainState(state); - fail("Invalid state transition performed: " + startState + " to " + state); + _manager.attainState(invalidEndState); + fail("Invalid state transition performed: " + startState + " to " + invalidEndState); } catch(IllegalStateException e) { @@ -188,6 +191,7 @@ public class StateManagerTest extends TestCase implements EventListener assertNull("No event should have be fired", _event); } + @Override public void event(Event event) { _event = event; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java deleted file mode 100644 index 44070f22ad..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -public class TestableMemoryMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new TestableMemoryMessageStore(); - } - - @Override - public String getStoreClassName() - { - return TestableMemoryMessageStore.class.getSimpleName(); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index f8200bf1cd..8a18aaff9e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -67,10 +66,10 @@ public class InternalBrokerBaseCase extends QpidTestCase super.setUp(); _configXml.addProperty("virtualhosts.virtualhost.name", "test"); - _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); + _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); _configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); - _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName()); + _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); createBroker(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java index 87eb0f9d16..b8ba76e43d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStoreFactory; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.test.utils.QpidTestCase; @@ -192,7 +192,7 @@ public class VirtualHostImplTest extends QpidTestCase writer.write(" <name>" + vhostName + "</name>"); writer.write(" <" + vhostName + ">"); writer.write(" <store>"); - writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>"); + writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>"); writer.write(" </store>"); if(exchangeName != null && !dontDeclare) { diff --git a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index cbf6caf141..8a14466aeb 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -110,8 +110,8 @@ public class QpidTestCase extends TestCase } } - protected static final String MS_FACTORY_CLASS_NAME_KEY = "messagestorefactory.class.name"; - protected static final String MEMORY_STORE_FACTORY_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStoreFactory"; + protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name"; + protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore"; private static List<String> _exclusionList; @@ -140,12 +140,12 @@ public class QpidTestCase extends TestCase } } - public String getTestProfileMessageStoreFactoryClassName() + public String getTestProfileMessageStoreClassName() { - final String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); - _logger.debug("MS_FACTORY_CLASS_NAME_KEY " + storeFactoryClass); + final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); + _logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass); - return storeFactoryClass != null ? storeFactoryClass : MEMORY_STORE_FACTORY_CLASS_NAME ; + return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 1261a1bdea..8c6d815fee 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -349,7 +349,7 @@ <sysproperty key="log4j.configuration" value="${log4j.configuration}"/> <sysproperty key="java.naming.factory.initial" value="${java.naming.factory.initial}"/> <sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/> - <sysproperty key="messagestorefactory.class.name" value="${messagestorefactory.class.name}" /> + <sysproperty key="messagestore.class.name" value="${messagestore.class.name}" /> <sysproperty key="test.output" value="${module.results}"/> <sysproperty key="qpid.amqp.version" value="${qpid.amqp.version}"/> diff --git a/qpid/java/systests/etc/log.properties b/qpid/java/systests/etc/log.properties new file mode 100644 index 0000000000..745c5187c9 --- /dev/null +++ b/qpid/java/systests/etc/log.properties @@ -0,0 +1,2 @@ +com.sleepycat.je.util.FileHandler.level=ALL +com.sleepycat.je.util.ConsoleHandler.level=ALL diff --git a/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml index 216046b40b..ce16523f13 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml @@ -26,7 +26,7 @@ <name>localhost</name> <localhost> <store> - <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> <environment-path>${work}/bdbstore/localhost-store</environment-path> </store> </localhost> @@ -36,7 +36,7 @@ <name>development</name> <development> <store> - <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> <environment-path>${work}/bdbstore/development-store</environment-path> </store> </development> @@ -46,7 +46,7 @@ <name>test</name> <test> <store> - <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> <environment-path>${work}/bdbstore/test-store</environment-path> </store> </test> diff --git a/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml b/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml index d494768e91..08a40ca812 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml @@ -26,7 +26,7 @@ <virtualhost> <localhost> <store> - <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbyDB/localhost-store</environment-path> </store> </localhost> @@ -35,7 +35,7 @@ <virtualhost> <development> <store> - <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbyDB/development-store</environment-path> </store> </development> @@ -44,7 +44,7 @@ <virtualhost> <test> <store> - <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class> <environment-path>${QPID_WORK}/derbyDB/test-store</environment-path> </store> </test> diff --git a/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml b/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml index f8e9fde8ca..20908e6eb4 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml @@ -26,7 +26,7 @@ <name>test</name> <test> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> </store> </test> </virtualhost> @@ -35,7 +35,7 @@ <name>test2</name> <test2> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> </store> <security> <firewall default-action="deny"> diff --git a/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml b/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml index 95db02672a..90377f345f 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml @@ -26,7 +26,7 @@ <name>test</name> <test> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> </store> </test> </virtualhost> @@ -35,7 +35,7 @@ <name>test2</name> <test2> <store> - <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> </store> <security> <firewall default-action="deny"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java deleted file mode 100644 index 6497a640d2..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -public class SlowMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new SlowMessageStore(); - } - - @Override - public String getStoreClassName() - { - return SlowMessageStore.class.getSimpleName(); - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java index 283fb4ed4c..bcee4e4930 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -58,7 +58,13 @@ public class StoreOverfullTest extends QpidBrokerTestCase String.valueOf(OVERFULL_SIZE)); setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", String.valueOf(UNDERFULL_SIZE)); - setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000"); + + if(getTestProfileMessageStoreClassName().contains("BDB")) + { + setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).name", "je.log.fileMax"); + setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).value", "1000000"); + } + super.setUp(); _producerConnection = getConnection(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 91f5cb7770..ee81e7c372 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -54,7 +54,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.factoryclass", "org.apache.qpid.server.store.SlowMessageStoreFactory"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); setConfigurationProperty("management.enabled", "false"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index d9c259c389..1ef6164db6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -79,8 +79,16 @@ public class JMXTestUtils public void open() throws Exception { + open(0); // Zero signifies default broker to QBTC. + } + + public void open(final int brokerPort) throws Exception + { + int actualBrokerPort = _test.getPort(brokerPort); + int managementPort = _test.getManagementPort(actualBrokerPort); + _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", - _test.getManagementPort(_test.getPort()), _user, _password); + managementPort, _user, _password); _mbsc = _jmxc.getMBeanServerConnection(); } @@ -319,7 +327,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = queryObjects(query); _test.assertNotNull("Null ObjectName Set returned", objectNames); - _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size()); + _test.assertEquals("Unexpected number of objects matching " + managedClass + " returned", 1, objectNames.size()); ObjectName objectName = objectNames.iterator().next(); _test.getLogger().info("Loading: " + objectName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 4d4dee001b..9f019443f5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -64,7 +64,7 @@ import org.apache.qpid.server.ProtocolExclusion; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory; +import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.LogMonitor; @@ -74,7 +74,6 @@ import org.apache.qpid.util.LogMonitor; */ public class QpidBrokerTestCase extends QpidTestCase { - public enum BrokerType { EXTERNAL /** Test case relies on a Broker started independently of the test-suite */, @@ -110,8 +109,10 @@ public class QpidBrokerTestCase extends QpidTestCase } // system properties + private static final String TEST_VIRTUALHOSTS = "test.virtualhosts"; + private static final String TEST_CONFIG = "test.config"; private static final String BROKER_LANGUAGE = "broker.language"; - private static final String BROKER_TYPE = "broker.type"; + protected static final String BROKER_TYPE = "broker.type"; private static final String BROKER_COMMAND = "broker.command"; private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; @@ -190,7 +191,7 @@ public class QpidBrokerTestCase extends QpidTestCase { super(); } - + public Logger getLogger() { return QpidBrokerTestCase._logger; @@ -346,11 +347,16 @@ public class QpidBrokerTestCase extends QpidTestCase public void startBroker(int port) throws Exception { + startBroker(port, _testConfiguration, _testVirtualhosts); + } + + public void startBroker(int port, XMLConfiguration testConfiguration, XMLConfiguration virtualHosts) throws Exception + { port = getPort(port); // Save any configuration changes that have been made - saveTestConfiguration(); - saveTestVirtualhosts(); + String testConfig = saveTestConfiguration(port, testConfiguration); + String virtualHostsConfig = saveTestVirtualhosts(port, virtualHosts); if(_brokers.get(port) != null) { @@ -360,7 +366,11 @@ public class QpidBrokerTestCase extends QpidTestCase if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker()) { setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); - saveTestConfiguration(); + testConfig = saveTestConfiguration(port, testConfiguration); + _logger.info("Set test.config property to: " + testConfig); + _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig); + setSystemProperty(TEST_CONFIG, testConfig); + setSystemProperty(TEST_VIRTUALHOSTS, virtualHostsConfig); BrokerOptions options = new BrokerOptions(); options.setConfigFile(_configFile.getAbsolutePath()); @@ -388,16 +398,16 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("starting external broker: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); - Map<String, String> env = pb.environment(); + Map<String, String> processEnv = pb.environment(); String qpidHome = System.getProperty(QPID_HOME); - env.put(QPID_HOME, qpidHome); + processEnv.put(QPID_HOME, qpidHome); //Augment Path with bin directory in QPID_HOME. - env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); + processEnv.put("PATH", processEnv.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); //Add the test name to the broker run. // DON'T change PNAME, qpid.stop needs this value. - env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\""); - env.put("QPID_WORK", qpidWork); + processEnv.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\""); + processEnv.put("QPID_WORK", qpidWork); // Use the environment variable to set amqj.logging.level for the broker // The value used is a 'server' value in the test configuration to @@ -412,7 +422,7 @@ public class QpidBrokerTestCase extends QpidTestCase { for (Map.Entry<String, String> entry : _env.entrySet()) { - env.put(entry.getKey(), entry.getValue()); + processEnv.put(entry.getKey(), entry.getValue()); } } @@ -429,25 +439,25 @@ public class QpidBrokerTestCase extends QpidTestCase setSystemProperty("root.logging.level"); } + // set test.config and test.virtualhosts + String qpidOpts = " -D" + TEST_CONFIG + "=" + testConfig + " -D" + TEST_VIRTUALHOSTS + "=" + virtualHostsConfig; - String QPID_OPTS = " "; // Add all the specified system properties to QPID_OPTS if (!_propertiesSetForBroker.isEmpty()) { for (String key : _propertiesSetForBroker.keySet()) { - QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " "; - } - - if (env.containsKey("QPID_OPTS")) - { - env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS); - } - else - { - env.put("QPID_OPTS", QPID_OPTS); + qpidOpts += " -D" + key + "=" + _propertiesSetForBroker.get(key); } } + if (processEnv.containsKey("QPID_OPTS")) + { + qpidOpts = processEnv.get("QPID_OPTS") + qpidOpts; + } + processEnv.put("QPID_OPTS", qpidOpts); + + _logger.info("Set test.config property to: " + testConfig); + _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig); // cpp broker requires that the work directory is created createBrokerWork(qpidWork); @@ -545,12 +555,17 @@ public class QpidBrokerTestCase extends QpidTestCase public String getTestConfigFile() { - return _output + "/" + getTestQueueName() + "-config.xml"; + return getTestConfigFile(getPort()); } - public String getTestVirtualhostsFile() + public String getTestConfigFile(int port) { - return _output + "/" + getTestQueueName() + "-virtualhosts.xml"; + return _output + "/" + getTestQueueName() + "-" + port + "-config.xml"; + } + + public String getTestVirtualhostsFile(int port) + { + return _output + "/" + getTestQueueName() + "-" + port + "-virtualhosts.xml"; } private String relativeToQpidHome(String file) @@ -560,38 +575,50 @@ public class QpidBrokerTestCase extends QpidTestCase protected void saveTestConfiguration() throws ConfigurationException { + String relative = saveTestConfiguration(getPort(), _testConfiguration); + _logger.info("Set test.config property to: " + relative); + setSystemProperty(TEST_CONFIG, relative); + } + + protected String saveTestConfiguration(int port, XMLConfiguration testConfiguration) throws ConfigurationException + { // Specify the test config file - String testConfig = getTestConfigFile(); + String testConfig = getTestConfigFile(port); String relative = relativeToQpidHome(testConfig); - setSystemProperty("test.config", relative); - _logger.info("Set test.config property to: " + relative); _logger.info("Saving test virtualhosts file at: " + testConfig); // Create the file if configuration does not exist - if (_testConfiguration.isEmpty()) + if (testConfiguration.isEmpty()) { - _testConfiguration.addProperty("__ignore", "true"); + testConfiguration.addProperty("__ignore", "true"); } - _testConfiguration.save(testConfig); + testConfiguration.save(testConfig); + return relative; } protected void saveTestVirtualhosts() throws ConfigurationException { + String relative = saveTestVirtualhosts(getPort(), _testVirtualhosts); + _logger.info("Set test.virtualhosts property to: " + relative); + setSystemProperty(TEST_VIRTUALHOSTS, relative); + } + + protected String saveTestVirtualhosts(int port, XMLConfiguration virtualHostConfiguration) throws ConfigurationException + { // Specify the test virtualhosts file - String testVirtualhosts = getTestVirtualhostsFile(); + String testVirtualhosts = getTestVirtualhostsFile(port); String relative = relativeToQpidHome(testVirtualhosts); - setSystemProperty("test.virtualhosts", relative); - _logger.info("Set test.virtualhosts property to: " + relative); - _logger.info("Saving test virtualhosts file at: " + testVirtualhosts); + _logger.info("Set test.virtualhosts property to: " + testVirtualhosts); // Create the file if configuration does not exist - if (_testVirtualhosts.isEmpty()) + if (virtualHostConfiguration.isEmpty()) { - _testVirtualhosts.addProperty("__ignore", "true"); + virtualHostConfiguration.addProperty("__ignore", "true"); } - _testVirtualhosts.save(testVirtualhosts); + virtualHostConfiguration.save(testVirtualhosts); + return relative; } protected void cleanBrokerWork(final String qpidWork) @@ -698,21 +725,21 @@ public class QpidBrokerTestCase extends QpidTestCase protected void makeVirtualHostPersistent(String virtualhost) throws ConfigurationException, IOException { - Class<?> storeFactoryClass = null; + Class<?> storeClass = null; try { // Try and lookup the BDB class - storeFactoryClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory"); + storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore"); } catch (ClassNotFoundException e) { // No BDB store, we'll use Derby instead. - storeFactoryClass = DerbyMessageStoreFactory.class; + storeClass = DerbyMessageStore.class; } - setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.factoryclass", - storeFactoryClass.getName()); + setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class", + storeClass.getName()); setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, "${QPID_WORK}/" + virtualhost); } @@ -1288,4 +1315,24 @@ public class QpidBrokerTestCase extends QpidTestCase { return FAILING_PORT; } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + + public void setTestVirtualhosts(XMLConfiguration testVirtualhosts) + { + _testVirtualhosts = testVirtualhosts; + } + + public XMLConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public void setTestConfiguration(XMLConfiguration testConfiguration) + { + _testConfiguration = testConfiguration; + } } diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile index 2cef1fd53e..cba348b67f 100644 --- a/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile +++ b/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile @@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test- broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes Java010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile index 62c9385835..6e21a35425 100644 --- a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile +++ b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile @@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile index cfc2a12dde..491775d452 100644 --- a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile +++ b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile @@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile index 9cfa25eb9a..6e4c6cbd13 100644 --- a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile +++ b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile @@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb.0-10.testprofile b/qpid/java/test-profiles/java-bdb.0-10.testprofile index 4ac4c3baf2..3ef93a68cb 100644 --- a/qpid/java/test-profiles/java-bdb.0-10.testprofile +++ b/qpid/java/test-profiles/java-bdb.0-10.testprofile @@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test- broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes Java010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb.0-8.testprofile b/qpid/java/test-profiles/java-bdb.0-8.testprofile index 76c3ea0b72..b20a5b39a7 100644 --- a/qpid/java/test-profiles/java-bdb.0-8.testprofile +++ b/qpid/java/test-profiles/java-bdb.0-8.testprofile @@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile index afc5f7bfd1..111d8f7867 100644 --- a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile +++ b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile @@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-bdb.0-9.testprofile b/qpid/java/test-profiles/java-bdb.0-9.testprofile index 76bde0defc..9524f59f02 100644 --- a/qpid/java/test-profiles/java-bdb.0-9.testprofile +++ b/qpid/java/test-profiles/java-bdb.0-9.testprofile @@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-bdb.xml -messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile index 3b57dca346..906ea271e6 100644 --- a/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile +++ b/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile @@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test- broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes Java010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile index 9d421f706e..11670154a8 100644 --- a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile +++ b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile @@ -24,7 +24,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile index 3038dd324e..7d60916ab5 100644 --- a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile +++ b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile @@ -24,7 +24,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile index 6007105097..4eab65eab0 100644 --- a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile +++ b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile @@ -24,7 +24,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby.0-10.testprofile b/qpid/java/test-profiles/java-dby.0-10.testprofile index 51f6c4ca6f..9bc6caf8a5 100644 --- a/qpid/java/test-profiles/java-dby.0-10.testprofile +++ b/qpid/java/test-profiles/java-dby.0-10.testprofile @@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test- broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes Java010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby.0-8.testprofile b/qpid/java/test-profiles/java-dby.0-8.testprofile index c841c69922..8a0a57c202 100644 --- a/qpid/java/test-profiles/java-dby.0-8.testprofile +++ b/qpid/java/test-profiles/java-dby.0-8.testprofile @@ -25,7 +25,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby.0-9-1.testprofile b/qpid/java/test-profiles/java-dby.0-9-1.testprofile index fa01010d52..1208949805 100644 --- a/qpid/java/test-profiles/java-dby.0-9-1.testprofile +++ b/qpid/java/test-profiles/java-dby.0-9-1.testprofile @@ -25,7 +25,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/java-dby.0-9.testprofile b/qpid/java/test-profiles/java-dby.0-9.testprofile index d343185591..9cf1347c4d 100644 --- a/qpid/java/test-profiles/java-dby.0-9.testprofile +++ b/qpid/java/test-profiles/java-dby.0-9.testprofile @@ -25,7 +25,7 @@ broker.ready=BRK-1004 broker.stopped=Exception broker.config=build/etc/config-systests-derby.xml broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT -messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes broker.clean.between.tests=true broker.persistent=true diff --git a/qpid/java/test-profiles/testprofile.defaults b/qpid/java/test-profiles/testprofile.defaults index 2c3c92e922..b0c1aea661 100644 --- a/qpid/java/test-profiles/testprofile.defaults +++ b/qpid/java/test-profiles/testprofile.defaults @@ -21,7 +21,7 @@ java.naming.provider.url=${test.profiles}/test-provider.properties broker.ready=Listening on TCP broker.config=build/etc/config-systests.xml -messagestorefactory.class.name=org.apache.qpid.server.store.MemoryMessageStoreFactory +messagestore.class.name=org.apache.qpid.server.store.MemoryMessageStore broker.protocol.excludes= broker.persistent=false |