diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java | 1913 |
1 files changed, 0 insertions, 1913 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java deleted file mode 100644 index 9a06f06999..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ /dev/null @@ -1,1913 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.*; -import com.sleepycat.je.Transaction; - -import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.*; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; -import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; -import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; -import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.*; -import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; -import org.apache.qpid.util.FileUtils; - -public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore -{ - private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class); - - private static final int LOCK_RETRY_ATTEMPTS = 5; - - public static final int VERSION = 8; - - private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() - {{ - put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); - put(EnvironmentConfig.STATS_COLLECT, "false"); // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84 - }}); - - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private Environment _environment; - - private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; - private static String CONFIGURED_OBJECT_HIERARCHY = "CONFIGURED_OBJECT_HIERARCHY"; - - private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; - private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; - private static String BRIDGEDB_NAME = "BRIDGES"; - private static String LINKDB_NAME = "LINKS"; - private static String XIDDB_NAME = "XIDS"; - private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; - - private Database _configuredObjectsDb; - private Database _configuredObjectHierarchyDb; - private Database _configVersionDb; - private Database _messageMetaDataDb; - private Database _messageContentDb; - private Database _deliveryDb; - private Database _bridgeDb; - private Database _linkDb; - private Database _xidDb; - - /* ======= - * Schema: - * ======= - * - * Queue: - * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), - * arguments(FieldTable encoded as binary), exclusive (boolean) - * - * Exchange: - * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean) - * - * Binding: - * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString), - * arguments (FieldTable encoded as binary) - 0 (zero) - * - * QueueEntry: - * queueName(AMQShortString), messageId (long) - 0 (zero) - * - * Message (MetaData): - * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary) - * - * Message (Content): - * messageId (long), byteOffset (integer) - dataLength(integer), data(binary) - */ - - private final AtomicLong _messageId = new AtomicLong(0); - - protected final StateManager _stateManager; - - private MessageStoreRecoveryHandler _messageRecoveryHandler; - - private TransactionLogRecoveryHandler _tlogRecoveryHandler; - - private ConfigurationRecoveryHandler _configRecoveryHandler; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - private final EventManager _eventManager = new EventManager(); - private String _storeLocation; - - private Map<String, String> _envConfigMap; - private VirtualHost _virtualHost; - - - public AbstractBDBMessageStore() - { - _stateManager = new StateManager(_eventManager); - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) - { - _stateManager.attainState(State.INITIALISING); - - _configRecoveryHandler = recoveryHandler; - _virtualHost = virtualHost; - } - - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) - { - if(_stateManager.isInState(State.INITIAL)) - { - // Is acting as a message store, but not a durable config store - _stateManager.attainState(State.INITIALISING); - } - - _messageRecoveryHandler = messageRecoveryHandler; - _tlogRecoveryHandler = tlogRecoveryHandler; - _virtualHost = virtualHost; - - completeInitialisation(); - } - - private void completeInitialisation() - { - configure(); - - _stateManager.attainState(State.INITIALISED); - } - - public synchronized void activate() - { - // check if acting as a durable config store, but not a message store - if(_stateManager.isInState(State.INITIALISING)) - { - completeInitialisation(); - } - _stateManager.attainState(State.ACTIVATING); - - if(_configRecoveryHandler != null) - { - recoverConfig(_configRecoveryHandler); - } - if(_messageRecoveryHandler != null) - { - recoverMessages(_messageRecoveryHandler); - } - if(_tlogRecoveryHandler != null) - { - recoverQueueEntries(_tlogRecoveryHandler); - } - - _stateManager.attainState(State.ACTIVE); - } - - public org.apache.qpid.server.store.Transaction newTransaction() - { - return new BDBTransaction(); - } - - /** - * Called after instantiation in order to configure the message store. - * - * @return whether a new store environment was created or not (to indicate whether recovery is necessary) - * - * @throws Exception If any error occurs that means the store is unable to configure itself. - */ - public void configure() - { - configure(_messageRecoveryHandler != null); - } - - public void configure(boolean isMessageStore) - { - String name = _virtualHost.getName(); - final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation; - if(isMessageStore) - { - storeLocation = (String) _virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - else // we are acting only as the durable config store - { - storeLocation = (String) _virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - - Object overfullAttr = _virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = _virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - File environmentPath = new File(storeLocation); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - _storeLocation = storeLocation; - - _envConfigMap = new HashMap<String, String>(); - _envConfigMap.putAll(ENVCONFIG_DEFAULTS); - - Object bdbEnvConfigAttr = _virtualHost.getAttribute("bdbEnvironmentConfig"); - if(bdbEnvConfigAttr instanceof Map) - { - _envConfigMap.putAll((Map)bdbEnvConfigAttr); - } - - LOGGER.info("Configuring BDB message store"); - - setupStore(environmentPath); - } - - @Override - public String getStoreLocation() - { - return _storeLocation; - } - - /** - * Move the store state from INITIAL to ACTIVE without actually recovering. - * - * This is required if you do not want to perform recovery of the store data - * - * @throws org.apache.qpid.server.store.StoreException if the store is not in the correct state - */ - void startWithNoRecover() throws StoreException - { - _stateManager.attainState(State.INITIALISING); - _stateManager.attainState(State.INITIALISED); - _stateManager.attainState(State.ACTIVATING); - _stateManager.attainState(State.ACTIVE); - } - - protected void setupStore(File storePath) - { - _environment = createEnvironment(storePath); - - new Upgrader(_environment, _virtualHost).upgradeIfNecessary(); - - openDatabases(); - - _totalStoreSize = getSizeOnDisk(); - } - - protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException; - - public Environment getEnvironment() - { - return _environment; - } - - protected final VirtualHost getVirtualHost() - { - return _virtualHost; - } - - private void openDatabases() throws DatabaseException - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - - //This is required if we are wanting read only access. - dbConfig.setReadOnly(false); - - _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); - _configuredObjectHierarchyDb = openDatabase(CONFIGURED_OBJECT_HIERARCHY, dbConfig); - _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); - _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); - _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); - _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); - _linkDb = openDatabase(LINKDB_NAME, dbConfig); - _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig); - _xidDb = openDatabase(XIDDB_NAME, dbConfig); - } - - private Database openDatabase(final String dbName, final DatabaseConfig dbConfig) - { - // if opening read-only and the database doesn't exist, then you can't create it - return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName) - ? null - : _environment.openDatabase(null, dbName, dbConfig); - } - - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ - public void close() - { - if (_closed.compareAndSet(false, true)) - { - _stateManager.attainState(State.CLOSING); - closeInternal(); - _stateManager.attainState(State.CLOSED); - } - } - - protected void closeInternal() - { - if (_messageMetaDataDb != null) - { - LOGGER.info("Closing message metadata database"); - _messageMetaDataDb.close(); - } - - if (_messageContentDb != null) - { - LOGGER.info("Closing message content database"); - _messageContentDb.close(); - } - - if (_configuredObjectsDb != null) - { - LOGGER.info("Closing configurable objects database"); - _configuredObjectsDb.close(); - } - - if (_configuredObjectHierarchyDb != null) - { - LOGGER.info("Closing configurable object hierarchy database"); - _configuredObjectHierarchyDb.close(); - } - - if (_deliveryDb != null) - { - LOGGER.info("Close delivery database"); - _deliveryDb.close(); - } - - if (_bridgeDb != null) - { - LOGGER.info("Close bridge database"); - _bridgeDb.close(); - } - - if (_linkDb != null) - { - LOGGER.info("Close link database"); - _linkDb.close(); - } - - - if (_xidDb != null) - { - LOGGER.info("Close xid database"); - _xidDb.close(); - } - - - if (_configVersionDb != null) - { - LOGGER.info("Close config version database"); - _configVersionDb.close(); - } - - closeEnvironment(); - - } - - private void closeEnvironment() throws DatabaseException - { - if (_environment != null) - { - // Clean the log before closing. This makes sure it doesn't contain - // redundant data. Closing without doing this means the cleaner may not - // get a chance to finish. - try - { - _environment.cleanLog(); - } - finally - { - _environment.close(); - } - } - } - - - private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) - { - try - { - final int configVersion = getConfigVersion(); - recoveryHandler.beginConfigurationRecovery(this, configVersion); - loadConfiguredObjects(recoveryHandler); - - final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - - private void updateConfigVersion(int newConfigVersion) throws StoreException - { - Cursor cursor = null; - try - { - Transaction txn = _environment.beginTransaction(null, null); - cursor = _configVersionDb.openCursor(txn, null); - DatabaseEntry key = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0,key); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - IntegerBinding.intToEntry(newConfigVersion, value); - OperationStatus status = cursor.put(key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error setting config version: " + status); - } - } - cursor.close(); - cursor = null; - txn.commit(); - } - finally - { - closeCursorSafely(cursor); - } - - } - - private int getConfigVersion() throws StoreException - { - Cursor cursor = null; - try - { - cursor = _configVersionDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - return IntegerBinding.entryToInt(value); - } - - // Insert 0 as the default config version - IntegerBinding.intToEntry(0,value); - ByteBinding.byteToEntry((byte) 0,key); - OperationStatus status = _configVersionDb.put(null, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error initialising config version: " + status); - } - return 0; - } - finally - { - closeCursorSafely(cursor); - } - } - - private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException - { - Cursor objectsCursor = null; - Cursor hierarchyCursor = null; - try - { - objectsCursor = _configuredObjectsDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - Map<UUID, BDBConfiguredObjectRecord> configuredObjects = - new HashMap<UUID, BDBConfiguredObjectRecord>(); - - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - BDBConfiguredObjectRecord configuredObject = - (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); - configuredObjects.put(configuredObject.getId(), configuredObject); - } - - // set parents - hierarchyCursor = _configuredObjectHierarchyDb.openCursor(null, null); - while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); - if(child != null) - { - ConfiguredObjectRecord parent = configuredObjects.get(parentId); - if(parent != null) - { - child.addParent(hk.getParentType(), parent); - } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); - } - } - } - - for (ConfiguredObjectRecord record : configuredObjects.values()) - { - crh.configuredObject(record); - } - - - - } - finally - { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); - } - } - - private void closeCursorSafely(Cursor cursor) - { - if (cursor != null) - { - cursor.close(); - } - } - - - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = _messageMetaDataDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws DatabaseException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - qerh.queueEntry(queueId, messageId); - } - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); - - cursor = null; - try - { - cursor = _xidDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); - } - - } - catch (DatabaseException e) - { - LOGGER.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - closeCursorSafely(cursor); - } - - - dtxrh.completeDtxRecordRecovery(); - } - - public void removeMessage(long messageId, boolean sync) throws StoreException - { - - boolean complete = false; - com.sleepycat.je.Transaction tx = null; - - Random rand = null; - int attempts = 0; - try - { - do - { - tx = null; - try - { - tx = _environment.beginTransaction(null, null); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removing message id " + messageId); - } - - - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted metadata for message " + messageId); - } - - //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - _messageContentDb.delete(tx, contentKeyEntry); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleted content for message " + messageId); - } - - commit(tx, sync); - complete = true; - tx = null; - } - catch (LockConflictException e) - { - try - { - if(tx != null) - { - tx.abort(); - } - } - catch(DatabaseException e2) - { - LOGGER.warn("Unable to abort transaction after LockConflictExcption", e2); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw e; - } - - - LOGGER.warn("Lock timeout exception. Retrying (attempt " - + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); - - if(++attempts < LOCK_RETRY_ATTEMPTS) - { - if(rand == null) - { - rand = new Random(); - } - - try - { - Thread.sleep(500l + (long)(500l * rand.nextDouble())); - } - catch (InterruptedException e1) - { - - } - } - else - { - // rethrow the lock conflict exception since we could not solve by retrying - throw e; - } - } - } - while(!complete); - } - catch (DatabaseException e) - { - LOGGER.error("Unexpected BDB exception", e); - - if (tx != null) - { - try - { - tx.abort(); - tx = null; - } - catch (DatabaseException e1) - { - throw new StoreException("Error aborting transaction " + e1, e1); - } - } - - throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); - } - finally - { - if (tx != null) - { - try - { - tx.abort(); - tx = null; - } - catch (DatabaseException e1) - { - throw new StoreException("Error aborting transaction " + e1, e1); - } - } - } - } - - @Override - public void create(ConfiguredObjectRecord configuredObject) throws StoreException - { - - if (_stateManager.isInState(State.ACTIVE)) - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - - storeConfiguredObjectEntry(txn, configuredObject); - - txn.commit(); - } - } - - public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - for(ConfiguredObjectRecord record : objects) - { - if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS) - { - removed.add(record.getId()); - } - } - - txn.commit(); - return removed.toArray(new UUID[removed.size()]); - } - - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - for(ConfiguredObjectRecord record : records) - { - update(createIfNecessary, record, txn); - } - txn.commit(); - } - - private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws - StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); - } - - try - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(record.getId(), key); - - DatabaseEntry value = new DatabaseEntry(); - DatabaseEntry newValue = new DatabaseEntry(); - ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - - OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); - final boolean isNewRecord = status == OperationStatus.NOTFOUND; - if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord)) - { - - // write the updated entry to the store - configuredObjectBinding.objectToEntry(record, newValue); - status = _configuredObjectsDb.put(txn, key, newValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error updating queue details within the store: " + status); - } - if(isNewRecord) - { - writeHierarchyRecords(txn, record); - } - } - else if (status != OperationStatus.NOTFOUND) - { - throw new StoreException("Error finding queue details within the store: " + status); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error updating queue details within the store: " + e,e); - } - } - - /** - * Places a message onto a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The the queue to place the message on. - * @param messageId The message to enqueue. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); - keyBinding.objectToEntry(dd, key); - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " in transaction " + tx); - } - _deliveryDb.put(tx, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " to database", e); - } - } - - /** - * Extracts a message from a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The queue to take the message from. - * @param messageId The message to dequeue. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, - long messageId) throws StoreException - { - - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); - keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Dequeue message id " + messageId + " from queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - - try - { - - OperationStatus status = _deliveryDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Removed message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id - + " from delivery db"); - - } - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e); - LOGGER.error(tx); - - throw new StoreException("Error accessing database while dequeuing message: " + e.getMessage(), e); - } - } - - - private void recordXid(com.sleepycat.je.Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - keyBinding.objectToEntry(xid,key); - - DatabaseEntry value = new DatabaseEntry(); - PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - valueBinding.objectToEntry(preparedTransaction, value); - - try - { - _xidDb.put(txn, key, value); - } - catch (DatabaseException e) - { - LOGGER.error("Failed to write xid: " + e.getMessage(), e); - throw new StoreException("Error writing xid to database", e); - } - } - - private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) - throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - Xid xid = new Xid(format, globalId, branchId); - XidBinding keyBinding = XidBinding.getInstance(); - - keyBinding.objectToEntry(xid, key); - - - try - { - - OperationStatus status = _xidDb.delete(txn, key); - if (status == OperationStatus.NOTFOUND) - { - throw new StoreException("Unable to find xid"); - } - else if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Unable to remove xid"); - } - - } - catch (DatabaseException e) - { - - LOGGER.error("Failed to remove xid ", e); - LOGGER.error(txn); - - throw new StoreException("Error accessing database while removing xid: " + e.getMessage(), e); - } - } - - /** - * Commits all operations performed within a given transaction. - * - * @param tx The transaction to commit all operations for. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws - StoreException - { - if (tx == null) - { - throw new StoreException("Fatal internal error: transactional is null at commitTran"); - } - - StoreFuture result; - try - { - result = commit(tx, syncCommit); - - if (LOGGER.isDebugEnabled()) - { - String transactionType = syncCommit ? "synchronous" : "asynchronous"; - LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error commit tx: " + e.getMessage(), e); - } - - return result; - } - - /** - * Abandons all operations performed within a given transaction. - * - * @param tx The transaction to abandon. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("abortTran called for transaction " + tx); - } - - try - { - tx.abort(); - } - catch (DatabaseException e) - { - throw new StoreException("Error aborting transaction: " + e.getMessage(), e); - } - } - - /** - * Primarily for testing purposes. - * - * @param queueId - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List<Long> getEnqueuedMessages(UUID queueId) throws StoreException - { - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueId, 0); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList<Long> messageIds = new LinkedList<Long>(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new StoreException("Database error: " + e.getMessage(), e); - } - finally - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch (DatabaseException e) - { - throw new StoreException("Error closing cursor: " + e.getMessage(), e); - } - } - } - } - - /** - * Return a valid, currently unused message id. - * - * @return A fresh message id. - */ - public long getNewMessageId() - { - return _messageId.incrementAndGet(); - } - - /** - * Stores a chunk of message data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param offset The offset of the data chunk in the message. - * @param contentBody The content of the data chunk. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, - ByteBuffer contentBody) throws StoreException - { - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding messageBinding = ContentBinding.getInstance(); - messageBinding.objectToEntry(contentBody.array(), value); - try - { - OperationStatus status = _messageContentDb.put(tx, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error adding content for message id " + messageId + ": " + status); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); - - } - } - catch (DatabaseException e) - { - throw new StoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Stores message meta-data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param messageMetaData The message meta data to store. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, - StorableMessageMetaData messageMetaData) - throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("storeMetaData called for transaction " + tx - + ", messageId " + messageId - + ", messageMetaData " + messageMetaData); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - messageBinding.objectToEntry(messageMetaData, value); - try - { - _messageMetaDataDb.put(tx, key, value); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); - } - } - catch (DatabaseException e) - { - throw new StoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Retrieves message meta-data. - * - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = " - + messageId + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance(); - - try - { - OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Metadata not found for message with id " + messageId); - } - - StorableMessageMetaData mdd = messageBinding.entryToObject(value); - - return mdd; - } - catch (DatabaseException e) - { - throw new StoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); - } - } - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - ContentBinding contentTupleBinding = ContentBinding.getInstance(); - - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); - } - - try - { - - int written = 0; - OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - if (status == OperationStatus.SUCCESS) - { - byte[] dataAsBytes = contentTupleBinding.entryToObject(value); - int size = dataAsBytes.length; - if (offset > size) - { - throw new StoreException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); - - } - - written = size - offset; - if(written > dst.remaining()) - { - written = dst.remaining(); - } - - dst.put(dataAsBytes, offset, written); - } - return written; - } - catch (DatabaseException e) - { - throw new StoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - public boolean isPersistent() - { - return true; - } - - @SuppressWarnings("unchecked") - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - if(metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNewMessageId(), metaData); - } - } - - //Package getters for the various databases used by the Store - - Database getMetaDataDb() - { - return _messageMetaDataDb; - } - - Database getContentDb() - { - return _messageContentDb; - } - - Database getDeliveryDb() - { - return _deliveryDb; - } - - /** - * Makes the specified configured object persistent. - * - * - * @param txn - * @param configuredObject Details of the configured object to store. - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. - */ - private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - LOGGER.debug("Storing configured object: " + configuredObject); - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(configuredObject.getId(), key); - - DatabaseEntry value = new DatabaseEntry(); - ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); - - queueBinding.objectToEntry(configuredObject, value); - try - { - OperationStatus status = _configuredObjectsDb.put(txn, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " to database: " - + status); - } - writeHierarchyRecords(txn, configuredObject); - } - catch (DatabaseException e) - { - throw new StoreException("Error writing configured object " + configuredObject - + " to database: " + e.getMessage(), e); - } - } - } - - private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) - { - OperationStatus status; - HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance(); - DatabaseEntry hierarchyKey = new DatabaseEntry(); - DatabaseEntry hierarchyValue = new DatabaseEntry(); - - for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet()) - { - - hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey); - UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue); - status = _configuredObjectHierarchyDb.put(txn, hierarchyKey, hierarchyValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: " - + status); - } - } - } - - private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException - { - UUID id = record.getId(); - Map<String, ConfiguredObjectRecord> parents = record.getParents(); - LOGGER.debug("Removing configured object: " + id); - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(id, key); - try - { - OperationStatus status = _configuredObjectsDb.delete(tx, key); - if(status == OperationStatus.SUCCESS) - { - for(String parentType : parents.keySet()) - { - DatabaseEntry hierarchyKey = new DatabaseEntry(); - HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance(); - keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey); - _configuredObjectHierarchyDb.delete(tx, hierarchyKey); - } - } - return status; - } - catch (DatabaseException e) - { - throw new StoreException("Error deleting of configured object with id " + id + " from database", e); - } - } - - protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException; - - - private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData> - { - - private final long _messageId; - private final boolean _isRecovered; - - private StorableMessageMetaData _metaData; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData) - { - this(messageId, metaData, false); - } - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) - { - _messageId = messageId; - _isRecovered = isRecovered; - - if(!_isRecovered) - { - _metaData = metaData; - } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - public StorableMessageMetaData getMetaData() - { - StorableMessageMetaData metaData = _metaDataRef.get(); - if(metaData == null) - { - metaData = AbstractBDBMessageStore.this.getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - return metaData; - } - - public long getMessageNumber() - { - return _messageId; - } - - public void addContent(int offsetInMessage, java.nio.ByteBuffer src) - { - src = src.slice(); - - if(_data == null) - { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); - } - else - { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); - } - - } - - public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else - { - return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } - } - - public ByteBuffer getContent(int offsetInMessage, int size) - { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else - { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; - } - } - - synchronized void store(com.sleepycat.je.Transaction txn) - { - if (!stored()) - { - try - { - _dataRef = new SoftReference<byte[]>(_data); - AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); - } - catch(DatabaseException e) - { - throw new StoreException(e); - } - finally - { - _metaData = null; - _data = null; - } - } - } - - public synchronized StoreFuture flushToStore() - { - if(!stored()) - { - com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); - store(txn); - AbstractBDBMessageStore.this.commit(txn,true); - storedSizeChange(getMetaData().getContentSize()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - - public void remove() - { - int delta = getMetaData().getContentSize(); - AbstractBDBMessageStore.this.removeMessage(_messageId, false); - storedSizeChange(-delta); - } - - private boolean stored() - { - return _metaData == null || _isRecovered; - } - } - - private class BDBTransaction implements org.apache.qpid.server.store.Transaction - { - private com.sleepycat.je.Transaction _txn; - private int _storeSizeIncrease; - - private BDBTransaction() - { - try - { - _txn = _environment.beginTransaction(null, null); - } - catch (DatabaseException e) - { - LOGGER.error("Exception during transaction begin, closing store environment.", e); - closeEnvironmentSafely(); - - throw new StoreException("Exception during transaction begin, store environment closed.", e); - } - } - - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - if(message.getStoredMessage() instanceof StoredBDBMessage) - { - final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); - } - - AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); - } - - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); - } - - public void commitTran() - { - AbstractBDBMessageStore.this.commitTranImpl(_txn, true); - AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); - } - - public StoreFuture commitTranAsync() - { - AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); - return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); - } - - public void abortTran() - { - AbstractBDBMessageStore.this.abortTran(_txn); - } - - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); - - } - - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) - { - AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); - } - } - - private void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized (this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 2*delta; - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - _totalStoreSize = getSizeOnDisk(); - - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - _totalStoreSize = getSizeOnDisk(); - - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(); - - _totalStoreSize = getSizeOnDisk(); - - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - } - } - - private void reduceSizeOnDisk() - { - _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); - boolean cleaned = false; - while (_environment.cleanLog() > 0) - { - cleaned = true; - } - if (cleaned) - { - CheckpointConfig force = new CheckpointConfig(); - force.setForce(true); - _environment.checkpoint(force); - } - - - _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); - } - - private long getSizeOnDisk() - { - return _environment.getStats(null).getTotalLogSize(); - } - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - private void setEnvironmentConfigProperties(EnvironmentConfig envConfig) - { - for (Map.Entry<String, String> configItem : _envConfigMap.entrySet()) - { - LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); - envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); - } - } - - protected EnvironmentConfig createEnvironmentConfig() - { - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - - setEnvironmentConfigProperties(envConfig); - - envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - - return envConfig; - } - - protected void closeEnvironmentSafely() - { - try - { - _environment.close(); - } - catch (DatabaseException ex) - { - LOGGER.error("Exception closing store environment", ex); - } - catch (IllegalStateException ex) - { - LOGGER.error("Exception closing store environment", ex); - } - } - - - private class LoggingAsyncExceptionListener implements ExceptionListener - { - @Override - public void exceptionThrown(ExceptionEvent event) - { - LOGGER.error("Asynchronous exception thrown by BDB thread '" - + event.getThreadName() + "'", event.getException()); - } - } - - @Override - public void onDelete() - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store " + _storeLocation); - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - LOGGER.error("Cannot delete " + _storeLocation); - } - } - } - } - - void setVirtualHost(final VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - -} |