summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java147
1 files changed, 19 insertions, 128 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index d7c8102c0e..f6b7e1790f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
@@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
private Map<String, String> _envConfigMap;
public AbstractBDBMessageStore()
@@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
try
{
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
-
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+ recoveryHandler.beginConfigurationRecovery(this);
+ loadConfiguredObjects(recoveryHandler);
- brh.completeBindingRecovery();
+ recoveryHandler.completeConfigurationRecovery();
}
catch (DatabaseException e)
{
@@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
- List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
try
{
cursor = _configuredObjectsDb.openCursor(null, null);
@@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value);
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- configuredObject.setId(id);
- results.add(configuredObject);
+
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
+ crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
}
}
@@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
closeCursorSafely(cursor);
}
- return results;
}
private void closeCursorSafely(Cursor cursor)
@@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
- /**
- * @see DurableConfigurationStore#createExchange(Exchange)
- */
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- storeConfiguredObjectEntry(configuredObject);
- }
- }
-
- /**
- * @see DurableConfigurationStore#removeExchange(Exchange)
- */
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- UUID id = exchange.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called");
- }
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
- * @see DurableConfigurationStore#bindQueue(Binding)
- */
- public void bindQueue(Binding binding) throws AMQStoreException
+ @Override
+ public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
storeConfiguredObjectEntry(configuredObject);
}
}
- /**
- * @see DurableConfigurationStore#unbindQueue(Binding)
- */
- public void unbindQueue(Binding binding)
- throws AMQStoreException
+ @Override
+ public void remove(UUID id, String type) throws AMQStoreException
{
- UUID id = binding.getId();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called");
+ LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
}
-
OperationStatus status = removeConfiguredObject(id);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue)
- */
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
- */
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId()
- + ", arguments=" + arguments + "): called");
- }
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- storeConfiguredObjectEntry(configuredObject);
+ throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found");
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating queue: " + queue.getName());
+ LOGGER.debug("Updating " +type + ", id: " + id);
}
try
{
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(queue.getId(), key);
+ keyBinding.objectToEntry(id, key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
@@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value);
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
@@ -879,29 +793,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
/**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- UUID id = queue.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called");
- }
-
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
* Places a message onto a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.