summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java192
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java268
2 files changed, 206 insertions, 254 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 492ec9d7bf..4fe41d512a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -20,17 +20,17 @@ package org.apache.qpid.server.store.berkeleydb;
*
*/
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -47,8 +47,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
private BDBMessageStore _messageStore;
-
- private boolean _inVhostInitiatedClose;
+ private MessageStoreLogSubject _messageStoreLogSubject;
BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
@@ -60,28 +59,18 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
protected void initialiseStorage(VirtualHost virtualHost)
{
- _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
-
- final MessageStoreLogSubject storeLogSubject =
- new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
- OperationalLoggingListener.listen(_messageStore, storeLogSubject, getEventLogger());
-
- _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
- _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
- _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
-
+ setState(State.PASSIVE);
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), BDBMessageStore.class.getSimpleName());
+ _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
- _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
- _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
- _messageStore.openConfigurationStore(
- virtualHost.getName(), virtualHost.getMessageStoreSettings()
- );
+ _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings);
+ _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings);
- _messageStore.openMessageStore(
- virtualHost.getName(), virtualHost.getMessageStoreSettings()
- );
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
// Make the virtualhost model object a replication group listener
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
@@ -89,30 +78,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
}
-
- protected void closeStorage()
- {
- //Close MessageStore
- if (_messageStore != null)
- {
- //Remove MessageStore Interface should not throw Exception
- try
- {
- _inVhostInitiatedClose = true;
- _messageStore.closeMessageStore();
- _messageStore.closeConfigurationStore();
- }
- catch (Exception e)
- {
- getLogger().error("Failed to close message store", e);
- }
- finally
- {
- _inVhostInitiatedClose = false;
- }
- }
- }
-
@Override
public DurableConfigurationStore getDurableConfigurationStore()
{
@@ -125,77 +90,62 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
return _messageStore;
}
- private final class AfterInitialisationListener implements EventListener
+ private void activate()
{
- public void event(Event event)
+ try
{
- setState(State.PASSIVE);
- }
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- }
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger());
+ _messageStore.recoverConfigurationStore(getModel(), configRecoverer);
- private final class BeforePassivationListener implements EventListener
- {
- public void event(Event event)
- {
- State finalState = State.ERRORED;
-
- try
- {
- /* the approach here is not ideal as there is a race condition where a
- * queue etc could be created while the virtual host is on the way to
- * the passivated state. However the store state change from MASTER to UNKNOWN
- * is documented as exceptionally rare..
- */
-
- getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- removeHouseKeepingTasks();
+ initialiseModel();
- getQueueRegistry().stopAllAndUnregisterMBeans();
- getExchangeRegistry().clearAndUnregisterMbeans();
- getDtxRegistry().close();
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
+ _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
- finalState = State.PASSIVE;
- }
- finally
- {
- setState(finalState);
- reportIfError(getState());
- }
+ attainActivation();
}
-
- }
-
-
- private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
+ catch (Exception e)
{
- initialiseModel();
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
}
}
- private final class AfterActivationListener implements EventListener
+ private void passivate()
{
- @Override
- public void event(Event event)
+ State finalState = State.ERRORED;
+
+ try
{
- attainActivation();
+ /* the approach here is not ideal as there is a race condition where a
+ * queue etc could be created while the virtual host is on the way to
+ * the passivated state. However the store state change from MASTER to UNKNOWN
+ * is documented as exceptionally rare.
+ */
+
+ getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ removeHouseKeepingTasks();
+
+ getQueueRegistry().stopAllAndUnregisterMBeans();
+ getExchangeRegistry().clearAndUnregisterMbeans();
+ getDtxRegistry().close();
+
+ finalState = State.PASSIVE;
+ }
+ finally
+ {
+ setState(finalState);
+ reportIfError(getState());
}
}
- private final class BeforeCloseListener implements EventListener
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
{
- @Override
- public void event(Event event)
- {
- if(!_inVhostInitiatedClose)
- {
- shutdownHouseKeeping();
- }
-
- }
+ return _messageStoreLogSubject;
}
private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
@@ -208,8 +158,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
if (LOGGER.isInfoEnabled())
{
- LOGGER.info("Received BDB event indicating transition to state " + state
- + " when current message store state is " + _messageStore._messageStoreStateManager.getState());
+ LOGGER.info("Received BDB event indicating transition to state " + state);
}
switch (state)
@@ -232,43 +181,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
throw new IllegalStateException("Unexpected state change: " + state);
}
}
-
- private void activate()
- {
- try
- {
- _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
-
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger());
- _messageStore.recoverConfigurationStore(getModel(), configRecoverer);
-
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this);
- _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to activate on hearing MASTER change event", e);
- }
- }
-
- private void passivate()
- {
- try
- {
- //TODO: move this this into the store method passivate()
- if (_messageStore._messageStoreStateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
- {
- _messageStore._messageStoreStateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
- }
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
- }
- }
-
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index ec6ae23367..9f8e5410a7 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -47,8 +48,6 @@ import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
@@ -119,8 +118,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private EnvironmentFacade _environmentFacade;
private final AtomicLong _messageId = new AtomicLong(0);
- protected final StateManager _messageStoreStateManager;
- private final StateManager _configurationStoreStateManager;
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private long _totalStoreSize;
private boolean _limitBusted;
@@ -134,8 +133,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private volatile Committer _committer;
- private String _virtualHostName;
-
public BDBMessageStore()
{
@@ -146,8 +143,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
- _messageStoreStateManager = new StateManager(_eventManager);
- _configurationStoreStateManager = new StateManager(new EventManager());
}
@Override
@@ -159,20 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _configurationStoreStateManager.attainState(State.INITIALISING);
-
- _virtualHostName = virtualHostName;
- if (_environmentFacade == null)
+ if (_configurationStoreOpen.compareAndSet(false, true))
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, storeSettings);
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, storeSettings);
+ }
}
- _configurationStoreStateManager.attainState(State.INITIALISED);
}
@Override
public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
{
- _configurationStoreStateManager.attainState(State.ACTIVATING);
+ checkConfigurationStoreOpen();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
@@ -187,45 +181,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
}
recoverConfig(recoveryHandler);
-
- _configurationStoreStateManager.attainState(State.ACTIVE);
}
@Override
public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException
{
- _messageStoreStateManager.attainState(State.INITIALISING);
-
- _virtualHostName = virtualHostName;
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
- Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
- Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
- if (_environmentFacade == null)
- {
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, messageStoreSettings);
- }
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, messageStoreSettings);
+ }
- _committer = _environmentFacade.createCommitter(_virtualHostName);
- _committer.start();
+ _committer = _environmentFacade.createCommitter(virtualHostName);
+ _committer.start();
- _messageStoreStateManager.attainState(State.INITIALISED);
+ }
}
@Override
public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
{
- _messageStoreStateManager.attainState(State.ACTIVATING);
+ checkMessageStoreOpen();
+
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
@@ -248,13 +240,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
recoverQueueEntries(transactionLogRecoveryHandler);
}
-
- _messageStoreStateManager.attainState(State.ACTIVE);
}
@Override
public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
{
+ checkMessageStoreOpen();
+
return new BDBTransaction();
}
@@ -273,51 +265,48 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _environmentFacade;
}
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
@Override
public void closeMessageStore() throws StoreException
{
- _messageStoreStateManager.attainState(State.CLOSING);
- try
+ if (_messageStoreOpen.compareAndSet(true, false))
{
- if (_committer != null)
+ try
{
- _committer.stop();
+ if (_committer != null)
+ {
+ _committer.stop();
+ }
}
- }
- finally
- {
- if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL))
+ finally
{
- closeEnvironment();
+ if (!_configurationStoreOpen.get())
+ {
+ closeEnvironment();
+ }
}
}
- _messageStoreStateManager.attainState(State.CLOSED);
}
@Override
public void closeConfigurationStore() throws StoreException
{
- _configurationStoreStateManager.attainState(State.CLOSING);
- try
+ if (_configurationStoreOpen.compareAndSet(true, false))
{
- if (_committer != null)
+ try
{
- _committer.stop();
+ if (_committer != null)
+ {
+ _committer.stop();
+ }
}
- }
- finally
- {
- if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL))
+ finally
{
- closeEnvironment();
+ if (!_messageStoreOpen.get())
+ {
+ closeEnvironment();
+ }
}
}
- _configurationStoreStateManager.attainState(State.CLOSED);
}
private void closeEnvironment()
@@ -758,27 +747,25 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ checkConfigurationStoreOpen();
+ com.sleepycat.je.Transaction txn = null;
+ try
{
- com.sleepycat.je.Transaction txn = null;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- storeConfiguredObjectEntry(txn, configuredObject);
- txn.commit();
- txn = null;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
- + " in database: " + e.getMessage(), e);
- }
- finally
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ storeConfiguredObjectEntry(txn, configuredObject);
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
+ + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if (txn != null)
{
- if (txn != null)
- {
- abortTransactionIgnoringException("Error creating configured object", txn);
- }
+ abortTransactionIgnoringException("Error creating configured object", txn);
}
}
}
@@ -786,11 +773,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
+ checkConfigurationStoreOpen();
+
com.sleepycat.je.Transaction txn = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
-
+
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
{
@@ -799,7 +788,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
removed.add(record.getId());
}
}
-
+
txn.commit();
txn = null;
return removed.toArray(new UUID[removed.size()]);
@@ -821,6 +810,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
+ checkConfigurationStoreOpen();
+
com.sleepycat.je.Transaction txn = null;
try
{
@@ -892,7 +883,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @throws StoreException If the operation fails for any reason.
*/
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
@@ -931,7 +922,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
@@ -1348,38 +1339,35 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ if (LOGGER.isDebugEnabled())
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing configured object: " + configuredObject);
- }
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(configuredObject.getId(), key);
+ LOGGER.debug("Storing configured object record: " + configuredObject);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(configuredObject.getId(), key);
- DatabaseEntry value = new DatabaseEntry();
- ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
- queueBinding.objectToEntry(configuredObject, value);
- try
- {
- OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " to database: "
- + status);
- }
- writeHierarchyRecords(txn, configuredObject);
- }
- catch (DatabaseException e)
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
+ if (status != OperationStatus.SUCCESS)
{
- throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
- + " to database: " + e.getMessage(), e);
+ throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
}
+ writeHierarchyRecords(txn, configuredObject);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
}
}
-
+
private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
{
OperationStatus status;
@@ -1405,7 +1393,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
UUID id = record.getId();
Map<String, ConfiguredObjectRecord> parents = record.getParents();
-
+
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removing configured object: " + id);
@@ -1456,11 +1444,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
+ @Override
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaDataRef.get();
if(metaData == null)
{
+ checkMessageStoreOpen();
+
metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@@ -1468,11 +1459,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return metaData;
}
+ @Override
public long getMessageNumber()
{
return _messageId;
}
+ @Override
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
@@ -1495,6 +1488,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
+ @Override
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
@@ -1506,10 +1500,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
else
{
+ checkMessageStoreOpen();
+
return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
+ @Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
@@ -1546,10 +1543,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ @Override
public synchronized StoreFuture flushToStore()
{
if(!stored())
{
+ checkMessageStoreOpen();
+
com.sleepycat.je.Transaction txn;
try
{
@@ -1569,8 +1569,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
public void remove()
{
+ checkMessageStoreOpen();
+
int delta = getMetaData().getContentSize();
BDBMessageStore.this.removeMessage(_messageId, false);
storedSizeChangeOccured(-delta);
@@ -1599,8 +1602,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ @Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
+ checkMessageStoreOpen();
+
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
@@ -1611,36 +1617,54 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
}
+ @Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
}
+ @Override
public void commitTran() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.commitTranImpl(_txn, true);
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
}
+ @Override
public StoreFuture commitTranAsync() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
return BDBMessageStore.this.commitTranImpl(_txn, false);
}
+ @Override
public void abortTran() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.abortTran(_txn);
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
Record[] dequeues) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
}
}
@@ -1704,6 +1728,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ private void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
private void reduceSizeOnDisk()
{
_environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");