diff options
author | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:49 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:49 +0000 |
commit | 2df814dae9f4b112a7d36ee2f94f1a73a711ac15 (patch) | |
tree | 1956200b2d7fd9e1c90d801cdfa312b41b9fc25d /java | |
parent | 9e35c097bcfd92ce7d75d1891420705cd232883f (diff) | |
download | qpid-python-2df814dae9f4b112a7d36ee2f94f1a73a711ac15.tar.gz |
QPID-5801: [Java Broker] Elimination of the BDB facade's initialisation tasks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1600934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
28 files changed, 374 insertions, 505 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index 2766c2ac98..ca32ffaa73 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,7 +33,6 @@ import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import com.sleepycat.bind.tuple.ByteBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -46,12 +46,13 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; @@ -66,7 +67,6 @@ import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; -import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; @@ -83,7 +83,6 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; /** @@ -96,6 +95,8 @@ import org.apache.qpid.util.FileUtils; */ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore { + public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true); + private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class); public static final int VERSION = 8; @@ -103,6 +104,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; + private static String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ"; private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; @@ -110,16 +112,20 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private static String BRIDGEDB_NAME = "BRIDGES"; private static String LINKDB_NAME = "LINKS"; private static String XID_DB_NAME = "XIDS"; - private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; - private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; private EnvironmentFacade _environmentFacade; - private final AtomicLong _messageId = new AtomicLong(0); + + private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes( + Charset.forName("UTF-8"))); + + private static final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. + setAllowCreate(true). + setInitialValue(1). + setWrap(true). + setCacheSize(100000); private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); - private long _totalStoreSize; - private final EnvironmentFacadeFactory _environmentFacadeFactory; private volatile Committer _committer; @@ -128,6 +134,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private String _storeLocation; private final BDBMessageStore _messageStoreFacade = new BDBMessageStore(); + private ConfiguredObject<?> _parent; public BDBConfigurationStore() { @@ -144,22 +151,11 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { if (_configurationStoreOpen.compareAndSet(false, true)) { + _parent = parent; + if (_environmentFacade == null) { - EnvironmentFacadeTask[] initialisationTasks = null; - _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false); - if (_isMessageStoreProvider) - { - String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; - System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); - System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); - initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask()}; - } - else - { - initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; - } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings); _storeLocation = _environmentFacade.getStoreLocation(); } else @@ -170,6 +166,18 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } @Override + public void upgradeStoreStructure() throws StoreException + { + try + { + new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e); + } + } + @Override public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); @@ -564,123 +572,37 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private Database getConfiguredObjectsDb() { - return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); + return _environmentFacade.openDatabase(CONFIGURED_OBJECTS_DB_NAME, DEFAULT_DATABASE_CONFIG); } private Database getConfiguredObjectHierarchyDb() { - return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME); + return _environmentFacade.openDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, DEFAULT_DATABASE_CONFIG); } private Database getMessageContentDb() { - return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); + return _environmentFacade.openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG); } private Database getMessageMetaDataDb() { - return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME); - } - - private Database getDeliveryDb() - { - return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME); + return _environmentFacade.openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG); } - private Database getXidDb() + private Database getMessageMetaDataSeqDb() { - return _environmentFacade.getOpenDatabase(XID_DB_NAME); + return _environmentFacade.openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG); } - class UpgradeTask implements EnvironmentFacadeTask - { - private final ConfiguredObject<?> _parent; - - public UpgradeTask(ConfiguredObject<?> parent) - { - _parent = parent; - } - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary(); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot upgrade store", e); - } - } - } - - class OpenDatabasesTask implements EnvironmentFacadeTask + private Database getDeliveryDb() { - private String[] _names; - - public OpenDatabasesTask(String[] names) - { - _names = names; - } - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - facade.openDatabases(dbConfig, _names); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot open databases", e); - } - } - + return _environmentFacade.openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG); } - class DiskSpaceTask implements EnvironmentFacadeTask - { - - @Override - public void execute(EnvironmentFacade facade) - { - try - { - _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize(); - } - catch(DatabaseException e) - { - throw facade.handleDatabaseException("Cannot evaluate disk store size", e); - } - } - - } - - public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler + private Database getXidDb() { - private long _maxId; - - @Override - public void execute(EnvironmentFacade facade) - { - ((BDBMessageStore)getMessageStore()).visitMessagesInternal(this, facade); - _messageId.set(_maxId); - } - - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - long id = storedMessage.getMessageNumber(); - if (_maxId<id) - { - _maxId = id; - } - return true; - } + return _environmentFacade.openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG); } class BDBMessageStore implements MessageStore @@ -692,12 +614,14 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private boolean _limitBusted; private long _persistentSizeLowThreshold; private long _persistentSizeHighThreshold; + private ConfiguredObject<?> _parent; @Override public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) { if (_messageStoreOpen.compareAndSet(false, true)) { + _parent = parent; Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE); Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE); @@ -714,8 +638,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, - new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings + ); _storeLocation = _environmentFacade.getStoreLocation(); } @@ -725,26 +649,44 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + public void upgradeStoreStructure() throws StoreException { - if (metaData.isPersistent()) + try { - return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); + new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary(); } - else + catch(DatabaseException e) { - return new StoredMemoryMessage<T>(getNewMessageId(), metaData); + throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e); } } - /** - * Return a valid, currently unused message id. - * - * @return A fresh message id. - */ - private long getNewMessageId() + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { - return _messageId.incrementAndGet(); + + Sequence mmdSeq = null; + try + { + mmdSeq = getMessageMetaDataSeqDb().openSequence(null, MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG); + long newMessageId = mmdSeq.get(null, 1); + + if (metaData.isPersistent()) + { + return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); + } + else + { + return new StoredMemoryMessage<T>(newMessageId, metaData); + } + } + finally + { + if (mmdSeq != null) + { + mmdSeq.close(); + } + } } @Override @@ -1116,7 +1058,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi Cursor cursor = null; try { - cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null); + cursor = getMessageMetaDataDb().openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); @@ -1461,42 +1403,23 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi { synchronized (this) { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 2*delta; + // TODO KW I think we should simplify matters here. The MessageStore should + // expose merely method #getStoreSizeEstimate(). The virtualhost (housekeeper) + // should periodically call this method and it be the one responsible for + // the generation of alerts. - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - _totalStoreSize = getSizeOnDisk(); + long newSize = getSizeOnDisk(); + reduceSizeOnDisk(); - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); } else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) { - long oldSize = _totalStoreSize; - _totalStoreSize = getSizeOnDisk(); - - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(); - - _totalStoreSize = getSizeOnDisk(); - - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } } } @@ -1512,6 +1435,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi return _persistentSizeHighThreshold; } + // TODO remove altogether or perhaps expose as public method: requestStoreCleanup private void reduceSizeOnDisk() { _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index e51a610a26..c614ba1367 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -43,11 +43,9 @@ public interface EnvironmentFacade Environment getEnvironment(); - Committer createCommitter(String name); - - void openDatabases(DatabaseConfig dbConfig, String... databaseNames); + Database openDatabase(String name, DatabaseConfig databaseConfig); - Database getOpenDatabase(String name); + Committer createCommitter(String name); Transaction beginTransaction(); @@ -57,5 +55,6 @@ public interface EnvironmentFacade String getStoreLocation(); + void closeDatabase(String name); void close(); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index 83584e306f..e13ad0e452 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -26,12 +26,7 @@ public interface EnvironmentFacadeFactory { public static final String ENVIRONMENT_CONFIGURATION = "environmentConfiguration"; - EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks); + EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings); String getType(); - - public static interface EnvironmentFacadeTask - { - void execute(EnvironmentFacade facade); - } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 643cd9ae70..f41eca602b 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -21,11 +21,10 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -40,11 +39,12 @@ public class StandardEnvironmentFacade implements EnvironmentFacade public static final String TYPE = "BDB"; private final String _storePath; - private final Map<String, Database> _databases = new HashMap<String, Database>(); + private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); private Environment _environment; - public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks) + public StandardEnvironmentFacade(String storePath, + Map<String, String> attributes) { _storePath = storePath; @@ -76,13 +76,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); _environment = new Environment(environmentPath, envConfig); - if (initialisationTasks != null) - { - for (EnvironmentFacadeTask task : initialisationTasks) - { - task.execute(this); - } - } } @@ -119,7 +112,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade private void closeDatabases() { RuntimeException firstThrownException = null; - for (Database database : _databases.values()) + for (Database database : _cachedDatabases.values()) { try { @@ -209,24 +202,34 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) + public Database openDatabase(String name, DatabaseConfig databaseConfig) { - for (String databaseName : databaseNames) + Database cachedHandle = _cachedDatabases.get(name); + if (cachedHandle == null) { - Database database = _environment.openDatabase(null, databaseName, dbConfig); - _databases .put(databaseName, database); + Database handle = _environment.openDatabase(null, name, databaseConfig); + Database existingHandle = _cachedDatabases.putIfAbsent(name, handle); + if (existingHandle == null) + { + cachedHandle = handle; + } + else + { + cachedHandle = existingHandle; + handle.close(); + } } + return cachedHandle; } @Override - public Database getOpenDatabase(String name) + public void closeDatabase(final String name) { - Database database = _databases.get(name); - if (database == null) + Database cachedHandle = _cachedDatabases.remove(name); + if (cachedHandle != null) { - throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); + cachedHandle.close(); } - return database; } @Override diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 9506b1c20a..49db913d6c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -30,7 +30,7 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) + public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings) { Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); @@ -41,7 +41,7 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); - return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks); + return new StandardEnvironmentFacade(storeLocation, envConfigMap); } @Override diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index 6ec5067d01..be4b8199c3 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; @@ -32,13 +33,16 @@ import com.sleepycat.je.Transaction; public class DatabasePinger { public static final String PING_DATABASE_NAME = "PINGDB"; + private static final DatabaseConfig DATABASE_CONFIG = + DatabaseConfig.DEFAULT.setAllowCreate(true).setTransactional(true); private static final int ID = 0; public void pingDb(EnvironmentFacade facade) { try { - final Database db = facade.getOpenDatabase(PING_DATABASE_NAME); + final Database db = facade.openDatabase(PING_DATABASE_NAME, + DATABASE_CONFIG); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(ID, key); @@ -55,16 +59,9 @@ public class DatabasePinger } finally { - try + if (txn != null) { - if (txn != null) - { - txn.abort(); - } - } - finally - { - db.close(); + txn.abort(); } } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index b57d5bae21..748f10a01a 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -39,14 +40,12 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; -import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; @@ -148,12 +147,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final ExecutorService _environmentJobExecutor; private final ScheduledExecutorService _groupChangeExecutor; private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); - private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>(); private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); - private final AtomicBoolean _initialised; - private final EnvironmentFacadeTask[] _initialisationTasks; private final Durability _defaultDurability; private final CoalescingCommiter _coalescingCommiter; @@ -163,7 +159,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; - public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) + private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); + + public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { _environmentDirectory = new File(configuration.getStorePath()); if (!_environmentDirectory.exists()) @@ -174,9 +172,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan + "Ensure the path is correct and that the permissions are correct."); } } + else + { + LOGGER.debug("Environment at path " + _environmentDirectory + " already exists."); + } - _initialised = new AtomicBoolean(); - _initialisationTasks = initialisationTasks; _configuration = configuration; _defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); @@ -327,8 +327,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) + public Database openDatabase(String name, DatabaseConfig databaseConfig) { + LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName); if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not in opened state"); @@ -339,54 +340,42 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan throw new IllegalStateException("Environment is not valid"); } - if (_environment.getState() != ReplicatedEnvironment.State.MASTER) - { - throw new IllegalStateException("Databases can only be opened on Master node"); - } - - for (String databaseName : databaseNames) + Database cachedHandle = _cachedDatabases.get(name); + if (cachedHandle == null) { - _databases.put(databaseName, new DatabaseHolder(dbConfig)); - } - for (String databaseName : databaseNames) - { - DatabaseHolder holder = _databases.get(databaseName); - openDatabaseInternally(databaseName, holder); - } - } + Database handle = _environment.openDatabase(null, name, databaseConfig); + Database existingHandle = _cachedDatabases.putIfAbsent(name, handle); + if (existingHandle == null) + { + LOGGER.debug("openDatabase " + name + " new handle"); - private void openDatabaseInternally(String databaseName, DatabaseHolder holder) - { - if (_state.get() == State.OPEN) - { - Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); - holder.setDatabase(database); + cachedHandle = handle; + } + else + { + LOGGER.debug("openDatabase " + name + " existing handle"); + cachedHandle = existingHandle; + handle.close(); + } } + return cachedHandle; } @Override - public Database getOpenDatabase(String name) + public void closeDatabase(final String databaseName) { - if (_state.get() != State.OPEN) + Database cachedHandle = _cachedDatabases.remove(databaseName); + if (cachedHandle != null) { - throw new IllegalStateException("Environment facade is not in opened state"); - } - - if (!_environment.isValid()) - { - throw new IllegalStateException("Environment is not valid"); - } - DatabaseHolder databaseHolder = _databases.get(name); - if (databaseHolder == null) - { - throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); - } - Database database = databaseHolder.getDatabase(); - if (database == null) - { - throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing " + databaseName + " on " + _prettyGroupNodeName); + } + if (cachedHandle.getEnvironment().isValid()) + { + cachedHandle.close(); + } } - return database; } @Override @@ -441,11 +430,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); _joinTime = System.currentTimeMillis(); } - - if (state == ReplicatedEnvironment.State.MASTER) - { - onMasterStateChange(); - } } StateChangeListener listener = _stateChangeListener.get(); @@ -462,39 +446,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _lastKnownEnvironmentState = state; } - private void onMasterStateChange() - { - reopenDatabases(); - - if (_initialised.compareAndSet(false, true)) - { - if (_initialisationTasks != null) - { - for (EnvironmentFacadeTask task : _initialisationTasks) - { - task.execute(ReplicatedEnvironmentFacade.this); - } - } - } - } - - private void reopenDatabases() - { - if (_state.get() == State.OPEN) - { - DatabaseConfig pingDbConfig = new DatabaseConfig(); - pingDbConfig.setTransactional(true); - pingDbConfig.setAllowCreate(true); - - _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); - - for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) - { - openDatabaseInternally(entry.getKey(), entry.getValue()); - } - } - } - public String getGroupName() { return (String)_configuration.getGroupName(); @@ -833,36 +784,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { if (LOGGER.isInfoEnabled()) { - LOGGER.debug("When restarting a state change event is recieved on NOOP listener for state:" + stateChangeEvent.getState()); + LOGGER.debug( + "When restarting a state change event is received on NOOP listener for state:" + + stateChangeEvent.getState()); } } }); - try - { - closeDatabases(); - } - catch(Exception e) - { - LOGGER.warn("Ignoring an exception whilst closing databases", e); - } } - else + + try { - // reset database holders for invalid environments - for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) - { - DatabaseHolder databaseHolder = entry.getValue(); - Database database = databaseHolder.getDatabase(); - if (database != null) - { - databaseHolder.setDatabase(null); - } - } + closeDatabases(); + } + catch(Exception e) + { + LOGGER.warn("Ignoring an exception whilst closing databases", e); } - LOGGER.debug("Closing environent"); environment.close(); - LOGGER.debug("Environent is closed"); } catch (EnvironmentFailureException efe) { @@ -874,35 +813,29 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void closeDatabases() { RuntimeException firstThrownException = null; - for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) + + Iterator<String> itr = _cachedDatabases.keySet().iterator(); + while (itr.hasNext()) { - DatabaseHolder databaseHolder = entry.getValue(); - Database database = databaseHolder.getDatabase(); - if (database != null) + String databaseName = itr.next(); + + if (databaseName != null) { try { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); - } - - database.close(); + closeDatabase(databaseName); } catch(RuntimeException e) { - LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); + LOGGER.error("Failed to close database " + databaseName + " on " + _prettyGroupNodeName, e); if (firstThrownException == null) { firstThrownException = e; } } - finally - { - databaseHolder.setDatabase(null); - } } } + if (firstThrownException != null) { throw firstThrownException; @@ -1340,37 +1273,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan CLOSING, CLOSED } - - private static class DatabaseHolder - { - private final DatabaseConfig _config; - private Database _database; - - public DatabaseHolder(DatabaseConfig config) - { - _config = config; - } - - public Database getDatabase() - { - return _database; - } - - public void setDatabase(Database database) - { - _database = database; - } - - public DatabaseConfig getConfig() - { - return _config; - } - - @Override - public String toString() - { - return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; - } - - } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index e154e8bf45..c08318a657 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @Override - public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) + public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings) { ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @@ -95,7 +95,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return (String)messageStoreSettings.get(BDBHAVirtualHostNode.GROUP_NAME); } }; - return new ReplicatedEnvironmentFacade(configuration, initialisationTasks); + return new ReplicatedEnvironmentFacade(configuration); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index 05841b86ae..4638194970 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.io.IOException; import java.io.StringWriter; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -32,7 +33,12 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; + +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -62,6 +68,12 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade put("amq.match", "headers"); }}; + private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(Charset.forName("UTF-8"))); + private static SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. + setAllowCreate(true). + setWrap(true). + setCacheSize(100000); + @Override public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { @@ -74,6 +86,11 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig); Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig); Database configVersionDb = environment.openDatabase(null, "CONFIG_VERSION", dbConfig); + Database messageMetadataDb = environment.openDatabase(null, "MESSAGE_METADATA", dbConfig); + Database messageMetadataSeqDb = environment.openDatabase(null, "MESSAGE_METADATA.SEQ", dbConfig); + + long maxMessageId = getMaximumMessageId(messageMetadataDb); + createMessageMetadataSequence(messageMetadataSeqDb, maxMessageId); Cursor objectsCursor = null; @@ -197,6 +214,8 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade hierarchyDb.close(); configuredObjectsDb.close(); + messageMetadataDb.close(); + messageMetadataSeqDb.close(); reportFinished(environment, 8); } @@ -214,6 +233,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade hierarchyDb.put(txn, key, value); } + private void createMessageMetadataSequence(final Database messageMetadataSeqDb, + final long maximumMessageId) + { + SequenceConfig sequenceConfig = MESSAGE_METADATA_SEQ_CONFIG.setInitialValue(maximumMessageId + 1); + + Sequence messageMetadataSeq = messageMetadataSeqDb.openSequence(null, MESSAGE_METADATA_SEQ_KEY, sequenceConfig); + messageMetadataSeq.close(); + } + private int getConfigVersion(Database configVersionDb) { Cursor cursor = null; @@ -251,4 +279,33 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade + status); } } + + private long getMaximumMessageId(Database messageMetaDataDb) + { + Cursor cursor = null; + long maximumMessageId = 0; // Our hand-rolled sequences value always began at zero + try + { + cursor = messageMetaDataDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = LongBinding.entryToLong(key); + maximumMessageId = Math.max(messageId, maximumMessageId); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + return maximumMessageId; + } + } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index e36def581b..d808cf9724 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -389,7 +389,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { closeVirtualHostIfExist(); - getConfigurationStore().getEnvironmentFacade().getEnvironment().flushLog(true); + + getConfigurationStore().upgradeStoreStructure(); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_START()); VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(this); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 0254cbc909..fd196a28da 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -193,7 +193,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node.delete(); assertEquals("Unexpected state returned after delete", State.DELETED, node.getState()); assertEquals("Unexpected state", State.DELETED, node.getState()); - assertFalse("Store still exists", _bdbStorePath.exists()); + assertFalse("Store still exists " + _bdbStorePath, _bdbStorePath.exists()); } public void testMutableAttributes() throws Exception diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java index a82bb066e2..5772498ebc 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -77,38 +77,21 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase assertNull("Environment should be null after facade close", e); } - public void testOpenDatabases() throws Exception + public void testOpenDatabaseReusesCachedHandle() throws Exception { - EnvironmentFacade ef = getEnvironmentFacade(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1", "test2"); - Database test1 = ef.getOpenDatabase("test1"); - Database test2 = ef.getOpenDatabase("test2"); - - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); - } + DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); - public void testGetOpenDatabaseForNonExistingDatabase() throws Exception - { EnvironmentFacade ef = getEnvironmentFacade(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1"); - Database test1 = ef.getOpenDatabase("test1"); - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - try - { - ef.getOpenDatabase("test2"); - fail("An exception should be thrown for the non existing database"); - } - catch(IllegalArgumentException e) - { - assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage()); - } + Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertNotNull(handle1); + + Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertSame("Database handle should be cached", handle1, handle2); + + ef.closeDatabase("myDatabase"); + + Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertNotSame("Expecting a new handle after database closure", handle1, handle3); } EnvironmentFacade getEnvironmentFacade() throws Exception @@ -122,7 +105,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase EnvironmentFacade createEnvironmentFacade() { - return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null); + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); } } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index e4f1b62954..b14332ecf6 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -45,7 +44,6 @@ import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; -import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; @@ -122,38 +120,21 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertNull("Environment should be null after facade close", e); } - public void testOpenDatabases() throws Exception + public void testOpenDatabaseReusesCachedHandle() throws Exception { + DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); + EnvironmentFacade ef = createMaster(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1", "test2"); - Database test1 = ef.getOpenDatabase("test1"); - Database test2 = ef.getOpenDatabase("test2"); + Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertNotNull(handle1); - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName()); - } + Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertSame("Database handle should be cached", handle1, handle2); - public void testGetOpenDatabaseForNonExistingDatabase() throws Exception - { - EnvironmentFacade ef = createMaster(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - ef.openDatabases(dbConfig, "test1"); - Database test1 = ef.getOpenDatabase("test1"); - assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName()); - try - { - ef.getOpenDatabase("test2"); - fail("An exception should be thrown for the non existing database"); - } - catch(IllegalArgumentException e) - { - assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage()); - } + ef.closeDatabase("myDatabase"); + + Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig); + assertNotSame("Expecting a new handle after database closure", handle1, handle3); } public void testGetGroupName() throws Exception @@ -490,59 +471,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); } - public void testEnvironmentRestartOnInsufficientReplicas() throws Exception - { - - ReplicatedEnvironmentFacade master = createMaster(); - - int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); - String replica1NodeName = TEST_NODE_NAME + "_1"; - String replica1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener()); - - int replica2Port = getNextAvailable(replica1Port + 1); - String replica2NodeName = TEST_NODE_NAME + "_2"; - String replica2NodeHostPort = "localhost:" + replica2Port; - ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener()); - - String databaseName = "test"; - - DatabaseConfig dbConfig = createDatabase(master, databaseName); - - // close replicas - replica1.close(); - replica2.close(); - - Environment e = master.getEnvironment(); - master.getOpenDatabase(databaseName); - try - { - master.openDatabases(dbConfig, "test2"); - fail("Opening of new database without quorum should fail"); - } - catch(InsufficientReplicasException ex) - { - master.handleDatabaseException(null, ex); - } - - EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA); - replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); - replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); - - // Need to poll to await the remote node updating itself - long timeout = System.currentTimeMillis() + 5000; - while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) - { - Thread.sleep(200); - } - - assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), - State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); - - Environment e2 = master.getEnvironment(); - assertNotSame("Environment has not been restarted", e2, e); - } - public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception { final CountDownLatch masterLatch = new CountDownLatch(1); @@ -779,7 +707,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); ref.setStateChangeListener(stateChangeListener); ref.setReplicationGroupListener(replicationGroupListener); _nodes.put(nodeName, ref); @@ -791,15 +719,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener); } - private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - environmentFacade.openDatabases(dbConfig, databaseName); - return dbConfig; - } - private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException { int counter = 0; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java index 3d32d483db..d5bbd51fe1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java @@ -180,6 +180,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore } @Override + public void upgradeStoreStructure() throws StoreException + { + _store.upgradeStoreStructure(); + } + + @Override public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index dab8e8e20f..d95d58a9cf 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -344,20 +344,25 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore } @Override - public void closeConfigurationStore() throws StoreException + public void openConfigurationStore(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) + throws StoreException { + _parent = parent; } @Override - public void onDelete() + public void upgradeStoreStructure() throws StoreException { } @Override - public void openConfigurationStore(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) - throws StoreException + public void closeConfigurationStore() throws StoreException + { + } + + @Override + public void onDelete() { - _parent = parent; } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 79c2c274ee..24563bae61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -690,6 +690,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, abstract protected String getSqlBigIntType(); + @Override + public void upgradeStoreStructure() throws StoreException + { + // TODO acquire connection to the database using the attribute of the parents, + // run the upgrader in a transaction, close the connection. + } + protected void createOrOpenMessageStoreDatabase() throws SQLException { Connection conn = newAutoCommitConnection(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java index 1c92e491c3..48608dde4f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java @@ -73,16 +73,23 @@ abstract class AbstractMemoryStore implements DurableConfigurationStore, Message } @Override - public void closeConfigurationStore() + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { - _configuredObjectRecords.clear(); } @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + public void upgradeStoreStructure() throws StoreException + { + + } + + @Override + public void closeConfigurationStore() { + _configuredObjectRecords.clear(); } + @Override public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 94de6c30c3..e353b55e68 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -32,8 +32,7 @@ public interface DurableConfigurationStore String STORE_PATH = "storePath"; /** - * Called after instantiation in order to configure the message store. A particular implementation can define - * whatever parameters it wants. + * Initializes and opens the configuration store. * * @param parent * @param storeSettings store settings @@ -41,6 +40,16 @@ public interface DurableConfigurationStore void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException; /** + * Requests that the store performs any upgrade work on the store's structure. If there is no + * upgrade work to be done, this method should return without doing anything. + * + * @throws StoreException signals that a problem was encountered trying to upgrade the store. + * Implementations, on encountering a problem, should endeavour to leave the store in its + * original state. + */ + void upgradeStoreStructure() throws StoreException; + + /** * Visit all configured object records with given handler. * * @param handler a handler to invoke on each configured object record diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index ba42c45207..afb2179b9b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -106,6 +106,12 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override + public void upgradeStoreStructure() throws StoreException + { + // No-op for Json + } + + @Override public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { _parent = parent; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 44d640ca86..f4ad308a2e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -184,6 +184,12 @@ public class MemoryMessageStore implements MessageStore } @Override + public void upgradeStoreStructure() throws StoreException + { + + } + + @Override public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) { long id = _messageId.getAndIncrement(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index b1ddaabda8..e82f9dbbb0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -43,13 +43,23 @@ public interface MessageStore void addEventListener(EventListener eventListener, Event... events); /** - * Called after instantiation in order to open and initialize the message store. A particular implementation can define - * whatever parameters it wants. - * @param parent virtual host name + * Initializes and opens the message store. + * + * @param parent parent object * @param messageStoreSettings store settings */ void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings); + /** + * Requests that the store performs any upgrade work on the store's structure. If there is no + * upgrade work to be done, this method should return without doing anything. + * + * @throws StoreException signals that a problem was encountered trying to upgrade the store. + * Implementations, on encountering a problem, should endeavour to leave the store in its + * original state. + */ + void upgradeStoreStructure() throws StoreException; + void visitMessages(MessageHandler handler) throws StoreException; void visitMessageInstances(MessageInstanceHandler handler) throws StoreException; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index a3ed4bea05..a929198af6 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -63,6 +63,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public void upgradeStoreStructure() throws StoreException + { + + } + + @Override public void closeMessageStore() { } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 9e10d5e424..5af88f9b94 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -1368,6 +1368,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation())); } + messageStore.upgradeStoreStructure(); + if (isStoreEmpty()) { createDefaultExchanges(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 62c05991c5..f91ab44792 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -99,6 +99,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard Map<String, Object> attributes = buildAttributesForStore(); getConfigurationStore().openConfigurationStore(this, attributes); + getConfigurationStore().upgradeStoreStructure(); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java index c6fa815c82..7d37363c81 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java @@ -256,6 +256,12 @@ public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase } @Override + public void upgradeStoreStructure() throws StoreException + { + + } + + @Override public void create(ConfiguredObjectRecord object) throws StoreException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 8bf981bd7b..c0968794f5 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -42,6 +42,8 @@ import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; + +import org.hamcrest.Description; import org.mockito.ArgumentMatcher; public abstract class MessageStoreTestCase extends QpidTestCase @@ -163,7 +165,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase StoreFuture flushFuture = message.flushToStore(); flushFuture.waitForCompletion(); - assertEquals("Unexpected message id", 4, message.getMessageNumber()); + assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4); } public void testVisitMessageInstances() throws Exception @@ -412,6 +414,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase { return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber; } + + @Override + public void describeTo(final Description description) + { + description.appendText("Expected messageNumber:"); + description.appendValue(_messageNumber); + } + } private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 38b4c66ebe..6f87e81ba1 100644 --- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -504,6 +504,11 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } @Override + public void upgradeStoreStructure() throws StoreException + { + } + + @Override public String getStoreLocation() { return DerbyMessageStore.this.getStoreLocation(); diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index ddafa83bb3..61ecb6748c 100644 --- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -484,6 +484,11 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } @Override + public void upgradeStoreStructure() throws StoreException + { + } + + @Override public String getStoreLocation() { return JDBCMessageStore.this.getStoreLocation(); |