summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-06 15:43:49 +0000
committerKeith Wall <kwall@apache.org>2014-06-06 15:43:49 +0000
commit2df814dae9f4b112a7d36ee2f94f1a73a711ac15 (patch)
tree1956200b2d7fd9e1c90d801cdfa312b41b9fc25d /java
parent9e35c097bcfd92ce7d75d1891420705cd232883f (diff)
downloadqpid-python-2df814dae9f4b112a7d36ee2f94f1a73a711ac15.tar.gz
QPID-5801: [Java Broker] Elimination of the BDB facade's initialisation tasks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1600934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java254
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java7
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java7
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java45
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java17
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java212
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java57
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java3
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java2
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java43
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java105
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java15
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java7
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java13
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java13
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java16
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java1
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java12
-rw-r--r--java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java5
-rw-r--r--java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java5
28 files changed, 374 insertions, 505 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 2766c2ac98..ca32ffaa73 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,7 +33,6 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -46,12 +46,13 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
@@ -66,7 +67,6 @@ import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
-import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
@@ -83,7 +83,6 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
/**
@@ -96,6 +95,8 @@ import org.apache.qpid.util.FileUtils;
*/
public class BDBConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
{
+ public static final DatabaseConfig DEFAULT_DATABASE_CONFIG = new DatabaseConfig().setTransactional(true).setAllowCreate(true);
+
private static final Logger LOGGER = Logger.getLogger(BDBConfigurationStore.class);
public static final int VERSION = 8;
@@ -103,6 +104,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGE_META_DATA_SEQ_DB_NAME = "MESSAGE_METADATA.SEQ";
private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
@@ -110,16 +112,20 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private static String BRIDGEDB_NAME = "BRIDGES";
private static String LINKDB_NAME = "LINKS";
private static String XID_DB_NAME = "XIDS";
- private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME};
- private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
private EnvironmentFacade _environmentFacade;
- private final AtomicLong _messageId = new AtomicLong(0);
+
+ private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(
+ Charset.forName("UTF-8")));
+
+ private static final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT.
+ setAllowCreate(true).
+ setInitialValue(1).
+ setWrap(true).
+ setCacheSize(100000);
private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
- private long _totalStoreSize;
-
private final EnvironmentFacadeFactory _environmentFacadeFactory;
private volatile Committer _committer;
@@ -128,6 +134,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private String _storeLocation;
private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
+ private ConfiguredObject<?> _parent;
public BDBConfigurationStore()
{
@@ -144,22 +151,11 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
+ _parent = parent;
+
if (_environmentFacade == null)
{
- EnvironmentFacadeTask[] initialisationTasks = null;
- _isMessageStoreProvider = MapValueConverter.getBooleanAttribute(VirtualHostNode.IS_MESSAGE_STORE_PROVIDER, storeSettings, false);
- if (_isMessageStoreProvider)
- {
- String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
- System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
- System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
- initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask()};
- }
- else
- {
- initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
- }
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings);
_storeLocation = _environmentFacade.getStoreLocation();
}
else
@@ -170,6 +166,18 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ try
+ {
+ new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e);
+ }
+ }
+ @Override
public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
@@ -564,123 +572,37 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private Database getConfiguredObjectsDb()
{
- return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
+ return _environmentFacade.openDatabase(CONFIGURED_OBJECTS_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
private Database getConfiguredObjectHierarchyDb()
{
- return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME);
+ return _environmentFacade.openDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
private Database getMessageContentDb()
{
- return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
+ return _environmentFacade.openDatabase(MESSAGE_CONTENT_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
private Database getMessageMetaDataDb()
{
- return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
- }
-
- private Database getDeliveryDb()
- {
- return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
+ return _environmentFacade.openDatabase(MESSAGE_META_DATA_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
- private Database getXidDb()
+ private Database getMessageMetaDataSeqDb()
{
- return _environmentFacade.getOpenDatabase(XID_DB_NAME);
+ return _environmentFacade.openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
- class UpgradeTask implements EnvironmentFacadeTask
- {
- private final ConfiguredObject<?> _parent;
-
- public UpgradeTask(ConfiguredObject<?> parent)
- {
- _parent = parent;
- }
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot upgrade store", e);
- }
- }
- }
-
- class OpenDatabasesTask implements EnvironmentFacadeTask
+ private Database getDeliveryDb()
{
- private String[] _names;
-
- public OpenDatabasesTask(String[] names)
- {
- _names = names;
- }
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- facade.openDatabases(dbConfig, _names);
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot open databases", e);
- }
- }
-
+ return _environmentFacade.openDatabase(DELIVERY_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
- class DiskSpaceTask implements EnvironmentFacadeTask
- {
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- try
- {
- _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
- }
- catch(DatabaseException e)
- {
- throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
- }
- }
-
- }
-
- public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+ private Database getXidDb()
{
- private long _maxId;
-
- @Override
- public void execute(EnvironmentFacade facade)
- {
- ((BDBMessageStore)getMessageStore()).visitMessagesInternal(this, facade);
- _messageId.set(_maxId);
- }
-
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- long id = storedMessage.getMessageNumber();
- if (_maxId<id)
- {
- _maxId = id;
- }
- return true;
- }
+ return _environmentFacade.openDatabase(XID_DB_NAME, DEFAULT_DATABASE_CONFIG);
}
class BDBMessageStore implements MessageStore
@@ -692,12 +614,14 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private boolean _limitBusted;
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
+ private ConfiguredObject<?> _parent;
@Override
public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
{
if (_messageStoreOpen.compareAndSet(false, true))
{
+ _parent = parent;
Object overfullAttr = messageStoreSettings.get(OVERFULL_SIZE);
Object underfullAttr = messageStoreSettings.get(UNDERFULL_SIZE);
@@ -714,8 +638,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
- new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings
+ );
_storeLocation = _environmentFacade.getStoreLocation();
}
@@ -725,26 +649,44 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
@Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ public void upgradeStoreStructure() throws StoreException
{
- if (metaData.isPersistent())
+ try
{
- return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
+ new Upgrader(_environmentFacade.getEnvironment(), _parent).upgradeIfNecessary();
}
- else
+ catch(DatabaseException e)
{
- return new StoredMemoryMessage<T>(getNewMessageId(), metaData);
+ throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e);
}
}
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- private long getNewMessageId()
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
- return _messageId.incrementAndGet();
+
+ Sequence mmdSeq = null;
+ try
+ {
+ mmdSeq = getMessageMetaDataSeqDb().openSequence(null, MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG);
+ long newMessageId = mmdSeq.get(null, 1);
+
+ if (metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage<T>(newMessageId, metaData);
+ }
+ }
+ finally
+ {
+ if (mmdSeq != null)
+ {
+ mmdSeq.close();
+ }
+ }
}
@Override
@@ -1116,7 +1058,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
Cursor cursor = null;
try
{
- cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+ cursor = getMessageMetaDataDb().openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
@@ -1461,42 +1403,23 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
{
synchronized (this)
{
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 2*delta;
+ // TODO KW I think we should simplify matters here. The MessageStore should
+ // expose merely method #getStoreSizeEstimate(). The virtualhost (housekeeper)
+ // should periodically call this method and it be the one responsible for
+ // the generation of alerts.
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- _totalStoreSize = getSizeOnDisk();
+ long newSize = getSizeOnDisk();
+ reduceSizeOnDisk();
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
}
else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
{
- long oldSize = _totalStoreSize;
- _totalStoreSize = getSizeOnDisk();
-
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk();
-
- _totalStoreSize = getSizeOnDisk();
-
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
}
}
@@ -1512,6 +1435,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
return _persistentSizeHighThreshold;
}
+ // TODO remove altogether or perhaps expose as public method: requestStoreCleanup
private void reduceSizeOnDisk()
{
_environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index e51a610a26..c614ba1367 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -43,11 +43,9 @@ public interface EnvironmentFacade
Environment getEnvironment();
- Committer createCommitter(String name);
-
- void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
+ Database openDatabase(String name, DatabaseConfig databaseConfig);
- Database getOpenDatabase(String name);
+ Committer createCommitter(String name);
Transaction beginTransaction();
@@ -57,5 +55,6 @@ public interface EnvironmentFacade
String getStoreLocation();
+ void closeDatabase(String name);
void close();
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index 83584e306f..e13ad0e452 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -26,12 +26,7 @@ public interface EnvironmentFacadeFactory
{
public static final String ENVIRONMENT_CONFIGURATION = "environmentConfiguration";
- EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks);
+ EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings);
String getType();
-
- public static interface EnvironmentFacadeTask
- {
- void execute(EnvironmentFacade facade);
- }
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 643cd9ae70..f41eca602b 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -21,11 +21,10 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -40,11 +39,12 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
public static final String TYPE = "BDB";
private final String _storePath;
- private final Map<String, Database> _databases = new HashMap<String, Database>();
+ private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private Environment _environment;
- public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks)
+ public StandardEnvironmentFacade(String storePath,
+ Map<String, String> attributes)
{
_storePath = storePath;
@@ -76,13 +76,6 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
_environment = new Environment(environmentPath, envConfig);
- if (initialisationTasks != null)
- {
- for (EnvironmentFacadeTask task : initialisationTasks)
- {
- task.execute(this);
- }
- }
}
@@ -119,7 +112,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- for (Database database : _databases.values())
+ for (Database database : _cachedDatabases.values())
{
try
{
@@ -209,24 +202,34 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ public Database openDatabase(String name, DatabaseConfig databaseConfig)
{
- for (String databaseName : databaseNames)
+ Database cachedHandle = _cachedDatabases.get(name);
+ if (cachedHandle == null)
{
- Database database = _environment.openDatabase(null, databaseName, dbConfig);
- _databases .put(databaseName, database);
+ Database handle = _environment.openDatabase(null, name, databaseConfig);
+ Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
+ if (existingHandle == null)
+ {
+ cachedHandle = handle;
+ }
+ else
+ {
+ cachedHandle = existingHandle;
+ handle.close();
+ }
}
+ return cachedHandle;
}
@Override
- public Database getOpenDatabase(String name)
+ public void closeDatabase(final String name)
{
- Database database = _databases.get(name);
- if (database == null)
+ Database cachedHandle = _cachedDatabases.remove(name);
+ if (cachedHandle != null)
{
- throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ cachedHandle.close();
}
- return database;
}
@Override
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 9506b1c20a..49db913d6c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -30,7 +30,7 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
+ public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
@@ -41,7 +41,7 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
- return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks);
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap);
}
@Override
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
index 6ec5067d01..be4b8199c3 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
@@ -32,13 +33,16 @@ import com.sleepycat.je.Transaction;
public class DatabasePinger
{
public static final String PING_DATABASE_NAME = "PINGDB";
+ private static final DatabaseConfig DATABASE_CONFIG =
+ DatabaseConfig.DEFAULT.setAllowCreate(true).setTransactional(true);
private static final int ID = 0;
public void pingDb(EnvironmentFacade facade)
{
try
{
- final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+ final Database db = facade.openDatabase(PING_DATABASE_NAME,
+ DATABASE_CONFIG);
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(ID, key);
@@ -55,16 +59,9 @@ public class DatabasePinger
}
finally
{
- try
+ if (txn != null)
{
- if (txn != null)
- {
- txn.abort();
- }
- }
- finally
- {
- db.close();
+ txn.abort();
}
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index b57d5bae21..748f10a01a 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -39,14 +40,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
-import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -148,12 +147,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final ExecutorService _environmentJobExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
- private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
- private final AtomicBoolean _initialised;
- private final EnvironmentFacadeTask[] _initialisationTasks;
private final Durability _defaultDurability;
private final CoalescingCommiter _coalescingCommiter;
@@ -163,7 +159,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
- public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
+ private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
_environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
@@ -174,9 +172,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
+ "Ensure the path is correct and that the permissions are correct.");
}
}
+ else
+ {
+ LOGGER.debug("Environment at path " + _environmentDirectory + " already exists.");
+ }
- _initialised = new AtomicBoolean();
- _initialisationTasks = initialisationTasks;
_configuration = configuration;
_defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
_prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
@@ -327,8 +327,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
- public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ public Database openDatabase(String name, DatabaseConfig databaseConfig)
{
+ LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
if (_state.get() != State.OPEN)
{
throw new IllegalStateException("Environment facade is not in opened state");
@@ -339,54 +340,42 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
throw new IllegalStateException("Environment is not valid");
}
- if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
- {
- throw new IllegalStateException("Databases can only be opened on Master node");
- }
-
- for (String databaseName : databaseNames)
+ Database cachedHandle = _cachedDatabases.get(name);
+ if (cachedHandle == null)
{
- _databases.put(databaseName, new DatabaseHolder(dbConfig));
- }
- for (String databaseName : databaseNames)
- {
- DatabaseHolder holder = _databases.get(databaseName);
- openDatabaseInternally(databaseName, holder);
- }
- }
+ Database handle = _environment.openDatabase(null, name, databaseConfig);
+ Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
+ if (existingHandle == null)
+ {
+ LOGGER.debug("openDatabase " + name + " new handle");
- private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
- {
- if (_state.get() == State.OPEN)
- {
- Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
- holder.setDatabase(database);
+ cachedHandle = handle;
+ }
+ else
+ {
+ LOGGER.debug("openDatabase " + name + " existing handle");
+ cachedHandle = existingHandle;
+ handle.close();
+ }
}
+ return cachedHandle;
}
@Override
- public Database getOpenDatabase(String name)
+ public void closeDatabase(final String databaseName)
{
- if (_state.get() != State.OPEN)
+ Database cachedHandle = _cachedDatabases.remove(databaseName);
+ if (cachedHandle != null)
{
- throw new IllegalStateException("Environment facade is not in opened state");
- }
-
- if (!_environment.isValid())
- {
- throw new IllegalStateException("Environment is not valid");
- }
- DatabaseHolder databaseHolder = _databases.get(name);
- if (databaseHolder == null)
- {
- throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
- }
- Database database = databaseHolder.getDatabase();
- if (database == null)
- {
- throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing " + databaseName + " on " + _prettyGroupNodeName);
+ }
+ if (cachedHandle.getEnvironment().isValid())
+ {
+ cachedHandle.close();
+ }
}
- return database;
}
@Override
@@ -441,11 +430,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
-
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- onMasterStateChange();
- }
}
StateChangeListener listener = _stateChangeListener.get();
@@ -462,39 +446,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_lastKnownEnvironmentState = state;
}
- private void onMasterStateChange()
- {
- reopenDatabases();
-
- if (_initialised.compareAndSet(false, true))
- {
- if (_initialisationTasks != null)
- {
- for (EnvironmentFacadeTask task : _initialisationTasks)
- {
- task.execute(ReplicatedEnvironmentFacade.this);
- }
- }
- }
- }
-
- private void reopenDatabases()
- {
- if (_state.get() == State.OPEN)
- {
- DatabaseConfig pingDbConfig = new DatabaseConfig();
- pingDbConfig.setTransactional(true);
- pingDbConfig.setAllowCreate(true);
-
- _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
-
- for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
- {
- openDatabaseInternally(entry.getKey(), entry.getValue());
- }
- }
- }
-
public String getGroupName()
{
return (String)_configuration.getGroupName();
@@ -833,36 +784,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
if (LOGGER.isInfoEnabled())
{
- LOGGER.debug("When restarting a state change event is recieved on NOOP listener for state:" + stateChangeEvent.getState());
+ LOGGER.debug(
+ "When restarting a state change event is received on NOOP listener for state:"
+ + stateChangeEvent.getState());
}
}
});
- try
- {
- closeDatabases();
- }
- catch(Exception e)
- {
- LOGGER.warn("Ignoring an exception whilst closing databases", e);
- }
}
- else
+
+ try
{
- // reset database holders for invalid environments
- for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
- {
- DatabaseHolder databaseHolder = entry.getValue();
- Database database = databaseHolder.getDatabase();
- if (database != null)
- {
- databaseHolder.setDatabase(null);
- }
- }
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing databases", e);
}
- LOGGER.debug("Closing environent");
environment.close();
- LOGGER.debug("Environent is closed");
}
catch (EnvironmentFailureException efe)
{
@@ -874,35 +813,29 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+
+ Iterator<String> itr = _cachedDatabases.keySet().iterator();
+ while (itr.hasNext())
{
- DatabaseHolder databaseHolder = entry.getValue();
- Database database = databaseHolder.getDatabase();
- if (database != null)
+ String databaseName = itr.next();
+
+ if (databaseName != null)
{
try
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
- }
-
- database.close();
+ closeDatabase(databaseName);
}
catch(RuntimeException e)
{
- LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+ LOGGER.error("Failed to close database " + databaseName + " on " + _prettyGroupNodeName, e);
if (firstThrownException == null)
{
firstThrownException = e;
}
}
- finally
- {
- databaseHolder.setDatabase(null);
- }
}
}
+
if (firstThrownException != null)
{
throw firstThrownException;
@@ -1340,37 +1273,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
CLOSING,
CLOSED
}
-
- private static class DatabaseHolder
- {
- private final DatabaseConfig _config;
- private Database _database;
-
- public DatabaseHolder(DatabaseConfig config)
- {
- _config = config;
- }
-
- public Database getDatabase()
- {
- return _database;
- }
-
- public void setDatabase(Database database)
- {
- _database = database;
- }
-
- public DatabaseConfig getConfig()
- {
- return _config;
- }
-
- @Override
- public String toString()
- {
- return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
- }
-
- }
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index e154e8bf45..c08318a657 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@Override
- public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
+ public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings)
{
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@@ -95,7 +95,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return (String)messageStoreSettings.get(BDBHAVirtualHostNode.GROUP_NAME);
}
};
- return new ReplicatedEnvironmentFacade(configuration, initialisationTasks);
+ return new ReplicatedEnvironmentFacade(configuration);
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
index 05841b86ae..4638194970 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.upgrade;
import java.io.IOException;
import java.io.StringWriter;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -32,7 +33,12 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -62,6 +68,12 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
put("amq.match", "headers");
}};
+ private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(Charset.forName("UTF-8")));
+ private static SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT.
+ setAllowCreate(true).
+ setWrap(true).
+ setCacheSize(100000);
+
@Override
public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
@@ -74,6 +86,11 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig);
Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig);
Database configVersionDb = environment.openDatabase(null, "CONFIG_VERSION", dbConfig);
+ Database messageMetadataDb = environment.openDatabase(null, "MESSAGE_METADATA", dbConfig);
+ Database messageMetadataSeqDb = environment.openDatabase(null, "MESSAGE_METADATA.SEQ", dbConfig);
+
+ long maxMessageId = getMaximumMessageId(messageMetadataDb);
+ createMessageMetadataSequence(messageMetadataSeqDb, maxMessageId);
Cursor objectsCursor = null;
@@ -197,6 +214,8 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
hierarchyDb.close();
configuredObjectsDb.close();
+ messageMetadataDb.close();
+ messageMetadataSeqDb.close();
reportFinished(environment, 8);
}
@@ -214,6 +233,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
hierarchyDb.put(txn, key, value);
}
+ private void createMessageMetadataSequence(final Database messageMetadataSeqDb,
+ final long maximumMessageId)
+ {
+ SequenceConfig sequenceConfig = MESSAGE_METADATA_SEQ_CONFIG.setInitialValue(maximumMessageId + 1);
+
+ Sequence messageMetadataSeq = messageMetadataSeqDb.openSequence(null, MESSAGE_METADATA_SEQ_KEY, sequenceConfig);
+ messageMetadataSeq.close();
+ }
+
private int getConfigVersion(Database configVersionDb)
{
Cursor cursor = null;
@@ -251,4 +279,33 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
+ status);
}
}
+
+ private long getMaximumMessageId(Database messageMetaDataDb)
+ {
+ Cursor cursor = null;
+ long maximumMessageId = 0; // Our hand-rolled sequences value always began at zero
+ try
+ {
+ cursor = messageMetaDataDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ maximumMessageId = Math.max(messageId, maximumMessageId);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ return maximumMessageId;
+ }
+
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index e36def581b..d808cf9724 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -389,7 +389,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
closeVirtualHostIfExist();
- getConfigurationStore().getEnvironmentFacade().getEnvironment().flushLog(true);
+
+ getConfigurationStore().upgradeStoreStructure();
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_START());
VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(this);
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 0254cbc909..fd196a28da 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -193,7 +193,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node.delete();
assertEquals("Unexpected state returned after delete", State.DELETED, node.getState());
assertEquals("Unexpected state", State.DELETED, node.getState());
- assertFalse("Store still exists", _bdbStorePath.exists());
+ assertFalse("Store still exists " + _bdbStorePath, _bdbStorePath.exists());
}
public void testMutableAttributes() throws Exception
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
index a82bb066e2..5772498ebc 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -77,38 +77,21 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
assertNull("Environment should be null after facade close", e);
}
- public void testOpenDatabases() throws Exception
+ public void testOpenDatabaseReusesCachedHandle() throws Exception
{
- EnvironmentFacade ef = getEnvironmentFacade();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- ef.openDatabases(dbConfig, "test1", "test2");
- Database test1 = ef.getOpenDatabase("test1");
- Database test2 = ef.getOpenDatabase("test2");
-
- assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
- assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
- }
+ DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
- public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
- {
EnvironmentFacade ef = getEnvironmentFacade();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- ef.openDatabases(dbConfig, "test1");
- Database test1 = ef.getOpenDatabase("test1");
- assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
- try
- {
- ef.getOpenDatabase("test2");
- fail("An exception should be thrown for the non existing database");
- }
- catch(IllegalArgumentException e)
- {
- assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
- }
+ Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertNotNull(handle1);
+
+ Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertSame("Database handle should be cached", handle1, handle2);
+
+ ef.closeDatabase("myDatabase");
+
+ Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertNotSame("Expecting a new handle after database closure", handle1, handle3);
}
EnvironmentFacade getEnvironmentFacade() throws Exception
@@ -122,7 +105,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
EnvironmentFacade createEnvironmentFacade()
{
- return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null);
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
}
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index e4f1b62954..b14332ecf6 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -45,7 +44,6 @@ import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
-import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
@@ -122,38 +120,21 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertNull("Environment should be null after facade close", e);
}
- public void testOpenDatabases() throws Exception
+ public void testOpenDatabaseReusesCachedHandle() throws Exception
{
+ DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
+
EnvironmentFacade ef = createMaster();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- ef.openDatabases(dbConfig, "test1", "test2");
- Database test1 = ef.getOpenDatabase("test1");
- Database test2 = ef.getOpenDatabase("test2");
+ Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertNotNull(handle1);
- assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
- assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
- }
+ Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertSame("Database handle should be cached", handle1, handle2);
- public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
- {
- EnvironmentFacade ef = createMaster();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- ef.openDatabases(dbConfig, "test1");
- Database test1 = ef.getOpenDatabase("test1");
- assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
- try
- {
- ef.getOpenDatabase("test2");
- fail("An exception should be thrown for the non existing database");
- }
- catch(IllegalArgumentException e)
- {
- assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
- }
+ ef.closeDatabase("myDatabase");
+
+ Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
+ assertNotSame("Expecting a new handle after database closure", handle1, handle3);
}
public void testGetGroupName() throws Exception
@@ -490,59 +471,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
}
- public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
- {
-
- ReplicatedEnvironmentFacade master = createMaster();
-
- int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
- String replica1NodeName = TEST_NODE_NAME + "_1";
- String replica1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener());
-
- int replica2Port = getNextAvailable(replica1Port + 1);
- String replica2NodeName = TEST_NODE_NAME + "_2";
- String replica2NodeHostPort = "localhost:" + replica2Port;
- ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener());
-
- String databaseName = "test";
-
- DatabaseConfig dbConfig = createDatabase(master, databaseName);
-
- // close replicas
- replica1.close();
- replica2.close();
-
- Environment e = master.getEnvironment();
- master.getOpenDatabase(databaseName);
- try
- {
- master.openDatabases(dbConfig, "test2");
- fail("Opening of new database without quorum should fail");
- }
- catch(InsufficientReplicasException ex)
- {
- master.handleDatabaseException(null, ex);
- }
-
- EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA);
- replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
- replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
-
- // Need to poll to await the remote node updating itself
- long timeout = System.currentTimeMillis() + 5000;
- while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
- {
- Thread.sleep(200);
- }
-
- assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
- State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
-
- Environment e2 = master.getEnvironment();
- assertNotSame("Environment has not been restarted", e2, e);
- }
-
public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
final CountDownLatch masterLatch = new CountDownLatch(1);
@@ -779,7 +707,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
ref.setStateChangeListener(stateChangeListener);
ref.setReplicationGroupListener(replicationGroupListener);
_nodes.put(nodeName, ref);
@@ -791,15 +719,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener);
}
- private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName)
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- environmentFacade.openDatabases(dbConfig, databaseName);
- return dbConfig;
- }
-
private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException
{
int counter = 0;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
index 3d32d483db..d5bbd51fe1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
@@ -180,6 +180,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ _store.upgradeStoreStructure();
+ }
+
+ @Override
public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index dab8e8e20f..d95d58a9cf 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -344,20 +344,25 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
}
@Override
- public void closeConfigurationStore() throws StoreException
+ public void openConfigurationStore(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings)
+ throws StoreException
{
+ _parent = parent;
}
@Override
- public void onDelete()
+ public void upgradeStoreStructure() throws StoreException
{
}
@Override
- public void openConfigurationStore(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings)
- throws StoreException
+ public void closeConfigurationStore() throws StoreException
+ {
+ }
+
+ @Override
+ public void onDelete()
{
- _parent = parent;
}
@Override
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 79c2c274ee..24563bae61 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -690,6 +690,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
abstract protected String getSqlBigIntType();
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ // TODO acquire connection to the database using the attribute of the parents,
+ // run the upgrader in a transaction, close the connection.
+ }
+
protected void createOrOpenMessageStoreDatabase() throws SQLException
{
Connection conn = newAutoCommitConnection();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
index 1c92e491c3..48608dde4f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
@@ -73,16 +73,23 @@ abstract class AbstractMemoryStore implements DurableConfigurationStore, Message
}
@Override
- public void closeConfigurationStore()
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
- _configuredObjectRecords.clear();
}
@Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ public void upgradeStoreStructure() throws StoreException
+ {
+
+ }
+
+ @Override
+ public void closeConfigurationStore()
{
+ _configuredObjectRecords.clear();
}
+
@Override
public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 94de6c30c3..e353b55e68 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -32,8 +32,7 @@ public interface DurableConfigurationStore
String STORE_PATH = "storePath";
/**
- * Called after instantiation in order to configure the message store. A particular implementation can define
- * whatever parameters it wants.
+ * Initializes and opens the configuration store.
*
* @param parent
* @param storeSettings store settings
@@ -41,6 +40,16 @@ public interface DurableConfigurationStore
void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
/**
+ * Requests that the store performs any upgrade work on the store's structure. If there is no
+ * upgrade work to be done, this method should return without doing anything.
+ *
+ * @throws StoreException signals that a problem was encountered trying to upgrade the store.
+ * Implementations, on encountering a problem, should endeavour to leave the store in its
+ * original state.
+ */
+ void upgradeStoreStructure() throws StoreException;
+
+ /**
* Visit all configured object records with given handler.
*
* @param handler a handler to invoke on each configured object record
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index ba42c45207..afb2179b9b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -106,6 +106,12 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ // No-op for Json
+ }
+
+ @Override
public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
_parent = parent;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 44d640ca86..f4ad308a2e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -184,6 +184,12 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+
+ }
+
+ @Override
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
{
long id = _messageId.getAndIncrement();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index b1ddaabda8..e82f9dbbb0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -43,13 +43,23 @@ public interface MessageStore
void addEventListener(EventListener eventListener, Event... events);
/**
- * Called after instantiation in order to open and initialize the message store. A particular implementation can define
- * whatever parameters it wants.
- * @param parent virtual host name
+ * Initializes and opens the message store.
+ *
+ * @param parent parent object
* @param messageStoreSettings store settings
*/
void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
+ /**
+ * Requests that the store performs any upgrade work on the store's structure. If there is no
+ * upgrade work to be done, this method should return without doing anything.
+ *
+ * @throws StoreException signals that a problem was encountered trying to upgrade the store.
+ * Implementations, on encountering a problem, should endeavour to leave the store in its
+ * original state.
+ */
+ void upgradeStoreStructure() throws StoreException;
+
void visitMessages(MessageHandler handler) throws StoreException;
void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index a3ed4bea05..a929198af6 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -63,6 +63,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+
+ }
+
+ @Override
public void closeMessageStore()
{
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 9e10d5e424..5af88f9b94 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1368,6 +1368,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation()));
}
+ messageStore.upgradeStoreStructure();
+
if (isStoreEmpty())
{
createDefaultExchanges();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index 62c05991c5..f91ab44792 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -99,6 +99,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
Map<String, Object> attributes = buildAttributesForStore();
getConfigurationStore().openConfigurationStore(this, attributes);
+ getConfigurationStore().upgradeStoreStructure();
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED());
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
index c6fa815c82..7d37363c81 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
@@ -256,6 +256,12 @@ public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+
+ }
+
+ @Override
public void create(ConfiguredObjectRecord object) throws StoreException
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 8bf981bd7b..c0968794f5 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -42,6 +42,8 @@ import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
+
+import org.hamcrest.Description;
import org.mockito.ArgumentMatcher;
public abstract class MessageStoreTestCase extends QpidTestCase
@@ -163,7 +165,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
StoreFuture flushFuture = message.flushToStore();
flushFuture.waitForCompletion();
- assertEquals("Unexpected message id", 4, message.getMessageNumber());
+ assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4);
}
public void testVisitMessageInstances() throws Exception
@@ -412,6 +414,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase
{
return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber;
}
+
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendText("Expected messageNumber:");
+ description.appendValue(_messageNumber);
+ }
+
}
private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler
diff --git a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 38b4c66ebe..6f87e81ba1 100644
--- a/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -504,6 +504,11 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ }
+
+ @Override
public String getStoreLocation()
{
return DerbyMessageStore.this.getStoreLocation();
diff --git a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index ddafa83bb3..61ecb6748c 100644
--- a/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -484,6 +484,11 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ }
+
+ @Override
public String getStoreLocation()
{
return JDBCMessageStore.this.getStoreLocation();