diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/bdbstore/src | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/bdbstore/src')
24 files changed, 2484 insertions, 471 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index fb1d7c5265..9323111fdd 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -29,14 +30,17 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.ExceptionListener; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.TransactionConfig; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -44,6 +48,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; @@ -53,28 +58,13 @@ import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; -import org.apache.qpid.server.store.ConfiguredObjectHelper; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.State; -import org.apache.qpid.server.store.StateManager; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; @@ -96,7 +86,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore public static final int VERSION = 6; - public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); + }}); private Environment _environment; @@ -145,36 +138,44 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected final StateManager _stateManager; - protected TransactionConfig _transactionConfig = new TransactionConfig(); - private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; - + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private final EventManager _eventManager = new EventManager(); private String _storeLocation; private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + private Map<String, String> _envConfigMap; + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); } + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; - configure(name,storeConfiguration); - - - + configure(name, storeConfiguration); } public void configureMessageStore(String name, @@ -185,12 +186,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } - public void activate() throws Exception + public synchronized void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); recoverConfig(_configRecoveryHandler); recoverMessages(_messageRecoveryHandler); @@ -204,7 +205,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore return new BDBTransaction(); } - /** * Called after instantiation in order to configure the message store. * @@ -215,9 +215,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void configure(String name, Configuration storeConfig) throws Exception { - final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, + final String storeLocation = storeConfig.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name); + _persistentSizeHighThreshold = storeConfig.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = storeConfig.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + File environmentPath = new File(storeLocation); if (!environmentPath.exists()) { @@ -230,11 +237,39 @@ public abstract class AbstractBDBMessageStore implements MessageStore _storeLocation = storeLocation; + _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig"); + LOGGER.info("Configuring BDB message store"); setupStore(environmentPath, name); } + protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException + { + final List<Object> argumentNames = config.getList(prefix + ".name"); + final List<Object> argumentValues = config.getList(prefix + ".value"); + final int initialSize = argumentNames.size() + defaultConfig.size(); + + final Map<String,String> attributes = new HashMap<String,String>(initialSize); + attributes.putAll(defaultConfig); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i).toString(); + final String argValue = argumentValues.get(i).toString(); + + attributes.put(argName, argValue); + } + + return Collections.unmodifiableMap(attributes); + } + + @Override + public String getStoreLocation() + { + return _storeLocation; + } + /** * Move the store state from INITIAL to ACTIVE without actually recovering. * @@ -244,9 +279,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ void startWithNoRecover() throws AMQStoreException { - _stateManager.attainState(State.CONFIGURING); - _stateManager.attainState(State.CONFIGURED); - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.INITIALISING); + _stateManager.attainState(State.INITIALISED); + _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); } @@ -257,54 +292,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore new Upgrader(_environment, name).upgradeIfNecessary(); openDatabases(); - } - - protected Environment createEnvironment(File environmentPath) throws DatabaseException - { - LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); - try - { - return new Environment(environmentPath, envConfig); - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - //Allow the creation this time - envConfig.setAllowCreate(true); - if (_environment != null ) - { - _environment.cleanLog(); - _environment.close(); - } - return new Environment(environmentPath, envConfig); - } - else - { - throw de; - } - } + _totalStoreSize = getSizeOnDisk(); } + protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException; + public Environment getEnvironment() { return _environment; @@ -343,14 +336,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void close() throws Exception { - if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED)) - { - _stateManager.stateTransition(State.ACTIVE, State.CLOSING); - - closeInternal(); - - _stateManager.stateTransition(State.CLOSING, State.CLOSED); - } + _stateManager.attainState(State.CLOSING); + closeInternal(); + _stateManager.attainState(State.CLOSED); } protected void closeInternal() throws Exception @@ -409,8 +397,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore // Clean the log before closing. This makes sure it doesn't contain // redundant data. Closing without doing this means the cleaner may not // get a chance to finish. - _environment.cleanLog(); - _environment.close(); + try + { + _environment.cleanLog(); + } + finally + { + _environment.close(); + } } } @@ -420,16 +414,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore try { List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - QueueRecoveryHandler qrh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + ExchangeRecoveryHandler erh = recoveryHandler.begin(this); _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); + + BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (DatabaseException e) @@ -552,7 +546,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore long messageId = LongBinding.entryToLong(key); StorableMessageMetaData metaData = valueBinding.entryToObject(value); - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + mrh.message(message); maxId = Math.max(maxId, messageId); @@ -950,10 +945,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (_stateManager.isInState(State.ACTIVE)) { DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(),value); + LongBinding.longToEntry(link.getCreateTime(), value); StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); try @@ -971,7 +966,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException { DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); try { OperationStatus status = _linkDb.delete(null, key); @@ -991,10 +986,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (_stateManager.isInState(State.ACTIVE)) { DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); DatabaseEntry value = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value); LongBinding.longToEntry(bridge.getCreateTime(),value); StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); @@ -1014,7 +1009,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void deleteBridge(final Bridge bridge) throws AMQStoreException { DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); try { OperationStatus status = _bridgeDb.delete(null, key); @@ -1247,7 +1242,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore /** * Primarily for testing purposes. * - * @param queueName + * @param queueId * * @return a list of message ids for messages enqueued for a particular queue */ @@ -1494,6 +1489,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore return true; } + @SuppressWarnings("unchecked") public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { if(metaData.isPersistent()) @@ -1580,34 +1576,29 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private final long _messageId; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private final boolean _isRecovered; private StorableMessageMetaData _metaData; - private volatile SoftReference<byte[]> _dataRef; + private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; StoredBDBMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } - - StoredBDBMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) { - try - { - _messageId = messageId; - _metaData = metaData; + _messageId = messageId; + _isRecovered = isRecovered; - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - - } - catch (DatabaseException e) + if(!_isRecovered) { - throw new RuntimeException(e); + _metaData = metaData; } - + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } public StorableMessageMetaData getMetaData() @@ -1697,8 +1688,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore synchronized void store(com.sleepycat.je.Transaction txn) { - - if(_metaData != null) + if (!stored()) { try { @@ -1730,12 +1720,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore public synchronized StoreFuture flushToStore() { - if(_metaData != null) + if(!stored()) { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); store(txn); AbstractBDBMessageStore.this.commit(txn,true); - + storedSizeChange(getMetaData().getContentSize()); } return StoreFuture.IMMEDIATE_FUTURE; } @@ -1744,18 +1734,27 @@ public abstract class AbstractBDBMessageStore implements MessageStore { try { + int delta = getMetaData().getContentSize(); AbstractBDBMessageStore.this.removeMessage(_messageId, false); + storedSizeChange(-delta); + } catch (AMQStoreException e) { throw new RuntimeException(e); } } + + private boolean stored() + { + return _metaData == null || _isRecovered; + } } private class BDBTransaction implements org.apache.qpid.server.store.Transaction { private com.sleepycat.je.Transaction _txn; + private int _storeSizeIncrease; private BDBTransaction() { @@ -1765,7 +1764,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore } catch (DatabaseException e) { - throw new RuntimeException(e); + LOGGER.error("Exception during transaction begin, closing store environment.", e); + closeEnvironmentSafely(); + + throw new RuntimeException("Exception during transaction begin, store environment closed.", e); } } @@ -1773,7 +1775,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if(message.getStoredMessage() instanceof StoredBDBMessage) { - ((StoredBDBMessage)message.getStoredMessage()).store(_txn); + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1787,10 +1791,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws AMQStoreException { AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); } public StoreFuture commitTranAsync() throws AMQStoreException { + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1811,15 +1817,133 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public void addEventListener(EventListener eventListener, Event... events) + private void storedSizeChange(final int delta) { - _eventManager.addEventListener(eventListener, events); + if(getPersistentSizeHighThreshold() > 0) + { + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); + + _totalStoreSize = getSizeOnDisk(); + + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } } - @Override - public String getStoreLocation() + private void reduceSizeOnDisk() { - return _storeLocation; + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (_environment.cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + _environment.checkpoint(force); + } + + + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return _environment.getStats(null).getTotalLogSize(); + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + private void setEnvironmentConfigProperties(EnvironmentConfig envConfig) + { + for (Map.Entry<String, String> configItem : _envConfigMap.entrySet()) + { + LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + protected EnvironmentConfig createEnvironmentConfig() + { + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + + setEnvironmentConfigProperties(envConfig); + + envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + + return envConfig; + } + + protected void closeEnvironmentSafely() + { + try + { + _environment.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + catch (IllegalStateException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + } + + + private class LoggingAsyncExceptionListener implements ExceptionListener + { + @Override + public void exceptionThrown(ExceptionEvent event) + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + + event.getThreadName() + "'", event.getException()); + } } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java new file mode 100644 index 0000000000..c40f24dbc3 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.store.HAMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.OperationFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + + private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); + + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + @SuppressWarnings("serial") + private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + /** + * Parameter decreased as the 24h default may lead very large log files for most users. + */ + put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); + /** + * Parameter increased as the 5 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); + /** + * Parameter increased as the 10 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); + /** + * Parameter increased as the 10 h default may cause user confusion. + */ + put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); + /** + * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False + * is scheduled to become default after JE 5.0.48. + */ + put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); + /** + * Parameter decreased as a default 5min interval may lead to bigger data losses on Node + * with NO_SYN durability in case if such Node crushes. + */ + put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + }}); + + public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + + private String _groupName; + private String _nodeName; + private String _nodeHostPort; + private String _helperHostPort; + private Durability _durability; + + private String _name; + + private CommitThreadWrapper _commitThreadWrapper; + private boolean _coalescingSync; + private boolean _designatedPrimary; + private Map<String, String> _repConfig; + + @Override + public void configure(String name, Configuration storeConfig) throws Exception + { + //Mandatory configuration + _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig); + _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig); + _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig); + _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig); + _name = name; + + //Optional configuration + String durabilitySetting = storeConfig.getString("highAvailability.durability"); + if (durabilitySetting == null) + { + _durability = DEFAULT_DURABILITY; + } + else + { + _durability = Durability.parse(durabilitySetting); + } + _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE); + _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE); + _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); + + if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) + { + throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } + + super.configure(name, storeConfig); + } + + @Override + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException + { + super.setupStore(storePath, name); + + if(_coalescingSync) + { + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); + } + } + + @Override + protected Environment createEnvironment(File environmentPath) throws DatabaseException + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Environment path " + environmentPath.getAbsolutePath()); + LOGGER.info("Group name " + _groupName); + LOGGER.info("Node name " + _nodeName); + LOGGER.info("Node host port " + _nodeHostPort); + LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); + } + + final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + + replicationConfig.setHelperHosts(_helperHostPort); + replicationConfig.setDesignatedPrimary(_designatedPrimary); + setReplicationConfigProperties(replicationConfig); + + final EnvironmentConfig envConfig = createEnvironmentConfig(); + envConfig.setDurability(_durability); + + ReplicatedEnvironment replicatedEnvironment = null; + try + { + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + + return replicatedEnvironment; + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + + replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + } + + @Override + public synchronized void activate() throws Exception + { + // Before proceeding, perform a log flush with an fsync + getEnvironment().flushLog(true); + + super.activate(); + } + + @Override + public synchronized void passivate() + { + if (_stateManager.isNotInState(State.INITIALISED)) + { + LOGGER.debug("Store becoming passive"); + _stateManager.attainState(State.INITIALISED); + } + } + + public String getName() + { + return _name; + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeName() + { + return _nodeName; + } + + public String getNodeHostPort() + { + return _nodeHostPort; + } + + public String getHelperHostPort() + { + return _helperHostPort; + } + + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() + { + return _coalescingSync; + } + + public String getNodeState() + { + ReplicatedEnvironment.State state = getReplicatedEnvironment().getState(); + return state.toString(); + } + + public Boolean isDesignatedPrimary() + { + return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary(); + } + + public List<Map<String, String>> getGroupMembers() + { + List<Map<String, String>> members = new ArrayList<Map<String,String>>(); + + for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes()) + { + Map<String, String> nodeMap = new HashMap<String, String>(); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(String nodeName) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().removeMember(nodeName); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); + } + } + + public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException + { + try + { + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + synchronized(replicatedEnvironment) + { + final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + replicatedEnvironment.setRepMutableConfig(newConfig); + } + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e); + } + } + + ReplicatedEnvironment getReplicatedEnvironment() + { + return (ReplicatedEnvironment)getEnvironment(); + } + + public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e); + } + } + + @Override + protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + try + { + tx.commit(); + } + catch (DatabaseException de) + { + LOGGER.error("Got DatabaseException on commit, closing environment", de); + + closeEnvironmentSafely(); + + throw de; + } + + if(_coalescingSync) + { + return _commitThreadWrapper.commit(tx, syncCommit); + } + else + { + return StoreFuture.IMMEDIATE_FUTURE; + } + } + + @Override + protected void closeInternal() throws Exception + { + substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); + + try + { + if(_coalescingSync) + { + _commitThreadWrapper.stopCommitThread(); + } + } + finally + { + super.closeInternal(); + } + } + + /** + * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during + * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change. + */ + private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) + { + LOGGER.debug("Substituting no-op state change listener for environment close"); + replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener()); + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_groupName, helpers); + } + + + private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + { + for (Map.Entry<String, String> configItem : _repConfig.entrySet()) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException + { + if (!config.containsKey(key)) + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + + key.replace('.', '/')); + } + return config.getString(key); + } + + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener + { + private final Executor _executor = Executors.newSingleThreadExecutor(); + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Received BDB event indicating transition to state " + state); + } + + switch (state) + { + case MASTER: + activateStoreAsync(); + break; + case REPLICA: + passivateStoreAsync(); + break; + case DETACHED: + LOGGER.error("BDB replicated node in detached state, therefore passivating."); + passivateStoreAsync(); + break; + case UNKNOWN: + LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); + break; + default: + LOGGER.error("Unexpected state change: " + state); + throw new IllegalStateException("Unexpected state change: " + state); + } + } + + /** + * Calls {@link MessageStore#activate()}. + * + * <p/> + * + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * activate may execute transactions, which can't complete until + * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned. + */ + private void activateStoreAsync() + { + String threadName = "BDBHANodeActivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + throw e; + } + return null; + } + }, threadName); + } + + /** + * Calls {@link #passivate()}. + * + * <p/> + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * passivation due to the effect of state change listeners. + */ + private void passivateStoreAsync() + { + String threadName = "BDBHANodePassivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + + @Override + public Void call() throws Exception + { + try + { + passivate(); + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); + throw e; + } + return null; + } + }, threadName); + } + + private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) + { + final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); + + _executor.execute(new Runnable() + { + + @Override + public void run() + { + final String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(threadName); + try + { + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return threadName; + } + }); + + try + { + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); + } + } + finally + { + Thread.currentThread().setName(originalThreadName); + } + } + }); + } + } + + private class NoOpStateChangeListener implements StateChangeListener + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) + throws RuntimeException + { + } + } + + @Override + public String getStoreType() + { + return BDB_HA_STORE_TYPE; + } +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9f7eb4bfd9..82bc3d8564 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,16 +21,12 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; -import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; @@ -46,39 +42,23 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - - private final CommitThread _commitThread = new CommitThread("Commit-Thread"); + private static final String BDB_STORE_TYPE = "BDB"; + private CommitThreadWrapper _commitThreadWrapper; @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { super.setupStore(storePath, name); - startCommitThread(); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException { LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); + EnvironmentConfig envConfig = createEnvironmentConfig(); - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); try { return new Environment(environmentPath, envConfig); @@ -98,12 +78,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore } } - - @Override protected void closeInternal() throws Exception { - stopCommitThread(); + _commitThreadWrapper.stopCommitThread(); super.closeInternal(); } @@ -111,206 +89,26 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException { - tx.commitNoSync(); - - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - - return commitFuture; - } - - private void startCommitThread() - { - _commitThread.start(); - } - - private void stopCommitThread() throws InterruptedException - { - _commitThread.close(); - _commitThread.join(); - } - - private static final class BDBCommitFuture implements StoreFuture - { - private final CommitThread _commitThread; - private final com.sleepycat.je.Transaction _tx; - private DatabaseException _databaseException; - private boolean _complete; - private boolean _syncCommit; - - public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) - { - _commitThread = commitThread; - _tx = tx; - _syncCommit = syncCommit; - } - - public synchronized void complete() + try { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); - } - _complete = true; - - notifyAll(); + tx.commitNoSync(); } - - public synchronized void abort(DatabaseException databaseException) + catch(DatabaseException de) { - _complete = true; - _databaseException = databaseException; + LOGGER.error("Got DatabaseException on commit, closing environment", de); - notifyAll(); - } - - public void commit() throws DatabaseException - { - _commitThread.addJob(this, _syncCommit); - - if(!_syncCommit) - { - LOGGER.debug("CommitAsync was requested, returning immediately."); - return; - } - - waitForCompletion(); - - if (_databaseException != null) - { - throw _databaseException; - } + closeEnvironmentSafely(); + throw de; } - public synchronized boolean isComplete() - { - return _complete; - } - - public synchronized void waitForCompletion() - { - while (!isComplete()) - { - _commitThread.explicitNotify(); - try - { - wait(250); - } - catch (InterruptedException e) - { - //TODO Should we ignore, or throw a 'StoreException'? - throw new RuntimeException(e); - } - } - } + return _commitThreadWrapper.commit(tx, syncCommit); } - /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations - * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before - * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> - */ - private class CommitThread extends Thread + @Override + public String getStoreType() { - private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); - private final CheckpointConfig _config = new CheckpointConfig(); - private final Object _lock = new Object(); - - public CommitThread(String name) - { - super(name); - _config.setForce(true); - - } - - public void explicitNotify() - { - synchronized (_lock) - { - _lock.notify(); - } - } - - public void run() - { - while (!_stopped.get()) - { - synchronized (_lock) - { - while (!_stopped.get() && !hasJobs()) - { - try - { - // RHM-7 Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) - { - } - } - } - processJobs(); - } - } - - private void processJobs() - { - int size = _jobQueue.size(); - - try - { - getEnvironment().flushLog(true); - - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.complete(); - } - - } - catch (DatabaseException e) - { - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.abort(e); - } - } - - } - - private boolean hasJobs() - { - return !_jobQueue.isEmpty(); - } - - public void addJob(BDBCommitFuture commit, final boolean sync) - { - - _jobQueue.add(commit); - if(sync) - { - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - - public void close() - { - synchronized (_lock) - { - _stopped.set(true); - _lock.notifyAll(); - } - } + return BDB_STORE_TYPE; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java new file mode 100644 index 0000000000..fe1556b5a6 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -0,0 +1,265 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreFuture; + +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; + +public class CommitThreadWrapper +{ + private final CommitThread _commitThread; + + public CommitThreadWrapper(String name, Environment env) + { + _commitThread = new CommitThread(name, env); + } + + public void startCommitThread() + { + _commitThread.start(); + } + + public void stopCommitThread() throws InterruptedException + { + _commitThread.close(); + _commitThread.join(); + } + + public StoreFuture commit(Transaction tx, boolean syncCommit) + { + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + return commitFuture; + } + + private static final class BDBCommitFuture implements StoreFuture + { + private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + + private final CommitThread _commitThread; + private final Transaction _tx; + private DatabaseException _databaseException; + private boolean _complete; + private boolean _syncCommit; + + public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + { + _commitThread = commitThread; + _tx = tx; + _syncCommit = syncCommit; + } + + public synchronized void complete() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + } + _complete = true; + + notifyAll(); + } + + public synchronized void abort(DatabaseException databaseException) + { + _complete = true; + _databaseException = databaseException; + + notifyAll(); + } + + public void commit() throws DatabaseException + { + _commitThread.addJob(this, _syncCommit); + + if(!_syncCommit) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + return; + } + + waitForCompletion(); + + if (_databaseException != null) + { + throw _databaseException; + } + + } + + public synchronized boolean isComplete() + { + return _complete; + } + + public synchronized void waitForCompletion() + { + while (!isComplete()) + { + _commitThread.explicitNotify(); + try + { + wait(250); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + } + + /** + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before + * continuing, but it is the responsibility of this thread to tell the commit operations when they have been + * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> + */ + private static class CommitThread extends Thread + { + private static final Logger LOGGER = Logger.getLogger(CommitThread.class); + + private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final CheckpointConfig _config = new CheckpointConfig(); + private final Object _lock = new Object(); + private Environment _environment; + + public CommitThread(String name, Environment env) + { + super(name); + _config.setForce(true); + _environment = env; + } + + public void explicitNotify() + { + synchronized (_lock) + { + _lock.notify(); + } + } + + public void run() + { + while (!_stopped.get()) + { + synchronized (_lock) + { + while (!_stopped.get() && !hasJobs()) + { + try + { + // RHM-7 Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) + { + } + } + } + processJobs(); + } + } + + private void processJobs() + { + int size = _jobQueue.size(); + + try + { + _environment.flushLog(true); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.complete(); + } + + } + catch (DatabaseException e) + { + try + { + LOGGER.error("Exception during environment log flush", e); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.abort(e); + } + } + finally + { + LOGGER.error("Closing store environment", e); + + try + { + _environment.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + } + } + } + + private boolean hasJobs() + { + return !_jobQueue.isEmpty(); + } + + public void addJob(BDBCommitFuture commit, final boolean sync) + { + + _jobQueue.add(commit); + if(sync) + { + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + + public void close() + { + synchronized (_lock) + { + _stopped.set(true); + _lock.notifyAll(); + } + } + } +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java index 8b84a4c9bb..945bcf1d28 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store.berkeleydb.tuple; import org.apache.qpid.server.store.ConfiguredObjectRecord; diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java index 0b14080486..dc68837d47 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java @@ -1,5 +1,3 @@ -package org.apache.qpid.server.store.berkeleydb.upgrade; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +18,8 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; * under the License. * */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + import com.sleepycat.je.Cursor; import com.sleepycat.je.CursorConfig; import com.sleepycat.je.Database; diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 3265fb6823..2b7c782ad1 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; import org.apache.qpid.server.util.MapJsonSerializer; @@ -93,6 +94,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private MapJsonSerializer _serializer = new MapJsonSerializer(); + private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString())); + /** * Upgrades from a v5 database to a v6 database * @@ -380,7 +383,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade for (int i = 0; i < newDequeues.length; i++) { OldRecordImpl dequeue = oldDequeues[i]; - UUID id = UUIDGenerator.generateUUID(dequeue.getQueueName(), virtualHostName); + UUID id = UUIDGenerator.generateQueueUUID(dequeue.getQueueName(), virtualHostName); newDequeues[i] = new NewRecordImpl(id, dequeue.getMessageNumber()); } } @@ -390,7 +393,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade for (int i = 0; i < newEnqueues.length; i++) { OldRecordImpl enqueue = oldEnqueues[i]; - UUID id = UUIDGenerator.generateUUID(enqueue.getQueueName(), virtualHostName); + UUID id = UUIDGenerator.generateQueueUUID(enqueue.getQueueName(), virtualHostName); newEnqueues[i] = new NewRecordImpl(id, enqueue.getMessageNumber()); } } @@ -420,7 +423,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade Transaction transaction, DatabaseEntry key, DatabaseEntry value) { OldQueueEntryKey oldEntryRecord = oldBinding.entryToObject(key); - UUID queueId = UUIDGenerator.generateUUID(oldEntryRecord.getQueueName().asString(), virtualHostName); + UUID queueId = UUIDGenerator.generateQueueUUID(oldEntryRecord.getQueueName().asString(), virtualHostName); NewQueueEntryKey newEntryRecord = new NewQueueEntryKey(queueId, oldEntryRecord.getMessageId()); DatabaseEntry newKey = new DatabaseEntry(); @@ -455,7 +458,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade String routingKey = bindingRecord.getRoutingKey().asString(); FieldTable arguments = bindingRecord.getArguments(); - UUID bindingId = UUIDGenerator.generateUUID(); + UUID bindingId = UUIDGenerator.generateBindingUUID(exchangeName, queueName, routingKey, virtualHostName); UpgradeConfiguredObjectRecord configuredObject = createBindingConfiguredObjectRecord(exchangeName, queueName, routingKey, arguments, virtualHostName); storeConfiguredObjectEntry(configuredObjectsDatabase, bindingId, configuredObject, transaction); @@ -489,7 +492,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade String exchangeType = exchangeRecord.getType().asString(); boolean autoDelete = exchangeRecord.isAutoDelete(); - UUID exchangeId = UUIDGenerator.generateUUID(exchangeName, virtualHostName); + UUID exchangeId = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostName); UpgradeConfiguredObjectRecord configuredObject = createExchangeConfiguredObjectRecord(exchangeName, exchangeType, autoDelete); @@ -526,7 +529,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade boolean exclusive = queueRecord.isExclusive(); FieldTable arguments = queueRecord.getArguments(); - UUID queueId = UUIDGenerator.generateUUID(queueName, virtualHostName); + UUID queueId = UUIDGenerator.generateQueueUUID(queueName, virtualHostName); UpgradeConfiguredObjectRecord configuredObject = createQueueConfiguredObjectRecord(queueName, owner, exclusive, arguments); storeConfiguredObjectEntry(configuredObjectsDatabase, queueId, configuredObject, transaction); @@ -554,17 +557,49 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive, FieldTable arguments) { + Map<String, Object> attributesMap = buildQueueArgumentMap(queueName, + owner, exclusive, arguments); + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); + return configuredObject; + } + + private Map<String, Object> buildQueueArgumentMap(String queueName, + String owner, boolean exclusive, FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); attributesMap.put(Queue.NAME, queueName); - attributesMap.put(Queue.OWNER, owner); attributesMap.put(Queue.EXCLUSIVE, exclusive); + + FieldTable argumentsCopy = new FieldTable(); if (arguments != null) { - attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments)); + argumentsCopy.addAll(arguments); } - String json = _serializer.serialize(attributesMap); - UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); - return configuredObject; + + if (moveNonExclusiveOwnerToDescription(owner, exclusive)) + { + _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + + attributesMap.put(Queue.OWNER, null); + argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner); + } + else + { + attributesMap.put(Queue.OWNER, owner); + } + if (!argumentsCopy.isEmpty()) + { + attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy)); + } + return attributesMap; + } + + private boolean moveNonExclusiveOwnerToDescription(String owner, + boolean exclusive) + { + return exclusive == false && owner != null && _moveNonExclusiveQueueOwnerToDescription; } private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, @@ -585,8 +620,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade { Map<String, Object> attributesMap = new HashMap<String, Object>(); attributesMap.put(Binding.NAME, routingKey); - attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, virtualHostName)); - attributesMap.put(Binding.QUEUE, UUIDGenerator.generateUUID(queueName, virtualHostName)); + attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostName)); + attributesMap.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queueName, virtualHostName)); if (arguments != null) { attributesMap.put(Binding.ARGUMENTS, FieldTable.convertToMap(arguments)); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index e71e39cbb8..f1ab012efc 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -24,7 +24,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -63,7 +63,7 @@ public class Upgrader if(versionDb.count() == 0L) { - int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion(); + int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion(); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(sourceVersion, key); DatabaseEntry value = new DatabaseEntry(); @@ -87,7 +87,7 @@ public class Upgrader int getSourceVersion(Database versionDb) { - int version = BDBMessageStore.VERSION + 1; + int version = AbstractBDBMessageStore.VERSION + 1; OperationStatus result; do @@ -106,7 +106,7 @@ public class Upgrader void performUpgradeFromVersion(int sourceVersion, Database versionDb) throws AMQStoreException { - while(sourceVersion != BDBMessageStore.VERSION) + while(sourceVersion != AbstractBDBMessageStore.VERSION) { upgrade(sourceVersion, ++sourceVersion); DatabaseEntry key = new DatabaseEntry(); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java new file mode 100644 index 0000000000..a04fb20680 --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetAddress; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + + +public class BDBHAMessageStoreTest extends QpidTestCase +{ + private static final String TEST_LOG_FILE_MAX = "1000000"; + private static final String TEST_ELECTION_RETRIES = "1000"; + private static final String TEST_NUMBER_OF_THREADS = "10"; + private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; + private String _groupName; + private String _workDir; + private int _masterPort; + private String _host; + private XMLConfiguration _configXml; + + public void setUp() throws Exception + { + super.setUp(); + + _workDir = TMP_FOLDER + File.separator + getName(); + _host = InetAddress.getByName("localhost").getHostAddress(); + _groupName = "group" + getName(); + _masterPort = -1; + + FileUtils.delete(new File(_workDir), true); + _configXml = new XMLConfiguration(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_workDir), true); + super.tearDown(); + } + + public void testSetSystemConfiguration() throws Exception + { + // create virtual host configuration, registry and host instance + addVirtualHostConfiguration(); + TestApplicationRegistry registry = initialize(); + try + { + VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort); + BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore(); + + // test whether JVM system settings were applied + Environment env = store.getEnvironment(); + assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); + assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); + assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, + repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); + assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, + repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void addVirtualHostConfiguration() throws Exception + { + int port = findFreePort(); + if (_masterPort == -1) + { + _masterPort = port; + } + String nodeName = getNodeNameForNodeAt(port); + + String vhostName = "test" + port; + String vhostPrefix = "virtualhosts.virtualhost." + vhostName; + + _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); + _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); + _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator + + port); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort", + getNodeHostPortForNodeAt(port)); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort", + getHelperHostPort()); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT); + } + + private String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + getName() + bdbPort; + } + + private String getNodeHostPortForNodeAt(final int bdbPort) + { + return _host + ":" + bdbPort; + } + + private String getHelperHostPort() + { + if (_masterPort == -1) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + return _host + ":" + _masterPort; + } + + private TestApplicationRegistry initialize() throws Exception + { + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + ServerConfiguration configuration = new ServerConfiguration(_configXml); + TestApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort); + return registry; + } +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index 687c671566..5cc436a22a 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.store.DurableConfigurationStoreTest; diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..fe48e29d0b --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); + + /* + * Notes on calculation of quota limits. + * + * 150 32kb messages is approximately 4.8MB which is greater than + * OVERFULL_SIZE. + * + * We deliberately use settings that force BDB to use multiple log files, so + * that when one or more of them are subsequently cleaned (following message + * consumption) the actual size on disk is reduced. + */ + + private static final String MAX_BDB_LOG_SIZE = "1000000"; // ~1MB + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 150; + + private static final int OVERFULL_SIZE = 4000000; // ~4MB + private static final int UNDERFULL_SIZE = 3500000; // ~3.5MB + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(XMLConfiguration config) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + config.addProperty("envConfig(-1).name", "je.log.fileMax"); + config.addProperty("envConfig.value", MAX_BDB_LOG_SIZE); + config.addProperty("overfull-size", OVERFULL_SIZE); + config.addProperty("underfull-size", UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + MessageStore store = new BDBMessageStore(); + return store; + } +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a318187f13..eef9f7eab4 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -55,7 +55,7 @@ import org.apache.qpid.transport.MessageTransfer; /** * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the DBB store-implementation. + * the BDB Store as well as additional tests specific to the BDB store-implementation. */ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest { @@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); // Create content ByteBuffers. // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. @@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto * Use this method instead of reloading the virtual host like other tests in order * to avoid the recovery handler deleting the message for not being on a queue. */ - private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception + private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception { messageStore.close(); - BDBMessageStore newStore = new BDBMessageStore(); + AbstractBDBMessageStore newStore = new BDBMessageStore(); newStore.configure("", _config.subset("store")); newStore.startWithNoRecover(); @@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testGetContentWithOffset() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testMessageCreationAndRemoval() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Retrieved content when none was expected", 0, bdbStore.getContent(messageid_0_8, 0, dst)); } - private BDBMessageStore assertBDBStore(MessageStore store) + private AbstractBDBMessageStore assertBDBStore(MessageStore store) { assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - return (BDBMessageStore) store; + return (AbstractBDBMessageStore) store; } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) @@ -405,9 +405,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); - final UUID mockQueueId = UUIDGenerator.generateUUID(); + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { @Override @@ -443,9 +443,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); - final UUID mockQueueId = UUIDGenerator.generateUUID(); + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { @Override @@ -484,9 +484,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); - final UUID mockQueueId = UUIDGenerator.generateUUID(); + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { @Override diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java new file mode 100644 index 0000000000..c6a9ba8f8b --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + * + * @see HAClusterWhiteboxTest + */ +public class HAClusterBlackboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverAwaitingListener; + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + + _clusterCreator.startCluster(); + _failoverAwaitingListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfMasterNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _clusterCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception + { + LOGGER.info("Connecting to " + _brokerFailoverUrl); + final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.stopNode(inactiveBrokerPort); + + _failoverAwaitingListener.assertFailoverDoesNotOccur(2000); + + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void assertFailoverOccurs(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); + } + + public void assertFailoverDoesNotOccur(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount()); + } + + + @Override + public void failoverComplete() + { + _failoverLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java new file mode 100644 index 0000000000..408643b98a --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The HA white box tests test the BDB cluster where the test retains the knowledge of the + * individual test nodes. It uses this knowledge to examine the nodes to ensure that they + * remain in the correct state throughout the test. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterWhiteboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + + private final int NUMBER_OF_NODES = 3; + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testClusterPermitsConnectionToOnlyOneNode() throws Exception + { + int connectionSuccesses = 0; + int connectionFails = 0; + + for (int brokerPortNumber : getBrokerPortNumbers()) + { + try + { + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); + connectionSuccesses++; + } + catch(JMSException e) + { + assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + connectionFails++; + } + } + + assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); + assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); + } + + public void testClusterThatLosesNodeStillAllowsConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + + // verify that JMS persistence operations are working + assertProducingConsuming(subsequentConnection); + + closeConnection(initialConnection); + } + + public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); + + killBroker(subsequentPortNumber); + + final Connection finalConnection = getConnectionToNodeInCluster(); + assertNull(finalConnection); + + closeConnection(initialConnection); + } + + public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception + { + final Connection connection = getConnectionToNodeInCluster(); + assertNotNull(connection); + + final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); + connection.close(); + + _clusterCreator.stopNode(brokerPortNumber); + _clusterCreator.startNode(brokerPortNumber); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + } + + public void testClusterLosingNodeRetainsData() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + + final String queueNamePrefix = getTestQueueName(); + final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; + final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; + + populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + + assertNotNull("no valid connection obtained", subsequentConnection); + + checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + } + + public void xtestRecoveryOfOutOfDateNode() throws Exception + { + /* + * TODO: Implement + * + * Cant yet find a way to control cleaning in a deterministic way to allow provoking + * a node to become out of date. We do now know that even a new joiner to the group + * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has + * done *any* cleaning and then adding a new node should be sufficient to cause this. + */ + } + + private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception + { + populateBrokerWithData(connection, 1, queueUrls); + } + + private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception + { + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + session.createConsumer(queue).close(); + sendMessage(session, queue, noOfMessages); + } + } + + private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException + { + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + final MessageConsumer consumer = session.createConsumer(queue); + final Message message = consumer.receive(1000); + session.commit(); + assertNotNull("Queue " + queue + " should have message", message); + assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); + } + } + + private Connection getConnectionToNodeInCluster() throws URLSyntaxException + { + Connection connection = null; + Set<Integer> runningBrokerPorts = getBrokerPortNumbers(); + + for (int brokerPortNumber : runningBrokerPorts) + { + try + { + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); + break; + } + catch(JMSException je) + { + assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + } + } + return connection; + } + + private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception + { + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + initialConnection.close(); + + killBroker(initialPortNumber); // kill awaits the death of the child + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + Message m2 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 2 is not received", m2); + assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); + session.commit(); + } + + private void closeConnection(final Connection initialConnection) + { + try + { + initialConnection.close(); + } + catch(Exception e) + { + // ignore. + // java.net.SocketException is seen sometimes on active connection + } + } +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index 7e5ef3f94c..eaa3c3eba4 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -19,22 +19,25 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreFactory; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.test.utils.QpidTestCase; -public class BDBMessageStoreFactory implements MessageStoreFactory +public class HAMessageStoreSmokeTest extends QpidTestCase { + private final BDBHAMessageStore _store = new BDBHAMessageStore(); + private final XMLConfiguration _config = new XMLConfiguration(); - @Override - public MessageStore createMessageStore() + public void testMissingHAConfigThrowsException() throws Exception { - return new BDBMessageStore(); + try + { + _store.configure("test", _config); + fail("Expected an exception to be thrown"); + } + catch (ConfigurationException ce) + { + assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); + } } - - @Override - public String getStoreClassName() - { - return BDBMessageStore.class.getSimpleName(); - } - -} +}
\ No newline at end of file diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java new file mode 100644 index 0000000000..abe13edc32 --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +public class HATestClusterCreator +{ + protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + + private static final int FAILOVER_CYCLECOUNT = 10; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 60; + private static final int CONNECTDELAY = 75; + + private final QpidBrokerTestCase _testcase; + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); + private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); + private final String _virtualHostName; + private final String _storeConfigKeyPrefix; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + + public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = "group" + _testcase.getName(); + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + configureClusterNode(brokerPort, bdbPort); + collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts()); + + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + } + + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); + final String configKey = getConfigKey("highAvailability.designatedPrimary"); + brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary)); + _primaryBrokerPort = brokerConfigEntry.getKey(); + } + + /** + * @param configKeySuffix "highAvailability.designatedPrimary", for example + * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example + */ + private String getConfigKey(String configKeySuffix) + { + final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts."); + return configKey; + } + + public void startNode(final int brokerPortNumber) throws Exception + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + + _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration()); + _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); + + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); + try + { + List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + Future<Object> future = executor.submit(new Callable<Object>() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), + brokerConfigHolder.getTestVirtualhosts()); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future<Object> future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.stopBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set<Integer> getBdbPortNumbers() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); + } + + public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); + } + + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException + { + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set<Integer> portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set<Integer> getBrokerPortNumbersForNodes() + { + return new HashSet<Integer>(_brokerConfigurations.keySet()); + } + + private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception + { + final String nodeName = getNodeNameForNodeAt(bdbPort); + + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + } + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(), + (XMLConfiguration) testVirtualhosts.clone())); + } + + public class BrokerConfigHolder + { + private final XMLConfiguration _testConfiguration; + private final XMLConfiguration _testVirtualhosts; + + public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _testConfiguration = testConfiguration; + _testVirtualhosts = testVirtualhosts; + } + + public XMLConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); + final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); + + final String configKey = getConfigKey("highAvailability.nodeHostPort"); + final String oldBdbHostPort = virtualHostConfig.getString(configKey); + + final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + final String oldHost = oldHostAndPort[0]; + + final String newBdbHostPort = oldHost + ":" + newBdbPort; + + virtualHostConfig.setProperty(configKey, newBdbHostPort); + collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); + } + + public String getStoreConfigKeyPrefix() + { + return _storeConfigKeyPrefix; + } + + +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java index f8aeb7f7b0..7f93f5691e 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -42,7 +42,7 @@ public class ConfiguredObjectBindingTest extends TestCase { super.setUp(); _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecord(UUIDGenerator.generateUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); + _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); } public void testObjectToEntryAndEntryToObject() diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index 36991b90d0..cd2654f79f 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -52,8 +52,8 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase } public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue", - "queue-non-durable" }; - public static int[] QUEUE_SIZES = { 1, 1, 10, 3 }; + "queue-non-durable", "nonexclusive-with-erroneous-owner" }; + public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0}; public static int TOTAL_MESSAGE_NUMBER = 15; protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index 3f9e4e4aa1..65a8bb03fb 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -23,10 +23,13 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; @@ -49,6 +52,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME; + private static final String NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName"; private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName"; private static final String EXCHANGE_DB_NAME = "exchangeDb_v5"; @@ -87,6 +91,10 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null); + assertBindingRecord(queueBindings, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, null); + + assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertContent(); } @@ -94,7 +102,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); - assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE))); + assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER))); assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12); @@ -112,6 +120,9 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); + + assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertContent(); } @@ -257,7 +268,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase private void assertQueues(Set<String> expectedQueueNames) { - List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>(); + List<AMQShortString> durableSubNames = Collections.emptyList(); final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); final Set<String> actualQueueNames = new HashSet<String>(); @@ -278,6 +289,35 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames); } + private void assertQueueHasOwner(String queueName, final String expectedOwner) + { + List<AMQShortString> durableSubNames = Collections.emptyList(); + final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); + final AtomicReference<String> actualOwner = new AtomicReference<String>(); + final AtomicBoolean foundQueue = new AtomicBoolean(false); + + CursorOperation queueNameCollector = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + QueueRecord record = binding.entryToObject(value); + String queueName = record.getNameShortString().asString(); + if (queueName.equals(queueName)) + { + foundQueue.set(true); + actualOwner.set(AMQShortString.toString(record.getOwner())); + } + } + }; + new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector); + + assertTrue("Could not find queue in database", foundQueue.get()); + assertEquals("Queue has unexpected owner", expectedOwner, actualOwner.get()); + } + private void assertContent() { final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding(); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 5297692820..2d2a6b20a2 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -29,8 +29,11 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OL import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME; import java.io.File; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -41,6 +44,7 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; @@ -163,11 +167,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase NewRecordImpl[] newDequeues = newTransaction.getDequeues(); assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); NewRecordImpl enqueue = newEnqueues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST1", getVirtualHostName()), enqueue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHostName()), enqueue.getId()); assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); assertEquals("Unxpected new dequeues number", 1, newDequeues.length); NewRecordImpl dequeue = newDequeues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST2", getVirtualHostName()), dequeue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHostName()), dequeue.getId()); assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); } @@ -260,7 +264,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private void assertDatabaseRecordCounts() { - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 12); assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12); assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); @@ -270,64 +274,27 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private void assertConfiguredObjects() { Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 9, configuredObjects.size()); - - Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(9); - Map<String, Object> queue1 = new HashMap<String, Object>(); - queue1.put("exclusive", Boolean.FALSE); - queue1.put("name", "myUpgradeQueue"); - queue1.put("owner", null); - expected.add(queue1); - Map<String, Object> queue2 = new HashMap<String, Object>(); - queue2.put("exclusive", Boolean.TRUE); - queue2.put("name", "clientid:mySelectorDurSubName"); - queue2.put("owner", "clientid"); - expected.add(queue2); - Map<String, Object> queue3 = new HashMap<String, Object>(); - queue3.put("exclusive", Boolean.TRUE); - queue3.put("name", "clientid:myDurSubName"); - queue3.put("owner", "clientid"); - expected.add(queue3); - - Map<String, Object> queueBinding1 = new HashMap<String, Object>(); - queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); - queueBinding1.put("name", "myUpgradeQueue"); - queueBinding1.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); - expected.add(queueBinding1); - Map<String, Object> queueBinding2 = new HashMap<String, Object>(); - queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); - queueBinding2.put("name", "myUpgradeQueue"); - queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString()); - Map<String, Object> arguments2 = new HashMap<String, Object>(); - arguments2.put("x-filter-jms-selector", ""); - queueBinding2.put("arguments", arguments2); - expected.add(queueBinding2); - Map<String, Object> queueBinding3 = new HashMap<String, Object>(); - queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); - queueBinding3.put("name", "myUpgradeTopic"); - queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); - Map<String, Object> arguments3 = new HashMap<String, Object>(); - arguments3.put("x-filter-jms-selector", ""); - queueBinding3.put("arguments", arguments3); - expected.add(queueBinding3); - Map<String, Object> queueBinding4 = new HashMap<String, Object>(); - queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); - queueBinding4.put("name", "mySelectorUpgradeTopic"); - queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); - Map<String, Object> arguments4 = new HashMap<String, Object>(); - arguments4.put("x-filter-jms-selector", "testprop='true'"); - queueBinding4.put("arguments", arguments4); - expected.add(queueBinding4); - Map<String, Object> queueBinding5 = new HashMap<String, Object>(); - queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); - queueBinding5.put("name", "clientid:myDurSubName"); - queueBinding5.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); - expected.add(queueBinding5); - Map<String, Object> queueBinding6 = new HashMap<String, Object>(); - queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); - queueBinding6.put("name", "clientid:mySelectorDurSubName"); - queueBinding6.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); - expected.add(queueBinding6); + assertEquals("Unexpected number of configured objects", 12, configuredObjects.size()); + + Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12); + List<UUID> expectedBindingIDs = new ArrayList<UUID>(); + + expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null)); + expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null)); + expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null)); + expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null, + Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description"))); + + expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "myUpgradeTopic", "amq.topic", + Collections.singletonMap("x-filter-jms-selector", ""), expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "mySelectorUpgradeTopic", "amq.topic", + Collections.singletonMap("x-filter-jms-selector", "testprop='true'"), expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "clientid:myDurSubName", "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "clientid:mySelectorDurSubName", "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs)); Set<String> expectedTypes = new HashSet<String>(); expectedTypes.add(Queue.class.getName()); @@ -337,21 +304,63 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) { UpgradeConfiguredObjectRecord object = entry.getValue(); - UUID key = entry.getKey(); Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes()); assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized)); String type = object.getType(); assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); - if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName())) + UUID key = entry.getKey(); + + assertNotNull("Key cannot be null", key); + + if (type.equals(Exchange.class.getName())) + { + String exchangeName = (String) deserialized.get(Exchange.NAME); + assertNotNull(exchangeName); + assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName())); + } + else if (type.equals(Queue.class.getName())) { - assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName())); + String queueName = (String) deserialized.get(Queue.NAME); + assertNotNull(queueName); + assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHostName())); } - else + else if (type.equals(Binding.class.getName())) { - assertNotNull("Key cannot be null", key); + assertTrue("unexpected binding id", expectedBindingIDs.remove(key)); } } + assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty()); + assertTrue("Not all expected bindings found:" + expectedBindingIDs, expectedBindingIDs.isEmpty()); + } + + private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs) + { + Map<String, Object> expectedQueueBinding = new HashMap<String, Object>(); + expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.NAME, bindingName); + expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()).toString()); + if (argumentMap != null) + { + expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap); + } + + expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHostName())); + + return expectedQueueBinding; + } + + private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, String> argumentMap) + { + Map<String, Object> expectedQueueEntry = new HashMap<String, Object>(); + expectedQueueEntry.put(Queue.NAME, name); + expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag); + expectedQueueEntry.put(Queue.OWNER, owner); + if (argumentMap != null) + { + expectedQueueEntry.put(Queue.ARGUMENTS, argumentMap); + } + return expectedQueueEntry; } private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index ba5ca842bf..23fd9bc24f 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -24,7 +24,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import com.sleepycat.bind.tuple.IntegerBinding; @@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase { assertEquals("Unexpected store version", -1, getStoreVersion()); _upgrader.upgradeIfNecessary(); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); assertContent(); } @@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase List<String> expectedDatabases = new ArrayList<String>(); expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); nonExistentStoreLocation.delete(); } diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb Binary files differindex 167ab7f0ca..f5ed9aa5a2 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb Binary files differindex d44b21a83e..f5ed9aa5a2 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb Binary files differindex 9b85860c19..d5ae8c1096 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb |