summaryrefslogtreecommitdiff
path: root/java/bdbstore/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /java/bdbstore/src
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/bdbstore/src')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java388
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java607
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java234
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java265
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java20
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java61
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java8
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java165
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java20
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java73
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java28
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java165
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java266
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java (renamed from java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java)31
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java423
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java2
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java4
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java44
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java141
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java6
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdbbin1357197 -> 1361990 bytes
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdbbin1357227 -> 1361990 bytes
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdbbin1332881 -> 1333643 bytes
24 files changed, 2484 insertions, 471 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index fb1d7c5265..9323111fdd 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -29,14 +30,17 @@ import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -44,6 +48,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
@@ -53,28 +58,13 @@ import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectHelper;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -96,7 +86,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public static final int VERSION = 6;
- public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+ private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+ }});
private Environment _environment;
@@ -145,36 +138,44 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected final StateManager _stateManager;
- protected TransactionConfig _transactionConfig = new TransactionConfig();
-
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
-
+
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+ private Map<String, String> _envConfigMap;
+
public AbstractBDBMessageStore()
{
_stateManager = new StateManager(_eventManager);
}
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = recoveryHandler;
- configure(name,storeConfiguration);
-
-
-
+ configure(name, storeConfiguration);
}
public void configureMessageStore(String name,
@@ -185,12 +186,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
- public void activate() throws Exception
+ public synchronized void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
recoverConfig(_configRecoveryHandler);
recoverMessages(_messageRecoveryHandler);
@@ -204,7 +205,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return new BDBTransaction();
}
-
/**
* Called after instantiation in order to configure the message store.
*
@@ -215,9 +215,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void configure(String name, Configuration storeConfig) throws Exception
{
- final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ final String storeLocation = storeConfig.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY,
System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name);
+ _persistentSizeHighThreshold = storeConfig.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
+ _persistentSizeLowThreshold = storeConfig.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
File environmentPath = new File(storeLocation);
if (!environmentPath.exists())
{
@@ -230,11 +237,39 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_storeLocation = storeLocation;
+ _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig");
+
LOGGER.info("Configuring BDB message store");
setupStore(environmentPath, name);
}
+ protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException
+ {
+ final List<Object> argumentNames = config.getList(prefix + ".name");
+ final List<Object> argumentValues = config.getList(prefix + ".value");
+ final int initialSize = argumentNames.size() + defaultConfig.size();
+
+ final Map<String,String> attributes = new HashMap<String,String>(initialSize);
+ attributes.putAll(defaultConfig);
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i).toString();
+ final String argValue = argumentValues.get(i).toString();
+
+ attributes.put(argName, argValue);
+ }
+
+ return Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+
/**
* Move the store state from INITIAL to ACTIVE without actually recovering.
*
@@ -244,9 +279,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
void startWithNoRecover() throws AMQStoreException
{
- _stateManager.attainState(State.CONFIGURING);
- _stateManager.attainState(State.CONFIGURED);
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.INITIALISING);
+ _stateManager.attainState(State.INITIALISED);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
@@ -257,54 +292,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
new Upgrader(_environment, name).upgradeIfNecessary();
openDatabases();
- }
-
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
- {
- LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
- try
- {
- return new Environment(environmentPath, envConfig);
- }
- catch (DatabaseException de)
- {
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
- {
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- if (_environment != null )
- {
- _environment.cleanLog();
- _environment.close();
- }
- return new Environment(environmentPath, envConfig);
- }
- else
- {
- throw de;
- }
- }
+ _totalStoreSize = getSizeOnDisk();
}
+ protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException;
+
public Environment getEnvironment()
{
return _environment;
@@ -343,14 +336,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void close() throws Exception
{
- if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED))
- {
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
-
- closeInternal();
-
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
- }
+ _stateManager.attainState(State.CLOSING);
+ closeInternal();
+ _stateManager.attainState(State.CLOSED);
}
protected void closeInternal() throws Exception
@@ -409,8 +397,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may not
// get a chance to finish.
- _environment.cleanLog();
- _environment.close();
+ try
+ {
+ _environment.cleanLog();
+ }
+ finally
+ {
+ _environment.close();
+ }
}
}
@@ -420,16 +414,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
try
{
List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
_configuredObjectHelper.recoverExchanges(erh, configuredObjects);
- BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (DatabaseException e)
@@ -552,7 +546,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
mrh.message(message);
maxId = Math.max(maxId, messageId);
@@ -950,10 +945,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (_stateManager.isInState(State.ACTIVE))
{
DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+ UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(),value);
+ LongBinding.longToEntry(link.getCreateTime(), value);
StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
try
@@ -971,7 +966,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+ UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
try
{
OperationStatus status = _linkDb.delete(null, key);
@@ -991,10 +986,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (_stateManager.isInState(State.ACTIVE))
{
DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
DatabaseEntry value = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value);
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value);
LongBinding.longToEntry(bridge.getCreateTime(),value);
StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
@@ -1014,7 +1009,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void deleteBridge(final Bridge bridge) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
try
{
OperationStatus status = _bridgeDb.delete(null, key);
@@ -1247,7 +1242,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
/**
* Primarily for testing purposes.
*
- * @param queueName
+ * @param queueId
*
* @return a list of message ids for messages enqueued for a particular queue
*/
@@ -1494,6 +1489,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return true;
}
+ @SuppressWarnings("unchecked")
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
if(metaData.isPersistent())
@@ -1580,34 +1576,29 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private final long _messageId;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private final boolean _isRecovered;
private StorableMessageMetaData _metaData;
- private volatile SoftReference<byte[]> _dataRef;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+
private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
-
- StoredBDBMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
{
- try
- {
- _messageId = messageId;
- _metaData = metaData;
+ _messageId = messageId;
+ _isRecovered = isRecovered;
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-
- }
- catch (DatabaseException e)
+ if(!_isRecovered)
{
- throw new RuntimeException(e);
+ _metaData = metaData;
}
-
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
public StorableMessageMetaData getMetaData()
@@ -1697,8 +1688,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
synchronized void store(com.sleepycat.je.Transaction txn)
{
-
- if(_metaData != null)
+ if (!stored())
{
try
{
@@ -1730,12 +1720,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public synchronized StoreFuture flushToStore()
{
- if(_metaData != null)
+ if(!stored())
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
store(txn);
AbstractBDBMessageStore.this.commit(txn,true);
-
+ storedSizeChange(getMetaData().getContentSize());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@@ -1744,18 +1734,27 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
try
{
+ int delta = getMetaData().getContentSize();
AbstractBDBMessageStore.this.removeMessage(_messageId, false);
+ storedSizeChange(-delta);
+
}
catch (AMQStoreException e)
{
throw new RuntimeException(e);
}
}
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
}
private class BDBTransaction implements org.apache.qpid.server.store.Transaction
{
private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
private BDBTransaction()
{
@@ -1765,7 +1764,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
catch (DatabaseException e)
{
- throw new RuntimeException(e);
+ LOGGER.error("Exception during transaction begin, closing store environment.", e);
+ closeEnvironmentSafely();
+
+ throw new RuntimeException("Exception during transaction begin, store environment closed.", e);
}
}
@@ -1773,7 +1775,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
- ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1787,10 +1791,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1811,15 +1817,133 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
+ private void storedSizeChange(final int delta)
{
- _eventManager.addEventListener(eventListener, events);
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
}
- @Override
- public String getStoreLocation()
+ private void reduceSizeOnDisk()
{
- return _storeLocation;
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environment.cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environment.checkpoint(force);
+ }
+
+
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environment.getStats(null).getTotalLogSize();
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+ private void setEnvironmentConfigProperties(EnvironmentConfig envConfig)
+ {
+ for (Map.Entry<String, String> configItem : _envConfigMap.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+
+ protected EnvironmentConfig createEnvironmentConfig()
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+
+ setEnvironmentConfigProperties(envConfig);
+
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+ return envConfig;
+ }
+
+ protected void closeEnvironmentSafely()
+ {
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ catch (IllegalStateException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+
+
+ private class LoggingAsyncExceptionListener implements ExceptionListener
+ {
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '"
+ + event.getThreadName() + "'", event.getException());
+ }
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
new file mode 100644
index 0000000000..c40f24dbc3
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.HAMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY);
+
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ @SuppressWarnings("serial")
+ private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ /**
+ * Parameter decreased as the 24h default may lead very large log files for most users.
+ */
+ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+ /**
+ * Parameter increased as the 5 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+ /**
+ * Parameter increased as the 10 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+ /**
+ * Parameter increased as the 10 h default may cause user confusion.
+ */
+ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+ /**
+ * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+ * is scheduled to become default after JE 5.0.48.
+ */
+ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+ /**
+ * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+ * with NO_SYN durability in case if such Node crushes.
+ */
+ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+ }});
+
+ public static final String BDB_HA_STORE_TYPE = "BDB-HA";
+
+ private String _groupName;
+ private String _nodeName;
+ private String _nodeHostPort;
+ private String _helperHostPort;
+ private Durability _durability;
+
+ private String _name;
+
+ private CommitThreadWrapper _commitThreadWrapper;
+ private boolean _coalescingSync;
+ private boolean _designatedPrimary;
+ private Map<String, String> _repConfig;
+
+ @Override
+ public void configure(String name, Configuration storeConfig) throws Exception
+ {
+ //Mandatory configuration
+ _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig);
+ _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig);
+ _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig);
+ _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig);
+ _name = name;
+
+ //Optional configuration
+ String durabilitySetting = storeConfig.getString("highAvailability.durability");
+ if (durabilitySetting == null)
+ {
+ _durability = DEFAULT_DURABILITY;
+ }
+ else
+ {
+ _durability = Durability.parse(durabilitySetting);
+ }
+ _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE);
+ _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE);
+ _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
+
+ if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ }
+
+ super.configure(name, storeConfig);
+ }
+
+ @Override
+ protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
+ {
+ super.setupStore(storePath, name);
+
+ if(_coalescingSync)
+ {
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
+ }
+ }
+
+ @Override
+ protected Environment createEnvironment(File environmentPath) throws DatabaseException
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Environment path " + environmentPath.getAbsolutePath());
+ LOGGER.info("Group name " + _groupName);
+ LOGGER.info("Node name " + _nodeName);
+ LOGGER.info("Node host port " + _nodeHostPort);
+ LOGGER.info("Helper host port " + _helperHostPort);
+ LOGGER.info("Durability " + _durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
+ }
+
+ final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+
+ replicationConfig.setHelperHosts(_helperHostPort);
+ replicationConfig.setDesignatedPrimary(_designatedPrimary);
+ setReplicationConfigProperties(replicationConfig);
+
+ final EnvironmentConfig envConfig = createEnvironmentConfig();
+ envConfig.setDurability(_durability);
+
+ ReplicatedEnvironment replicatedEnvironment = null;
+ try
+ {
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+
+ return replicatedEnvironment;
+ }
+
+ @Override
+ public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
+
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+
+ replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ }
+
+ @Override
+ public synchronized void activate() throws Exception
+ {
+ // Before proceeding, perform a log flush with an fsync
+ getEnvironment().flushLog(true);
+
+ super.activate();
+ }
+
+ @Override
+ public synchronized void passivate()
+ {
+ if (_stateManager.isNotInState(State.INITIALISED))
+ {
+ LOGGER.debug("Store becoming passive");
+ _stateManager.attainState(State.INITIALISED);
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeName()
+ {
+ return _nodeName;
+ }
+
+ public String getNodeHostPort()
+ {
+ return _nodeHostPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ return _helperHostPort;
+ }
+
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
+ {
+ return _coalescingSync;
+ }
+
+ public String getNodeState()
+ {
+ ReplicatedEnvironment.State state = getReplicatedEnvironment().getState();
+ return state.toString();
+ }
+
+ public Boolean isDesignatedPrimary()
+ {
+ return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String,String>>();
+
+ for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(String nodeName) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
+ }
+ }
+
+ public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException
+ {
+ try
+ {
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+ synchronized(replicatedEnvironment)
+ {
+ final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ replicatedEnvironment.setRepMutableConfig(newConfig);
+ }
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e);
+ }
+ }
+
+ ReplicatedEnvironment getReplicatedEnvironment()
+ {
+ return (ReplicatedEnvironment)getEnvironment();
+ }
+
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ try
+ {
+ tx.commit();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw de;
+ }
+
+ if(_coalescingSync)
+ {
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+ else
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+ }
+
+ @Override
+ protected void closeInternal() throws Exception
+ {
+ substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
+
+ try
+ {
+ if(_coalescingSync)
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ }
+ finally
+ {
+ super.closeInternal();
+ }
+ }
+
+ /**
+ * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during
+ * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change.
+ */
+ private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment)
+ {
+ LOGGER.debug("Substituting no-op state change listener for environment close");
+ replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_groupName, helpers);
+ }
+
+
+ private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+ {
+ for (Map.Entry<String, String> configItem : _repConfig.entrySet())
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+
+ private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
+ {
+ if (!config.containsKey(key))
+ {
+ throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
+ + key.replace('.', '/'));
+ }
+ return config.getString(key);
+ }
+
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+ private final Executor _executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Received BDB event indicating transition to state " + state);
+ }
+
+ switch (state)
+ {
+ case MASTER:
+ activateStoreAsync();
+ break;
+ case REPLICA:
+ passivateStoreAsync();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivateStoreAsync();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ /**
+ * Calls {@link MessageStore#activate()}.
+ *
+ * <p/>
+ *
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * activate may execute transactions, which can't complete until
+ * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
+ */
+ private void activateStoreAsync()
+ {
+ String threadName = "BDBHANodeActivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ /**
+ * Calls {@link #passivate()}.
+ *
+ * <p/>
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * passivation due to the effect of state change listeners.
+ */
+ private void passivateStoreAsync()
+ {
+ String threadName = "BDBHANodePassivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ passivate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+ {
+ final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
+
+ _executor.execute(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ final String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(threadName);
+ try
+ {
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return threadName;
+ }
+ });
+
+ try
+ {
+ callable.call();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception during state change", e);
+ }
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+ });
+ }
+ }
+
+ private class NoOpStateChangeListener implements StateChangeListener
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent)
+ throws RuntimeException
+ {
+ }
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return BDB_HA_STORE_TYPE;
+ }
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9f7eb4bfd9..82bc3d8564 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,16 +21,12 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
@@ -46,39 +42,23 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
-
- private final CommitThread _commitThread = new CommitThread("Commit-Thread");
+ private static final String BDB_STORE_TYPE = "BDB";
+ private CommitThreadWrapper _commitThreadWrapper;
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
super.setupStore(storePath, name);
- startCommitThread();
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
}
protected Environment createEnvironment(File environmentPath) throws DatabaseException
{
LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
+ EnvironmentConfig envConfig = createEnvironmentConfig();
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
try
{
return new Environment(environmentPath, envConfig);
@@ -98,12 +78,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore
}
}
-
-
@Override
protected void closeInternal() throws Exception
{
- stopCommitThread();
+ _commitThreadWrapper.stopCommitThread();
super.closeInternal();
}
@@ -111,206 +89,26 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
{
- tx.commitNoSync();
-
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
- commitFuture.commit();
-
- return commitFuture;
- }
-
- private void startCommitThread()
- {
- _commitThread.start();
- }
-
- private void stopCommitThread() throws InterruptedException
- {
- _commitThread.close();
- _commitThread.join();
- }
-
- private static final class BDBCommitFuture implements StoreFuture
- {
- private final CommitThread _commitThread;
- private final com.sleepycat.je.Transaction _tx;
- private DatabaseException _databaseException;
- private boolean _complete;
- private boolean _syncCommit;
-
- public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
- {
- _commitThread = commitThread;
- _tx = tx;
- _syncCommit = syncCommit;
- }
-
- public synchronized void complete()
+ try
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
- }
- _complete = true;
-
- notifyAll();
+ tx.commitNoSync();
}
-
- public synchronized void abort(DatabaseException databaseException)
+ catch(DatabaseException de)
{
- _complete = true;
- _databaseException = databaseException;
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
- notifyAll();
- }
-
- public void commit() throws DatabaseException
- {
- _commitThread.addJob(this, _syncCommit);
-
- if(!_syncCommit)
- {
- LOGGER.debug("CommitAsync was requested, returning immediately.");
- return;
- }
-
- waitForCompletion();
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
+ closeEnvironmentSafely();
+ throw de;
}
- public synchronized boolean isComplete()
- {
- return _complete;
- }
-
- public synchronized void waitForCompletion()
- {
- while (!isComplete())
- {
- _commitThread.explicitNotify();
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- //TODO Should we ignore, or throw a 'StoreException'?
- throw new RuntimeException(e);
- }
- }
- }
+ return _commitThreadWrapper.commit(tx, syncCommit);
}
- /**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
- * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
- * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
- */
- private class CommitThread extends Thread
+ @Override
+ public String getStoreType()
{
- private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
- private final CheckpointConfig _config = new CheckpointConfig();
- private final Object _lock = new Object();
-
- public CommitThread(String name)
- {
- super(name);
- _config.setForce(true);
-
- }
-
- public void explicitNotify()
- {
- synchronized (_lock)
- {
- _lock.notify();
- }
- }
-
- public void run()
- {
- while (!_stopped.get())
- {
- synchronized (_lock)
- {
- while (!_stopped.get() && !hasJobs())
- {
- try
- {
- // RHM-7 Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- processJobs();
- }
- }
-
- private void processJobs()
- {
- int size = _jobQueue.size();
-
- try
- {
- getEnvironment().flushLog(true);
-
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.complete();
- }
-
- }
- catch (DatabaseException e)
- {
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.abort(e);
- }
- }
-
- }
-
- private boolean hasJobs()
- {
- return !_jobQueue.isEmpty();
- }
-
- public void addJob(BDBCommitFuture commit, final boolean sync)
- {
-
- _jobQueue.add(commit);
- if(sync)
- {
- synchronized (_lock)
- {
- _lock.notifyAll();
- }
- }
- }
-
- public void close()
- {
- synchronized (_lock)
- {
- _stopped.set(true);
- _lock.notifyAll();
- }
- }
+ return BDB_STORE_TYPE;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
new file mode 100644
index 0000000000..fe1556b5a6
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class CommitThreadWrapper
+{
+ private final CommitThread _commitThread;
+
+ public CommitThreadWrapper(String name, Environment env)
+ {
+ _commitThread = new CommitThread(name, env);
+ }
+
+ public void startCommitThread()
+ {
+ _commitThread.start();
+ }
+
+ public void stopCommitThread() throws InterruptedException
+ {
+ _commitThread.close();
+ _commitThread.join();
+ }
+
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final Transaction _tx;
+ private DatabaseException _databaseException;
+ private boolean _complete;
+ private boolean _syncCommit;
+
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ {
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ }
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(DatabaseException databaseException)
+ {
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ _commitThread.addJob(this, _syncCommit);
+
+ if(!_syncCommit)
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ return;
+ }
+
+ waitForCompletion();
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public synchronized void waitForCompletion()
+ {
+ while (!isComplete())
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
+ */
+ private static class CommitThread extends Thread
+ {
+ private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
+
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+ private Environment _environment;
+
+ public CommitThread(String name, Environment env)
+ {
+ super(name);
+ _config.setForce(true);
+ _environment = env;
+ }
+
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // RHM-7 Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ int size = _jobQueue.size();
+
+ try
+ {
+ _environment.flushLog(true);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.complete();
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ try
+ {
+ LOGGER.error("Exception during environment log flush", e);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
+ }
+ finally
+ {
+ LOGGER.error("Closing store environment", e);
+
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+ }
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit, final boolean sync)
+ {
+
+ _jobQueue.add(commit);
+ if(sync)
+ {
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
index 8b84a4c9bb..945bcf1d28 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb.tuple;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
index 0b14080486..dc68837d47 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
@@ -1,5 +1,3 @@
-package org.apache.qpid.server.store.berkeleydb.upgrade;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +18,8 @@ package org.apache.qpid.server.store.berkeleydb.upgrade;
* under the License.
*
*/
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 3265fb6823..2b7c782ad1 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.util.MapJsonSerializer;
@@ -93,6 +94,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
private MapJsonSerializer _serializer = new MapJsonSerializer();
+ private static final boolean _moveNonExclusiveQueueOwnerToDescription = Boolean.parseBoolean(System.getProperty("qpid.move_non_exclusive_queue_owner_to_description", Boolean.TRUE.toString()));
+
/**
* Upgrades from a v5 database to a v6 database
*
@@ -380,7 +383,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
for (int i = 0; i < newDequeues.length; i++)
{
OldRecordImpl dequeue = oldDequeues[i];
- UUID id = UUIDGenerator.generateUUID(dequeue.getQueueName(), virtualHostName);
+ UUID id = UUIDGenerator.generateQueueUUID(dequeue.getQueueName(), virtualHostName);
newDequeues[i] = new NewRecordImpl(id, dequeue.getMessageNumber());
}
}
@@ -390,7 +393,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
for (int i = 0; i < newEnqueues.length; i++)
{
OldRecordImpl enqueue = oldEnqueues[i];
- UUID id = UUIDGenerator.generateUUID(enqueue.getQueueName(), virtualHostName);
+ UUID id = UUIDGenerator.generateQueueUUID(enqueue.getQueueName(), virtualHostName);
newEnqueues[i] = new NewRecordImpl(id, enqueue.getMessageNumber());
}
}
@@ -420,7 +423,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
Transaction transaction, DatabaseEntry key, DatabaseEntry value)
{
OldQueueEntryKey oldEntryRecord = oldBinding.entryToObject(key);
- UUID queueId = UUIDGenerator.generateUUID(oldEntryRecord.getQueueName().asString(), virtualHostName);
+ UUID queueId = UUIDGenerator.generateQueueUUID(oldEntryRecord.getQueueName().asString(), virtualHostName);
NewQueueEntryKey newEntryRecord = new NewQueueEntryKey(queueId, oldEntryRecord.getMessageId());
DatabaseEntry newKey = new DatabaseEntry();
@@ -455,7 +458,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
String routingKey = bindingRecord.getRoutingKey().asString();
FieldTable arguments = bindingRecord.getArguments();
- UUID bindingId = UUIDGenerator.generateUUID();
+ UUID bindingId = UUIDGenerator.generateBindingUUID(exchangeName, queueName, routingKey, virtualHostName);
UpgradeConfiguredObjectRecord configuredObject = createBindingConfiguredObjectRecord(exchangeName, queueName,
routingKey, arguments, virtualHostName);
storeConfiguredObjectEntry(configuredObjectsDatabase, bindingId, configuredObject, transaction);
@@ -489,7 +492,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
String exchangeType = exchangeRecord.getType().asString();
boolean autoDelete = exchangeRecord.isAutoDelete();
- UUID exchangeId = UUIDGenerator.generateUUID(exchangeName, virtualHostName);
+ UUID exchangeId = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostName);
UpgradeConfiguredObjectRecord configuredObject = createExchangeConfiguredObjectRecord(exchangeName,
exchangeType, autoDelete);
@@ -526,7 +529,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
boolean exclusive = queueRecord.isExclusive();
FieldTable arguments = queueRecord.getArguments();
- UUID queueId = UUIDGenerator.generateUUID(queueName, virtualHostName);
+ UUID queueId = UUIDGenerator.generateQueueUUID(queueName, virtualHostName);
UpgradeConfiguredObjectRecord configuredObject = createQueueConfiguredObjectRecord(queueName, owner, exclusive,
arguments);
storeConfiguredObjectEntry(configuredObjectsDatabase, queueId, configuredObject, transaction);
@@ -554,17 +557,49 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive,
FieldTable arguments)
{
+ Map<String, Object> attributesMap = buildQueueArgumentMap(queueName,
+ owner, exclusive, arguments);
+ String json = _serializer.serialize(attributesMap);
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json);
+ return configuredObject;
+ }
+
+ private Map<String, Object> buildQueueArgumentMap(String queueName,
+ String owner, boolean exclusive, FieldTable arguments)
+ {
+
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Queue.NAME, queueName);
- attributesMap.put(Queue.OWNER, owner);
attributesMap.put(Queue.EXCLUSIVE, exclusive);
+
+ FieldTable argumentsCopy = new FieldTable();
if (arguments != null)
{
- attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments));
+ argumentsCopy.addAll(arguments);
}
- String json = _serializer.serialize(attributesMap);
- UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json);
- return configuredObject;
+
+ if (moveNonExclusiveOwnerToDescription(owner, exclusive))
+ {
+ _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION);
+
+ attributesMap.put(Queue.OWNER, null);
+ argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner);
+ }
+ else
+ {
+ attributesMap.put(Queue.OWNER, owner);
+ }
+ if (!argumentsCopy.isEmpty())
+ {
+ attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy));
+ }
+ return attributesMap;
+ }
+
+ private boolean moveNonExclusiveOwnerToDescription(String owner,
+ boolean exclusive)
+ {
+ return exclusive == false && owner != null && _moveNonExclusiveQueueOwnerToDescription;
}
private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType,
@@ -585,8 +620,8 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Binding.NAME, routingKey);
- attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, virtualHostName));
- attributesMap.put(Binding.QUEUE, UUIDGenerator.generateUUID(queueName, virtualHostName));
+ attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostName));
+ attributesMap.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queueName, virtualHostName));
if (arguments != null)
{
attributesMap.put(Binding.ARGUMENTS, FieldTable.convertToMap(arguments));
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index e71e39cbb8..f1ab012efc 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -24,7 +24,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -63,7 +63,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -87,7 +87,7 @@ public class Upgrader
int getSourceVersion(Database versionDb)
{
- int version = BDBMessageStore.VERSION + 1;
+ int version = AbstractBDBMessageStore.VERSION + 1;
OperationStatus result;
do
@@ -106,7 +106,7 @@ public class Upgrader
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
throws AMQStoreException
{
- while(sourceVersion != BDBMessageStore.VERSION)
+ while(sourceVersion != AbstractBDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
new file mode 100644
index 0000000000..a04fb20680
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetAddress;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+
+public class BDBHAMessageStoreTest extends QpidTestCase
+{
+ private static final String TEST_LOG_FILE_MAX = "1000000";
+ private static final String TEST_ELECTION_RETRIES = "1000";
+ private static final String TEST_NUMBER_OF_THREADS = "10";
+ private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
+ private String _groupName;
+ private String _workDir;
+ private int _masterPort;
+ private String _host;
+ private XMLConfiguration _configXml;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _workDir = TMP_FOLDER + File.separator + getName();
+ _host = InetAddress.getByName("localhost").getHostAddress();
+ _groupName = "group" + getName();
+ _masterPort = -1;
+
+ FileUtils.delete(new File(_workDir), true);
+ _configXml = new XMLConfiguration();
+ }
+
+ public void tearDown() throws Exception
+ {
+ FileUtils.delete(new File(_workDir), true);
+ super.tearDown();
+ }
+
+ public void testSetSystemConfiguration() throws Exception
+ {
+ // create virtual host configuration, registry and host instance
+ addVirtualHostConfiguration();
+ TestApplicationRegistry registry = initialize();
+ try
+ {
+ VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort);
+ BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore();
+
+ // test whether JVM system settings were applied
+ Environment env = store.getEnvironment();
+ assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+ assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+ assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void addVirtualHostConfiguration() throws Exception
+ {
+ int port = findFreePort();
+ if (_masterPort == -1)
+ {
+ _masterPort = port;
+ }
+ String nodeName = getNodeNameForNodeAt(port);
+
+ String vhostName = "test" + port;
+ String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
+
+ _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
+ _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName());
+ _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator
+ + port);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort",
+ getNodeHostPortForNodeAt(port));
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort",
+ getHelperHostPort());
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS);
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT);
+ }
+
+ private String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + getName() + bdbPort;
+ }
+
+ private String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _host + ":" + bdbPort;
+ }
+
+ private String getHelperHostPort()
+ {
+ if (_masterPort == -1)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+ return _host + ":" + _masterPort;
+ }
+
+ private TestApplicationRegistry initialize() throws Exception
+ {
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ ServerConfiguration configuration = new ServerConfiguration(_configXml);
+ TestApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort);
+ return registry;
+ }
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
index 687c671566..5cc436a22a 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.store.DurableConfigurationStoreTest;
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
new file mode 100644
index 0000000000..fe48e29d0b
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
+
+public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
+{
+ private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class);
+
+ /*
+ * Notes on calculation of quota limits.
+ *
+ * 150 32kb messages is approximately 4.8MB which is greater than
+ * OVERFULL_SIZE.
+ *
+ * We deliberately use settings that force BDB to use multiple log files, so
+ * that when one or more of them are subsequently cleaned (following message
+ * consumption) the actual size on disk is reduced.
+ */
+
+ private static final String MAX_BDB_LOG_SIZE = "1000000"; // ~1MB
+
+ private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 150;
+
+ private static final int OVERFULL_SIZE = 4000000; // ~4MB
+ private static final int UNDERFULL_SIZE = 3500000; // ~3.5MB
+
+ @Override
+ protected int getNumberOfMessagesToFillStore()
+ {
+ return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
+ }
+
+ @Override
+ protected void applyStoreSpecificConfiguration(XMLConfiguration config)
+ {
+ _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+
+ config.addProperty("envConfig(-1).name", "je.log.fileMax");
+ config.addProperty("envConfig.value", MAX_BDB_LOG_SIZE);
+ config.addProperty("overfull-size", OVERFULL_SIZE);
+ config.addProperty("underfull-size", UNDERFULL_SIZE);
+ }
+
+ @Override
+ protected MessageStore createStore() throws Exception
+ {
+ MessageStore store = new BDBMessageStore();
+ return store;
+ }
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a318187f13..eef9f7eab4 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -55,7 +55,7 @@ import org.apache.qpid.transport.MessageTransfer;
/**
* Subclass of MessageStoreTest which runs the standard tests from the superclass against
- * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ * the BDB Store as well as additional tests specific to the BDB store-implementation.
*/
public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
{
@@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
* Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
- private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
+ private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
{
messageStore.close();
- BDBMessageStore newStore = new BDBMessageStore();
+ AbstractBDBMessageStore newStore = new BDBMessageStore();
newStore.configure("", _config.subset("store"));
newStore.startWithNoRecover();
@@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testGetContentWithOffset() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testMessageCreationAndRemoval() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private BDBMessageStore assertBDBStore(MessageStore store)
+ private AbstractBDBMessageStore assertBDBStore(MessageStore store)
{
assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
- return (BDBMessageStore) store;
+ return (AbstractBDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
@@ -405,9 +405,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
- final UUID mockQueueId = UUIDGenerator.generateUUID();
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
@@ -443,9 +443,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
- final UUID mockQueueId = UUIDGenerator.generateUUID();
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
@@ -484,9 +484,9 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
- final UUID mockQueueId = UUIDGenerator.generateUUID();
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
new file mode 100644
index 0000000000..c6a9ba8f8b
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+/**
+ * The HA black box tests test the BDB cluster as a opaque unit. Client connects to
+ * the cluster via a failover url
+ *
+ * @see HAClusterWhiteboxTest
+ */
+public class HAClusterBlackboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+ private static final int NUMBER_OF_NODES = 3;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ private FailoverAwaitingListener _failoverAwaitingListener;
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+
+ _clusterCreator.startCluster();
+ _failoverAwaitingListener = new FailoverAwaitingListener();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testLossOfMasterNodeCausesClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ _clusterCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Node is stopped");
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
+ {
+ LOGGER.info("Connecting to " + _brokerFailoverUrl);
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ LOGGER.info("Got connection to cluster");
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+
+ LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+
+ _clusterCreator.stopNode(inactiveBrokerPort);
+
+ _failoverAwaitingListener.assertFailoverDoesNotOccur(2000);
+
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private final class FailoverAwaitingListener implements ConnectionListener
+ {
+ private final CountDownLatch _failoverLatch = new CountDownLatch(1);
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public void assertFailoverOccurs(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
+ }
+
+ public void assertFailoverDoesNotOccur(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount());
+ }
+
+
+ @Override
+ public void failoverComplete()
+ {
+ _failoverLatch.countDown();
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ }
+
+ @Override
+ public void bytesReceived(long count)
+ {
+ }
+ }
+
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
new file mode 100644
index 0000000000..408643b98a
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The HA white box tests test the BDB cluster where the test retains the knowledge of the
+ * individual test nodes. It uses this knowledge to examine the nodes to ensure that they
+ * remain in the correct state throughout the test.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterWhiteboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private final int NUMBER_OF_NODES = 3;
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testClusterPermitsConnectionToOnlyOneNode() throws Exception
+ {
+ int connectionSuccesses = 0;
+ int connectionFails = 0;
+
+ for (int brokerPortNumber : getBrokerPortNumbers())
+ {
+ try
+ {
+ getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber));
+ connectionSuccesses++;
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ connectionFails++;
+ }
+ }
+
+ assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails);
+ assertEquals("Unexpected number of successful connections", 1, connectionSuccesses);
+ }
+
+ public void testClusterThatLosesNodeStillAllowsConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+
+ // verify that JMS persistence operations are working
+ assertProducingConsuming(subsequentConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection);
+
+ killBroker(subsequentPortNumber);
+
+ final Connection finalConnection = getConnectionToNodeInCluster();
+ assertNull(finalConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception
+ {
+ final Connection connection = getConnectionToNodeInCluster();
+ assertNotNull(connection);
+
+ final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ connection.close();
+
+ _clusterCreator.stopNode(brokerPortNumber);
+ _clusterCreator.startNode(brokerPortNumber);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ }
+
+ public void testClusterLosingNodeRetainsData() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+
+ final String queueNamePrefix = getTestQueueName();
+ final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'";
+ final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'";
+
+ populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+
+ assertNotNull("no valid connection obtained", subsequentConnection);
+
+ checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+ }
+
+ public void xtestRecoveryOfOutOfDateNode() throws Exception
+ {
+ /*
+ * TODO: Implement
+ *
+ * Cant yet find a way to control cleaning in a deterministic way to allow provoking
+ * a node to become out of date. We do now know that even a new joiner to the group
+ * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has
+ * done *any* cleaning and then adding a new node should be sufficient to cause this.
+ */
+ }
+
+ private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception
+ {
+ populateBrokerWithData(connection, 1, queueUrls);
+ }
+
+ private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception
+ {
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ session.createConsumer(queue).close();
+ sendMessage(session, queue, noOfMessages);
+ }
+ }
+
+ private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException
+ {
+ connection.start();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ final Message message = consumer.receive(1000);
+ session.commit();
+ assertNotNull("Queue " + queue + " should have message", message);
+ assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX));
+ }
+ }
+
+ private Connection getConnectionToNodeInCluster() throws URLSyntaxException
+ {
+ Connection connection = null;
+ Set<Integer> runningBrokerPorts = getBrokerPortNumbers();
+
+ for (int brokerPortNumber : runningBrokerPorts)
+ {
+ try
+ {
+ connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber));
+ break;
+ }
+ catch(JMSException je)
+ {
+ assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ }
+ }
+ return connection;
+ }
+
+ private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception
+ {
+ final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ initialConnection.close();
+
+ killBroker(initialPortNumber); // kill awaits the death of the child
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ Message m2 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 2 is not received", m2);
+ assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX));
+ session.commit();
+ }
+
+ private void closeConnection(final Connection initialConnection)
+ {
+ try
+ {
+ initialConnection.close();
+ }
+ catch(Exception e)
+ {
+ // ignore.
+ // java.net.SocketException is seen sometimes on active connection
+ }
+ }
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
index 7e5ef3f94c..eaa3c3eba4 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -19,22 +19,25 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class BDBMessageStoreFactory implements MessageStoreFactory
+public class HAMessageStoreSmokeTest extends QpidTestCase
{
+ private final BDBHAMessageStore _store = new BDBHAMessageStore();
+ private final XMLConfiguration _config = new XMLConfiguration();
- @Override
- public MessageStore createMessageStore()
+ public void testMissingHAConfigThrowsException() throws Exception
{
- return new BDBMessageStore();
+ try
+ {
+ _store.configure("test", _config);
+ fail("Expected an exception to be thrown");
+ }
+ catch (ConfigurationException ce)
+ {
+ assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ }
}
-
- @Override
- public String getStoreClassName()
- {
- return BDBMessageStore.class.getSimpleName();
- }
-
-}
+} \ No newline at end of file
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
new file mode 100644
index 0000000000..abe13edc32
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class HATestClusterCreator
+{
+ protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
+
+ private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
+ private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+
+ private static final int FAILOVER_CYCLECOUNT = 10;
+ private static final int FAILOVER_RETRIES = 1;
+ private static final int FAILOVER_CONNECTDELAY = 1000;
+
+ private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+ private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
+
+ private static final int RETRIES = 60;
+ private static final int CONNECTDELAY = 75;
+
+ private final QpidBrokerTestCase _testcase;
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
+ private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
+ private final String _virtualHostName;
+ private final String _storeConfigKeyPrefix;
+
+ private final String _ipAddressOfBroker;
+ private final String _groupName ;
+ private final int _numberOfNodes;
+ private int _bdbHelperPort;
+ private int _primaryBrokerPort;
+
+ public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
+ {
+ _testcase = testcase;
+ _virtualHostName = virtualHostName;
+ _groupName = "group" + _testcase.getName();
+ _ipAddressOfBroker = getIpAddressOfBrokerHost();
+ _numberOfNodes = numberOfNodes;
+ _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _bdbHelperPort = 0;
+ }
+
+ public void configureClusterNodes() throws Exception
+ {
+ int brokerPort = _testcase.findFreePort();
+
+ for (int i = 0; i < _numberOfNodes; i++)
+ {
+ int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
+ _brokerPortToBdbPortMap.put(brokerPort, bdbPort);
+
+ LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
+ if (_bdbHelperPort == 0)
+ {
+ _bdbHelperPort = bdbPort;
+ }
+
+ configureClusterNode(brokerPort, bdbPort);
+ collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts());
+
+ brokerPort = _testcase.getNextAvailable(bdbPort + 1);
+ }
+ }
+
+ public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
+ final String configKey = getConfigKey("highAvailability.designatedPrimary");
+ brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary));
+ _primaryBrokerPort = brokerConfigEntry.getKey();
+ }
+
+ /**
+ * @param configKeySuffix "highAvailability.designatedPrimary", for example
+ * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
+ */
+ private String getConfigKey(String configKeySuffix)
+ {
+ final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts.");
+ return configKey;
+ }
+
+ public void startNode(final int brokerPortNumber) throws Exception
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+
+ _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration());
+ _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
+
+ _testcase.startBroker(brokerPortNumber);
+ }
+
+ public void startCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ startNode(brokerPortNumber);
+ }
+ }
+
+ public void startClusterParallel() throws Exception
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
+ try
+ {
+ List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+ Future<Object> future = executor.submit(new Callable<Object>()
+ {
+ public Object call()
+ {
+ try
+ {
+ _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
+ brokerConfigHolder.getTestVirtualhosts());
+ return "OK";
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+ });
+ brokers.add(future);
+ }
+ for (Future<Object> future : brokers)
+ {
+ Object result = future.get(30, TimeUnit.SECONDS);
+ LOGGER.debug("Node startup result:" + result);
+ if (result instanceof Exception)
+ {
+ throw (Exception) result;
+ }
+ else if (!"OK".equals(result))
+ {
+ throw new Exception("One of the cluster nodes is not started");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ stopCluster();
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+
+ }
+
+ public void stopNode(final int brokerPortNumber)
+ {
+ _testcase.stopBroker(brokerPortNumber);
+ }
+
+ public void stopCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ try
+ {
+ stopNode(brokerPortNumber);
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
+ }
+ }
+ }
+
+ public int getBrokerPortNumberFromConnection(Connection connection)
+ {
+ final AMQConnection amqConnection = (AMQConnection)connection;
+ return amqConnection.getActiveBrokerDetails().getPort();
+ }
+
+ public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
+ {
+ final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
+ allBrokerPorts.remove(activeBrokerPort);
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int inactiveBrokerPort = allBrokerPorts.iterator().next();
+ return inactiveBrokerPort;
+ }
+
+ public int getBdbPortForBrokerPort(final int brokerPortNumber)
+ {
+ return _brokerPortToBdbPortMap.get(brokerPortNumber);
+ }
+
+ public Set<Integer> getBdbPortNumbers()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
+ }
+
+ public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
+ {
+ final StringBuilder brokerList = new StringBuilder();
+
+ for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
+ {
+ int brokerPortNumber = itr.next();
+
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES));
+ if (itr.hasNext())
+ {
+ brokerList.append(";");
+ }
+ }
+
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, false);
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, true);
+ }
+
+ private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
+ {
+ final String url;
+ if (retryAllowed)
+ {
+ url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ }
+ else
+ {
+ url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber);
+ }
+
+ return new AMQConnectionURL(url);
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + _testcase.getName() + bdbPort;
+ }
+
+ public String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _ipAddressOfBroker + ":" + bdbPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ if (_bdbHelperPort == 0)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+
+ return _ipAddressOfBroker + ":" + _bdbHelperPort;
+ }
+
+ public void setHelperHostPort(int bdbHelperPort)
+ {
+ _bdbHelperPort = bdbHelperPort;
+ }
+
+ public int getBrokerPortNumberOfPrimary()
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ return _primaryBrokerPort;
+ }
+
+ public int getBrokerPortNumberOfSecondaryNode()
+ {
+ final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
+ portNumbers.remove(getBrokerPortNumberOfPrimary());
+ return portNumbers.iterator().next();
+ }
+
+ public Set<Integer> getBrokerPortNumbersForNodes()
+ {
+ return new HashSet<Integer>(_brokerConfigurations.keySet());
+ }
+
+ private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
+ {
+ final String nodeName = getNodeNameForNodeAt(bdbPort);
+
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ }
+
+ public String getIpAddressOfBrokerHost()
+ {
+ String brokerHost = _testcase.getBroker().getHost();
+ try
+ {
+ return InetAddress.getByName(brokerHost).getHostAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
+ }
+ }
+
+ private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(),
+ (XMLConfiguration) testVirtualhosts.clone()));
+ }
+
+ public class BrokerConfigHolder
+ {
+ private final XMLConfiguration _testConfiguration;
+ private final XMLConfiguration _testVirtualhosts;
+
+ public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _testConfiguration = testConfiguration;
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public XMLConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+ }
+
+ public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
+ final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
+
+ final String configKey = getConfigKey("highAvailability.nodeHostPort");
+ final String oldBdbHostPort = virtualHostConfig.getString(configKey);
+
+ final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ final String oldHost = oldHostAndPort[0];
+
+ final String newBdbHostPort = oldHost + ":" + newBdbPort;
+
+ virtualHostConfig.setProperty(configKey, newBdbHostPort);
+ collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
+ }
+
+ public String getStoreConfigKeyPrefix()
+ {
+ return _storeConfigKeyPrefix;
+ }
+
+
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
index f8aeb7f7b0..7f93f5691e 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
@@ -42,7 +42,7 @@ public class ConfiguredObjectBindingTest extends TestCase
{
super.setUp();
_configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- _object = new ConfiguredObjectRecord(UUIDGenerator.generateUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
+ _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
}
public void testObjectToEntryAndEntryToObject()
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
index 36991b90d0..cd2654f79f 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
@@ -52,8 +52,8 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase
}
public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue",
- "queue-non-durable" };
- public static int[] QUEUE_SIZES = { 1, 1, 10, 3 };
+ "queue-non-durable", "nonexclusive-with-erroneous-owner" };
+ public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0};
public static int TOTAL_MESSAGE_NUMBER = 15;
protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
index 3f9e4e4aa1..65a8bb03fb 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
@@ -23,10 +23,13 @@ package org.apache.qpid.server.store.berkeleydb.upgrade;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -49,6 +52,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
{
private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+ private static final String NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
@@ -87,6 +91,10 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null);
+ assertBindingRecord(queueBindings, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, null);
+
+ assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
+
assertContent();
}
@@ -94,7 +102,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName());
- assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE)));
+ assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER)));
assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12);
@@ -112,6 +120,9 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+
+ assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
+
assertContent();
}
@@ -257,7 +268,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
private void assertQueues(Set<String> expectedQueueNames)
{
- List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>();
+ List<AMQShortString> durableSubNames = Collections.emptyList();
final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
final Set<String> actualQueueNames = new HashSet<String>();
@@ -278,6 +289,35 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames);
}
+ private void assertQueueHasOwner(String queueName, final String expectedOwner)
+ {
+ List<AMQShortString> durableSubNames = Collections.emptyList();
+ final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
+ final AtomicReference<String> actualOwner = new AtomicReference<String>();
+ final AtomicBoolean foundQueue = new AtomicBoolean(false);
+
+ CursorOperation queueNameCollector = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueRecord record = binding.entryToObject(value);
+ String queueName = record.getNameShortString().asString();
+ if (queueName.equals(queueName))
+ {
+ foundQueue.set(true);
+ actualOwner.set(AMQShortString.toString(record.getOwner()));
+ }
+ }
+ };
+ new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
+
+ assertTrue("Could not find queue in database", foundQueue.get());
+ assertEquals("Queue has unexpected owner", expectedOwner, actualOwner.get());
+ }
+
private void assertContent()
{
final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding();
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 5297692820..2d2a6b20a2 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -29,8 +29,11 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OL
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -41,6 +44,7 @@ import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
@@ -163,11 +167,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
NewRecordImpl[] newDequeues = newTransaction.getDequeues();
assertEquals("Unxpected new enqueus number", 1, newEnqueues.length);
NewRecordImpl enqueue = newEnqueues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST1", getVirtualHostName()), enqueue.getId());
+ assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHostName()), enqueue.getId());
assertEquals("Unxpected message id", 1, enqueue.getMessageNumber());
assertEquals("Unxpected new dequeues number", 1, newDequeues.length);
NewRecordImpl dequeue = newDequeues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST2", getVirtualHostName()), dequeue.getId());
+ assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHostName()), dequeue.getId());
assertEquals("Unxpected message id", 2, dequeue.getMessageNumber());
}
@@ -260,7 +264,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
private void assertDatabaseRecordCounts()
{
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9);
+ assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 12);
assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12);
assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
@@ -270,64 +274,27 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
private void assertConfiguredObjects()
{
Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 9, configuredObjects.size());
-
- Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(9);
- Map<String, Object> queue1 = new HashMap<String, Object>();
- queue1.put("exclusive", Boolean.FALSE);
- queue1.put("name", "myUpgradeQueue");
- queue1.put("owner", null);
- expected.add(queue1);
- Map<String, Object> queue2 = new HashMap<String, Object>();
- queue2.put("exclusive", Boolean.TRUE);
- queue2.put("name", "clientid:mySelectorDurSubName");
- queue2.put("owner", "clientid");
- expected.add(queue2);
- Map<String, Object> queue3 = new HashMap<String, Object>();
- queue3.put("exclusive", Boolean.TRUE);
- queue3.put("name", "clientid:myDurSubName");
- queue3.put("owner", "clientid");
- expected.add(queue3);
-
- Map<String, Object> queueBinding1 = new HashMap<String, Object>();
- queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString());
- queueBinding1.put("name", "myUpgradeQueue");
- queueBinding1.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
- expected.add(queueBinding1);
- Map<String, Object> queueBinding2 = new HashMap<String, Object>();
- queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString());
- queueBinding2.put("name", "myUpgradeQueue");
- queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString());
- Map<String, Object> arguments2 = new HashMap<String, Object>();
- arguments2.put("x-filter-jms-selector", "");
- queueBinding2.put("arguments", arguments2);
- expected.add(queueBinding2);
- Map<String, Object> queueBinding3 = new HashMap<String, Object>();
- queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString());
- queueBinding3.put("name", "myUpgradeTopic");
- queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString());
- Map<String, Object> arguments3 = new HashMap<String, Object>();
- arguments3.put("x-filter-jms-selector", "");
- queueBinding3.put("arguments", arguments3);
- expected.add(queueBinding3);
- Map<String, Object> queueBinding4 = new HashMap<String, Object>();
- queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString());
- queueBinding4.put("name", "mySelectorUpgradeTopic");
- queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString());
- Map<String, Object> arguments4 = new HashMap<String, Object>();
- arguments4.put("x-filter-jms-selector", "testprop='true'");
- queueBinding4.put("arguments", arguments4);
- expected.add(queueBinding4);
- Map<String, Object> queueBinding5 = new HashMap<String, Object>();
- queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString());
- queueBinding5.put("name", "clientid:myDurSubName");
- queueBinding5.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
- expected.add(queueBinding5);
- Map<String, Object> queueBinding6 = new HashMap<String, Object>();
- queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString());
- queueBinding6.put("name", "clientid:mySelectorDurSubName");
- queueBinding6.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
- expected.add(queueBinding6);
+ assertEquals("Unexpected number of configured objects", 12, configuredObjects.size());
+
+ Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12);
+ List<UUID> expectedBindingIDs = new ArrayList<UUID>();
+
+ expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null));
+ expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null));
+ expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null));
+ expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,
+ Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description")));
+
+ expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "myUpgradeTopic", "amq.topic",
+ Collections.singletonMap("x-filter-jms-selector", ""), expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "mySelectorUpgradeTopic", "amq.topic",
+ Collections.singletonMap("x-filter-jms-selector", "testprop='true'"), expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("clientid:myDurSubName", "clientid:myDurSubName", "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("clientid:mySelectorDurSubName", "clientid:mySelectorDurSubName", "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs));
Set<String> expectedTypes = new HashSet<String>();
expectedTypes.add(Queue.class.getName());
@@ -337,21 +304,63 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
{
UpgradeConfiguredObjectRecord object = entry.getValue();
- UUID key = entry.getKey();
Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes());
assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized));
String type = object.getType();
assertTrue("Unexpected type:" + type, expectedTypes.contains(type));
- if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName()))
+ UUID key = entry.getKey();
+
+ assertNotNull("Key cannot be null", key);
+
+ if (type.equals(Exchange.class.getName()))
+ {
+ String exchangeName = (String) deserialized.get(Exchange.NAME);
+ assertNotNull(exchangeName);
+ assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()));
+ }
+ else if (type.equals(Queue.class.getName()))
{
- assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName()));
+ String queueName = (String) deserialized.get(Queue.NAME);
+ assertNotNull(queueName);
+ assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHostName()));
}
- else
+ else if (type.equals(Binding.class.getName()))
{
- assertNotNull("Key cannot be null", key);
+ assertTrue("unexpected binding id", expectedBindingIDs.remove(key));
}
}
+
assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty());
+ assertTrue("Not all expected bindings found:" + expectedBindingIDs, expectedBindingIDs.isEmpty());
+ }
+
+ private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs)
+ {
+ Map<String, Object> expectedQueueBinding = new HashMap<String, Object>();
+ expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHostName()).toString());
+ expectedQueueBinding.put(Binding.NAME, bindingName);
+ expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()).toString());
+ if (argumentMap != null)
+ {
+ expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap);
+ }
+
+ expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHostName()));
+
+ return expectedQueueBinding;
+ }
+
+ private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, String> argumentMap)
+ {
+ Map<String, Object> expectedQueueEntry = new HashMap<String, Object>();
+ expectedQueueEntry.put(Queue.NAME, name);
+ expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag);
+ expectedQueueEntry.put(Queue.OWNER, owner);
+ if (argumentMap != null)
+ {
+ expectedQueueEntry.put(Queue.ARGUMENTS, argumentMap);
+ }
+ return expectedQueueEntry;
}
private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects()
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index ba5ca842bf..23fd9bc24f 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -24,7 +24,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
@@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
{
assertEquals("Unexpected store version", -1, getStoreVersion());
_upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
assertContent();
}
@@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
List<String> expectedDatabases = new ArrayList<String>();
expectedDatabases.add(Upgrader.VERSION_DB_NAME);
assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
nonExistentStoreLocation.delete();
}
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
index 167ab7f0ca..f5ed9aa5a2 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
Binary files differ
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
index d44b21a83e..f5ed9aa5a2 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
Binary files differ
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
index 9b85860c19..d5ae8c1096 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
Binary files differ