diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java | 147 |
1 files changed, 19 insertions, 128 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index d7c8102c0e..f6b7e1790f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; @@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private final EventManager _eventManager = new EventManager(); private String _storeLocation; - private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); - private Map<String, String> _envConfigMap; public AbstractBDBMessageStore() @@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - ExchangeRecoveryHandler erh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - - QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - BindingRecoveryHandler brh = qrh.completeQueueRecovery(); - _configuredObjectHelper.recoverBindings(brh, configuredObjects); + recoveryHandler.beginConfigurationRecovery(this); + loadConfiguredObjects(recoveryHandler); - brh.completeBindingRecovery(); + recoveryHandler.completeConfigurationRecovery(); } catch (DatabaseException e) { @@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } - private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; - List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); try { cursor = _configuredObjectsDb.openCursor(null, null); @@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry value = new DatabaseEntry(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value); UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - configuredObject.setId(id); - results.add(configuredObject); + + ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); + crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); } } @@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { closeCursorSafely(cursor); } - return results; } private void closeCursorSafely(Cursor cursor) @@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - /** - * @see DurableConfigurationStore#createExchange(Exchange) - */ - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); - storeConfiguredObjectEntry(configuredObject); - } - } - - /** - * @see DurableConfigurationStore#removeExchange(Exchange) - */ - public void removeExchange(Exchange exchange) throws AMQStoreException - { - UUID id = exchange.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called"); - } - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found"); - } - } - - - /** - * @see DurableConfigurationStore#bindQueue(Binding) - */ - public void bindQueue(Binding binding) throws AMQStoreException + @Override + public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); storeConfiguredObjectEntry(configuredObject); } } - /** - * @see DurableConfigurationStore#unbindQueue(Binding) - */ - public void unbindQueue(Binding binding) - throws AMQStoreException + @Override + public void remove(UUID id, String type) throws AMQStoreException { - UUID id = binding.getId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called"); + LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); } - OperationStatus status = removeConfiguredObject(id); if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Binding " + binding + " not found"); - } - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue) - */ - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable) - */ - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId() - + ", arguments=" + arguments + "): called"); - } - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); - storeConfiguredObjectEntry(configuredObject); + throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found"); } } - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * @param queue The queue to update the entry for. - * @throws AMQStoreException If the operation fails for any reason. - */ - public void updateQueue(final AMQQueue queue) throws AMQStoreException + @Override + public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating queue: " + queue.getName()); + LOGGER.debug("Updating " +type + ", id: " + id); } try { DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(queue.getId(), key); + keyBinding.objectToEntry(id, key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); @@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { - ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value); - ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); @@ -879,29 +793,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } /** - * Removes the specified queue from the persistent store. - * - * @param queue The queue to remove. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - UUID id = queue.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called"); - } - - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found"); - } - } - - - /** * Places a message onto a specified queue, in a given transaction. * * @param tx The transaction for the operation. |