diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-23 18:29:22 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-23 18:29:22 +0000 |
commit | d6218a66346b3b4005d61cdcb26c4b7b0eb71cdd (patch) | |
tree | 9a07f7d20af117ed4c76f2406a591132c1c6475e | |
parent | a998e640d140cd3ef42e0126a88c248186601684 (diff) | |
download | qpid-python-d6218a66346b3b4005d61cdcb26c4b7b0eb71cdd.tar.gz |
QPID-5615 : [Java Broker] update the durable configuration store interface to be entirely in terms of ConfiguredObjectRecords and give stores knowledge of hierarchy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1580561 13f79535-47bb-0310-9956-ffa450edef68
48 files changed, 1606 insertions, 795 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 7d7ebd32fe..9a06f06999 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -41,8 +41,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.message.EnqueueableMessage; @@ -51,16 +49,11 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.*; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.util.FileUtils; @@ -70,7 +63,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private static final int LOCK_RETRY_ATTEMPTS = 5; - public static final int VERSION = 7; + public static final int VERSION = 8; private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ @@ -83,6 +76,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private Environment _environment; private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; + private static String CONFIGURED_OBJECT_HIERARCHY = "CONFIGURED_OBJECT_HIERARCHY"; + private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; @@ -92,6 +87,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; private Database _configuredObjectsDb; + private Database _configuredObjectHierarchyDb; private Database _configVersionDb; private Database _messageMetaDataDb; private Database _messageContentDb; @@ -146,6 +142,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private Map<String, String> _envConfigMap; private VirtualHost _virtualHost; + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); @@ -183,7 +180,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private void completeInitialisation() { - configure(_virtualHost); + configure(); _stateManager.attainState(State.INITIALISED); } @@ -221,27 +218,24 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo /** * Called after instantiation in order to configure the message store. * - * - * - * @param virtualHost The virtual host using this store * @return whether a new store environment was created or not (to indicate whether recovery is necessary) * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - public void configure(VirtualHost virtualHost) + public void configure() { - configure(virtualHost, _messageRecoveryHandler != null); + configure(_messageRecoveryHandler != null); } - public void configure(VirtualHost virtualHost, boolean isMessageStore) + public void configure(boolean isMessageStore) { - String name = virtualHost.getName(); + String name = _virtualHost.getName(); final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; String storeLocation; if(isMessageStore) { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + storeLocation = (String) _virtualHost.getAttribute(VirtualHost.STORE_PATH); if(storeLocation == null) { storeLocation = defaultPath; @@ -249,15 +243,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } else // we are acting only as the durable config store { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + storeLocation = (String) _virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); if(storeLocation == null) { storeLocation = defaultPath; } } - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + Object overfullAttr = _virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = _virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); _persistentSizeHighThreshold = overfullAttr == null ? -1l : overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); @@ -285,7 +279,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _envConfigMap = new HashMap<String, String>(); _envConfigMap.putAll(ENVCONFIG_DEFAULTS); - Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig"); + Object bdbEnvConfigAttr = _virtualHost.getAttribute("bdbEnvironmentConfig"); if(bdbEnvConfigAttr instanceof Map) { _envConfigMap.putAll((Map)bdbEnvConfigAttr); @@ -293,27 +287,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo LOGGER.info("Configuring BDB message store"); - setupStore(environmentPath, name); - } - - protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException - { - final List<Object> argumentNames = config.getList(prefix + ".name"); - final List<Object> argumentValues = config.getList(prefix + ".value"); - final int initialSize = argumentNames.size() + defaultConfig.size(); - - final Map<String,String> attributes = new HashMap<String,String>(initialSize); - attributes.putAll(defaultConfig); - - for (int i = 0; i < argumentNames.size(); i++) - { - final String argName = argumentNames.get(i).toString(); - final String argValue = argumentValues.get(i).toString(); - - attributes.put(argName, argValue); - } - - return Collections.unmodifiableMap(attributes); + setupStore(environmentPath); } @Override @@ -337,11 +311,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _stateManager.attainState(State.ACTIVE); } - protected void setupStore(File storePath, String name) + protected void setupStore(File storePath) { _environment = createEnvironment(storePath); - new Upgrader(_environment, name).upgradeIfNecessary(); + new Upgrader(_environment, _virtualHost).upgradeIfNecessary(); openDatabases(); @@ -355,6 +329,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo return _environment; } + protected final VirtualHost getVirtualHost() + { + return _virtualHost; + } + private void openDatabases() throws DatabaseException { DatabaseConfig dbConfig = new DatabaseConfig(); @@ -365,6 +344,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo dbConfig.setReadOnly(false); _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); + _configuredObjectHierarchyDb = openDatabase(CONFIGURED_OBJECT_HIERARCHY, dbConfig); _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); @@ -411,11 +391,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _messageContentDb.close(); } - if (_configuredObjectsDb != null) - { - LOGGER.info("Closing configurable objects database"); - _configuredObjectsDb.close(); - } + if (_configuredObjectsDb != null) + { + LOGGER.info("Closing configurable objects database"); + _configuredObjectsDb.close(); + } + + if (_configuredObjectHierarchyDb != null) + { + LOGGER.info("Closing configurable object hierarchy database"); + _configuredObjectHierarchyDb.close(); + } if (_deliveryDb != null) { @@ -555,24 +541,60 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { - Cursor cursor = null; + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; try { - cursor = _configuredObjectsDb.openCursor(null, null); + objectsCursor = _configuredObjectsDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = + new HashMap<UUID, BDBConfiguredObjectRecord>(); + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); - crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); } + // set parents + hierarchyCursor = _configuredObjectHierarchyDb.openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + else if(hk.getParentType().equals("Exchange")) + { + // TODO - remove this hack for the pre-defined exchanges + child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); + } + } + } + + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + crh.configuredObject(record); + } + + + } finally { - closeCursorSafely(cursor); + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); } } @@ -842,39 +864,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } @Override - public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException + public void create(ConfiguredObjectRecord configuredObject) throws StoreException { + if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); - storeConfiguredObjectEntry(configuredObject); - } - } + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - @Override - public void remove(UUID id, String type) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); - } - OperationStatus status = removeConfiguredObject(null, id); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Configured object of type " + type + " with id " + id + " not found"); + storeConfiguredObjectEntry(txn, configuredObject); + + txn.commit(); } } - @Override - public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException + public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); Collection<UUID> removed = new ArrayList<UUID>(objects.length); - for(UUID id : objects) + for(ConfiguredObjectRecord record : objects) { - if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS) + if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS) { - removed.add(id); + removed.add(record.getId()); } } @@ -882,52 +893,50 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo return removed.toArray(new UUID[removed.size()]); } - @Override - public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - update(false, id, type, attributes, null); - } - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); for(ConfiguredObjectRecord record : records) { - update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn); + update(createIfNecessary, record, txn); } txn.commit(); } - private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws + private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " +type + ", id: " + id); + LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); } try { DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(id, key); + keyBinding.objectToEntry(record.getId(), key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND)) + final boolean isNewRecord = status == OperationStatus.NOTFOUND; + if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord)) { - ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store - configuredObjectBinding.objectToEntry(newQueueRecord, newValue); + configuredObjectBinding.objectToEntry(record, newValue); status = _configuredObjectsDb.put(txn, key, newValue); if (status != OperationStatus.SUCCESS) { throw new StoreException("Error updating queue details within the store: " + status); } + if(isNewRecord) + { + writeHierarchyRecords(txn, record); + } } else if (status != OperationStatus.NOTFOUND) { @@ -1441,17 +1450,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo /** * Makes the specified configured object persistent. * + * + * @param txn * @param configuredObject Details of the configured object to store. * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. */ - private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException + private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException { if (_stateManager.isInState(State.ACTIVE)) { LOGGER.debug("Storing configured object: " + configuredObject); DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(configuredObject.getId(), key); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); DatabaseEntry value = new DatabaseEntry(); ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); @@ -1459,12 +1470,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo queueBinding.objectToEntry(configuredObject, value); try { - OperationStatus status = _configuredObjectsDb.put(null, key, value); + OperationStatus status = _configuredObjectsDb.put(txn, key, value); if (status != OperationStatus.SUCCESS) { throw new StoreException("Error writing configured object " + configuredObject + " to database: " + status); } + writeHierarchyRecords(txn, configuredObject); } catch (DatabaseException e) { @@ -1474,16 +1486,49 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException + private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) { + OperationStatus status; + HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance(); + DatabaseEntry hierarchyKey = new DatabaseEntry(); + DatabaseEntry hierarchyValue = new DatabaseEntry(); + + for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet()) + { + + hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey); + UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue); + status = _configuredObjectHierarchyDb.put(txn, hierarchyKey, hierarchyValue); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: " + + status); + } + } + } + private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException + { + UUID id = record.getId(); + Map<String, ConfiguredObjectRecord> parents = record.getParents(); LOGGER.debug("Removing configured object: " + id); DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); uuidBinding.objectToEntry(id, key); try { - return _configuredObjectsDb.delete(tx, key); + OperationStatus status = _configuredObjectsDb.delete(tx, key); + if(status == OperationStatus.SUCCESS) + { + for(String parentType : parents.keySet()) + { + DatabaseEntry hierarchyKey = new DatabaseEntry(); + HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance(); + keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey); + _configuredObjectHierarchyDb.delete(tx, hierarchyKey); + } + } + return status; } catch (DatabaseException e) { @@ -1859,4 +1904,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } } + + void setVirtualHost(final VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java new file mode 100644 index 0000000000..f13e4dd08b --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord +{ + private final UUID _id; + private final String _type; + private final Map<String,Object> _attributes; + private Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); + + public BDBConfiguredObjectRecord(final UUID id, final String type, final Map<String, Object> attributes) + { + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(attributes); + } + + public UUID getId() + { + return _id; + } + + public String getType() + { + return _type; + } + + public Map<String, Object> getAttributes() + { + return _attributes; + } + + void addParent(String parentType, ConfiguredObjectRecord parent) + { + _parents.put(parentType, parent); + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return Collections.unmodifiableMap(_parents); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final BDBConfiguredObjectRecord that = (BDBConfiguredObjectRecord) o; + + if (_attributes != null ? !_attributes.equals(that._attributes) : that._attributes != null) + { + return false; + } + if (_id != null ? !_id.equals(that._id) : that._id != null) + { + return false; + } + if (_type != null ? !_type.equals(that._type) : that._type != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _id != null ? _id.hashCode() : 0; + result = 31 * result + (_type != null ? _type.hashCode() : 0); + return result; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index d99733acf0..bdd48e99bb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -37,7 +37,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.HAMessageStore; @@ -124,9 +123,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private Map<String, String> _repConfig; @Override - public void configure(VirtualHost virtualHost) + public void configure() { //Mandatory configuration + VirtualHost virtualHost = getVirtualHost(); _groupName = getValidatedStringAttribute(virtualHost, "haGroupName"); _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName"); _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress"); @@ -159,7 +159,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess + "! Please set highAvailability.coalescingSync to false in store configuration."); } - super.configure(virtualHost); + super.configure(); } @@ -207,13 +207,13 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override - protected void setupStore(File storePath, String name) throws DatabaseException + protected void setupStore(File storePath) throws DatabaseException { - super.setupStore(storePath, name); + super.setupStore(storePath); if(_coalescingSync) { - _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment()); _commitThreadWrapper.startCommitThread(); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index acff8e2b21..0fd5060397 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -46,11 +46,11 @@ public class BDBMessageStore extends AbstractBDBMessageStore private CommitThreadWrapper _commitThreadWrapper; @Override - protected void setupStore(File storePath, String name) throws DatabaseException + protected void setupStore(File storePath) throws DatabaseException { - super.setupStore(storePath, name); + super.setupStore(storePath); - _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment()); _commitThreadWrapper.startCommitThread(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java new file mode 100644 index 0000000000..d1c341447e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.entry; + +import java.util.UUID; + +public class HierarchyKey +{ + private final UUID _childId; + private final String _parentType; + + public HierarchyKey(final UUID childId, final String parentType) + { + _childId = childId; + _parentType = parentType; + } + + public UUID getChildId() + { + return _childId; + } + + public String getParentType() + { + return _parentType; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final HierarchyKey that = (HierarchyKey) o; + + if (!_childId.equals(that._childId)) + { + return false; + } + if (!_parentType.equals(that._parentType)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _childId.hashCode(); + result = 31 * result + _parentType.hashCode(); + return result; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java index bc3beeb78b..38a2215fe7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -32,6 +32,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -80,7 +81,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord _uuid = uuid; } - public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) + public BDBConfiguredObjectRecord entryToObject(TupleInput tupleInput) { String type = tupleInput.readString(); String json = tupleInput.readString(); @@ -88,7 +89,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord try { Map<String,Object> value = mapper.readValue(json, Map.class); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value); + BDBConfiguredObjectRecord configuredObject = new BDBConfiguredObjectRecord(_uuid, type, value); return configuredObject; } catch (IOException e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java new file mode 100644 index 0000000000..13adaabfc8 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; +import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; + +import java.util.UUID; + +public class HierarchyKeyBinding extends TupleBinding<HierarchyKey> +{ + + private static final HierarchyKeyBinding INSTANCE = new HierarchyKeyBinding(); + + public static HierarchyKeyBinding getInstance() + { + return INSTANCE; + } + + /** private constructor forces getInstance instead */ + private HierarchyKeyBinding() { } + + public HierarchyKey entryToObject(TupleInput tupleInput) + { + UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong()); + String parentType = tupleInput.readString(); + + return new HierarchyKey(childId, parentType); + } + + public void objectToEntry(HierarchyKey hk, TupleOutput tupleOutput) + { + UUID uuid = hk.getChildId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + tupleOutput.writeString(hk.getParentType()); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java index adcaef35ef..b06b6d533b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java @@ -21,8 +21,9 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import com.sleepycat.je.Environment; +import org.apache.qpid.server.model.VirtualHost; public interface StoreUpgrade { - void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName); + void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index 87f8afde4a..3eac47c81b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -39,6 +39,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -74,7 +75,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost) { Transaction transaction = null; reportStarting(environment, 4); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 46f2afd741..dea8421a33 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -44,6 +44,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; @@ -118,11 +119,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade * Queue, Exchange, Bindings entries are stored now as configurable objects * in "CONFIGURED_OBJECTS" table. */ - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost) { reportStarting(environment, 5); upgradeMessages(environment, handler); - upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName); + upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHost.getName()); renameDatabases(environment, null); reportFinished(environment, 6); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java index ce00fd1a48..79314ae098 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java @@ -27,6 +27,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.OperationStatus; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.StoreException; public class UpgradeFrom6To7 extends AbstractStoreUpgrade @@ -35,7 +36,7 @@ public class UpgradeFrom6To7 extends AbstractStoreUpgrade private static final int DEFAULT_CONFIG_VERSION = 0; @Override - public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost) { reportStarting(environment, 6); DatabaseConfig dbConfig = new DatabaseConfig(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java new file mode 100644 index 0000000000..3756c11d0c --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.*; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class UpgradeFrom7To8 extends AbstractStoreUpgrade +{ + + @Override + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost) + { + reportStarting(environment, 7); + + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + + Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig); + Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig); + + Cursor objectsCursor = null; + + Transaction txn = environment.beginTransaction(null, null); + + try + { + objectsCursor = configuredObjectsDb.openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = + new HashMap<UUID, BDBConfiguredObjectRecord>(); + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + TupleInput input = TupleBinding.entryToInput(value); + String type = input.readString(); + + if(!type.endsWith("Binding")) + { + UUIDTupleBinding.getInstance().objectToEntry(virtualHost.getId(),value); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("VirtualHost"); + TupleBinding.outputToEntry(tupleOutput, key); + hierarchyDb.put(txn, key, value); + } + else + { + String json = input.readString(); + ObjectMapper mapper = new ObjectMapper(); + try + { + DatabaseEntry hierarchyKey = new DatabaseEntry(); + DatabaseEntry hierarchyValue = new DatabaseEntry(); + + Map<String,Object> attributes = mapper.readValue(json, Map.class); + Object queueIdString = attributes.remove("queue"); + if(queueIdString instanceof String) + { + UUID queueId = UUID.fromString(queueIdString.toString()); + UUIDTupleBinding.getInstance().objectToEntry(queueId,hierarchyValue); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("Queue"); + TupleBinding.outputToEntry(tupleOutput, hierarchyKey); + hierarchyDb.put(txn, hierarchyKey, hierarchyValue); + } + Object exchangeIdString = attributes.remove("exchange"); + if(exchangeIdString instanceof String) + { + UUID exchangeId = UUID.fromString(exchangeIdString.toString()); + UUIDTupleBinding.getInstance().objectToEntry(exchangeId,hierarchyValue); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("Exchange"); + TupleBinding.outputToEntry(tupleOutput, hierarchyKey); + hierarchyDb.put(txn, hierarchyKey, hierarchyValue); + } + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeString(type); + StringWriter writer = new StringWriter(); + mapper.writeValue(writer,attributes); + tupleOutput.writeString(writer.getBuffer().toString()); + TupleBinding.outputToEntry(tupleOutput, value); + objectsCursor.putCurrent(value); + } + catch (IOException e) + { + throw new StoreException(e); + } + + } + + + } + + + } + finally + { + if(objectsCursor != null) + { + objectsCursor.close(); + } + } + txn.commit(); + + hierarchyDb.close(); + configuredObjectsDb.close(); + + + + reportFinished(environment, 8); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 4d536a2f95..e960518719 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -24,6 +24,7 @@ import com.sleepycat.je.Cursor; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; @@ -41,12 +42,12 @@ public class Upgrader static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; - private String _virtualHostName; + private VirtualHost _virtualHost; - public Upgrader(Environment environment, String virtualHostName) + public Upgrader(Environment environment, VirtualHost virtualHost) { _environment = environment; - _virtualHostName = virtualHostName; + _virtualHost = virtualHost; } public void upgradeIfNecessary() @@ -146,7 +147,7 @@ public class Upgrader + "UpgradeFrom"+fromVersion+"To"+toVersion); Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor(); StoreUpgrade upgrade = ctr.newInstance(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHost); } catch (ClassNotFoundException e) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index 7f7b65f315..6a42b2691f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -19,7 +19,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; @@ -34,7 +33,8 @@ public class HAMessageStoreSmokeTest extends QpidTestCase { try { - _store.configure(mock(VirtualHost.class)); + _store.setVirtualHost(mock(VirtualHost.class)); + _store.configure(); fail("Expected an exception to be thrown"); } catch (ServerScopedRuntimeException ce) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java index 5a5d39081c..965cad1cb5 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; public class ConfiguredObjectBindingTest extends TestCase { @@ -46,7 +47,7 @@ public class ConfiguredObjectBindingTest extends TestCase { super.setUp(); _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, + _object = new ConfiguredObjectRecordImpl(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_MAP); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index b2b28b3c2d..ce143aba1b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -25,12 +25,16 @@ import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPrepare import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.InputStream; +import java.util.UUID; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -167,8 +171,11 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase return count.longValue(); } - public String getVirtualHostName() + public VirtualHost getVirtualHost() { - return getName(); + VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getName()).thenReturn(getName()); + when(virtualHost.getId()).thenReturn(UUID.randomUUID()); + return virtualHost; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index 500fb0a919..d0f9455d9a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -73,7 +73,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); @@ -103,7 +103,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHost()); HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES)); assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME)); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 701fd94115..0460b1ce4c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -87,7 +87,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase public void testPerformUpgrade() throws Exception { UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); assertDatabaseRecordCounts(); assertContent(); @@ -101,7 +101,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase corruptDatabase(); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); assertDatabaseRecordCounts(); @@ -117,7 +117,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); - upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName()); + upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHost()); assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); @@ -135,7 +135,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { populateOldXidEntries(environment); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); assertXidEntries(environment); } finally @@ -171,11 +171,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase NewRecordImpl[] newDequeues = newTransaction.getDequeues(); assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); NewRecordImpl enqueue = newEnqueues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHostName()), enqueue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHost().getName()), enqueue.getId()); assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); assertEquals("Unxpected new dequeues number", 1, newDequeues.length); NewRecordImpl dequeue = newDequeues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHostName()), dequeue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHost().getName()), dequeue.getId()); assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); } @@ -347,13 +347,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { String exchangeName = (String) deserialized.get(Exchange.NAME); assertNotNull(exchangeName); - assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName())); + assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName())); } else if (type.equals(Queue.class.getName())) { String queueName = (String) deserialized.get(Queue.NAME); assertNotNull(queueName); - assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHostName())); + assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName())); } else if (type.equals(Binding.class.getName())) { @@ -368,15 +368,15 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs) { Map<String, Object> expectedQueueBinding = new HashMap<String, Object>(); - expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHost().getName()).toString()); expectedQueueBinding.put(Binding.NAME, bindingName); - expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()).toString()); if (argumentMap != null) { expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap); } - expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHostName())); + expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHost().getName())); return expectedQueueBinding; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index 400ac12792..c0de884a28 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -43,7 +43,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHostName()); + _upgrader = new Upgrader(_environment, getVirtualHost()); } private int getStoreVersion() diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index 3465f3582f..4b9a8d19a8 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -51,7 +51,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHostName()); + _upgrader = new Upgrader(_environment, getVirtualHost()); } private int getStoreVersion(Environment environment) @@ -108,7 +108,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation); try { - _upgrader = new Upgrader(emptyEnvironment, getVirtualHostName()); + _upgrader = new Upgrader(emptyEnvironment, getVirtualHost()); _upgrader.upgradeIfNecessary(); List<String> databaseNames = emptyEnvironment.getDatabaseNames(); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index bd0411619e..29e9ae19d7 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -25,7 +25,8 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.UUID; -import org.apache.qpid.server.store.StoreException; + +import org.apache.qpid.server.store.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; @@ -41,12 +42,6 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; -import org.apache.qpid.server.store.MessageStoreTest; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -56,6 +51,8 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; +import static org.mockito.Mockito.mock; + /** * Subclass of MessageStoreTest which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. @@ -230,7 +227,8 @@ public class BDBMessageStoreTest extends MessageStoreTest messageStore.close(); AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure(getVirtualHostModel(),true); + newStore.setVirtualHost(getVirtualHostModel()); + newStore.configure(true); newStore.startWithNoRecover(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 740ccf8200..7e3e5c9bbe 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.model; import org.apache.qpid.server.model.adapter.AbstractConfiguredObject; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import java.security.AccessControlException; import java.util.Collection; @@ -279,4 +280,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> Class<? extends ConfiguredObject> getCategoryClass(); + // TODO - remove this when objects become responsible for their own storage + ConfiguredObjectRecord asObjectRecord(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index ee10ecfc8b..6a771c4c25 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.store.MessageStore; import java.security.AccessControlException; import java.util.Collection; import java.util.Map; +import java.util.UUID; @ManagedObject( managesChildren = true ) public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<X> @@ -181,6 +182,8 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject< // TODO - remove this TaskExecutor getTaskExecutor(); + Exchange getExchange(UUID id); + MessageStore getMessageStore(); String getType(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java index 2a12284f1f..5e3ebb86ce 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java @@ -26,6 +26,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.security.AccessControlException; import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.*; import org.apache.qpid.server.model.*; @@ -37,6 +38,7 @@ import org.apache.qpid.server.configuration.updater.SetAttributeTask; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -544,6 +546,61 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]"; } + public ConfiguredObjectRecord asObjectRecord() + { + return new ConfiguredObjectRecord() + { + @Override + public UUID getId() + { + return AbstractConfiguredObject.this.getId(); + } + + @Override + public String getType() + { + return getCategoryClass().getSimpleName(); + } + + @Override + public Map<String, Object> getAttributes() + { + return Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Map<String, Object>>() + { + @Override + public Map<String, Object> run() + { + Map<String,Object> actualAttributes = new HashMap<String, Object>(getActualAttributes()); + for(Map.Entry<String,Object> entry : actualAttributes.entrySet()) + { + if(entry.getValue() instanceof ConfiguredObject) + { + entry.setValue(((ConfiguredObject)entry.getValue()).getId()); + } + } + actualAttributes.remove(ID); + return actualAttributes; + } + }); + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + Map<String, ConfiguredObjectRecord> parents = new LinkedHashMap<String, ConfiguredObjectRecord>(); + for(Class<? extends ConfiguredObject> parentClass : Model.getInstance().getParentTypes(getCategoryClass())) + { + ConfiguredObject parent = getParent(parentClass); + if(parent != null) + { + parents.put(parentClass.getSimpleName(), parent.asObjectRecord()); + } + } + return parents; + } + }; + } + @SuppressWarnings("unchecked") @Override public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 6dbc2eea85..c9bca357f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -1166,8 +1166,15 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo } } + @Override public TaskExecutor getTaskExecutor() { return super.getTaskExecutor(); } + + @Override + public Exchange getExchange(UUID id) + { + return _virtualHost.getExchange(id); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java index dbe8bf22a0..da84580a94 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java @@ -27,10 +27,9 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur { @Override public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, - final UUID id, - final Map<String, Object> attributes) + final ConfiguredObjectRecord record) { - final UnresolvedObject obj = createUnresolvedObject(id, getType(), attributes); + final UnresolvedObject obj = createUnresolvedObject(record); UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies(); for(final UnresolvedDependency dependency : dependencies) { @@ -53,7 +52,7 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur dependency.resolve(o); if(obj.getUnresolvedDependencies().length == 0) { - durableConfigurationRecoverer.resolve(getType(), id, obj.resolve()); + durableConfigurationRecoverer.resolve(getType(), record.getId(), obj.resolve()); } } }); @@ -61,17 +60,15 @@ public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements Dur } if(obj.getUnresolvedDependencies().length == 0) { - durableConfigurationRecoverer.resolve(getType(), id, obj.resolve()); + durableConfigurationRecoverer.resolve(getType(), record.getId(), obj.resolve()); } else { - durableConfigurationRecoverer.addUnresolvedObject(getType(), id, obj); + durableConfigurationRecoverer.addUnresolvedObject(getType(), record.getId(), obj); } } - public abstract UnresolvedObject<T> createUnresolvedObject(final UUID id, - final String type, - final Map<String, Object> attributes); + public abstract UnresolvedObject<T> createUnresolvedObject(final ConfiguredObjectRecord record); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 81c74fe8f5..01b142c9bc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -24,6 +24,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -34,12 +36,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; @@ -48,6 +45,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.transport.ConnectionOpen; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; @@ -70,19 +68,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; - private static final String LINKS_TABLE_NAME = "QPID_LINKS"; - private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES"; private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; + private static final int DEFAULT_CONFIG_VERSION = 0; - public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME, - XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME }; + public static String[] ALL_TABLES = + new String[]{DB_VERSION_TABLE_NAME, XID_ACTIONS_TABLE_NAME, + XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, + CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME}; - private static final int DB_VERSION = 7; + private static final int DB_VERSION = 8; private final AtomicLong _messageId = new AtomicLong(0); private final AtomicBoolean _closed = new AtomicBoolean(false); @@ -115,34 +115,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; - private static final String SELECT_FROM_LINKS = - "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb"; - private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME - + " WHERE id_lsb = ? and id_msb = ?"; - private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, " - + "arguments FROM " + LINKS_TABLE_NAME; - private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and" - + " id_msb = ?"; - private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, " - + "id_msb, create_time, arguments ) values (?, ?, ?, ?)"; - private static final String SELECT_FROM_BRIDGES = - "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " - + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?"; - private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME - + " WHERE id_lsb = ? and id_msb = ?"; - private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " - + " create_time," - + " link_id_lsb, link_id_msb, " - + "arguments FROM " + BRIDGES_TABLE_NAME - + " WHERE link_id_lsb = ? and link_id_msb = ?"; - private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME + - " WHERE id_lsb = ? and id_msb = ?"; - private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, " - + "create_time, " - + "link_id_lsb, link_id_msb, " - + "arguments )" - + " values (?, ?, ?, ?, ?, ?)"; - private static final String INSERT_INTO_XIDS = "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)"; private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME @@ -166,6 +138,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC + " where id = ?"; private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; + + private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; + + private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " where child_id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; + protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); @@ -319,6 +299,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { case 6: upgradeFromV6(); + case 7: + upgradeFromV7(); case DB_VERSION: return; default: @@ -347,6 +329,135 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC updateDbVersion(7); } + + private void upgradeFromV7() throws SQLException + { + Connection connection = newConnection(); + try + { + Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); + List<UUID> others = new ArrayList<UUID>(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + + PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID id = UUID.fromString(rs.getString(1)); + String objectType = rs.getString(2); + Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); + if(objectType.endsWith("Binding")) + { + bindingsToUpdate.put(id,attributes); + } + else + { + others.add(id); + } + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); + try + { + for (UUID id : others) + { + stmt.setString(1, id.toString()); + stmt.setString(2, "VirtualHost"); + stmt.setString(3, _virtualHost.getId().toString()); + stmt.execute(); + } + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Queue"); + stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); + stmt.execute(); + + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Exchange"); + stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); + stmt.execute(); + } + } + finally + { + stmt.close(); + } + stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, "Binding"); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt.setBinaryStream(2, bis, attributesAsBytes.length); + stmt.setString(3, bindingEntry.getKey().toString()); + stmt.execute(); + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + stmt.close(); + } + stmt = connection.prepareStatement(UPDATE_DB_VERSION); + try + { + stmt.setInt(1, 8); + stmt.execute(); + } + finally + { + stmt.close(); + } + connection.commit(); + } + finally + { + connection.close(); + } + } + private void updateDbVersion(int newVersion) throws SQLException { Connection conn = newAutoCommitConnection(); @@ -388,11 +499,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC createVersionTable(conn); createConfigVersionTable(conn); createConfiguredObjectsTable(conn); + createConfiguredObjectHierarchyTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); - createLinkTable(conn); - createBridgeTable(conn); createXidTable(conn); createXidActionTable(conn); conn.close(); @@ -469,6 +579,23 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } + private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); + } + finally + { + stmt.close(); + } + } + } + private void createQueueEntryTable(final Connection conn) throws SQLException @@ -535,45 +662,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - private void createLinkTable(final Connection conn) throws SQLException - { - if(!tableExists(LINKS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null," - + " id_msb " + getSqlBigIntType() + " not null," - + " create_time " + getSqlBigIntType() + " not null," - + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))"); - } - finally - { - stmt.close(); - } - } - } - private void createBridgeTable(final Connection conn) throws SQLException - { - if(!tableExists(BRIDGES_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null," - + " id_msb " + getSqlBigIntType() + " not null," - + " create_time " + getSqlBigIntType() + " not null," - + " link_id_lsb " + getSqlBigIntType() + " not null," - + " link_id_msb " + getSqlBigIntType() + " not null," - + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))"); - } - finally - { - stmt.close(); - } - } - } private void createXidTable(final Connection conn) throws SQLException { @@ -817,35 +906,26 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override - public void create(UUID id, String type, Map<String,Object> attributes) throws StoreException + public void create(ConfiguredObjectRecord object) throws StoreException { if (_stateManager.isInState(State.ACTIVE)) { - insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes)); - } - - } - - @Override - public void remove(UUID id, String type) throws StoreException - { - int results = removeConfiguredObject(id); - if (results == 0) - { - throw new StoreException(type + " with id " + id + " not found"); - } - } - - @Override - public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id); - if (queueConfiguredObject != null) + try { - ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); - updateConfiguredObject(newQueueRecord); + Connection conn = newConnection(); + try + { + insertConfiguredObject(object, conn); + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error creating ConfiguredObject " + object); } } @@ -1993,109 +2073,85 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC _eventManager.addEventListener(eventListener, events); } - private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws StoreException + private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + try { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); try { - Connection conn = newAutoCommitConnection(); + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + boolean exists; try { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + exists = rs.next(); + + } + finally + { + rs.close(); + } + // If we don't have any data in the result set then we can add this configured object + if (!exists) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); try { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); - try + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) { - // If we don't have any data in the result set then we can add this configured object - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); - try - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, configuredObject.getType()); - if(configuredObject.getAttributes() == null) - { - insertStmt.setNull(3, Types.BLOB); - } - else - { - final Map<String, Object> attributes = configuredObject.getAttributes(); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); - - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); - } - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } + insertStmt.setNull(3, Types.BLOB); } - finally + else { - rs.close(); + final Map<String, Object> attributes = configuredObject.getAttributes(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); } + insertStmt.execute(); } finally { - stmt.close(); + insertStmt.close(); } + + writeHierarchy(configuredObject, conn); } - finally - { - conn.close(); - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (SQLException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - } - } - private int removeConfiguredObject(UUID id) throws StoreException - { - int results = 0; - try - { - Connection conn = newAutoCommitConnection(); - try - { - results = removeConfiguredObject(id, conn); } finally { - conn.close(); + stmt.close(); } + + } + catch (JsonMappingException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } catch (SQLException e) { - throw new StoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e); + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } - return results; + } - public UUID[] removeConfiguredObjects(UUID... objects) throws StoreException + @Override + public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException { Collection<UUID> removed = new ArrayList<UUID>(objects.length); try @@ -2104,11 +2160,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); try { - for(UUID id : objects) + for(ConfiguredObjectRecord record : objects) { - if(removeConfiguredObject(id, conn) != 0) + if(removeConfiguredObject(record.getId(), conn) != 0) { - removed.add(id); + removed.add(record.getId()); } } } @@ -2126,7 +2182,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException { - final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + final int results; + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); try { stmt.setString(1, id.toString()); @@ -2136,30 +2193,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } - return results; - } - - private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws StoreException - { - if (_stateManager.isInState(State.ACTIVE)) + stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); + try { - try - { - Connection conn = newAutoCommitConnection(); - try - { - updateConfiguredObject(configuredObject, false, conn); - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } + stmt.setString(1, id.toString()); + stmt.executeUpdate(); } + finally + { + stmt.close(); + } + + return results; } public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException @@ -2254,6 +2299,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { insertStmt.close(); } + writeHierarchy(configuredObject, conn); } } finally @@ -2280,72 +2326,31 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws StoreException + private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException { - ConfiguredObjectRecord result = null; + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); try { - Connection conn = newAutoCommitConnection(); - try - { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try - { - stmt.setString(1, id.toString()); - ResultSet rs = stmt.executeQuery(); - try - { - if (rs.next()) - { - String type = rs.getString(1); - String attributes = getBlobAsString(rs, 2); - result = new ConfiguredObjectRecord(id, type, - (new ObjectMapper()).readValue(attributes,Map.class)); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally + for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet()) { - conn.close(); + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, parentEntry.getKey()); + insertStmt.setString(3, parentEntry.getValue().getId().toString()); + + insertStmt.execute(); } } - catch (JsonMappingException e) - { - throw new StoreException("Error loading of configured object with id " + id + " from database: " - + e.getMessage(), e); - } - catch (JsonParseException e) - { - throw new StoreException("Error loading of configured object with id " + id + " from database: " - + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error loading of configured object with id " + id + " from database: " - + e.getMessage(), e); - } - catch (SQLException e) + finally { - throw new StoreException("Error loading of configured object with id " + id + " from database: " - + e.getMessage(), e); + insertStmt.close(); } - return result; } private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, StoreException { Connection conn = newAutoCommitConnection(); - + Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); final ObjectMapper objectMapper = new ObjectMapper(); try { @@ -2360,8 +2365,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC String id = rs.getString(1); String objectType = rs.getString(2); String attributes = getBlobAsString(rs, 3); - recoveryHandler.configuredObject(UUID.fromString(id), objectType, - objectMapper.readValue(attributes,Map.class)); + final ConfiguredObjectRecordImpl configuredObjectRecord = + new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, + objectMapper.readValue(attributes, Map.class)); + configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); + } } catch (JsonMappingException e) @@ -2385,11 +2393,53 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } + stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID childId = UUID.fromString(rs.getString(1)); + String parentType = rs.getString(2); + UUID parentId = UUID.fromString(rs.getString(3)); + + ConfiguredObjectRecordImpl child = configuredObjects.get(childId); + ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); + + if(child != null && parent != null) + { + child.addParent(parentType, parent); + } + else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) + { + // TODO - remove this hack for amq. exchanges + child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap())); + } + + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } finally { conn.close(); } + + for(ConfiguredObjectRecord record : configuredObjects.values()) + { + recoveryHandler.configuredObject(record); + } } protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; @@ -2429,4 +2479,51 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } + + private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord + { + + private final UUID _id; + private final String _type; + private final Map<String, Object> _attributes; + private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); + + private ConfiguredObjectRecordImpl(final UUID id, + final String type, + final Map<String, Object> attributes) + { + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(attributes); + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public String getType() + { + return _type; + } + + private void addParent(String parentType, ConfiguredObjectRecord parent) + { + _parents.put(parentType, parent); + } + + @Override + public Map<String, Object> getAttributes() + { + return _attributes; + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return Collections.unmodifiableMap(_parents); + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index c6ebe90802..c8aef92a95 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -27,7 +27,7 @@ public interface ConfigurationRecoveryHandler { void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion); - void configuredObject(UUID id, String type, Map<String, Object> attributes); + void configuredObject(ConfiguredObjectRecord object); /** * diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java index 44490385d9..99f8f0f04a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java @@ -20,69 +20,16 @@ */ package org.apache.qpid.server.store; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -public class ConfiguredObjectRecord +public interface ConfiguredObjectRecord { - private UUID _id; - private String _type; - private Map<String,Object> _attributes; + UUID getId(); - public ConfiguredObjectRecord(UUID id, String type, Map<String,Object> attributes) - { - super(); - _id = id; - _type = type; - _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes)); - } + String getType(); - public UUID getId() - { - return _id; - } + Map<String,Object> getAttributes(); - public String getType() - { - return _type; - } - - public Map<String,Object> getAttributes() - { - return _attributes; - } - - @Override - public String toString() - { - return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]"; - } - - @Override - public boolean equals(Object o) - { - if(this == o) - { - return true; - } - if(o == null || getClass() != o.getClass()) - { - return false; - } - - ConfiguredObjectRecord that = (ConfiguredObjectRecord) o; - - return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes); - } - - @Override - public int hashCode() - { - int result = _id.hashCode(); - result = 31 * result + _type.hashCode(); - result = 31 * result + _attributes.hashCode(); - return result; - } + Map<String, ConfiguredObjectRecord> getParents(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java new file mode 100644 index 0000000000..ed553aa823 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordImpl.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; + +public class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord +{ + private UUID _id; + private String _type; + private final Map<String,Object> _attributes; + private final Map<String,ConfiguredObjectRecord> _parents; + + public ConfiguredObjectRecordImpl(UUID id, String type, Map<String, Object> attributes) + { + this(id,type,attributes,Collections.<String,ConfiguredObjectRecord>emptyMap()); + } + + public ConfiguredObjectRecordImpl(UUID id, String type, Map<String, Object> attributes, Map<String,ConfiguredObjectRecord> parents) + { + super(); + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes)); + _parents = Collections.unmodifiableMap(new LinkedHashMap<String, ConfiguredObjectRecord>(parents)); + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public String getType() + { + return _type; + } + + @Override + public Map<String,Object> getAttributes() + { + return _attributes; + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return _parents; + } + + @Override + public String toString() + { + return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]"; + } + + @Override + public boolean equals(Object o) + { + if(this == o) + { + return true; + } + if(o == null || getClass() != o.getClass()) + { + return false; + } + + ConfiguredObjectRecordImpl that = (ConfiguredObjectRecordImpl) o; + + return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes); + } + + @Override + public int hashCode() + { + int result = _id.hashCode(); + result = 31 * result + _type.hashCode(); + result = 31 * result + _attributes.hashCode(); + return result; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java index 6696426c0e..f8d8ecdd7c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -76,19 +77,19 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl } @Override - public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes) + public void configuredObject(ConfiguredObjectRecord record) { - _upgrader.configuredObject(id, type, attributes); + _upgrader.configuredObject(record); } - void onConfiguredObject(final UUID id, final String type, final Map<String, Object> attributes) + void onConfiguredObject(ConfiguredObjectRecord record) { - DurableConfiguredObjectRecoverer recoverer = getRecoverer(type); + DurableConfiguredObjectRecoverer recoverer = getRecoverer(record.getType()); if(recoverer == null) { - throw new IllegalConfigurationException("Unknown type for configured object: " + type); + throw new IllegalConfigurationException("Unknown type for configured object: " + record.getType()); } - recoverer.load(this, id, attributes); + recoverer.load(this, record); } private DurableConfiguredObjectRecoverer getRecoverer(final String type) @@ -111,19 +112,12 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl private void applyUpgrade() { - final Collection<ConfiguredObjectRecord> updates = new ArrayList<ConfiguredObjectRecord>(); - final Collection<UUID> deletes = new ArrayList<UUID>(); - for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _upgrader.getUpdatedRecords().entrySet()) - { - if(entry.getValue() != null) - { - updates.add(entry.getValue()); - } - else - { - deletes.add(entry.getKey()); - } - } + final Collection<ConfiguredObjectRecord> updates = new HashSet<ConfiguredObjectRecord>(_upgrader.getUpdatedRecords().values()); + final Collection<ConfiguredObjectRecord> deletes = new HashSet<ConfiguredObjectRecord>(_upgrader.getDeletedRecords().values()); + + // Due to the way the upgraders work it is possible that the updates list may contain nulls + updates.remove(null); + deletes.remove(null); if(!updates.isEmpty()) { @@ -131,7 +125,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl } if(!deletes.isEmpty()) { - _store.removeConfiguredObjects(deletes.toArray(new UUID[deletes.size()])); + _store.remove(deletes.toArray(new ConfiguredObjectRecord[deletes.size()])); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 589eca1600..ae138f12c2 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -50,41 +50,33 @@ public interface DurableConfigurationStore /** * Makes the specified object persistent. * - * @param id The id of the object to persist. - * @param type The type of the object to persist - * @param attributes the attributes of the object to persist + * @param object The object to persist. * * @throws StoreException If the operation fails for any reason. */ - void create(UUID id, String type, Map<String, Object> attributes) throws StoreException; + void create(ConfiguredObjectRecord object) throws StoreException; /** - * Removes the specified persistent configured object. + * Removes the specified persistent configured objects. * - * @param id The id of the object to remove. - * @param type The type of the object to remove + * @param objects The objects to remove. * * @throws StoreException If the operation fails for any reason. */ - void remove(UUID id, String type) throws StoreException; - - public UUID[] removeConfiguredObjects(UUID... objects) throws StoreException; + public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException; /** - * Updates the specified object in the persistent store, IF it is already present. If the object - * is not present in the store, it will not be added. + * Updates the specified objects in the persistent store, IF it is already present. If the object + * is not present in the store, it will only be added if createIfNecessary is set to true, otherwise an exception + * will be thrown. * - * @param id The id of the object to update. - * @param type The type of the object to update - * @param attributes the updated attributes + * @param createIfNecessary if false then will fail if the object does not exist. + * @param records the records to update * * @throws StoreException If the operation fails for any reason. */ - void update(UUID id, String type, Map<String, Object> attributes) throws StoreException; - - - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException; + void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException; void close() throws Exception; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index f914389b0e..9410006d65 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -20,12 +20,8 @@ */ package org.apache.qpid.server.store; -import java.security.PrivilegedAction; import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; @@ -35,12 +31,8 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.security.*; - -import javax.security.auth.Subject; public class DurableConfigurationStoreHelper { @@ -58,7 +50,7 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - store.update(queue.getId(), QUEUE, attributesMap); + store.update(false, new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap)); } public static void createQueue(DurableConfigurationStore store, AMQQueue<?> queue) @@ -71,49 +63,33 @@ public class DurableConfigurationStoreHelper attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - store.create(queue.getId(), QUEUE, attributesMap); + store.create(new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap)); } public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) { - store.remove(queue.getId(), QUEUE); + store.remove(queue.asObjectRecord()); } public static void createExchange(DurableConfigurationStore store, ExchangeImpl exchange) { - Map<String, Object> attributesMap = exchange.getActualAttributes(); - attributesMap.remove(ConfiguredObject.ID); - - store.create(exchange.getId(), EXCHANGE, attributesMap); - + store.create(exchange.asObjectRecord()); } - public static void removeExchange(DurableConfigurationStore store, ExchangeImpl exchange) { - store.remove(exchange.getId(), EXCHANGE); + store.remove(exchange.asObjectRecord()); } public static void createBinding(DurableConfigurationStore store, final BindingImpl binding) { - Map<String, Object> attributesMap = binding.getActualAttributes(); - attributesMap.remove(ConfiguredObject.ID); - if(!attributesMap.containsKey(Binding.EXCHANGE)) - { - attributesMap.put(Binding.EXCHANGE, binding.getExchange()); - } - if(!attributesMap.containsKey(Binding.QUEUE)) - { - attributesMap.put(Binding.QUEUE, binding.getQueue()); - } - - store.create(binding.getId(), BINDING, attributesMap); + store.create(binding.asObjectRecord()); } public static void removeBinding(DurableConfigurationStore store, BindingImpl binding) { - store.remove(binding.getId(), BINDING); + store.remove(binding.asObjectRecord()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java index 1d3e4cc672..45b2ff56d7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java @@ -25,11 +25,12 @@ import java.util.UUID; public interface DurableConfigurationStoreUpgrader { - void configuredObject(UUID id, String type, Map<String, Object> attributes); + void configuredObject(ConfiguredObjectRecord record); void complete(); void setNextUpgrader(DurableConfigurationStoreUpgrader upgrader); Map<UUID, ConfiguredObjectRecord> getUpdatedRecords(); + Map<UUID, ConfiguredObjectRecord> getDeletedRecords(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java index e065728bd3..4c618f5089 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java @@ -26,8 +26,7 @@ import java.util.UUID; public interface DurableConfiguredObjectRecoverer { public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, - final UUID id, - final Map<String, Object> attributes); + final ConfiguredObjectRecord record); public String getType(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index e99c2019cd..c8898eedb6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -26,14 +26,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; @@ -106,7 +99,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values()); for(ConfiguredObjectRecord record : records) { - recoveryHandler.configuredObject(record.getId(), record.getType(), record.getAttributes()); + recoveryHandler.configuredObject(record); } int oldConfigVersion = _configVersion; _configVersion = recoveryHandler.completeConfigurationRecovery(); @@ -311,20 +304,21 @@ public class JsonFileConfigStore implements DurableConfigurationStore } } + Map<String,UUID> parentMap = new HashMap<String, UUID>(); if(parentId != null) { - data.put(parentClass.getSimpleName().toLowerCase(),parentId); + parentMap.put(parentClass.getSimpleName(),parentId); for(Class<? extends ConfiguredObject> otherParent : MODEL.getParentTypes(clazz)) { if(otherParent != parentClass) { final String otherParentAttr = otherParent.getSimpleName().toLowerCase(); - Object otherParentId = data.get(otherParentAttr); + Object otherParentId = data.remove(otherParentAttr); if(otherParentId instanceof String) { try { - data.put(otherParentAttr, UUID.fromString((String) otherParentId)); + parentMap.put(otherParent.getSimpleName(), UUID.fromString((String) otherParentId)); } catch(IllegalArgumentException e) { @@ -336,7 +330,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore } } - _objectsById.put(id, new ConfiguredObjectRecord(id, type, data)); + _objectsById.put(id, new ConfiguredObjectRecordImpl(id, type, data, parentMap)); List<UUID> idsForType = _idsByType.get(type); if(idsForType == null) { @@ -348,28 +342,27 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override - public synchronized void create(final UUID id, final String type, final Map<String, Object> attributes) throws - StoreException + public synchronized void create(ConfiguredObjectRecord record) throws StoreException { - if(_objectsById.containsKey(id)) + if(_objectsById.containsKey(record.getId())) { - throw new StoreException("Object with id " + id + " already exists"); + throw new StoreException("Object with id " + record.getId() + " already exists"); } - else if(!CLASS_NAME_MAPPING.containsKey(type)) + else if(!CLASS_NAME_MAPPING.containsKey(record.getType())) { - throw new StoreException("Cannot create object of unknown type " + type); + throw new StoreException("Cannot create object of unknown type " + record.getType()); } else { - ConfiguredObjectRecord record = new ConfiguredObjectRecord(id, type, attributes); - _objectsById.put(id, record); - List<UUID> idsForType = _idsByType.get(type); + + _objectsById.put(record.getId(), record); + List<UUID> idsForType = _idsByType.get(record.getType()); if(idsForType == null) { idsForType = new ArrayList<UUID>(); - _idsByType.put(type, idsForType); + _idsByType.put(record.getType(), idsForType); } - idsForType.add(id); + idsForType.add(record.getId()); save(); } } @@ -424,7 +417,21 @@ public class JsonFileConfigStore implements DurableConfigurationStore Map<String,Object> map = new LinkedHashMap<String, Object>(); map.put("id", id); map.putAll(record.getAttributes()); - map.remove(MODEL.getParentTypes(type).iterator().next().getSimpleName().toLowerCase()); + + Collection<Class<? extends ConfiguredObject>> parentTypes = MODEL.getParentTypes(type); + if(parentTypes.size() > 1) + { + Iterator<Class<? extends ConfiguredObject>> iter = parentTypes.iterator(); + // skip the first parent, which is given by structure + iter.next(); + // for all other parents add a fake attribute with name being the parent type in lower case, and the value + // being the parents id + while(iter.hasNext()) + { + String parentType = iter.next().getSimpleName(); + map.put(parentType.toLowerCase(), record.getParents().get(parentType).getId()); + } + } Collection<Class<? extends ConfiguredObject>> childClasses = new ArrayList<Class<? extends ConfiguredObject>>(MODEL.getChildTypes(type)); @@ -442,17 +449,9 @@ public class JsonFileConfigStore implements DurableConfigurationStore for(UUID childId : childIds) { ConfiguredObjectRecord childRecord = _objectsById.get(childId); - final String parentArg = type.getSimpleName().toLowerCase(); - final Object parent = childRecord.getAttributes().get(parentArg); - String parentId; - if(parent instanceof ConfiguredObject) - { - parentId = ((ConfiguredObject)parent).getId().toString(); - } - else - { - parentId = String.valueOf(parent); - } + + final ConfiguredObjectRecord parent = childRecord.getParents().get(type.getSimpleName()); + String parentId = parent.getId().toString(); if(id.toString().equals(parentId)) { entities.add(build(childClass,childId)); @@ -470,34 +469,22 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override - public void remove(final UUID id, final String type) throws StoreException - { - removeConfiguredObjects(id); - } - - @Override - public synchronized UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException + public synchronized UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { List<UUID> removedIds = new ArrayList<UUID>(); - for(UUID id : objects) + for(ConfiguredObjectRecord requestedRecord : objects) { - ConfiguredObjectRecord record = _objectsById.remove(id); + ConfiguredObjectRecord record = _objectsById.remove(requestedRecord.getId()); if(record != null) { - removedIds.add(id); - _idsByType.get(record.getType()).remove(id); + removedIds.add(record.getId()); + _idsByType.get(record.getType()).remove(record.getId()); } } save(); return removedIds.toArray(new UUID[removedIds.size()]); } - @Override - public void update(final UUID id, final String type, final Map<String, Object> attributes) throws - StoreException - { - update(false, new ConfiguredObjectRecord(id, type, attributes)); - } @Override public void update(final boolean createIfNecessary, final ConfiguredObjectRecord... records) @@ -583,5 +570,59 @@ public class JsonFileConfigStore implements DurableConfigurationStore return map; } + private class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord + { + + private final UUID _id; + private final String _type; + private final Map<String, Object> _attributes; + private final Map<String, UUID> _parents; + + private ConfiguredObjectRecordImpl(final UUID id, final String type, final Map<String, Object> attributes, + final Map<String, UUID> parents) + { + _id = id; + _type = type; + _attributes = attributes; + _parents = parents; + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public String getType() + { + return _type; + } + + @Override + public Map<String, Object> getAttributes() + { + return _attributes; + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + Map<String,ConfiguredObjectRecord> parents = new HashMap<String, ConfiguredObjectRecord>(); + for(Map.Entry<String,UUID> entry : _parents.entrySet()) + { + ConfiguredObjectRecord value = _objectsById.get(entry.getValue()); + + if(value == null && entry.getKey().equals("Exchange")) + { + // TODO - remove this hack for the defined exchanges + value = new ConfiguredObjectRecordImpl(entry.getValue(),entry.getKey(),Collections.<String,Object>emptyMap(), Collections.<String,UUID>emptyMap()); + } + + parents.put(entry.getKey(), value); + } + return parents; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java index a671e93b26..1abd3de34a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java @@ -28,6 +28,7 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad { private DurableConfigurationStoreUpgrader _nextUpgrader; private final Map<UUID, ConfiguredObjectRecord> _updates = new HashMap<UUID, ConfiguredObjectRecord>(); + private final Map<UUID, ConfiguredObjectRecord> _deletes = new HashMap<UUID, ConfiguredObjectRecord>(); public final void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader) { @@ -50,6 +51,10 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad { return _updates; } + protected Map<UUID, ConfiguredObjectRecord> getDeleteMap() + { + return _deletes; + } @Override public final Map<UUID, ConfiguredObjectRecord> getUpdatedRecords() @@ -59,4 +64,11 @@ public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrad return updates; } + @Override + public final Map<UUID, ConfiguredObjectRecord> getDeletedRecords() + { + final Map<UUID, ConfiguredObjectRecord> deletes = new HashMap<UUID, ConfiguredObjectRecord>(_deletes); + deletes.putAll(_nextUpgrader.getDeletedRecords()); + return deletes; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index c579a27731..782ac40a5d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -19,7 +19,6 @@ */ package org.apache.qpid.server.store; -import java.util.Map; import java.util.UUID; import org.apache.qpid.server.model.VirtualHost; @@ -32,29 +31,23 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void update(UUID id, String type, Map<String, Object> attributes) - { - } - - @Override public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) { } - - @Override - public void remove(UUID id, String type) - { - } - @Override - public UUID[] removeConfiguredObjects(final UUID... objects) + public UUID[] remove(final ConfiguredObjectRecord... objects) { - return objects; + final UUID[] removed = new UUID[objects.length]; + for(int i = 0; i < objects.length; i++) + { + removed[i] = objects[i].getId(); + } + return removed; } @Override - public void create(UUID id, String type, Map<String, Object> attributes) + public void create(ConfiguredObjectRecord record) { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java index c8a812fa89..b17dd4a6f3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullUpgrader.java @@ -34,9 +34,9 @@ public final class NullUpgrader implements DurableConfigurationStoreUpgrader } @Override - public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes) + public void configuredObject(final ConfiguredObjectRecord record) { - _durableConfigurationRecoverer.onConfiguredObject(id, type, attributes); + _durableConfigurationRecoverer.onConfiguredObject(record); } @Override @@ -55,4 +55,10 @@ public final class NullUpgrader implements DurableConfigurationStoreUpgrader { return Collections.emptyMap(); } + + @Override + public Map<UUID, ConfiguredObjectRecord> getDeletedRecords() + { + return Collections.emptyMap(); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index e6577e04e4..58721ea8b2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -28,9 +28,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -49,11 +51,9 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B } @Override - public UnresolvedObject<BindingImpl> createUnresolvedObject(final UUID id, - final String type, - final Map<String, Object> attributes) + public UnresolvedObject<BindingImpl> createUnresolvedObject(ConfiguredObjectRecord record) { - return new UnresolvedBinding(id, attributes); + return new UnresolvedBinding(record); } @Override @@ -76,12 +76,11 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B private ExchangeImpl _exchange; private AMQQueue _queue; - public UnresolvedBinding(final UUID id, - final Map<String, Object> attributeMap) + public UnresolvedBinding(final ConfiguredObjectRecord record) { - _bindingId = id; - _exchangeId = UUID.fromString(String.valueOf(attributeMap.get(org.apache.qpid.server.model.Binding.EXCHANGE))); - _queueId = UUID.fromString(String.valueOf(attributeMap.get(org.apache.qpid.server.model.Binding.QUEUE))); + _bindingId = record.getId(); + _exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId(); + _queueId = record.getParents().get(Queue.class.getSimpleName()).getId(); _exchange = _exchangeRegistry.getExchange(_exchangeId); if(_exchange == null) { @@ -93,8 +92,8 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B _unresolvedDependencies.add(new QueueDependency()); } - _bindingName = (String) attributeMap.get(org.apache.qpid.server.model.Binding.NAME); - _bindingArgumentsMap = (Map<String, Object>) attributeMap.get(org.apache.qpid.server.model.Binding.ARGUMENTS); + _bindingName = (String) record.getAttributes().get(org.apache.qpid.server.model.Binding.NAME); + _bindingArgumentsMap = (Map<String, Object>) record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index efdca3b67d..6718acfccf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; import org.apache.qpid.server.store.NonNullUpgrader; @@ -105,9 +106,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider } @Override - public void configuredObject(final UUID id, final String type, Map<String, Object> attributes) + public void configuredObject(final ConfiguredObjectRecord record) { - _records.put(id, new ConfiguredObjectRecord(id, type, attributes)); + _records.put(record.getId(), record); } private void removeSelectorArguments(Map<String, Object> binding) @@ -119,9 +120,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider binding.put(Binding.ARGUMENTS, arguments); } - private boolean isTopicExchange(Map<String, Object> binding) + private boolean isTopicExchange(ConfiguredObjectRecord entry) { - UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE)); + UUID exchangeId = entry.getParents().get("Exchange").getId(); if(_records.containsKey(exchangeId)) { @@ -155,17 +156,17 @@ public class DefaultUpgraderProvider implements UpgraderProvider String type = record.getType(); Map<String, Object> attributes = record.getAttributes(); UUID id = record.getId(); - if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(attributes)) + if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) { attributes = new LinkedHashMap<String, Object>(attributes); removeSelectorArguments(attributes); - record = new ConfiguredObjectRecord(id, type, attributes); + record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); getUpdateMap().put(id, record); entry.setValue(record); } - getNextUpgrader().configuredObject(id, type, attributes); + getNextUpgrader().configuredObject(record); } getNextUpgrader().complete(); @@ -179,12 +180,14 @@ public class DefaultUpgraderProvider implements UpgraderProvider */ private class Version1Upgrader extends NonNullUpgrader { + + @Override - public void configuredObject(final UUID id, String type, final Map<String, Object> attributes) + public void configuredObject(final ConfiguredObjectRecord record) { - type = type.substring(1+type.lastIndexOf('.')); - getUpdateMap().put(id, new ConfiguredObjectRecord(id, type, attributes)); - + String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); + getUpdateMap().put(record.getId(), + new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents())); } @Override @@ -193,30 +196,31 @@ public class DefaultUpgraderProvider implements UpgraderProvider for(Map.Entry<UUID, ConfiguredObjectRecord> entry : getUpdateMap().entrySet()) { final ConfiguredObjectRecord record = entry.getValue(); - if(isBinding(record.getType()) && (unknownExchange((String) record.getAttributes().get(Binding.EXCHANGE)) - || unknownQueue((String) record.getAttributes().get(Binding.QUEUE)))) + final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); + final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); + if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) + || queueParent == null || unknownQueue(queueParent.getId()))) { + getDeleteMap().put(entry.getKey(), entry.getValue()); entry.setValue(null); } else { - getNextUpgrader().configuredObject(record.getId(), record.getType(), record.getAttributes()); + getNextUpgrader().configuredObject(record); } } getNextUpgrader().complete(); } - private boolean unknownExchange(final String exchangeIdString) + private boolean unknownExchange(final UUID exchangeId) { - UUID exchangeId = UUID.fromString(exchangeIdString); ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())) || _exchangeRegistry.getExchange(exchangeId) != null); } - private boolean unknownQueue(final String queueIdString) + private boolean unknownQueue(final UUID queueId) { - UUID queueId = UUID.fromString(queueIdString); ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())) || _virtualHost.getQueue(queueId) != null); @@ -240,22 +244,24 @@ public class DefaultUpgraderProvider implements UpgraderProvider private static final String ARGUMENTS = "arguments"; @Override - public void configuredObject(UUID id, String type, Map<String, Object> attributes) + public void configuredObject(ConfiguredObjectRecord record) { - if(Queue.class.getSimpleName().equals(type)) + + if(Queue.class.getSimpleName().equals(record.getType())) { Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); - if(attributes.get(ARGUMENTS) instanceof Map) + if(record.getAttributes().get(ARGUMENTS) instanceof Map) { - newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes + newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes() .get(ARGUMENTS))); } - newAttributes.putAll(attributes); - attributes = newAttributes; - getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + newAttributes.putAll(record.getAttributes()); + + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); } - getNextUpgrader().configuredObject(id,type,attributes); + getNextUpgrader().configuredObject(record); } @Override @@ -274,16 +280,17 @@ public class DefaultUpgraderProvider implements UpgraderProvider { @Override - public void configuredObject(UUID id, String type, Map<String, Object> attributes) + public void configuredObject(ConfiguredObjectRecord record) { - if(Queue.class.getSimpleName().equals(type)) + + if(Queue.class.getSimpleName().equals(record.getType())) { - Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes); - if(attributes.get(EXCLUSIVE) instanceof Boolean) + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes()); + if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) { - boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE); + boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); - if(!isExclusive && attributes.containsKey("owner")) + if(!isExclusive && record.getAttributes().containsKey("owner")) { newAttributes.remove("owner"); } @@ -292,15 +299,16 @@ public class DefaultUpgraderProvider implements UpgraderProvider { newAttributes.remove("owner"); } - if(!attributes.containsKey("durable")) + if(!record.getAttributes().containsKey("durable")) { newAttributes.put("durable","true"); } - attributes = newAttributes; - getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + + record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); } - getNextUpgrader().configuredObject(id,type,attributes); + getNextUpgrader().configuredObject(record); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 2743b0ef59..4431fc786d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -50,11 +51,9 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< } @Override - public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final UUID id, - final String type, - final Map<String, Object> attributes) + public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final ConfiguredObjectRecord record) { - return new UnresolvedExchange(id, attributes); + return new UnresolvedExchange(record.getId(), record.getAttributes()); } private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 385f4e219e..eb1bc883ca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -58,11 +59,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ } @Override - public UnresolvedObject<AMQQueue> createUnresolvedObject(final UUID id, - final String type, - final Map<String, Object> attributes) + public UnresolvedObject<AMQQueue> createUnresolvedObject(final ConfiguredObjectRecord record) { - return new UnresolvedQueue(id, type, attributes); + return new UnresolvedQueue(record.getId(), record.getAttributes()); } private class UnresolvedQueue implements UnresolvedObject<AMQQueue> @@ -75,7 +74,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ private ExchangeImpl _alternateExchange; public UnresolvedQueue(final UUID id, - final String type, final Map<String, Object> attributes) { _attributes = attributes; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 04c2f974c9..0e30096047 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -43,6 +42,8 @@ import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -68,6 +69,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName(); private static final String QUEUE = Queue.class.getSimpleName(); + private static final UUID ANY_UUID = UUID.randomUUID(); + private static final Map ANY_MAP = new HashMap(); + + private String _storePath; private String _storeName; private MessageStore _messageStore; @@ -117,10 +122,17 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(_exchange.getId()).thenReturn(_exchangeId); when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); when(_exchange.getEventLogger()).thenReturn(new EventLogger()); + + ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); + when(exchangeRecord.getId()).thenReturn(_exchangeId); + when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); + when(_exchange.asObjectRecord()).thenReturn(exchangeRecord); + when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( _storePath); when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); + _bindingArgs = new HashMap<String, Object>(); String argKey = AMQPFilterTypes.JMS_SELECTOR.toString(); String argValue = "some selector expression"; @@ -149,8 +161,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), - eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), + verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE, + map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()))); } @@ -183,7 +195,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); + verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); } public void testBindQueue() throws Exception @@ -191,42 +203,86 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, _bindingArgs); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); DurableConfigurationStoreHelper.createBinding(_configStore, binding); reopenStore(); Map<String,Object> map = new HashMap<String, Object>(); - map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString()); - map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString()); - map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); - map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,_bindingArgs); + map.put(Binding.NAME, ROUTING_KEY); + map.put(Binding.ARGUMENTS,_bindingArgs); + + Map<String,UUID> parents = new HashMap<String, UUID>(); + + parents.put(Exchange.class.getSimpleName(), _exchange.getId()); + parents.put(Queue.class.getSimpleName(), queue.getId()); + + verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents)); + } + + + private ConfiguredObjectRecord matchesRecord(UUID id, + String type, + Map<String, Object> attributes, + final Map<String, UUID> parents) + { + return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents)); + } - verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING), - argThat(new IgnoreCreatedByMatcher(map))); + private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes) + { + return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP)); } - private static class IgnoreCreatedByMatcher extends ArgumentMatcher<Map<String,Object>> + private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord> { private final Map<String,Object> _matchingMap; + private final UUID _id; + private final String _name; + private final Map<String,UUID> _parents; - private IgnoreCreatedByMatcher(final Map<String, Object> matchingMap) + private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap, Map<String,UUID> parents) { + _id = id; + _name = type; _matchingMap = matchingMap; + _parents = parents; } @Override public boolean matches(final Object argument) { - if(argument instanceof Map) + if(argument instanceof ConfiguredObjectRecord) { - Map<String,Object> arg = new HashMap<String, Object>((Map<String,Object>) argument); + ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument; + + Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes()); arg.remove("createdBy"); arg.remove("createdTime"); - return arg.equals(_matchingMap); - + return (_id == ANY_UUID || _id.equals(binding.getId())) + && _name.equals(binding.getType()) + && (_matchingMap == ANY_MAP || arg.equals(_matchingMap)) + && (_parents == ANY_MAP || matchesParents(binding)); } return false; } + + private boolean matchesParents(ConfiguredObjectRecord binding) + { + Map<String, ConfiguredObjectRecord> bindingParents = binding.getParents(); + if(bindingParents.size() != _parents.size()) + { + return false; + } + for(Map.Entry<String,UUID> entry : _parents.entrySet()) + { + if(!bindingParents.get(entry.getKey()).getId().equals(entry.getValue())) + { + return false; + } + } + return true; + } } public void testUnbindQueue() throws Exception @@ -239,9 +295,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.removeBinding(_configStore, binding); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(BINDING), - anyMap()); + verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING, + ANY_MAP)); } public void testCreateQueueAMQQueue() throws Exception @@ -254,7 +309,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -276,7 +331,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -294,7 +349,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); } private ExchangeImpl createTestAlternateExchange() @@ -327,7 +382,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); } @@ -354,7 +409,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testRemoveQueue() throws Exception @@ -369,9 +424,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Queue.class.getName()), - anyMap()); + verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); } private AMQQueue createTestQueue(String queueName, @@ -424,6 +477,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest }); when(queue.getActualAttributes()).thenReturn(attributes); + + ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class); + when(objectRecord.getId()).thenReturn(_queueId); + when(objectRecord.getType()).thenReturn(Queue.class.getSimpleName()); + when(objectRecord.getAttributes()).thenReturn(attributes); + when(queue.asObjectRecord()).thenReturn(objectRecord); return queue; } @@ -440,6 +499,13 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(exchange.getTypeName()).thenReturn(getName() + "Type"); when(exchange.isAutoDelete()).thenReturn(true); when(exchange.getId()).thenReturn(_exchangeId); + ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); + when(exchangeRecord.getId()).thenReturn(_exchangeId); + when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); + Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes); + actualAttributesExceptId.remove("id"); + when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId); + when(exchange.asObjectRecord()).thenReturn(exchangeRecord); return exchange; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index 224a22687f..4304b59d10 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -31,11 +31,13 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentMatcher; import org.mockito.InOrder; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -49,6 +51,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase private VirtualHost _virtualHost; private JsonFileConfigStore _store; + + private static final UUID ANY_UUID = UUID.randomUUID(); + private static final Map<String, Object> ANY_MAP = new HashMap<String, Object>(); + @Override public void setUp() throws Exception { @@ -109,7 +115,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.configureConfigStore(_virtualHost, _recoveryHandler); InOrder inorder = inOrder(_recoveryHandler); inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); - inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap()); + inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class)); inorder.verify(_recoveryHandler).completeConfigurationRecovery(); _store.close(); } @@ -141,11 +147,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase final String queueType = Queue.class.getSimpleName(); final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); - _store.create(queueId, queueType, queueAttr); + _store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr)); _store.close(); _store.configureConfigStore(_virtualHost, _recoveryHandler); - verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); + verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); _store.close(); } @@ -156,17 +162,17 @@ public class JsonFileConfigStoreTest extends QpidTestCase final String queueType = Queue.class.getSimpleName(); Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); - _store.create(queueId, queueType, queueAttr); + _store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr)); queueAttr = new HashMap<String,Object>(queueAttr); queueAttr.put("owner", "theowner"); - _store.update(queueId, queueType, queueAttr); + _store.update(false, new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr)); _store.close(); _store.configureConfigStore(_virtualHost, _recoveryHandler); - verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); + verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); _store.close(); } @@ -178,15 +184,16 @@ public class JsonFileConfigStoreTest extends QpidTestCase final String queueType = Queue.class.getSimpleName(); Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); - _store.create(queueId, queueType, queueAttr); + final ConfiguredObjectRecordImpl record = new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr); + _store.create(record); - _store.remove(queueId, queueType); + _store.remove(record); _store.close(); _store.configureConfigStore(_virtualHost, _recoveryHandler); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); + verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); _store.close(); } @@ -195,7 +202,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.configureConfigStore(_virtualHost, _recoveryHandler); try { - _store.create(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap()); + _store.create(new ConfiguredObjectRecordImpl(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap())); fail("Should not be able to create instance of type wibble"); } catch (StoreException e) @@ -208,10 +215,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase { _store.configureConfigStore(_virtualHost, _recoveryHandler); final UUID id = UUID.randomUUID(); - _store.create(id, "Queue", Collections.<String, Object>emptyMap()); + _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap())); try { - _store.create(id, "Exchange", Collections.<String, Object>emptyMap()); + _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap())); fail("Should not be able to create two objects with same id"); } catch (StoreException e) @@ -225,13 +232,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase { _store.configureConfigStore(_virtualHost, _recoveryHandler); final UUID id = UUID.randomUUID(); - _store.create(id, "Queue", Collections.<String, Object>emptyMap()); + _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap())); _store.close(); _store.configureConfigStore(_virtualHost, _recoveryHandler); try { - _store.update(id, "Exchange", Collections.<String, Object>emptyMap()); + _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap())); fail("Should not be able to update object to different type"); } catch (StoreException e) @@ -270,31 +277,75 @@ public class JsonFileConfigStoreTest extends QpidTestCase final Map<String, Object> EMPTY_ATTR = Collections.emptyMap(); final UUID exchangeId = new UUID(0, 2); - final Map<String, Object> bindingAttributes = new HashMap<String, Object>(); - bindingAttributes.put(Binding.EXCHANGE, exchangeId); - bindingAttributes.put(Binding.QUEUE, queueId); - final Map<String, Object> binding2Attributes = new HashMap<String, Object>(); - binding2Attributes.put(Binding.EXCHANGE, exchangeId); - binding2Attributes.put(Binding.QUEUE, queue2Id); final UUID bindingId = new UUID(0, 3); final UUID binding2Id = new UUID(1, 3); - _store.create(queueId, "Queue", EMPTY_ATTR); - _store.create(queue2Id, "Queue", EMPTY_ATTR); - _store.create(exchangeId, "Exchange", EMPTY_ATTR); - _store.update(true, - new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes), - new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes)); + final ConfiguredObjectRecordImpl queueRecord = new ConfiguredObjectRecordImpl(queueId, "Queue", EMPTY_ATTR); + _store.create(queueRecord); + final ConfiguredObjectRecordImpl queue2Record = new ConfiguredObjectRecordImpl(queue2Id, "Queue", EMPTY_ATTR); + _store.create(queue2Record); + final ConfiguredObjectRecordImpl exchangeRecord = new ConfiguredObjectRecordImpl(exchangeId, "Exchange", EMPTY_ATTR); + _store.create(exchangeRecord); + Map<String,ConfiguredObjectRecord> bindingParents = new HashMap<String, ConfiguredObjectRecord>(); + bindingParents.put("Exchange", exchangeRecord); + bindingParents.put("Queue", queueRecord); + final ConfiguredObjectRecordImpl bindingRecord = + new ConfiguredObjectRecordImpl(bindingId, "Binding", EMPTY_ATTR, bindingParents); + + + Map<String,ConfiguredObjectRecord> binding2Parents = new HashMap<String, ConfiguredObjectRecord>(); + binding2Parents.put("Exchange", exchangeRecord); + binding2Parents.put("Queue", queue2Record); + final ConfiguredObjectRecordImpl binding2Record = + new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents); + _store.update(true, bindingRecord, binding2Record); _store.close(); _store.configureConfigStore(_virtualHost, _recoveryHandler); - verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes)); - verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes)); + verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); + verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); _store.close(); } + private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes) + { + return argThat(new ConfiguredObjectMatcher(id, type, attributes)); + } + + private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord> + { + private final Map<String,Object> _matchingMap; + private final UUID _id; + private final String _name; + + private ConfiguredObjectMatcher(final UUID id, final String type, final Map<String, Object> matchingMap) + { + _id = id; + _name = type; + _matchingMap = matchingMap; + } + + @Override + public boolean matches(final Object argument) + { + if(argument instanceof ConfiguredObjectRecord) + { + ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument; + + Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes()); + arg.remove("createdBy"); + arg.remove("createdTime"); + return (_id == ANY_UUID || _id.equals(binding.getId())) + && _name.equals(binding.getType()) + && (_matchingMap == ANY_MAP || arg.equals(_matchingMap)); + + } + return false; + } + } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 9f1f6f48c0..b784405881 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.DirectExchange; @@ -214,17 +215,16 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", createBinding("key", - DIRECT_EXCHANGE_ID, - QUEUE_ID, "x-filter-jms-selector", - "wibble")); + "wibble"), + createBindingParents(DIRECT_EXCHANGE_ID, QUEUE_ID))); final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) + new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding", + createBinding("key")) }; verifyCorrectUpdates(expected); @@ -239,32 +239,30 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", createBinding("key", - DIRECT_EXCHANGE_ID, - QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", - "moo")); + "moo"), + createBindingParents(DIRECT_EXCHANGE_ID, QUEUE_ID))); final UUID customExchangeId = new UUID(3,0); - _durableConfigurationRecoverer.configuredObject(new UUID(2, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(2, 0), "org.apache.qpid.server.model.Binding", createBinding("key", - customExchangeId, - QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", - "moo")); + "moo"), + createBindingParents(customExchangeId,QUEUE_ID))); - _durableConfigurationRecoverer.configuredObject(customExchangeId, + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(customExchangeId, "org.apache.qpid.server.model.Exchange", - createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE))); final ExchangeImpl customExchange = mock(ExchangeImpl.class); @@ -291,10 +289,10 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), - new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) + new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", "not-a-selector", "moo")), + new ConfiguredObjectRecordImpl(new UUID(2, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", "not-a-selector", "moo")) }; verifyCorrectUpdates(expected); @@ -308,17 +306,16 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", createBinding("key", - TOPIC_EXCHANGE_ID, - QUEUE_ID, "x-filter-jms-selector", - "wibble")); + "wibble"), + createBindingParents(TOPIC_EXCHANGE_ID,QUEUE_ID))); final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) + new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding", + createBinding("key", "x-filter-jms-selector", "wibble")) }; verifyCorrectUpdates(expected); @@ -331,16 +328,15 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding", createBinding("key", - DIRECT_EXCHANGE_ID, - QUEUE_ID, "x-filter-jms-selector", - "wibble")); + "wibble"), + createBindingParents(DIRECT_EXCHANGE_ID,QUEUE_ID))); doThrow(new RuntimeException("Update Should not be called")) - .when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class)); + .when(_store).update(anyBoolean(), any(ConfiguredObjectRecordImpl[].class)); _durableConfigurationRecoverer.completeConfigurationRecovery(); } @@ -350,13 +346,13 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding", createBinding("key", - new UUID(3,0), - QUEUE_ID, "x-filter-jms-selector", - "wibble")); + "wibble"), + createBindingParents(new UUID(3,0), + QUEUE_ID))); try { @@ -378,8 +374,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase try { final Map<String, Object> emptyArguments = Collections.emptyMap(); - _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), - "Wibble", emptyArguments); + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), + "Wibble", emptyArguments)); _durableConfigurationRecoverer.completeConfigurationRecovery(); fail("Expected resolution to fail due to unknown object type"); } @@ -425,11 +421,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); - _durableConfigurationRecoverer.configuredObject(queueId, Queue.class.getSimpleName(), - createQueue("testQueue", exchangeId)); - _durableConfigurationRecoverer.configuredObject(exchangeId, + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(queueId, Queue.class.getSimpleName(), + createQueue("testQueue", exchangeId))); + _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(exchangeId, org.apache.qpid.server.model.Exchange.class.getSimpleName(), - createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE))); _durableConfigurationRecoverer.completeConfigurationRecovery(); @@ -450,16 +446,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return null; } - }).when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class)); + }).when(_store).update(anyBoolean(), any(ConfiguredObjectRecordImpl[].class)); } - private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) + private Map<String,Object> createBinding(String bindingKey, String... args) { Map<String, Object> binding = new LinkedHashMap<String, Object>(); binding.put("name", bindingKey); - binding.put(Binding.EXCHANGE, exchangeId.toString()); - binding.put(Binding.QUEUE, queueId.toString()); Map<String,String> argumentMap = new LinkedHashMap<String, String>(); if(args != null && args.length != 0) { @@ -481,6 +475,15 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return binding; } + private Map<String,ConfiguredObjectRecord> createBindingParents(UUID exchangeId, UUID queueId) + { + Map<String,ConfiguredObjectRecord> parents = new HashMap<String, ConfiguredObjectRecord>(); + parents.put("Exchange", new ConfiguredObjectRecordImpl(exchangeId,"Exchange",Collections.<String,Object>emptyMap())); + parents.put("Queue", new ConfiguredObjectRecordImpl(queueId,"Queue",Collections.<String,Object>emptyMap())); + + return parents; + } + private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 1d7c2ffa46..0e80d72d9b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -190,6 +190,12 @@ public class MessageStoreTest extends QpidTestCase try { _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),new VirtualHostRegistry(_broker.getEventLogger()),getVirtualHostModel()); + when(_virtualHostModel.getId()).thenReturn(_virtualHost.getId()); + + ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class); + when(objectRecord.getId()).thenReturn(_virtualHost.getId()); + when(objectRecord.getType()).thenReturn(org.apache.qpid.server.model.VirtualHost.class.getSimpleName()); + when(_virtualHostModel.asObjectRecord()).thenReturn(objectRecord); } catch (Exception e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 9407ef2d6c..2def24205c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -184,41 +184,25 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore return _realStore.addMessage(metaData); } - @Override - public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException + public void create(ConfiguredObjectRecord record) throws StoreException { doPreDelay("create"); - _durableConfigurationStore.create(id, type, attributes); + _durableConfigurationStore.create(record); doPostDelay("create"); } - @Override - public void remove(UUID id, String type) throws StoreException - { - doPreDelay("remove"); - _durableConfigurationStore.remove(id, type); - doPostDelay("remove"); - } @Override - public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException + public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { doPreDelay("remove"); - UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects); + UUID[] removed = _durableConfigurationStore.remove(objects); doPostDelay("remove"); return removed; } @Override - public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - doPreDelay("update"); - _durableConfigurationStore.update(id, type, attributes); - doPostDelay("update"); - } - - @Override public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { doPreDelay("update"); |