summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-23 18:29:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-23 18:29:22 +0000
commitd6218a66346b3b4005d61cdcb26c4b7b0eb71cdd (patch)
tree9a07f7d20af117ed4c76f2406a591132c1c6475e
parenta998e640d140cd3ef42e0126a88c248186601684 (diff)
downloadqpid-python-d6218a66346b3b4005d61cdcb26c4b7b0eb71cdd.tar.gz
QPID-5615 : [Java Broker] update the durable configuration store interface to be entirely in terms of ConfiguredObjectRecords and give stores knowledge of hierarchy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1580561 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java245
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java107
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java79
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java59
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java159
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java9
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java3
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java11
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java22
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java57
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java615
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java63
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java104
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java34
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java147
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java80
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java120
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java113
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java93
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java24
48 files changed, 1606 insertions, 795 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 7d7ebd32fe..9a06f06999 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -41,8 +41,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -51,16 +49,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.*;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.util.FileUtils;
@@ -70,7 +63,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private static final int LOCK_RETRY_ATTEMPTS = 5;
- public static final int VERSION = 7;
+ public static final int VERSION = 8;
private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
@@ -83,6 +76,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private Environment _environment;
private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
+ private static String CONFIGURED_OBJECT_HIERARCHY = "CONFIGURED_OBJECT_HIERARCHY";
+
private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
@@ -92,6 +87,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
private Database _configuredObjectsDb;
+ private Database _configuredObjectHierarchyDb;
private Database _configVersionDb;
private Database _messageMetaDataDb;
private Database _messageContentDb;
@@ -146,6 +142,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private Map<String, String> _envConfigMap;
private VirtualHost _virtualHost;
+
public AbstractBDBMessageStore()
{
_stateManager = new StateManager(_eventManager);
@@ -183,7 +180,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private void completeInitialisation()
{
- configure(_virtualHost);
+ configure();
_stateManager.attainState(State.INITIALISED);
}
@@ -221,27 +218,24 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
/**
* Called after instantiation in order to configure the message store.
*
- *
- *
- * @param virtualHost The virtual host using this store
* @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(VirtualHost virtualHost)
+ public void configure()
{
- configure(virtualHost, _messageRecoveryHandler != null);
+ configure(_messageRecoveryHandler != null);
}
- public void configure(VirtualHost virtualHost, boolean isMessageStore)
+ public void configure(boolean isMessageStore)
{
- String name = virtualHost.getName();
+ String name = _virtualHost.getName();
final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
String storeLocation;
if(isMessageStore)
{
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ storeLocation = (String) _virtualHost.getAttribute(VirtualHost.STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
@@ -249,15 +243,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
else // we are acting only as the durable config store
{
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ storeLocation = (String) _virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
}
}
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Object overfullAttr = _virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = _virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
@@ -285,7 +279,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
_envConfigMap = new HashMap<String, String>();
_envConfigMap.putAll(ENVCONFIG_DEFAULTS);
- Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+ Object bdbEnvConfigAttr = _virtualHost.getAttribute("bdbEnvironmentConfig");
if(bdbEnvConfigAttr instanceof Map)
{
_envConfigMap.putAll((Map)bdbEnvConfigAttr);
@@ -293,27 +287,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
LOGGER.info("Configuring BDB message store");
- setupStore(environmentPath, name);
- }
-
- protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException
- {
- final List<Object> argumentNames = config.getList(prefix + ".name");
- final List<Object> argumentValues = config.getList(prefix + ".value");
- final int initialSize = argumentNames.size() + defaultConfig.size();
-
- final Map<String,String> attributes = new HashMap<String,String>(initialSize);
- attributes.putAll(defaultConfig);
-
- for (int i = 0; i < argumentNames.size(); i++)
- {
- final String argName = argumentNames.get(i).toString();
- final String argValue = argumentValues.get(i).toString();
-
- attributes.put(argName, argValue);
- }
-
- return Collections.unmodifiableMap(attributes);
+ setupStore(environmentPath);
}
@Override
@@ -337,11 +311,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
_stateManager.attainState(State.ACTIVE);
}
- protected void setupStore(File storePath, String name)
+ protected void setupStore(File storePath)
{
_environment = createEnvironment(storePath);
- new Upgrader(_environment, name).upgradeIfNecessary();
+ new Upgrader(_environment, _virtualHost).upgradeIfNecessary();
openDatabases();
@@ -355,6 +329,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
return _environment;
}
+ protected final VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
private void openDatabases() throws DatabaseException
{
DatabaseConfig dbConfig = new DatabaseConfig();
@@ -365,6 +344,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
dbConfig.setReadOnly(false);
_configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
+ _configuredObjectHierarchyDb = openDatabase(CONFIGURED_OBJECT_HIERARCHY, dbConfig);
_configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
_messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
_messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
@@ -411,11 +391,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
_messageContentDb.close();
}
- if (_configuredObjectsDb != null)
- {
- LOGGER.info("Closing configurable objects database");
- _configuredObjectsDb.close();
- }
+ if (_configuredObjectsDb != null)
+ {
+ LOGGER.info("Closing configurable objects database");
+ _configuredObjectsDb.close();
+ }
+
+ if (_configuredObjectHierarchyDb != null)
+ {
+ LOGGER.info("Closing configurable object hierarchy database");
+ _configuredObjectHierarchyDb.close();
+ }
if (_deliveryDb != null)
{
@@ -555,24 +541,60 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
- Cursor cursor = null;
+ Cursor objectsCursor = null;
+ Cursor hierarchyCursor = null;
try
{
- cursor = _configuredObjectsDb.openCursor(null, null);
+ objectsCursor = _configuredObjectsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
+ new HashMap<UUID, BDBConfiguredObjectRecord>();
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
- crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
+ BDBConfiguredObjectRecord configuredObject =
+ (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+ configuredObjects.put(configuredObject.getId(), configuredObject);
}
+ // set parents
+ hierarchyCursor = _configuredObjectHierarchyDb.openCursor(null, null);
+ while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+ if(child != null)
+ {
+ ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+ if(parent != null)
+ {
+ child.addParent(hk.getParentType(), parent);
+ }
+ else if(hk.getParentType().equals("Exchange"))
+ {
+ // TODO - remove this hack for the pre-defined exchanges
+ child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+
+ for (ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ crh.configuredObject(record);
+ }
+
+
+
}
finally
{
- closeCursorSafely(cursor);
+ closeCursorSafely(objectsCursor);
+ closeCursorSafely(hierarchyCursor);
}
}
@@ -842,39 +864,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
@Override
- public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
+ public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
+
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
- storeConfiguredObjectEntry(configuredObject);
- }
- }
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- @Override
- public void remove(UUID id, String type) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
- }
- OperationStatus status = removeConfiguredObject(null, id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Configured object of type " + type + " with id " + id + " not found");
+ storeConfiguredObjectEntry(txn, configuredObject);
+
+ txn.commit();
}
}
- @Override
- public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
+ public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- for(UUID id : objects)
+ for(ConfiguredObjectRecord record : objects)
{
- if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS)
+ if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
{
- removed.add(id);
+ removed.add(record.getId());
}
}
@@ -882,52 +893,50 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
return removed.toArray(new UUID[removed.size()]);
}
- @Override
- public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
- {
- update(false, id, type, attributes, null);
- }
-
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
for(ConfiguredObjectRecord record : records)
{
- update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn);
+ update(createIfNecessary, record, txn);
}
txn.commit();
}
- private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws
+ private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws
StoreException
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating " +type + ", id: " + id);
+ LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
}
try
{
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(id, key);
+ keyBinding.objectToEntry(record.getId(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND))
+ final boolean isNewRecord = status == OperationStatus.NOTFOUND;
+ if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
{
- ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
- configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
+ configuredObjectBinding.objectToEntry(record, newValue);
status = _configuredObjectsDb.put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error updating queue details within the store: " + status);
}
+ if(isNewRecord)
+ {
+ writeHierarchyRecords(txn, record);
+ }
}
else if (status != OperationStatus.NOTFOUND)
{
@@ -1441,17 +1450,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
/**
* Makes the specified configured object persistent.
*
+ *
+ * @param txn
* @param configuredObject Details of the configured object to store.
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException
+ private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
LOGGER.debug("Storing configured object: " + configuredObject);
DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(configuredObject.getId(), key);
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(configuredObject.getId(), key);
DatabaseEntry value = new DatabaseEntry();
ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
@@ -1459,12 +1470,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
queueBinding.objectToEntry(configuredObject, value);
try
{
- OperationStatus status = _configuredObjectsDb.put(null, key, value);
+ OperationStatus status = _configuredObjectsDb.put(txn, key, value);
if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ status);
}
+ writeHierarchyRecords(txn, configuredObject);
}
catch (DatabaseException e)
{
@@ -1474,16 +1486,49 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
- private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException
+ private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
{
+ OperationStatus status;
+ HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ DatabaseEntry hierarchyValue = new DatabaseEntry();
+
+ for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
+ {
+
+ hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
+ UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
+ status = _configuredObjectHierarchyDb.put(txn, hierarchyKey, hierarchyValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
+ + status);
+ }
+ }
+ }
+ private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
+ {
+ UUID id = record.getId();
+ Map<String, ConfiguredObjectRecord> parents = record.getParents();
LOGGER.debug("Removing configured object: " + id);
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
uuidBinding.objectToEntry(id, key);
try
{
- return _configuredObjectsDb.delete(tx, key);
+ OperationStatus status = _configuredObjectsDb.delete(tx, key);
+ if(status == OperationStatus.SUCCESS)
+ {
+ for(String parentType : parents.keySet())
+ {
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
+ keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
+ _configuredObjectHierarchyDb.delete(tx, hierarchyKey);
+ }
+ }
+ return status;
}
catch (DatabaseException e)
{
@@ -1859,4 +1904,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
}
+
+ void setVirtualHost(final VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
new file mode 100644
index 0000000000..f13e4dd08b
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord
+{
+ private final UUID _id;
+ private final String _type;
+ private final Map<String,Object> _attributes;
+ private Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
+
+ public BDBConfiguredObjectRecord(final UUID id, final String type, final Map<String, Object> attributes)
+ {
+ _id = id;
+ _type = type;
+ _attributes = Collections.unmodifiableMap(attributes);
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getType()
+ {
+ return _type;
+ }
+
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ void addParent(String parentType, ConfiguredObjectRecord parent)
+ {
+ _parents.put(parentType, parent);
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return Collections.unmodifiableMap(_parents);
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final BDBConfiguredObjectRecord that = (BDBConfiguredObjectRecord) o;
+
+ if (_attributes != null ? !_attributes.equals(that._attributes) : that._attributes != null)
+ {
+ return false;
+ }
+ if (_id != null ? !_id.equals(that._id) : that._id != null)
+ {
+ return false;
+ }
+ if (_type != null ? !_type.equals(that._type) : that._type != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _id != null ? _id.hashCode() : 0;
+ result = 31 * result + (_type != null ? _type.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index d99733acf0..bdd48e99bb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -37,7 +37,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.HAMessageStore;
@@ -124,9 +123,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private Map<String, String> _repConfig;
@Override
- public void configure(VirtualHost virtualHost)
+ public void configure()
{
//Mandatory configuration
+ VirtualHost virtualHost = getVirtualHost();
_groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
_nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
_nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
@@ -159,7 +159,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- super.configure(virtualHost);
+ super.configure();
}
@@ -207,13 +207,13 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
@Override
- protected void setupStore(File storePath, String name) throws DatabaseException
+ protected void setupStore(File storePath) throws DatabaseException
{
- super.setupStore(storePath, name);
+ super.setupStore(storePath);
if(_coalescingSync)
{
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment());
_commitThreadWrapper.startCommitThread();
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index acff8e2b21..0fd5060397 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -46,11 +46,11 @@ public class BDBMessageStore extends AbstractBDBMessageStore
private CommitThreadWrapper _commitThreadWrapper;
@Override
- protected void setupStore(File storePath, String name) throws DatabaseException
+ protected void setupStore(File storePath) throws DatabaseException
{
- super.setupStore(storePath, name);
+ super.setupStore(storePath);
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment());
_commitThreadWrapper.startCommitThread();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java
new file mode 100644
index 0000000000..d1c341447e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.entry;
+
+import java.util.UUID;
+
+public class HierarchyKey
+{
+ private final UUID _childId;
+ private final String _parentType;
+
+ public HierarchyKey(final UUID childId, final String parentType)
+ {
+ _childId = childId;
+ _parentType = parentType;
+ }
+
+ public UUID getChildId()
+ {
+ return _childId;
+ }
+
+ public String getParentType()
+ {
+ return _parentType;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final HierarchyKey that = (HierarchyKey) o;
+
+ if (!_childId.equals(that._childId))
+ {
+ return false;
+ }
+ if (!_parentType.equals(that._parentType))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _childId.hashCode();
+ result = 31 * result + _parentType.hashCode();
+ return result;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
index bc3beeb78b..38a2215fe7 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -32,6 +32,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -80,7 +81,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord
_uuid = uuid;
}
- public ConfiguredObjectRecord entryToObject(TupleInput tupleInput)
+ public BDBConfiguredObjectRecord entryToObject(TupleInput tupleInput)
{
String type = tupleInput.readString();
String json = tupleInput.readString();
@@ -88,7 +89,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord
try
{
Map<String,Object> value = mapper.readValue(json, Map.class);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value);
+ BDBConfiguredObjectRecord configuredObject = new BDBConfiguredObjectRecord(_uuid, type, value);
return configuredObject;
}
catch (IOException e)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java
new file mode 100644
index 0000000000..13adaabfc8
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+
+import java.util.UUID;
+
+public class HierarchyKeyBinding extends TupleBinding<HierarchyKey>
+{
+
+ private static final HierarchyKeyBinding INSTANCE = new HierarchyKeyBinding();
+
+ public static HierarchyKeyBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private HierarchyKeyBinding() { }
+
+ public HierarchyKey entryToObject(TupleInput tupleInput)
+ {
+ UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong());
+ String parentType = tupleInput.readString();
+
+ return new HierarchyKey(childId, parentType);
+ }
+
+ public void objectToEntry(HierarchyKey hk, TupleOutput tupleOutput)
+ {
+ UUID uuid = hk.getChildId();
+ tupleOutput.writeLong(uuid.getMostSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ tupleOutput.writeString(hk.getParentType());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
index adcaef35ef..b06b6d533b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
@@ -21,8 +21,9 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import com.sleepycat.je.Environment;
+import org.apache.qpid.server.model.VirtualHost;
public interface StoreUpgrade
{
- void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName);
+ void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
index 87f8afde4a..3eac47c81b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -39,6 +39,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -74,7 +75,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class);
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName)
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost)
{
Transaction transaction = null;
reportStarting(environment, 4);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 46f2afd741..dea8421a33 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -44,6 +44,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
@@ -118,11 +119,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
* Queue, Exchange, Bindings entries are stored now as configurable objects
* in "CONFIGURED_OBJECTS" table.
*/
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName)
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost)
{
reportStarting(environment, 5);
upgradeMessages(environment, handler);
- upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName);
+ upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHost.getName());
renameDatabases(environment, null);
reportFinished(environment, 6);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
index ce00fd1a48..79314ae098 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
@@ -27,6 +27,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.OperationStatus;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.StoreException;
public class UpgradeFrom6To7 extends AbstractStoreUpgrade
@@ -35,7 +36,7 @@ public class UpgradeFrom6To7 extends AbstractStoreUpgrade
private static final int DEFAULT_CONFIG_VERSION = 0;
@Override
- public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
+ public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost)
{
reportStarting(environment, 6);
DatabaseConfig dbConfig = new DatabaseConfig();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
new file mode 100644
index 0000000000..3756c11d0c
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.*;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
+import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class UpgradeFrom7To8 extends AbstractStoreUpgrade
+{
+
+ @Override
+ public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost)
+ {
+ reportStarting(environment, 7);
+
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig);
+ Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig);
+
+ Cursor objectsCursor = null;
+
+ Transaction txn = environment.beginTransaction(null, null);
+
+ try
+ {
+ objectsCursor = configuredObjectsDb.openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
+ new HashMap<UUID, BDBConfiguredObjectRecord>();
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+ TupleInput input = TupleBinding.entryToInput(value);
+ String type = input.readString();
+
+ if(!type.endsWith("Binding"))
+ {
+ UUIDTupleBinding.getInstance().objectToEntry(virtualHost.getId(),value);
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeLong(id.getMostSignificantBits());
+ tupleOutput.writeLong(id.getLeastSignificantBits());
+ tupleOutput.writeString("VirtualHost");
+ TupleBinding.outputToEntry(tupleOutput, key);
+ hierarchyDb.put(txn, key, value);
+ }
+ else
+ {
+ String json = input.readString();
+ ObjectMapper mapper = new ObjectMapper();
+ try
+ {
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ DatabaseEntry hierarchyValue = new DatabaseEntry();
+
+ Map<String,Object> attributes = mapper.readValue(json, Map.class);
+ Object queueIdString = attributes.remove("queue");
+ if(queueIdString instanceof String)
+ {
+ UUID queueId = UUID.fromString(queueIdString.toString());
+ UUIDTupleBinding.getInstance().objectToEntry(queueId,hierarchyValue);
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeLong(id.getMostSignificantBits());
+ tupleOutput.writeLong(id.getLeastSignificantBits());
+ tupleOutput.writeString("Queue");
+ TupleBinding.outputToEntry(tupleOutput, hierarchyKey);
+ hierarchyDb.put(txn, hierarchyKey, hierarchyValue);
+ }
+ Object exchangeIdString = attributes.remove("exchange");
+ if(exchangeIdString instanceof String)
+ {
+ UUID exchangeId = UUID.fromString(exchangeIdString.toString());
+ UUIDTupleBinding.getInstance().objectToEntry(exchangeId,hierarchyValue);
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeLong(id.getMostSignificantBits());
+ tupleOutput.writeLong(id.getLeastSignificantBits());
+ tupleOutput.writeString("Exchange");
+ TupleBinding.outputToEntry(tupleOutput, hierarchyKey);
+ hierarchyDb.put(txn, hierarchyKey, hierarchyValue);
+ }
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeString(type);
+ StringWriter writer = new StringWriter();
+ mapper.writeValue(writer,attributes);
+ tupleOutput.writeString(writer.getBuffer().toString());
+ TupleBinding.outputToEntry(tupleOutput, value);
+ objectsCursor.putCurrent(value);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException(e);
+ }
+
+ }
+
+
+ }
+
+
+ }
+ finally
+ {
+ if(objectsCursor != null)
+ {
+ objectsCursor.close();
+ }
+ }
+ txn.commit();
+
+ hierarchyDb.close();
+ configuredObjectsDb.close();
+
+
+
+ reportFinished(environment, 8);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index 4d536a2f95..e960518719 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -24,6 +24,7 @@ import com.sleepycat.je.Cursor;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
@@ -41,12 +42,12 @@ public class Upgrader
static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
- private String _virtualHostName;
+ private VirtualHost _virtualHost;
- public Upgrader(Environment environment, String virtualHostName)
+ public Upgrader(Environment environment, VirtualHost virtualHost)
{
_environment = environment;
- _virtualHostName = virtualHostName;
+ _virtualHost = virtualHost;
}
public void upgradeIfNecessary()
@@ -146,7 +147,7 @@ public class Upgrader
+ "UpgradeFrom"+fromVersion+"To"+toVersion);
Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
StoreUpgrade upgrade = ctr.newInstance();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName);
+ upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHost);
}
catch (ClassNotFoundException e)
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
index 7f7b65f315..6a42b2691f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -19,7 +19,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -34,7 +33,8 @@ public class HAMessageStoreSmokeTest extends QpidTestCase
{
try
{
- _store.configure(mock(VirtualHost.class));
+ _store.setVirtualHost(mock(VirtualHost.class));
+ _store.configure();
fail("Expected an exception to be thrown");
}
catch (ServerScopedRuntimeException ce)
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
index 5a5d39081c..965cad1cb5 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
public class ConfiguredObjectBindingTest extends TestCase
{
@@ -46,7 +47,7 @@ public class ConfiguredObjectBindingTest extends TestCase
{
super.setUp();
_configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
+ _object = new ConfiguredObjectRecordImpl(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
DUMMY_ATTRIBUTES_MAP);
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
index b2b28b3c2d..ce143aba1b 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
@@ -25,12 +25,16 @@ import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPrepare
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.File;
import java.io.InputStream;
+import java.util.UUID;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -167,8 +171,11 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase
return count.longValue();
}
- public String getVirtualHostName()
+ public VirtualHost getVirtualHost()
{
- return getName();
+ VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getName()).thenReturn(getName());
+ when(virtualHost.getId()).thenReturn(UUID.randomUUID());
+ return virtualHost;
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
index 500fb0a919..d0f9455d9a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
@@ -73,7 +73,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName());
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost());
assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
@@ -103,7 +103,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName());
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHost());
HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES));
assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME));
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 701fd94115..0460b1ce4c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -87,7 +87,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
public void testPerformUpgrade() throws Exception
{
UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName());
+ upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
assertDatabaseRecordCounts();
assertContent();
@@ -101,7 +101,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
corruptDatabase();
UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName());
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost());
assertDatabaseRecordCounts();
@@ -117,7 +117,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
- upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName());
+ upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHost());
assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12);
@@ -135,7 +135,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
{
populateOldXidEntries(environment);
UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName());
+ upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
assertXidEntries(environment);
}
finally
@@ -171,11 +171,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
NewRecordImpl[] newDequeues = newTransaction.getDequeues();
assertEquals("Unxpected new enqueus number", 1, newEnqueues.length);
NewRecordImpl enqueue = newEnqueues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHostName()), enqueue.getId());
+ assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHost().getName()), enqueue.getId());
assertEquals("Unxpected message id", 1, enqueue.getMessageNumber());
assertEquals("Unxpected new dequeues number", 1, newDequeues.length);
NewRecordImpl dequeue = newDequeues[0];
- assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHostName()), dequeue.getId());
+ assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHost().getName()), dequeue.getId());
assertEquals("Unxpected message id", 2, dequeue.getMessageNumber());
}
@@ -347,13 +347,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
{
String exchangeName = (String) deserialized.get(Exchange.NAME);
assertNotNull(exchangeName);
- assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()));
+ assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()));
}
else if (type.equals(Queue.class.getName()))
{
String queueName = (String) deserialized.get(Queue.NAME);
assertNotNull(queueName);
- assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHostName()));
+ assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()));
}
else if (type.equals(Binding.class.getName()))
{
@@ -368,15 +368,15 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs)
{
Map<String, Object> expectedQueueBinding = new HashMap<String, Object>();
- expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHostName()).toString());
+ expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHost().getName()).toString());
expectedQueueBinding.put(Binding.NAME, bindingName);
- expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()).toString());
+ expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()).toString());
if (argumentMap != null)
{
expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap);
}
- expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHostName()));
+ expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHost().getName()));
return expectedQueueBinding;
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
index 400ac12792..c0de884a28 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -43,7 +43,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
public void setUp() throws Exception
{
super.setUp();
- _upgrader = new Upgrader(_environment, getVirtualHostName());
+ _upgrader = new Upgrader(_environment, getVirtualHost());
}
private int getStoreVersion()
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index 3465f3582f..4b9a8d19a8 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -51,7 +51,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
public void setUp() throws Exception
{
super.setUp();
- _upgrader = new Upgrader(_environment, getVirtualHostName());
+ _upgrader = new Upgrader(_environment, getVirtualHost());
}
private int getStoreVersion(Environment environment)
@@ -108,7 +108,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation);
try
{
- _upgrader = new Upgrader(emptyEnvironment, getVirtualHostName());
+ _upgrader = new Upgrader(emptyEnvironment, getVirtualHost());
_upgrader.upgradeIfNecessary();
List<String> databaseNames = emptyEnvironment.getDatabaseNames();
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index bd0411619e..29e9ae19d7 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -25,7 +25,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import org.apache.qpid.server.store.StoreException;
+
+import org.apache.qpid.server.store.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -41,12 +42,6 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
-import org.apache.qpid.server.store.MessageStoreTest;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -56,6 +51,8 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
+import static org.mockito.Mockito.mock;
+
/**
* Subclass of MessageStoreTest which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -230,7 +227,8 @@ public class BDBMessageStoreTest extends MessageStoreTest
messageStore.close();
AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.configure(getVirtualHostModel(),true);
+ newStore.setVirtualHost(getVirtualHostModel());
+ newStore.configure(true);
newStore.startWithNoRecover();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 740ccf8200..7e3e5c9bbe 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.model;
import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import java.security.AccessControlException;
import java.util.Collection;
@@ -279,4 +280,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
Class<? extends ConfiguredObject> getCategoryClass();
+ // TODO - remove this when objects become responsible for their own storage
+ ConfiguredObjectRecord asObjectRecord();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index ee10ecfc8b..6a771c4c25 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.store.MessageStore;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Map;
+import java.util.UUID;
@ManagedObject( managesChildren = true )
public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<X>
@@ -181,6 +182,8 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<
// TODO - remove this
TaskExecutor getTaskExecutor();
+ Exchange getExchange(UUID id);
+
MessageStore getMessageStore();
String getType();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
index 2a12284f1f..5e3ebb86ce 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.security.AccessControlException;
import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.*;
import org.apache.qpid.server.model.*;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.configuration.updater.SetAttributeTask;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -544,6 +546,61 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]";
}
+ public ConfiguredObjectRecord asObjectRecord()
+ {
+ return new ConfiguredObjectRecord()
+ {
+ @Override
+ public UUID getId()
+ {
+ return AbstractConfiguredObject.this.getId();
+ }
+
+ @Override
+ public String getType()
+ {
+ return getCategoryClass().getSimpleName();
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Map<String, Object>>()
+ {
+ @Override
+ public Map<String, Object> run()
+ {
+ Map<String,Object> actualAttributes = new HashMap<String, Object>(getActualAttributes());
+ for(Map.Entry<String,Object> entry : actualAttributes.entrySet())
+ {
+ if(entry.getValue() instanceof ConfiguredObject)
+ {
+ entry.setValue(((ConfiguredObject)entry.getValue()).getId());
+ }
+ }
+ actualAttributes.remove(ID);
+ return actualAttributes;
+ }
+ });
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ Map<String, ConfiguredObjectRecord> parents = new LinkedHashMap<String, ConfiguredObjectRecord>();
+ for(Class<? extends ConfiguredObject> parentClass : Model.getInstance().getParentTypes(getCategoryClass()))
+ {
+ ConfiguredObject parent = getParent(parentClass);
+ if(parent != null)
+ {
+ parents.put(parentClass.getSimpleName(), parent.asObjectRecord());
+ }
+ }
+ return parents;
+ }
+ };
+ }
+
@SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 6dbc2eea85..c9bca357f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -1166,8 +1166,15 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
}
}
+ @Override
public TaskExecutor getTaskExecutor()
{
return super.getTaskExecutor();
}
+
+ @Override
+ public Exchange getExchange(UUID id)
+ {
+ return _virtualHost.getExchange(id);
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
index dbe8bf22a0..da84580a94 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
@@ -27,10 +27,9 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur
{
@Override
public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
- final UUID id,
- final Map<String, Object> attributes)
+ final ConfiguredObjectRecord record)
{
- final UnresolvedObject obj = createUnresolvedObject(id, getType(), attributes);
+ final UnresolvedObject obj = createUnresolvedObject(record);
UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies();
for(final UnresolvedDependency dependency : dependencies)
{
@@ -53,7 +52,7 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur
dependency.resolve(o);
if(obj.getUnresolvedDependencies().length == 0)
{
- durableConfigurationRecoverer.resolve(getType(), id, obj.resolve());
+ durableConfigurationRecoverer.resolve(getType(), record.getId(), obj.resolve());
}
}
});
@@ -61,17 +60,15 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur
}
if(obj.getUnresolvedDependencies().length == 0)
{
- durableConfigurationRecoverer.resolve(getType(), id, obj.resolve());
+ durableConfigurationRecoverer.resolve(getType(), record.getId(), obj.resolve());
}
else
{
- durableConfigurationRecoverer.addUnresolvedObject(getType(), id, obj);
+ durableConfigurationRecoverer.addUnresolvedObject(getType(), record.getId(), obj);
}
}
- public abstract UnresolvedObject<T> createUnresolvedObject(final UUID id,
- final String type,
- final Map<String, Object> attributes);
+ public abstract UnresolvedObject<T> createUnresolvedObject(final ConfiguredObjectRecord record);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 81c74fe8f5..01b142c9bc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -24,6 +24,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -34,12 +36,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -48,6 +45,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.transport.ConnectionOpen;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -70,19 +68,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
- private static final String LINKS_TABLE_NAME = "QPID_LINKS";
- private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+
private static final int DEFAULT_CONFIG_VERSION = 0;
- public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
+ public static String[] ALL_TABLES =
+ new String[]{DB_VERSION_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
+ XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME,
+ CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME};
- private static final int DB_VERSION = 7;
+ private static final int DB_VERSION = 8;
private final AtomicLong _messageId = new AtomicLong(0);
private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -115,34 +115,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
- private static final String SELECT_FROM_LINKS =
- "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
- private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
- + " WHERE id_lsb = ? and id_msb = ?";
- private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
- + "arguments FROM " + LINKS_TABLE_NAME;
- private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
- + " id_msb = ?";
- private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
- + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
- private static final String SELECT_FROM_BRIDGES =
- "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
- + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
- private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
- + " WHERE id_lsb = ? and id_msb = ?";
- private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
- + " create_time,"
- + " link_id_lsb, link_id_msb, "
- + "arguments FROM " + BRIDGES_TABLE_NAME
- + " WHERE link_id_lsb = ? and link_id_msb = ?";
- private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
- " WHERE id_lsb = ? and id_msb = ?";
- private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
- + "create_time, "
- + "link_id_lsb, link_id_msb, "
- + "arguments )"
- + " values (?, ?, ?, ?, ?, ?)";
-
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
@@ -166,6 +138,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
+ " where id = ?";
private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+ private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
+
+ private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " where child_id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
+
protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
@@ -319,6 +299,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
case 6:
upgradeFromV6();
+ case 7:
+ upgradeFromV7();
case DB_VERSION:
return;
default:
@@ -347,6 +329,135 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
updateDbVersion(7);
}
+
+ private void upgradeFromV7() throws SQLException
+ {
+ Connection connection = newConnection();
+ try
+ {
+ Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
+ List<UUID> others = new ArrayList<UUID>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+
+ PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID id = UUID.fromString(rs.getString(1));
+ String objectType = rs.getString(2);
+ Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
+ if(objectType.endsWith("Binding"))
+ {
+ bindingsToUpdate.put(id,attributes);
+ }
+ else
+ {
+ others.add(id);
+ }
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ for (UUID id : others)
+ {
+ stmt.setString(1, id.toString());
+ stmt.setString(2, "VirtualHost");
+ stmt.setString(3, _virtualHost.getId().toString());
+ stmt.execute();
+ }
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Queue");
+ stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
+ stmt.execute();
+
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Exchange");
+ stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
+ stmt.execute();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, "Binding");
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt.setBinaryStream(2, bis, attributesAsBytes.length);
+ stmt.setString(3, bindingEntry.getKey().toString());
+ stmt.execute();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = connection.prepareStatement(UPDATE_DB_VERSION);
+ try
+ {
+ stmt.setInt(1, 8);
+ stmt.execute();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ connection.commit();
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
private void updateDbVersion(int newVersion) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -388,11 +499,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
createVersionTable(conn);
createConfigVersionTable(conn);
createConfiguredObjectsTable(conn);
+ createConfiguredObjectHierarchyTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
- createLinkTable(conn);
- createBridgeTable(conn);
createXidTable(conn);
createXidActionTable(conn);
conn.close();
@@ -469,6 +579,23 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
+ private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
private void createQueueEntryTable(final Connection conn) throws SQLException
@@ -535,45 +662,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- private void createLinkTable(final Connection conn) throws SQLException
- {
- if(!tableExists(LINKS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
- + " id_msb " + getSqlBigIntType() + " not null,"
- + " create_time " + getSqlBigIntType() + " not null,"
- + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
- private void createBridgeTable(final Connection conn) throws SQLException
- {
- if(!tableExists(BRIDGES_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
- + " id_msb " + getSqlBigIntType() + " not null,"
- + " create_time " + getSqlBigIntType() + " not null,"
- + " link_id_lsb " + getSqlBigIntType() + " not null,"
- + " link_id_msb " + getSqlBigIntType() + " not null,"
- + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
private void createXidTable(final Connection conn) throws SQLException
{
@@ -817,35 +906,26 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
- public void create(UUID id, String type, Map<String,Object> attributes) throws StoreException
+ public void create(ConfiguredObjectRecord object) throws StoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
- }
-
- }
-
- @Override
- public void remove(UUID id, String type) throws StoreException
- {
- int results = removeConfiguredObject(id);
- if (results == 0)
- {
- throw new StoreException(type + " with id " + id + " not found");
- }
- }
-
- @Override
- public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
- if (queueConfiguredObject != null)
+ try
{
- ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
- updateConfiguredObject(newQueueRecord);
+ Connection conn = newConnection();
+ try
+ {
+ insertConfiguredObject(object, conn);
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
}
}
@@ -1993,109 +2073,85 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
_eventManager.addEventListener(eventListener, events);
}
- private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws StoreException
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ try
{
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
try
{
- Connection conn = newAutoCommitConnection();
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ boolean exists;
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ exists = rs.next();
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ // If we don't have any data in the result set then we can add this configured object
+ if (!exists)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- // If we don't have any data in the result set then we can add this configured object
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- rs.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
+ insertStmt.execute();
}
finally
{
- stmt.close();
+ insertStmt.close();
}
+
+ writeHierarchy(configuredObject, conn);
}
- finally
- {
- conn.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- }
- }
- private int removeConfiguredObject(UUID id) throws StoreException
- {
- int results = 0;
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- results = removeConfiguredObject(id, conn);
}
finally
{
- conn.close();
+ stmt.close();
}
+
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
catch (SQLException e)
{
- throw new StoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
- return results;
+
}
- public UUID[] removeConfiguredObjects(UUID... objects) throws StoreException
+ @Override
+ public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
{
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
try
@@ -2104,11 +2160,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
try
{
- for(UUID id : objects)
+ for(ConfiguredObjectRecord record : objects)
{
- if(removeConfiguredObject(id, conn) != 0)
+ if(removeConfiguredObject(record.getId(), conn) != 0)
{
- removed.add(id);
+ removed.add(record.getId());
}
}
}
@@ -2126,7 +2182,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
{
- final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ final int results;
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
try
{
stmt.setString(1, id.toString());
@@ -2136,30 +2193,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
- return results;
- }
-
- private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws StoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
+ stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
{
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- updateConfiguredObject(configuredObject, false, conn);
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
+ stmt.setString(1, id.toString());
+ stmt.executeUpdate();
}
+ finally
+ {
+ stmt.close();
+ }
+
+ return results;
}
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
@@ -2254,6 +2299,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
insertStmt.close();
}
+ writeHierarchy(configuredObject, conn);
}
}
finally
@@ -2280,72 +2326,31 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws StoreException
+ private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
{
- ConfiguredObjectRecord result = null;
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
try
{
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, id.toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- if (rs.next())
- {
- String type = rs.getString(1);
- String attributes = getBlobAsString(rs, 2);
- result = new ConfiguredObjectRecord(id, type,
- (new ObjectMapper()).readValue(attributes,Map.class));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
+ for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
{
- conn.close();
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, parentEntry.getKey());
+ insertStmt.setString(3, parentEntry.getValue().getId().toString());
+
+ insertStmt.execute();
}
}
- catch (JsonMappingException e)
- {
- throw new StoreException("Error loading of configured object with id " + id + " from database: "
- + e.getMessage(), e);
- }
- catch (JsonParseException e)
- {
- throw new StoreException("Error loading of configured object with id " + id + " from database: "
- + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error loading of configured object with id " + id + " from database: "
- + e.getMessage(), e);
- }
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error loading of configured object with id " + id + " from database: "
- + e.getMessage(), e);
+ insertStmt.close();
}
- return result;
}
private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
StoreException
{
Connection conn = newAutoCommitConnection();
-
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
final ObjectMapper objectMapper = new ObjectMapper();
try
{
@@ -2360,8 +2365,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
String id = rs.getString(1);
String objectType = rs.getString(2);
String attributes = getBlobAsString(rs, 3);
- recoveryHandler.configuredObject(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes,Map.class));
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
}
}
catch (JsonMappingException e)
@@ -2385,11 +2393,53 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+ {
+ // TODO - remove this hack for amq. exchanges
+ child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+ }
+
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
}
finally
{
conn.close();
}
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ recoveryHandler.configuredObject(record);
+ }
}
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
@@ -2429,4 +2479,51 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
+
+ private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+ {
+
+ private final UUID _id;
+ private final String _type;
+ private final Map<String, Object> _attributes;
+ private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
+
+ private ConfiguredObjectRecordImpl(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ _id = id;
+ _type = type;
+ _attributes = Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String getType()
+ {
+ return _type;
+ }
+
+ private void addParent(String parentType, ConfiguredObjectRecord parent)
+ {
+ _parents.put(parentType, parent);
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return Collections.unmodifiableMap(_parents);
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index c6ebe90802..c8aef92a95 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -27,7 +27,7 @@ public interface ConfigurationRecoveryHandler
{
void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
- void configuredObject(UUID id, String type, Map<String, Object> attributes);
+ void configuredObject(ConfiguredObjectRecord object);
/**
*
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
index 44490385d9..99f8f0f04a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
@@ -20,69 +20,16 @@
*/
package org.apache.qpid.server.store;
-import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-public class ConfiguredObjectRecord
+public interface ConfiguredObjectRecord
{
- private UUID _id;
- private String _type;
- private Map<String,Object> _attributes;
+ UUID getId();
- public ConfiguredObjectRecord(UUID id, String type, Map<String,Object> attributes)
- {
- super();
- _id = id;
- _type = type;
- _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes));
- }
+ String getType();
- public UUID getId()
- {
- return _id;
- }
+ Map<String,Object> getAttributes();
- public String getType()
- {
- return _type;
- }
-
- public Map<String,Object> getAttributes()
- {
- return _attributes;
- }
-
- @Override
- public String toString()
- {
- return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
- }
-
- @Override
- public boolean equals(Object o)
- {
- if(this == o)
- {
- return true;
- }
- if(o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
-
- return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
- }
-
- @Override
- public int hashCode()
- {
- int result = _id.hashCode();
- result = 31 * result + _type.hashCode();
- result = 31 * result + _attributes.hashCode();
- return result;
- }
+ Map<String, ConfiguredObjectRecord> getParents();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java
new file mode 100644
index 0000000000..ed553aa823
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+{
+ private UUID _id;
+ private String _type;
+ private final Map<String,Object> _attributes;
+ private final Map<String,ConfiguredObjectRecord> _parents;
+
+ public ConfiguredObjectRecordImpl(UUID id, String type, Map<String, Object> attributes)
+ {
+ this(id,type,attributes,Collections.<String,ConfiguredObjectRecord>emptyMap());
+ }
+
+ public ConfiguredObjectRecordImpl(UUID id, String type, Map<String, Object> attributes, Map<String,ConfiguredObjectRecord> parents)
+ {
+ super();
+ _id = id;
+ _type = type;
+ _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes));
+ _parents = Collections.unmodifiableMap(new LinkedHashMap<String, ConfiguredObjectRecord>(parents));
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String getType()
+ {
+ return _type;
+ }
+
+ @Override
+ public Map<String,Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return _parents;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ConfiguredObjectRecordImpl that = (ConfiguredObjectRecordImpl) o;
+
+ return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _id.hashCode();
+ result = 31 * result + _type.hashCode();
+ result = 31 * result + _attributes.hashCode();
+ return result;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
index 6696426c0e..f8d8ecdd7c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -76,19 +77,19 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
}
@Override
- public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ public void configuredObject(ConfiguredObjectRecord record)
{
- _upgrader.configuredObject(id, type, attributes);
+ _upgrader.configuredObject(record);
}
- void onConfiguredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ void onConfiguredObject(ConfiguredObjectRecord record)
{
- DurableConfiguredObjectRecoverer recoverer = getRecoverer(type);
+ DurableConfiguredObjectRecoverer recoverer = getRecoverer(record.getType());
if(recoverer == null)
{
- throw new IllegalConfigurationException("Unknown type for configured object: " + type);
+ throw new IllegalConfigurationException("Unknown type for configured object: " + record.getType());
}
- recoverer.load(this, id, attributes);
+ recoverer.load(this, record);
}
private DurableConfiguredObjectRecoverer getRecoverer(final String type)
@@ -111,19 +112,12 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
private void applyUpgrade()
{
- final Collection<ConfiguredObjectRecord> updates = new ArrayList<ConfiguredObjectRecord>();
- final Collection<UUID> deletes = new ArrayList<UUID>();
- for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _upgrader.getUpdatedRecords().entrySet())
- {
- if(entry.getValue() != null)
- {
- updates.add(entry.getValue());
- }
- else
- {
- deletes.add(entry.getKey());
- }
- }
+ final Collection<ConfiguredObjectRecord> updates = new HashSet<ConfiguredObjectRecord>(_upgrader.getUpdatedRecords().values());
+ final Collection<ConfiguredObjectRecord> deletes = new HashSet<ConfiguredObjectRecord>(_upgrader.getDeletedRecords().values());
+
+ // Due to the way the upgraders work it is possible that the updates list may contain nulls
+ updates.remove(null);
+ deletes.remove(null);
if(!updates.isEmpty())
{
@@ -131,7 +125,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
}
if(!deletes.isEmpty())
{
- _store.removeConfiguredObjects(deletes.toArray(new UUID[deletes.size()]));
+ _store.remove(deletes.toArray(new ConfiguredObjectRecord[deletes.size()]));
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 589eca1600..ae138f12c2 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -50,41 +50,33 @@ public interface DurableConfigurationStore
/**
* Makes the specified object persistent.
*
- * @param id The id of the object to persist.
- * @param type The type of the object to persist
- * @param attributes the attributes of the object to persist
+ * @param object The object to persist.
*
* @throws StoreException If the operation fails for any reason.
*/
- void create(UUID id, String type, Map<String, Object> attributes) throws StoreException;
+ void create(ConfiguredObjectRecord object) throws StoreException;
/**
- * Removes the specified persistent configured object.
+ * Removes the specified persistent configured objects.
*
- * @param id The id of the object to remove.
- * @param type The type of the object to remove
+ * @param objects The objects to remove.
*
* @throws StoreException If the operation fails for any reason.
*/
- void remove(UUID id, String type) throws StoreException;
-
- public UUID[] removeConfiguredObjects(UUID... objects) throws StoreException;
+ public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException;
/**
- * Updates the specified object in the persistent store, IF it is already present. If the object
- * is not present in the store, it will not be added.
+ * Updates the specified objects in the persistent store, IF it is already present. If the object
+ * is not present in the store, it will only be added if createIfNecessary is set to true, otherwise an exception
+ * will be thrown.
*
- * @param id The id of the object to update.
- * @param type The type of the object to update
- * @param attributes the updated attributes
+ * @param createIfNecessary if false then will fail if the object does not exist.
+ * @param records the records to update
*
* @throws StoreException If the operation fails for any reason.
*/
- void update(UUID id, String type, Map<String, Object> attributes) throws StoreException;
-
-
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
+ void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
void close() throws Exception;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index f914389b0e..9410006d65 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -20,12 +20,8 @@
*/
package org.apache.qpid.server.store;
-import java.security.PrivilegedAction;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@@ -35,12 +31,8 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.*;
-
-import javax.security.auth.Subject;
public class DurableConfigurationStoreHelper
{
@@ -58,7 +50,7 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- store.update(queue.getId(), QUEUE, attributesMap);
+ store.update(false, new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap));
}
public static void createQueue(DurableConfigurationStore store, AMQQueue<?> queue)
@@ -71,49 +63,33 @@ public class DurableConfigurationStoreHelper
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- store.create(queue.getId(), QUEUE, attributesMap);
+ store.create(new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap));
}
public static void removeQueue(DurableConfigurationStore store, AMQQueue queue)
{
- store.remove(queue.getId(), QUEUE);
+ store.remove(queue.asObjectRecord());
}
public static void createExchange(DurableConfigurationStore store, ExchangeImpl exchange)
{
- Map<String, Object> attributesMap = exchange.getActualAttributes();
- attributesMap.remove(ConfiguredObject.ID);
-
- store.create(exchange.getId(), EXCHANGE, attributesMap);
-
+ store.create(exchange.asObjectRecord());
}
-
public static void removeExchange(DurableConfigurationStore store, ExchangeImpl exchange)
{
- store.remove(exchange.getId(), EXCHANGE);
+ store.remove(exchange.asObjectRecord());
}
public static void createBinding(DurableConfigurationStore store, final BindingImpl binding)
{
- Map<String, Object> attributesMap = binding.getActualAttributes();
- attributesMap.remove(ConfiguredObject.ID);
- if(!attributesMap.containsKey(Binding.EXCHANGE))
- {
- attributesMap.put(Binding.EXCHANGE, binding.getExchange());
- }
- if(!attributesMap.containsKey(Binding.QUEUE))
- {
- attributesMap.put(Binding.QUEUE, binding.getQueue());
- }
-
- store.create(binding.getId(), BINDING, attributesMap);
+ store.create(binding.asObjectRecord());
}
public static void removeBinding(DurableConfigurationStore store, BindingImpl binding)
{
- store.remove(binding.getId(), BINDING);
+ store.remove(binding.asObjectRecord());
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java
index 1d3e4cc672..45b2ff56d7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java
@@ -25,11 +25,12 @@ import java.util.UUID;
public interface DurableConfigurationStoreUpgrader
{
- void configuredObject(UUID id, String type, Map<String, Object> attributes);
+ void configuredObject(ConfiguredObjectRecord record);
void complete();
void setNextUpgrader(DurableConfigurationStoreUpgrader upgrader);
Map<UUID, ConfiguredObjectRecord> getUpdatedRecords();
+ Map<UUID, ConfiguredObjectRecord> getDeletedRecords();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
index e065728bd3..4c618f5089 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
@@ -26,8 +26,7 @@ import java.util.UUID;
public interface DurableConfiguredObjectRecoverer
{
public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
- final UUID id,
- final Map<String, Object> attributes);
+ final ConfiguredObjectRecord record);
public String getType();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index e99c2019cd..c8898eedb6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -26,14 +26,7 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
@@ -106,7 +99,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
for(ConfiguredObjectRecord record : records)
{
- recoveryHandler.configuredObject(record.getId(), record.getType(), record.getAttributes());
+ recoveryHandler.configuredObject(record);
}
int oldConfigVersion = _configVersion;
_configVersion = recoveryHandler.completeConfigurationRecovery();
@@ -311,20 +304,21 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
}
+ Map<String,UUID> parentMap = new HashMap<String, UUID>();
if(parentId != null)
{
- data.put(parentClass.getSimpleName().toLowerCase(),parentId);
+ parentMap.put(parentClass.getSimpleName(),parentId);
for(Class<? extends ConfiguredObject> otherParent : MODEL.getParentTypes(clazz))
{
if(otherParent != parentClass)
{
final String otherParentAttr = otherParent.getSimpleName().toLowerCase();
- Object otherParentId = data.get(otherParentAttr);
+ Object otherParentId = data.remove(otherParentAttr);
if(otherParentId instanceof String)
{
try
{
- data.put(otherParentAttr, UUID.fromString((String) otherParentId));
+ parentMap.put(otherParent.getSimpleName(), UUID.fromString((String) otherParentId));
}
catch(IllegalArgumentException e)
{
@@ -336,7 +330,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
}
- _objectsById.put(id, new ConfiguredObjectRecord(id, type, data));
+ _objectsById.put(id, new ConfiguredObjectRecordImpl(id, type, data, parentMap));
List<UUID> idsForType = _idsByType.get(type);
if(idsForType == null)
{
@@ -348,28 +342,27 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public synchronized void create(final UUID id, final String type, final Map<String, Object> attributes) throws
- StoreException
+ public synchronized void create(ConfiguredObjectRecord record) throws StoreException
{
- if(_objectsById.containsKey(id))
+ if(_objectsById.containsKey(record.getId()))
{
- throw new StoreException("Object with id " + id + " already exists");
+ throw new StoreException("Object with id " + record.getId() + " already exists");
}
- else if(!CLASS_NAME_MAPPING.containsKey(type))
+ else if(!CLASS_NAME_MAPPING.containsKey(record.getType()))
{
- throw new StoreException("Cannot create object of unknown type " + type);
+ throw new StoreException("Cannot create object of unknown type " + record.getType());
}
else
{
- ConfiguredObjectRecord record = new ConfiguredObjectRecord(id, type, attributes);
- _objectsById.put(id, record);
- List<UUID> idsForType = _idsByType.get(type);
+
+ _objectsById.put(record.getId(), record);
+ List<UUID> idsForType = _idsByType.get(record.getType());
if(idsForType == null)
{
idsForType = new ArrayList<UUID>();
- _idsByType.put(type, idsForType);
+ _idsByType.put(record.getType(), idsForType);
}
- idsForType.add(id);
+ idsForType.add(record.getId());
save();
}
}
@@ -424,7 +417,21 @@ public class JsonFileConfigStore implements DurableConfigurationStore
Map<String,Object> map = new LinkedHashMap<String, Object>();
map.put("id", id);
map.putAll(record.getAttributes());
- map.remove(MODEL.getParentTypes(type).iterator().next().getSimpleName().toLowerCase());
+
+ Collection<Class<? extends ConfiguredObject>> parentTypes = MODEL.getParentTypes(type);
+ if(parentTypes.size() > 1)
+ {
+ Iterator<Class<? extends ConfiguredObject>> iter = parentTypes.iterator();
+ // skip the first parent, which is given by structure
+ iter.next();
+ // for all other parents add a fake attribute with name being the parent type in lower case, and the value
+ // being the parents id
+ while(iter.hasNext())
+ {
+ String parentType = iter.next().getSimpleName();
+ map.put(parentType.toLowerCase(), record.getParents().get(parentType).getId());
+ }
+ }
Collection<Class<? extends ConfiguredObject>> childClasses =
new ArrayList<Class<? extends ConfiguredObject>>(MODEL.getChildTypes(type));
@@ -442,17 +449,9 @@ public class JsonFileConfigStore implements DurableConfigurationStore
for(UUID childId : childIds)
{
ConfiguredObjectRecord childRecord = _objectsById.get(childId);
- final String parentArg = type.getSimpleName().toLowerCase();
- final Object parent = childRecord.getAttributes().get(parentArg);
- String parentId;
- if(parent instanceof ConfiguredObject)
- {
- parentId = ((ConfiguredObject)parent).getId().toString();
- }
- else
- {
- parentId = String.valueOf(parent);
- }
+
+ final ConfiguredObjectRecord parent = childRecord.getParents().get(type.getSimpleName());
+ String parentId = parent.getId().toString();
if(id.toString().equals(parentId))
{
entities.add(build(childClass,childId));
@@ -470,34 +469,22 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public void remove(final UUID id, final String type) throws StoreException
- {
- removeConfiguredObjects(id);
- }
-
- @Override
- public synchronized UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
+ public synchronized UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
List<UUID> removedIds = new ArrayList<UUID>();
- for(UUID id : objects)
+ for(ConfiguredObjectRecord requestedRecord : objects)
{
- ConfiguredObjectRecord record = _objectsById.remove(id);
+ ConfiguredObjectRecord record = _objectsById.remove(requestedRecord.getId());
if(record != null)
{
- removedIds.add(id);
- _idsByType.get(record.getType()).remove(id);
+ removedIds.add(record.getId());
+ _idsByType.get(record.getType()).remove(record.getId());
}
}
save();
return removedIds.toArray(new UUID[removedIds.size()]);
}
- @Override
- public void update(final UUID id, final String type, final Map<String, Object> attributes) throws
- StoreException
- {
- update(false, new ConfiguredObjectRecord(id, type, attributes));
- }
@Override
public void update(final boolean createIfNecessary, final ConfiguredObjectRecord... records)
@@ -583,5 +570,59 @@ public class JsonFileConfigStore implements DurableConfigurationStore
return map;
}
+ private class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+ {
+
+ private final UUID _id;
+ private final String _type;
+ private final Map<String, Object> _attributes;
+ private final Map<String, UUID> _parents;
+
+ private ConfiguredObjectRecordImpl(final UUID id, final String type, final Map<String, Object> attributes,
+ final Map<String, UUID> parents)
+ {
+ _id = id;
+ _type = type;
+ _attributes = attributes;
+ _parents = parents;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String getType()
+ {
+ return _type;
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ Map<String,ConfiguredObjectRecord> parents = new HashMap<String, ConfiguredObjectRecord>();
+ for(Map.Entry<String,UUID> entry : _parents.entrySet())
+ {
+ ConfiguredObjectRecord value = _objectsById.get(entry.getValue());
+
+ if(value == null && entry.getKey().equals("Exchange"))
+ {
+ // TODO - remove this hack for the defined exchanges
+ value = new ConfiguredObjectRecordImpl(entry.getValue(),entry.getKey(),Collections.<String,Object>emptyMap(), Collections.<String,UUID>emptyMap());
+ }
+
+ parents.put(entry.getKey(), value);
+ }
+ return parents;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java
index a671e93b26..1abd3de34a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java
@@ -28,6 +28,7 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad
{
private DurableConfigurationStoreUpgrader _nextUpgrader;
private final Map<UUID, ConfiguredObjectRecord> _updates = new HashMap<UUID, ConfiguredObjectRecord>();
+ private final Map<UUID, ConfiguredObjectRecord> _deletes = new HashMap<UUID, ConfiguredObjectRecord>();
public final void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader)
{
@@ -50,6 +51,10 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad
{
return _updates;
}
+ protected Map<UUID, ConfiguredObjectRecord> getDeleteMap()
+ {
+ return _deletes;
+ }
@Override
public final Map<UUID, ConfiguredObjectRecord> getUpdatedRecords()
@@ -59,4 +64,11 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad
return updates;
}
+ @Override
+ public final Map<UUID, ConfiguredObjectRecord> getDeletedRecords()
+ {
+ final Map<UUID, ConfiguredObjectRecord> deletes = new HashMap<UUID, ConfiguredObjectRecord>(_deletes);
+ deletes.putAll(_nextUpgrader.getDeletedRecords());
+ return deletes;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index c579a27731..782ac40a5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -19,7 +19,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.model.VirtualHost;
@@ -32,29 +31,23 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void update(UUID id, String type, Map<String, Object> attributes)
- {
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
{
}
-
- @Override
- public void remove(UUID id, String type)
- {
- }
-
@Override
- public UUID[] removeConfiguredObjects(final UUID... objects)
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
{
- return objects;
+ final UUID[] removed = new UUID[objects.length];
+ for(int i = 0; i < objects.length; i++)
+ {
+ removed[i] = objects[i].getId();
+ }
+ return removed;
}
@Override
- public void create(UUID id, String type, Map<String, Object> attributes)
+ public void create(ConfiguredObjectRecord record)
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java
index c8a812fa89..b17dd4a6f3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java
@@ -34,9 +34,9 @@ public final class NullUpgrader implements DurableConfigurationStoreUpgrader
}
@Override
- public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- _durableConfigurationRecoverer.onConfiguredObject(id, type, attributes);
+ _durableConfigurationRecoverer.onConfiguredObject(record);
}
@Override
@@ -55,4 +55,10 @@ public final class NullUpgrader implements DurableConfigurationStoreUpgrader
{
return Collections.emptyMap();
}
+
+ @Override
+ public Map<UUID, ConfiguredObjectRecord> getDeletedRecords()
+ {
+ return Collections.emptyMap();
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index e6577e04e4..58721ea8b2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -28,9 +28,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
@@ -49,11 +51,9 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
}
@Override
- public UnresolvedObject<BindingImpl> createUnresolvedObject(final UUID id,
- final String type,
- final Map<String, Object> attributes)
+ public UnresolvedObject<BindingImpl> createUnresolvedObject(ConfiguredObjectRecord record)
{
- return new UnresolvedBinding(id, attributes);
+ return new UnresolvedBinding(record);
}
@Override
@@ -76,12 +76,11 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
private ExchangeImpl _exchange;
private AMQQueue _queue;
- public UnresolvedBinding(final UUID id,
- final Map<String, Object> attributeMap)
+ public UnresolvedBinding(final ConfiguredObjectRecord record)
{
- _bindingId = id;
- _exchangeId = UUID.fromString(String.valueOf(attributeMap.get(org.apache.qpid.server.model.Binding.EXCHANGE)));
- _queueId = UUID.fromString(String.valueOf(attributeMap.get(org.apache.qpid.server.model.Binding.QUEUE)));
+ _bindingId = record.getId();
+ _exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId();
+ _queueId = record.getParents().get(Queue.class.getSimpleName()).getId();
_exchange = _exchangeRegistry.getExchange(_exchangeId);
if(_exchange == null)
{
@@ -93,8 +92,8 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
_unresolvedDependencies.add(new QueueDependency());
}
- _bindingName = (String) attributeMap.get(org.apache.qpid.server.model.Binding.NAME);
- _bindingArgumentsMap = (Map<String, Object>) attributeMap.get(org.apache.qpid.server.model.Binding.ARGUMENTS);
+ _bindingName = (String) record.getAttributes().get(org.apache.qpid.server.model.Binding.NAME);
+ _bindingArgumentsMap = (Map<String, Object>) record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index efdca3b67d..6718acfccf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
import org.apache.qpid.server.store.NonNullUpgrader;
@@ -105,9 +106,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
@Override
- public void configuredObject(final UUID id, final String type, Map<String, Object> attributes)
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- _records.put(id, new ConfiguredObjectRecord(id, type, attributes));
+ _records.put(record.getId(), record);
}
private void removeSelectorArguments(Map<String, Object> binding)
@@ -119,9 +120,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider
binding.put(Binding.ARGUMENTS, arguments);
}
- private boolean isTopicExchange(Map<String, Object> binding)
+ private boolean isTopicExchange(ConfiguredObjectRecord entry)
{
- UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
+ UUID exchangeId = entry.getParents().get("Exchange").getId();
if(_records.containsKey(exchangeId))
{
@@ -155,17 +156,17 @@ public class DefaultUpgraderProvider implements UpgraderProvider
String type = record.getType();
Map<String, Object> attributes = record.getAttributes();
UUID id = record.getId();
- if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(attributes))
+ if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
{
attributes = new LinkedHashMap<String, Object>(attributes);
removeSelectorArguments(attributes);
- record = new ConfiguredObjectRecord(id, type, attributes);
+ record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
getUpdateMap().put(id, record);
entry.setValue(record);
}
- getNextUpgrader().configuredObject(id, type, attributes);
+ getNextUpgrader().configuredObject(record);
}
getNextUpgrader().complete();
@@ -179,12 +180,14 @@ public class DefaultUpgraderProvider implements UpgraderProvider
*/
private class Version1Upgrader extends NonNullUpgrader
{
+
+
@Override
- public void configuredObject(final UUID id, String type, final Map<String, Object> attributes)
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- type = type.substring(1+type.lastIndexOf('.'));
- getUpdateMap().put(id, new ConfiguredObjectRecord(id, type, attributes));
-
+ String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
+ getUpdateMap().put(record.getId(),
+ new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()));
}
@Override
@@ -193,30 +196,31 @@ public class DefaultUpgraderProvider implements UpgraderProvider
for(Map.Entry<UUID, ConfiguredObjectRecord> entry : getUpdateMap().entrySet())
{
final ConfiguredObjectRecord record = entry.getValue();
- if(isBinding(record.getType()) && (unknownExchange((String) record.getAttributes().get(Binding.EXCHANGE))
- || unknownQueue((String) record.getAttributes().get(Binding.QUEUE))))
+ final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
+ final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
+ if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
+ || queueParent == null || unknownQueue(queueParent.getId())))
{
+ getDeleteMap().put(entry.getKey(), entry.getValue());
entry.setValue(null);
}
else
{
- getNextUpgrader().configuredObject(record.getId(), record.getType(), record.getAttributes());
+ getNextUpgrader().configuredObject(record);
}
}
getNextUpgrader().complete();
}
- private boolean unknownExchange(final String exchangeIdString)
+ private boolean unknownExchange(final UUID exchangeId)
{
- UUID exchangeId = UUID.fromString(exchangeIdString);
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
|| _exchangeRegistry.getExchange(exchangeId) != null);
}
- private boolean unknownQueue(final String queueIdString)
+ private boolean unknownQueue(final UUID queueId)
{
- UUID queueId = UUID.fromString(queueIdString);
ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()))
|| _virtualHost.getQueue(queueId) != null);
@@ -240,22 +244,24 @@ public class DefaultUpgraderProvider implements UpgraderProvider
private static final String ARGUMENTS = "arguments";
@Override
- public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ public void configuredObject(ConfiguredObjectRecord record)
{
- if(Queue.class.getSimpleName().equals(type))
+
+ if(Queue.class.getSimpleName().equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
- if(attributes.get(ARGUMENTS) instanceof Map)
+ if(record.getAttributes().get(ARGUMENTS) instanceof Map)
{
- newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
.get(ARGUMENTS)));
}
- newAttributes.putAll(attributes);
- attributes = newAttributes;
- getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+ newAttributes.putAll(record.getAttributes());
+
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
}
- getNextUpgrader().configuredObject(id,type,attributes);
+ getNextUpgrader().configuredObject(record);
}
@Override
@@ -274,16 +280,17 @@ public class DefaultUpgraderProvider implements UpgraderProvider
{
@Override
- public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ public void configuredObject(ConfiguredObjectRecord record)
{
- if(Queue.class.getSimpleName().equals(type))
+
+ if(Queue.class.getSimpleName().equals(record.getType()))
{
- Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes);
- if(attributes.get(EXCLUSIVE) instanceof Boolean)
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
+ if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
{
- boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE);
+ boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
- if(!isExclusive && attributes.containsKey("owner"))
+ if(!isExclusive && record.getAttributes().containsKey("owner"))
{
newAttributes.remove("owner");
}
@@ -292,15 +299,16 @@ public class DefaultUpgraderProvider implements UpgraderProvider
{
newAttributes.remove("owner");
}
- if(!attributes.containsKey("durable"))
+ if(!record.getAttributes().containsKey("durable"))
{
newAttributes.put("durable","true");
}
- attributes = newAttributes;
- getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+
+ record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
}
- getNextUpgrader().configuredObject(id,type,attributes);
+ getNextUpgrader().configuredObject(record);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index 2743b0ef59..4431fc786d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -50,11 +51,9 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
}
@Override
- public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final UUID id,
- final String type,
- final Map<String, Object> attributes)
+ public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final ConfiguredObjectRecord record)
{
- return new UnresolvedExchange(id, attributes);
+ return new UnresolvedExchange(record.getId(), record.getAttributes());
}
private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl>
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 385f4e219e..eb1bc883ca 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
@@ -58,11 +59,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
}
@Override
- public UnresolvedObject<AMQQueue> createUnresolvedObject(final UUID id,
- final String type,
- final Map<String, Object> attributes)
+ public UnresolvedObject<AMQQueue> createUnresolvedObject(final ConfiguredObjectRecord record)
{
- return new UnresolvedQueue(id, type, attributes);
+ return new UnresolvedQueue(record.getId(), record.getAttributes());
}
private class UnresolvedQueue implements UnresolvedObject<AMQQueue>
@@ -75,7 +74,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
private ExchangeImpl _alternateExchange;
public UnresolvedQueue(final UUID id,
- final String type,
final Map<String, Object> attributes)
{
_attributes = attributes;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 04c2f974c9..0e30096047 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@@ -43,6 +42,8 @@ import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -68,6 +69,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
+ private static final UUID ANY_UUID = UUID.randomUUID();
+ private static final Map ANY_MAP = new HashMap();
+
+
private String _storePath;
private String _storeName;
private MessageStore _messageStore;
@@ -117,10 +122,17 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(_exchange.getId()).thenReturn(_exchangeId);
when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
when(_exchange.getEventLogger()).thenReturn(new EventLogger());
+
+ ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
+ when(exchangeRecord.getId()).thenReturn(_exchangeId);
+ when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
+ when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+
when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn(
_storePath);
when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath);
+
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
String argValue = "some selector expression";
@@ -149,8 +161,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
- eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
+ verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+ map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
@@ -183,7 +195,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap());
+ verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
}
public void testBindQueue() throws Exception
@@ -191,42 +203,86 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, _bindingArgs);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
reopenStore();
Map<String,Object> map = new HashMap<String, Object>();
- map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString());
- map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString());
- map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY);
- map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,_bindingArgs);
+ map.put(Binding.NAME, ROUTING_KEY);
+ map.put(Binding.ARGUMENTS,_bindingArgs);
+
+ Map<String,UUID> parents = new HashMap<String, UUID>();
+
+ parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+ parents.put(Queue.class.getSimpleName(), queue.getId());
+
+ verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+ }
+
+
+ private ConfiguredObjectRecord matchesRecord(UUID id,
+ String type,
+ Map<String, Object> attributes,
+ final Map<String, UUID> parents)
+ {
+ return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
+ }
- verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING),
- argThat(new IgnoreCreatedByMatcher(map)));
+ private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
+ {
+ return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
}
- private static class IgnoreCreatedByMatcher extends ArgumentMatcher<Map<String,Object>>
+ private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord>
{
private final Map<String,Object> _matchingMap;
+ private final UUID _id;
+ private final String _name;
+ private final Map<String,UUID> _parents;
- private IgnoreCreatedByMatcher(final Map<String, Object> matchingMap)
+ private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap, Map<String,UUID> parents)
{
+ _id = id;
+ _name = type;
_matchingMap = matchingMap;
+ _parents = parents;
}
@Override
public boolean matches(final Object argument)
{
- if(argument instanceof Map)
+ if(argument instanceof ConfiguredObjectRecord)
{
- Map<String,Object> arg = new HashMap<String, Object>((Map<String,Object>) argument);
+ ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument;
+
+ Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes());
arg.remove("createdBy");
arg.remove("createdTime");
- return arg.equals(_matchingMap);
-
+ return (_id == ANY_UUID || _id.equals(binding.getId()))
+ && _name.equals(binding.getType())
+ && (_matchingMap == ANY_MAP || arg.equals(_matchingMap))
+ && (_parents == ANY_MAP || matchesParents(binding));
}
return false;
}
+
+ private boolean matchesParents(ConfiguredObjectRecord binding)
+ {
+ Map<String, ConfiguredObjectRecord> bindingParents = binding.getParents();
+ if(bindingParents.size() != _parents.size())
+ {
+ return false;
+ }
+ for(Map.Entry<String,UUID> entry : _parents.entrySet())
+ {
+ if(!bindingParents.get(entry.getKey()).getId().equals(entry.getValue()))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
}
public void testUnbindQueue() throws Exception
@@ -239,9 +295,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
- eq(BINDING),
- anyMap());
+ verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+ ANY_MAP));
}
public void testCreateQueueAMQQueue() throws Exception
@@ -254,7 +309,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -276,7 +331,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -294,7 +349,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
}
private ExchangeImpl createTestAlternateExchange()
@@ -327,7 +382,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@@ -354,7 +409,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testRemoveQueue() throws Exception
@@ -369,9 +424,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
- eq(org.apache.qpid.server.model.Queue.class.getName()),
- anyMap());
+ verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
}
private AMQQueue createTestQueue(String queueName,
@@ -424,6 +477,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
});
when(queue.getActualAttributes()).thenReturn(attributes);
+
+ ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class);
+ when(objectRecord.getId()).thenReturn(_queueId);
+ when(objectRecord.getType()).thenReturn(Queue.class.getSimpleName());
+ when(objectRecord.getAttributes()).thenReturn(attributes);
+ when(queue.asObjectRecord()).thenReturn(objectRecord);
return queue;
}
@@ -440,6 +499,13 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
when(exchange.getId()).thenReturn(_exchangeId);
+ ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
+ when(exchangeRecord.getId()).thenReturn(_exchangeId);
+ when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
+ Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
+ actualAttributesExceptId.remove("id");
+ when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+ when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
return exchange;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index 224a22687f..4304b59d10 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -31,11 +31,13 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -49,6 +51,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase
private VirtualHost _virtualHost;
private JsonFileConfigStore _store;
+
+ private static final UUID ANY_UUID = UUID.randomUUID();
+ private static final Map<String, Object> ANY_MAP = new HashMap<String, Object>();
+
@Override
public void setUp() throws Exception
{
@@ -109,7 +115,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.configureConfigStore(_virtualHost, _recoveryHandler);
InOrder inorder = inOrder(_recoveryHandler);
inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
- inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap());
+ inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
inorder.verify(_recoveryHandler).completeConfigurationRecovery();
_store.close();
}
@@ -141,11 +147,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final String queueType = Queue.class.getSimpleName();
final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
- _store.create(queueId, queueType, queueAttr);
+ _store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr));
_store.close();
_store.configureConfigStore(_virtualHost, _recoveryHandler);
- verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
+ verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
_store.close();
}
@@ -156,17 +162,17 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
- _store.create(queueId, queueType, queueAttr);
+ _store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr));
queueAttr = new HashMap<String,Object>(queueAttr);
queueAttr.put("owner", "theowner");
- _store.update(queueId, queueType, queueAttr);
+ _store.update(false, new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr));
_store.close();
_store.configureConfigStore(_virtualHost, _recoveryHandler);
- verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
+ verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
_store.close();
}
@@ -178,15 +184,16 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
- _store.create(queueId, queueType, queueAttr);
+ final ConfiguredObjectRecordImpl record = new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr);
+ _store.create(record);
- _store.remove(queueId, queueType);
+ _store.remove(record);
_store.close();
_store.configureConfigStore(_virtualHost, _recoveryHandler);
- verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap());
+ verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
_store.close();
}
@@ -195,7 +202,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.configureConfigStore(_virtualHost, _recoveryHandler);
try
{
- _store.create(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap());
+ _store.create(new ConfiguredObjectRecordImpl(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap()));
fail("Should not be able to create instance of type wibble");
}
catch (StoreException e)
@@ -208,10 +215,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase
{
_store.configureConfigStore(_virtualHost, _recoveryHandler);
final UUID id = UUID.randomUUID();
- _store.create(id, "Queue", Collections.<String, Object>emptyMap());
+ _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap()));
try
{
- _store.create(id, "Exchange", Collections.<String, Object>emptyMap());
+ _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap()));
fail("Should not be able to create two objects with same id");
}
catch (StoreException e)
@@ -225,13 +232,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase
{
_store.configureConfigStore(_virtualHost, _recoveryHandler);
final UUID id = UUID.randomUUID();
- _store.create(id, "Queue", Collections.<String, Object>emptyMap());
+ _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap()));
_store.close();
_store.configureConfigStore(_virtualHost, _recoveryHandler);
try
{
- _store.update(id, "Exchange", Collections.<String, Object>emptyMap());
+ _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap()));
fail("Should not be able to update object to different type");
}
catch (StoreException e)
@@ -270,31 +277,75 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final Map<String, Object> EMPTY_ATTR = Collections.emptyMap();
final UUID exchangeId = new UUID(0, 2);
- final Map<String, Object> bindingAttributes = new HashMap<String, Object>();
- bindingAttributes.put(Binding.EXCHANGE, exchangeId);
- bindingAttributes.put(Binding.QUEUE, queueId);
- final Map<String, Object> binding2Attributes = new HashMap<String, Object>();
- binding2Attributes.put(Binding.EXCHANGE, exchangeId);
- binding2Attributes.put(Binding.QUEUE, queue2Id);
final UUID bindingId = new UUID(0, 3);
final UUID binding2Id = new UUID(1, 3);
- _store.create(queueId, "Queue", EMPTY_ATTR);
- _store.create(queue2Id, "Queue", EMPTY_ATTR);
- _store.create(exchangeId, "Exchange", EMPTY_ATTR);
- _store.update(true,
- new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes),
- new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes));
+ final ConfiguredObjectRecordImpl queueRecord = new ConfiguredObjectRecordImpl(queueId, "Queue", EMPTY_ATTR);
+ _store.create(queueRecord);
+ final ConfiguredObjectRecordImpl queue2Record = new ConfiguredObjectRecordImpl(queue2Id, "Queue", EMPTY_ATTR);
+ _store.create(queue2Record);
+ final ConfiguredObjectRecordImpl exchangeRecord = new ConfiguredObjectRecordImpl(exchangeId, "Exchange", EMPTY_ATTR);
+ _store.create(exchangeRecord);
+ Map<String,ConfiguredObjectRecord> bindingParents = new HashMap<String, ConfiguredObjectRecord>();
+ bindingParents.put("Exchange", exchangeRecord);
+ bindingParents.put("Queue", queueRecord);
+ final ConfiguredObjectRecordImpl bindingRecord =
+ new ConfiguredObjectRecordImpl(bindingId, "Binding", EMPTY_ATTR, bindingParents);
+
+
+ Map<String,ConfiguredObjectRecord> binding2Parents = new HashMap<String, ConfiguredObjectRecord>();
+ binding2Parents.put("Exchange", exchangeRecord);
+ binding2Parents.put("Queue", queue2Record);
+ final ConfiguredObjectRecordImpl binding2Record =
+ new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents);
+ _store.update(true, bindingRecord, binding2Record);
_store.close();
_store.configureConfigStore(_virtualHost, _recoveryHandler);
- verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes));
- verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes));
+ verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+ verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
_store.close();
}
+ private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
+ {
+ return argThat(new ConfiguredObjectMatcher(id, type, attributes));
+ }
+
+ private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord>
+ {
+ private final Map<String,Object> _matchingMap;
+ private final UUID _id;
+ private final String _name;
+
+ private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap)
+ {
+ _id = id;
+ _name = type;
+ _matchingMap = matchingMap;
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if(argument instanceof ConfiguredObjectRecord)
+ {
+ ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument;
+
+ Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes());
+ arg.remove("createdBy");
+ arg.remove("createdTime");
+ return (_id == ANY_UUID || _id.equals(binding.getId()))
+ && _name.equals(binding.getType())
+ && (_matchingMap == ANY_MAP || arg.equals(_matchingMap));
+
+ }
+ return false;
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 9f1f6f48c0..b784405881 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -214,17 +215,16 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
"org.apache.qpid.server.model.Binding",
createBinding("key",
- DIRECT_EXCHANGE_ID,
- QUEUE_ID,
"x-filter-jms-selector",
- "wibble"));
+ "wibble"),
+ createBindingParents(DIRECT_EXCHANGE_ID, QUEUE_ID)));
final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
+ new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding",
+ createBinding("key"))
};
verifyCorrectUpdates(expected);
@@ -239,32 +239,30 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
"org.apache.qpid.server.model.Binding",
createBinding("key",
- DIRECT_EXCHANGE_ID,
- QUEUE_ID,
"x-filter-jms-selector",
"wibble",
"not-a-selector",
- "moo"));
+ "moo"),
+ createBindingParents(DIRECT_EXCHANGE_ID, QUEUE_ID)));
final UUID customExchangeId = new UUID(3,0);
- _durableConfigurationRecoverer.configuredObject(new UUID(2, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(2, 0),
"org.apache.qpid.server.model.Binding",
createBinding("key",
- customExchangeId,
- QUEUE_ID,
"x-filter-jms-selector",
"wibble",
"not-a-selector",
- "moo"));
+ "moo"),
+ createBindingParents(customExchangeId,QUEUE_ID)));
- _durableConfigurationRecoverer.configuredObject(customExchangeId,
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(customExchangeId,
"org.apache.qpid.server.model.Exchange",
- createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+ createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)));
final ExchangeImpl customExchange = mock(ExchangeImpl.class);
@@ -291,10 +289,10 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
- new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
+ new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", "not-a-selector", "moo")),
+ new ConfiguredObjectRecordImpl(new UUID(2, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", "not-a-selector", "moo"))
};
verifyCorrectUpdates(expected);
@@ -308,17 +306,16 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
"org.apache.qpid.server.model.Binding",
createBinding("key",
- TOPIC_EXCHANGE_ID,
- QUEUE_ID,
"x-filter-jms-selector",
- "wibble"));
+ "wibble"),
+ createBindingParents(TOPIC_EXCHANGE_ID,QUEUE_ID)));
final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
- createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
+ new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding",
+ createBinding("key", "x-filter-jms-selector", "wibble"))
};
verifyCorrectUpdates(expected);
@@ -331,16 +328,15 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
"Binding",
createBinding("key",
- DIRECT_EXCHANGE_ID,
- QUEUE_ID,
"x-filter-jms-selector",
- "wibble"));
+ "wibble"),
+ createBindingParents(DIRECT_EXCHANGE_ID,QUEUE_ID)));
doThrow(new RuntimeException("Update Should not be called"))
- .when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class));
+ .when(_store).update(anyBoolean(), any(ConfiguredObjectRecordImpl[].class));
_durableConfigurationRecoverer.completeConfigurationRecovery();
}
@@ -350,13 +346,13 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
"Binding",
createBinding("key",
- new UUID(3,0),
- QUEUE_ID,
"x-filter-jms-selector",
- "wibble"));
+ "wibble"),
+ createBindingParents(new UUID(3,0),
+ QUEUE_ID)));
try
{
@@ -378,8 +374,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
try
{
final Map<String, Object> emptyArguments = Collections.emptyMap();
- _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
- "Wibble", emptyArguments);
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0),
+ "Wibble", emptyArguments));
_durableConfigurationRecoverer.completeConfigurationRecovery();
fail("Expected resolution to fail due to unknown object type");
}
@@ -425,11 +421,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
- _durableConfigurationRecoverer.configuredObject(queueId, Queue.class.getSimpleName(),
- createQueue("testQueue", exchangeId));
- _durableConfigurationRecoverer.configuredObject(exchangeId,
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(queueId, Queue.class.getSimpleName(),
+ createQueue("testQueue", exchangeId)));
+ _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(exchangeId,
org.apache.qpid.server.model.Exchange.class.getSimpleName(),
- createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+ createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)));
_durableConfigurationRecoverer.completeConfigurationRecovery();
@@ -450,16 +446,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
return null;
}
- }).when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class));
+ }).when(_store).update(anyBoolean(), any(ConfiguredObjectRecordImpl[].class));
}
- private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
+ private Map<String,Object> createBinding(String bindingKey, String... args)
{
Map<String, Object> binding = new LinkedHashMap<String, Object>();
binding.put("name", bindingKey);
- binding.put(Binding.EXCHANGE, exchangeId.toString());
- binding.put(Binding.QUEUE, queueId.toString());
Map<String,String> argumentMap = new LinkedHashMap<String, String>();
if(args != null && args.length != 0)
{
@@ -481,6 +475,15 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
return binding;
}
+ private Map<String,ConfiguredObjectRecord> createBindingParents(UUID exchangeId, UUID queueId)
+ {
+ Map<String,ConfiguredObjectRecord> parents = new HashMap<String, ConfiguredObjectRecord>();
+ parents.put("Exchange", new ConfiguredObjectRecordImpl(exchangeId,"Exchange",Collections.<String,Object>emptyMap()));
+ parents.put("Queue", new ConfiguredObjectRecordImpl(queueId,"Queue",Collections.<String,Object>emptyMap()));
+
+ return parents;
+ }
+
private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 1d7c2ffa46..0e80d72d9b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -190,6 +190,12 @@ public class MessageStoreTest extends QpidTestCase
try
{
_virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),new VirtualHostRegistry(_broker.getEventLogger()),getVirtualHostModel());
+ when(_virtualHostModel.getId()).thenReturn(_virtualHost.getId());
+
+ ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class);
+ when(objectRecord.getId()).thenReturn(_virtualHost.getId());
+ when(objectRecord.getType()).thenReturn(org.apache.qpid.server.model.VirtualHost.class.getSimpleName());
+ when(_virtualHostModel.asObjectRecord()).thenReturn(objectRecord);
}
catch (Exception e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 9407ef2d6c..2def24205c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -184,41 +184,25 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
return _realStore.addMessage(metaData);
}
-
@Override
- public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
+ public void create(ConfiguredObjectRecord record) throws StoreException
{
doPreDelay("create");
- _durableConfigurationStore.create(id, type, attributes);
+ _durableConfigurationStore.create(record);
doPostDelay("create");
}
- @Override
- public void remove(UUID id, String type) throws StoreException
- {
- doPreDelay("remove");
- _durableConfigurationStore.remove(id, type);
- doPostDelay("remove");
- }
@Override
- public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
+ public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
doPreDelay("remove");
- UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects);
+ UUID[] removed = _durableConfigurationStore.remove(objects);
doPostDelay("remove");
return removed;
}
@Override
- public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
- {
- doPreDelay("update");
- _durableConfigurationStore.update(id, type, attributes);
- doPostDelay("update");
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
doPreDelay("update");