diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb')
2 files changed, 206 insertions, 254 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 492ec9d7bf..4fe41d512a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,17 +20,17 @@ package org.apache.qpid.server.store.berkeleydb; * */ +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -47,8 +47,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class); private BDBMessageStore _messageStore; - - private boolean _inVhostInitiatedClose; + private MessageStoreLogSubject _messageStoreLogSubject; BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, @@ -60,28 +59,18 @@ public class BDBHAVirtualHost extends AbstractVirtualHost protected void initialiseStorage(VirtualHost virtualHost) { - _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); - - final MessageStoreLogSubject storeLogSubject = - new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); - OperationalLoggingListener.listen(_messageStore, storeLogSubject, getEventLogger()); - - _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); - _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); - _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); - + setState(State.PASSIVE); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), BDBMessageStore.class.getSimpleName()); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED()); - _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); - _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); - _messageStore.openConfigurationStore( - virtualHost.getName(), virtualHost.getMessageStoreSettings() - ); + _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings); + _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings); - _messageStore.openMessageStore( - virtualHost.getName(), virtualHost.getMessageStoreSettings() - ); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); // Make the virtualhost model object a replication group listener ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade(); @@ -89,30 +78,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost } - - protected void closeStorage() - { - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - _inVhostInitiatedClose = true; - _messageStore.closeMessageStore(); - _messageStore.closeConfigurationStore(); - } - catch (Exception e) - { - getLogger().error("Failed to close message store", e); - } - finally - { - _inVhostInitiatedClose = false; - } - } - } - @Override public DurableConfigurationStore getDurableConfigurationStore() { @@ -125,77 +90,62 @@ public class BDBHAVirtualHost extends AbstractVirtualHost return _messageStore; } - private final class AfterInitialisationListener implements EventListener + private void activate() { - public void event(Event event) + try { - setState(State.PASSIVE); - } + _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - } + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); + _messageStore.recoverConfigurationStore(getModel(), configRecoverer); - private final class BeforePassivationListener implements EventListener - { - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - /* the approach here is not ideal as there is a race condition where a - * queue etc could be created while the virtual host is on the way to - * the passivated state. However the store state change from MASTER to UNKNOWN - * is documented as exceptionally rare.. - */ - - getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); + initialiseModel(); - getQueueRegistry().stopAllAndUnregisterMBeans(); - getExchangeRegistry().clearAndUnregisterMbeans(); - getDtxRegistry().close(); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); + _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); - finalState = State.PASSIVE; - } - finally - { - setState(finalState); - reportIfError(getState()); - } + attainActivation(); } - - } - - - private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) + catch (Exception e) { - initialiseModel(); + LOGGER.error("Failed to activate on hearing MASTER change event", e); } } - private final class AfterActivationListener implements EventListener + private void passivate() { - @Override - public void event(Event event) + State finalState = State.ERRORED; + + try { - attainActivation(); + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare. + */ + + getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + removeHouseKeepingTasks(); + + getQueueRegistry().stopAllAndUnregisterMBeans(); + getExchangeRegistry().clearAndUnregisterMbeans(); + getDtxRegistry().close(); + + finalState = State.PASSIVE; + } + finally + { + setState(finalState); + reportIfError(getState()); } } - private final class BeforeCloseListener implements EventListener + @Override + protected MessageStoreLogSubject getMessageStoreLogSubject() { - @Override - public void event(Event event) - { - if(!_inVhostInitiatedClose) - { - shutdownHouseKeeping(); - } - - } + return _messageStoreLogSubject; } private class BDBHAMessageStoreStateChangeListener implements StateChangeListener @@ -208,8 +158,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost if (LOGGER.isInfoEnabled()) { - LOGGER.info("Received BDB event indicating transition to state " + state - + " when current message store state is " + _messageStore._messageStoreStateManager.getState()); + LOGGER.info("Received BDB event indicating transition to state " + state); } switch (state) @@ -232,43 +181,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost throw new IllegalStateException("Unexpected state change: " + state); } } - - private void activate() - { - try - { - _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); - _messageStore.recoverConfigurationStore(getModel(), configRecoverer); - - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this); - _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); - } - catch (Exception e) - { - LOGGER.error("Failed to activate on hearing MASTER change event", e); - } - } - - private void passivate() - { - try - { - //TODO: move this this into the store method passivate() - if (_messageStore._messageStoreStateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) - { - _messageStore._messageStoreStateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - } - } - catch (Exception e) - { - LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e); - } - } - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index ec6ae23367..9f8e5410a7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; @@ -47,8 +48,6 @@ import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.State; -import org.apache.qpid.server.store.StateManager; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; @@ -119,8 +118,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private EnvironmentFacade _environmentFacade; private final AtomicLong _messageId = new AtomicLong(0); - protected final StateManager _messageStoreStateManager; - private final StateManager _configurationStoreStateManager; + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); private long _totalStoreSize; private boolean _limitBusted; @@ -134,8 +133,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private volatile Committer _committer; - private String _virtualHostName; - public BDBMessageStore() { @@ -146,8 +143,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; - _messageStoreStateManager = new StateManager(_eventManager); - _configurationStoreStateManager = new StateManager(new EventManager()); } @Override @@ -159,20 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) { - _configurationStoreStateManager.attainState(State.INITIALISING); - - _virtualHostName = virtualHostName; - if (_environmentFacade == null) + if (_configurationStoreOpen.compareAndSet(false, true)) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, storeSettings); + if (_environmentFacade == null) + { + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, storeSettings); + } } - _configurationStoreStateManager.attainState(State.INITIALISED); } @Override public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) { - _configurationStoreStateManager.attainState(State.ACTIVATING); + checkConfigurationStoreOpen(); DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); @@ -187,45 +181,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore throw _environmentFacade.handleDatabaseException("Cannot configure store", e); } recoverConfig(recoveryHandler); - - _configurationStoreStateManager.attainState(State.ACTIVE); } @Override public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException { - _messageStoreStateManager.attainState(State.INITIALISING); - - _virtualHostName = virtualHostName; + if (_messageStoreOpen.compareAndSet(false, true)) + { - Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); - Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } - if (_environmentFacade == null) - { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, messageStoreSettings); - } + if (_environmentFacade == null) + { + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, messageStoreSettings); + } - _committer = _environmentFacade.createCommitter(_virtualHostName); - _committer.start(); + _committer = _environmentFacade.createCommitter(virtualHostName); + _committer.start(); - _messageStoreStateManager.attainState(State.INITIALISED); + } } @Override public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException { - _messageStoreStateManager.attainState(State.ACTIVATING); + checkMessageStoreOpen(); + DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); @@ -248,13 +240,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { recoverQueueEntries(transactionLogRecoveryHandler); } - - _messageStoreStateManager.attainState(State.ACTIVE); } @Override public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException { + checkMessageStoreOpen(); + return new BDBTransaction(); } @@ -273,51 +265,48 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _environmentFacade; } - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ @Override public void closeMessageStore() throws StoreException { - _messageStoreStateManager.attainState(State.CLOSING); - try + if (_messageStoreOpen.compareAndSet(true, false)) { - if (_committer != null) + try { - _committer.stop(); + if (_committer != null) + { + _committer.stop(); + } } - } - finally - { - if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL)) + finally { - closeEnvironment(); + if (!_configurationStoreOpen.get()) + { + closeEnvironment(); + } } } - _messageStoreStateManager.attainState(State.CLOSED); } @Override public void closeConfigurationStore() throws StoreException { - _configurationStoreStateManager.attainState(State.CLOSING); - try + if (_configurationStoreOpen.compareAndSet(true, false)) { - if (_committer != null) + try { - _committer.stop(); + if (_committer != null) + { + _committer.stop(); + } } - } - finally - { - if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL)) + finally { - closeEnvironment(); + if (!_messageStoreOpen.get()) + { + closeEnvironment(); + } } } - _configurationStoreStateManager.attainState(State.CLOSED); } private void closeEnvironment() @@ -758,27 +747,25 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void create(ConfiguredObjectRecord configuredObject) throws StoreException { - if (_configurationStoreStateManager.isInState(State.ACTIVE)) + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; + try { - com.sleepycat.je.Transaction txn = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - storeConfiguredObjectEntry(txn, configuredObject); - txn.commit(); - txn = null; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject - + " in database: " + e.getMessage(), e); - } - finally + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + storeConfiguredObjectEntry(txn, configuredObject); + txn.commit(); + txn = null; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + + " in database: " + e.getMessage(), e); + } + finally + { + if (txn != null) { - if (txn != null) - { - abortTransactionIgnoringException("Error creating configured object", txn); - } + abortTransactionIgnoringException("Error creating configured object", txn); } } } @@ -786,11 +773,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; try { txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - + Collection<UUID> removed = new ArrayList<UUID>(objects.length); for(ConfiguredObjectRecord record : objects) { @@ -799,7 +788,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore removed.add(record.getId()); } } - + txn.commit(); txn = null; return removed.toArray(new UUID[removed.size()]); @@ -821,6 +810,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; try { @@ -892,7 +883,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason. */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws StoreException { @@ -931,7 +922,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws StoreException { @@ -1348,38 +1339,35 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException { - if (_configurationStoreStateManager.isInState(State.ACTIVE)) + if (LOGGER.isDebugEnabled()) { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing configured object: " + configuredObject); - } - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(configuredObject.getId(), key); + LOGGER.debug("Storing configured object record: " + configuredObject); + } + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); - DatabaseEntry value = new DatabaseEntry(); - ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); - queueBinding.objectToEntry(configuredObject, value); - try - { - OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " to database: " - + status); - } - writeHierarchyRecords(txn, configuredObject); - } - catch (DatabaseException e) + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); + if (status != OperationStatus.SUCCESS) { - throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject - + " to database: " + e.getMessage(), e); + throw new StoreException("Error writing configured object " + configuredObject + " to database: " + + status); } + writeHierarchyRecords(txn, configuredObject); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); } } - + private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) { OperationStatus status; @@ -1405,7 +1393,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { UUID id = record.getId(); Map<String, ConfiguredObjectRecord> parents = record.getParents(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removing configured object: " + id); @@ -1456,11 +1444,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } + @Override public StorableMessageMetaData getMetaData() { StorableMessageMetaData metaData = _metaDataRef.get(); if(metaData == null) { + checkMessageStoreOpen(); + metaData = BDBMessageStore.this.getMessageMetaData(_messageId); _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } @@ -1468,11 +1459,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return metaData; } + @Override public long getMessageNumber() { return _messageId; } + @Override public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { src = src.slice(); @@ -1495,6 +1488,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } + @Override public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { byte[] data = _dataRef == null ? null : _dataRef.get(); @@ -1506,10 +1500,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } else { + checkMessageStoreOpen(); + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); } } + @Override public ByteBuffer getContent(int offsetInMessage, int size) { byte[] data = _dataRef == null ? null : _dataRef.get(); @@ -1546,10 +1543,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + @Override public synchronized StoreFuture flushToStore() { if(!stored()) { + checkMessageStoreOpen(); + com.sleepycat.je.Transaction txn; try { @@ -1569,8 +1569,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return StoreFuture.IMMEDIATE_FUTURE; } + @Override public void remove() { + checkMessageStoreOpen(); + int delta = getMetaData().getContentSize(); BDBMessageStore.this.removeMessage(_messageId, false); storedSizeChangeOccured(-delta); @@ -1599,8 +1602,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { + checkMessageStoreOpen(); + if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); @@ -1611,36 +1617,54 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); } + @Override public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); } + @Override public void commitTran() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.commitTranImpl(_txn, true); BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); } + @Override public StoreFuture commitTranAsync() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); return BDBMessageStore.this.commitTranImpl(_txn, false); } + @Override public void abortTran() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.abortTran(_txn); } + @Override public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); } + @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); } } @@ -1704,6 +1728,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + private void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + private void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + private void reduceSizeOnDisk() { _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); |