summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java2025
1 files changed, 29 insertions, 1996 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 402df299fc..4484c140e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,84 +21,19 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.Queue;
-import java.util.Random;
-import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -108,244 +43,27 @@ import com.sleepycat.je.TransactionConfig;
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore extends AbstractBDBMessageStore
{
- private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
-
- private static final int LOCK_RETRY_ATTEMPTS = 5;
-
- public static final int VERSION = 6;
-
- public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
-
- private Environment _environment;
-
- private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS";
- private String DELIVERYDB_NAME = "DELIVERIES";
- private String EXCHANGEDB_NAME = "EXCHANGES";
- private String QUEUEDB_NAME = "QUEUES";
- private String BRIDGEDB_NAME = "BRIDGES";
- private String LINKDB_NAME = "LINKS";
- private String XIDDB_NAME = "XIDS";
-
-
- private Database _messageMetaDataDb;
- private Database _messageContentDb;
- private Database _queueBindingsDb;
- private Database _deliveryDb;
- private Database _exchangeDb;
- private Database _queueDb;
- private Database _bridgeDb;
- private Database _linkDb;
- private Database _xidDb;
-
- /* =======
- * Schema:
- * =======
- *
- * Queue:
- * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
- * arguments(FieldTable encoded as binary), exclusive (boolean)
- *
- * Exchange:
- * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
- *
- * Binding:
- * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
- * arguments (FieldTable encoded as binary) - 0 (zero)
- *
- * QueueEntry:
- * queueName(AMQShortString), messageId (long) - 0 (zero)
- *
- * Message (MetaData):
- * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
- *
- * Message (Content):
- * messageId (long), byteOffset (integer) - dataLength(integer), data(binary)
- */
-
- private LogSubject _logSubject;
-
- private final AtomicLong _messageId = new AtomicLong(0);
+ private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
- private enum State
- {
- INITIAL,
- CONFIGURING,
- CONFIGURED,
- RECOVERING,
- STARTED,
- CLOSING,
- CLOSED
- }
-
- private State _state = State.INITIAL;
-
- private TransactionConfig _transactionConfig = new TransactionConfig();
-
- private boolean _readOnly = false;
-
- private boolean _configured;
-
-
- public BDBMessageStore()
- {
- }
-
-
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
+ @Override
+ protected void setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
{
- CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
- _logSubject = logSubject;
- configure(name,storeConfiguration);
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
- }
-
- recover(recoveryHandler);
- stateTransition(State.RECOVERING, State.STARTED);
- }
-
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject) throws Exception
- {
- CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
- _logSubject = logSubject;
- configure(name,storeConfiguration);
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
- }
-
- recoverMessages(recoveryHandler);
-
- CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
-
- recoverQueueEntries(tlogRecoveryHandler);
- }
-
-
- public org.apache.qpid.server.store.Transaction newTransaction()
- {
- return new BDBTransaction();
- }
-
-
- /**
- * Called after instantiation in order to configure the message store.
- *
- * @param name The name of the virtual host using this store
- * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
- *
- * @throws Exception If any error occurs that means the store is unable to configure itself.
- */
- public boolean configure(String name, Configuration storeConfig) throws Exception
- {
- File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK") + "/bdbstore/" + name));
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- message(MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
-
- return configure(environmentPath, false);
- }
-
- void message(final LogMessage message)
- {
- CurrentActor.message(_logSubject, message);
- }
-
- /**
- * @param environmentPath location for the store to be created in/recovered from
- * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
- * @return whether or not a new store environment was created
- * @throws AMQStoreException
- * @throws DatabaseException
- */
- protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException
- {
- _readOnly = readonly;
- stateTransition(State.INITIAL, State.CONFIGURING);
-
- _log.info("Configuring BDB message store");
-
- return setupStore(environmentPath, readonly);
- }
-
- /**
- * Move the store state from CONFIGURING to STARTED.
- *
- * This is required if you do not want to perform recovery of the store data
- *
- * @throws AMQStoreException if the store is not in the correct state
- */
- public void start() throws AMQStoreException
- {
- stateTransition(State.CONFIGURING, State.STARTED);
- }
-
- private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
- {
- checkState(State.CONFIGURING);
-
- boolean newEnvironment = createEnvironment(storePath, readonly);
-
- new Upgrader(_environment).upgradeIfNecessary();
-
- openDatabases(readonly);
+ super.setupStore(storePath, readonly);
if (!readonly)
{
- _commitThread.start();
- }
-
- return newEnvironment;
- }
-
- private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
- {
- if (_state != requiredState)
- {
- throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
- }
-
- _state = newState;
- }
-
- private void checkState(State requiredState) throws AMQStoreException
- {
- if (_state != requiredState)
- {
- throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState);
+ startCommitThread();
}
}
- private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
+ @Override
+ protected Environment createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
{
- _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
+ LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
EnvironmentConfig envConfig = new EnvironmentConfig();
// This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(true);
@@ -367,8 +85,7 @@ public class BDBMessageStore implements MessageStore
envConfig.setReadOnly(readonly);
try
{
- _environment = new Environment(environmentPath, envConfig);
- return false;
+ return new Environment(environmentPath, envConfig);
}
catch (DatabaseException de)
{
@@ -376,14 +93,7 @@ public class BDBMessageStore implements MessageStore
{
//Allow the creation this time
envConfig.setAllowCreate(true);
- if (_environment != null )
- {
- _environment.cleanLog();
- _environment.close();
- }
- _environment = new Environment(environmentPath, envConfig);
-
- return true;
+ return new Environment(environmentPath, envConfig);
}
else
{
@@ -392,1464 +102,18 @@ public class BDBMessageStore implements MessageStore
}
}
- private void openDatabases(boolean readonly) throws DatabaseException
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
-
- //This is required if we are wanting read only access.
- dbConfig.setReadOnly(readonly);
-
- _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
- _queueDb = openDatabase(QUEUEDB_NAME, dbConfig);
- _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig);
- _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig);
- _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
- _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
- _linkDb = openDatabase(LINKDB_NAME, dbConfig);
- _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig);
- _xidDb = openDatabase(XIDDB_NAME, dbConfig);
-
-
- }
-
- private Database openDatabase(final String dbName, final DatabaseConfig dbConfig)
- {
- // if opening read-only and the database doesn't exist, then you can't create it
- return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName)
- ? null
- : _environment.openDatabase(null, dbName, dbConfig);
- }
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
- public void close() throws Exception
- {
- if (_state != State.STARTED)
- {
- return;
- }
-
- _state = State.CLOSING;
-
- _commitThread.close();
- _commitThread.join();
-
- if (_messageMetaDataDb != null)
- {
- _log.info("Closing message metadata database");
- _messageMetaDataDb.close();
- }
-
- if (_messageContentDb != null)
- {
- _log.info("Closing message content database");
- _messageContentDb.close();
- }
-
- if (_exchangeDb != null)
- {
- _log.info("Closing exchange database");
- _exchangeDb.close();
- }
-
- if (_queueBindingsDb != null)
- {
- _log.info("Closing bindings database");
- _queueBindingsDb.close();
- }
-
- if (_queueDb != null)
- {
- _log.info("Closing queue database");
- _queueDb.close();
- }
-
- if (_deliveryDb != null)
- {
- _log.info("Close delivery database");
- _deliveryDb.close();
- }
-
- if (_bridgeDb != null)
- {
- _log.info("Close bridge database");
- _bridgeDb.close();
- }
-
- if (_linkDb != null)
- {
- _log.info("Close link database");
- _linkDb.close();
- }
-
-
- if (_xidDb != null)
- {
- _log.info("Close xid database");
- _xidDb.close();
- }
-
- closeEnvironment();
-
- _state = State.CLOSED;
-
- message(MessageStoreMessages.CLOSED());
- }
-
- private void closeEnvironment() throws DatabaseException
- {
- if (_environment != null)
- {
- if(!_readOnly)
- {
- // Clean the log before closing. This makes sure it doesn't contain
- // redundant data. Closing without doing this means the cleaner may not
- // get a chance to finish.
- _environment.cleanLog();
- }
- _environment.close();
- }
- }
-
-
- public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
- {
- stateTransition(State.CONFIGURED, State.RECOVERING);
-
- message(MessageStoreMessages.RECOVERY_START());
-
- try
- {
- QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- loadQueues(qrh);
-
- ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
- loadExchanges(erh);
-
- BindingRecoveryHandler brh = erh.completeExchangeRecovery();
- recoverBindings(brh);
-
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
- private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _queueDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- QueueBinding binding = QueueBinding.getInstance();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueRecord queueRecord = binding.entryToObject(value);
-
- String queueName = queueRecord.getNameShortString() == null ? null :
- queueRecord.getNameShortString().asString();
- String owner = queueRecord.getOwner() == null ? null :
- queueRecord.getOwner().asString();
- boolean exclusive = queueRecord.isExclusive();
-
- FieldTable arguments = queueRecord.getArguments();
-
- qrh.queue(queueName, owner, exclusive, arguments);
- }
-
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
- }
-
-
- private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _exchangeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- ExchangeBinding binding = ExchangeBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- ExchangeRecord exchangeRec = binding.entryToObject(value);
-
- String exchangeName = exchangeRec.getNameShortString() == null ? null :
- exchangeRec.getNameShortString().asString();
- String type = exchangeRec.getType() == null ? null :
- exchangeRec.getType().asString();
- boolean autoDelete = exchangeRec.isAutoDelete();
-
- erh.exchange(exchangeName, type, autoDelete);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
- {
- Cursor cursor = null;
- try
- {
- cursor = _queueBindingsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- //yes, this is retrieving all the useful information from the key only.
- //For table compatibility it shall currently be left as is
- BindingRecord bindingRecord = binding.entryToObject(key);
-
- String exchangeName = bindingRecord.getExchangeName() == null ? null :
- bindingRecord.getExchangeName().asString();
- String queueName = bindingRecord.getQueueName() == null ? null :
- bindingRecord.getQueueName().asString();
- String routingKey = bindingRecord.getRoutingKey() == null ? null :
- bindingRecord.getRoutingKey().asString();
- ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
- java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
-
- brh.binding(exchangeName, queueName, routingKey, argumentsBB);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
-
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _linkDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _bridgeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- if(parentId.equals(linkId))
- {
-
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
- brh.bridge(id,createTime,arguments);
- }
- }
- brh.completeBridgeRecoveryForLink();
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
-
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = _messageMetaDataDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws DatabaseException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- AMQShortString queueName = entry.getQueueName();
- long messageId = entry.getMessageId();
-
- qerh.queueEntry(queueName.asString(),messageId);
- }
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
-
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
- cursor = null;
- try
- {
- cursor = _xidDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
-
- /**
- * Removes the specified message from the store.
- *
- * @param messageId Identifies the message to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeMessage(long messageId) throws AMQStoreException
- {
- removeMessage(messageId, true);
- }
- public void removeMessage(long messageId, boolean sync) throws AMQStoreException
- {
-
- boolean complete = false;
- com.sleepycat.je.Transaction tx = null;
-
- Random rand = null;
- int attempts = 0;
- try
- {
- do
- {
- tx = null;
- try
- {
- tx = _environment.beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- _messageContentDb.delete(tx, contentKeyEntry);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content for message " + messageId);
- }
-
- commit(tx, sync);
- complete = true;
- tx = null;
- }
- catch (LockConflictException e)
- {
- try
- {
- if(tx != null)
- {
- tx.abort();
- }
- }
- catch(DatabaseException e2)
- {
- _log.warn("Unable to abort transaction after LockConflictExcption", e2);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw e;
- }
-
-
- _log.warn("Lock timeout exception. Retrying (attempt "
- + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw e;
- }
- }
- }
- while(!complete);
- }
- catch (DatabaseException e)
- {
- _log.error("Unexpected BDB exception", e);
-
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new AMQStoreException("Error aborting transaction " + e1, e1);
- }
- }
-
- throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new AMQStoreException("Error aborting transaction " + e1, e1);
- }
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#createExchange(Exchange)
- */
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
- exchange.getTypeShortString(), exchange.isAutoDelete());
-
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- ExchangeBinding exchangeBinding = ExchangeBinding.getInstance();
- exchangeBinding.objectToEntry(exchangeRec, value);
-
- try
- {
- _exchangeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#removeExchange(Exchange)
- */
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
- try
- {
- OperationStatus status = _exchangeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
- }
- }
-
-
- /**
- * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
- */
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args));
- }
-
- protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- DatabaseEntry key = new DatabaseEntry();
- QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
-
- keyBinding.objectToEntry(bindingRecord, key);
-
- //yes, this is writing out 0 as a value and putting all the
- //useful info into the key, don't ask me why. For table
- //compatibility it shall currently be left as is
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- _queueBindingsDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange "
- + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
- */
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
- keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
-
- try
- {
- OperationStatus status = _queueBindingsDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " from database: " + e.getMessage(), e);
- }
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue)
- */
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
- */
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
- }
-
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), queue.isExclusive(), arguments);
-
- createQueue(queueRecord);
- }
-
- /**
- * Makes the specified queue persistent.
- *
- * Only intended for direct use during store upgrades.
- *
- * @param queueRecord Details of the queue to store.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- QueueBinding queueBinding = QueueBinding.getInstance();
-
- queueBinding.objectToEntry(queueRecord, value);
- try
- {
- _queueDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Updating queue: " + queue.getName());
- }
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(queue.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- DatabaseEntry newValue = new DatabaseEntry();
- QueueBinding queueBinding = QueueBinding.getInstance();
-
- OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
- if(status == OperationStatus.SUCCESS)
- {
- //read the existing record and apply the new exclusivity setting
- QueueRecord queueRecord = queueBinding.entryToObject(value);
- queueRecord.setExclusive(queue.isExclusive());
-
- //write the updated entry to the store
- queueBinding.objectToEntry(queueRecord, newValue);
-
- _queueDb.put(null, key, newValue);
- }
- else if(status != OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Error updating queue details within the store: " + status);
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error updating queue details within the store: " + e,e);
- }
- }
-
- /**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- AMQShortString name = queue.getNameShortString();
-
- if (_log.isDebugEnabled())
- {
- _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(name, key);
- try
- {
- OperationStatus status = _queueDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + name + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
- }
- }
-
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(),value);
- StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
-
- try
- {
- _linkDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Link " + link
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
- try
- {
- OperationStatus status = _linkDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
- }
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value);
- LongBinding.longToEntry(bridge.getCreateTime(),value);
- StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
-
- try
- {
- _bridgeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Bridge " + bridge
- + " to database: " + e.getMessage(), e);
- }
-
- }
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
- try
- {
- OperationStatus status = _bridgeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws AMQStoreException
- {
- AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]");
- }
- _deliveryDb.put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- _log.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
- + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The name queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws AMQStoreException
- {
- AMQShortString name = new AMQShortString(queue.getResourceName());
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId);
-
- keyBinding.objectToEntry(queueEntryKey, key);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Dequeue message id " + messageId);
- }
-
- try
- {
-
- OperationStatus status = _deliveryDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Removed message " + messageId + ", " + name + " from delivery db");
-
- }
- }
- catch (DatabaseException e)
- {
-
- _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
- _log.error(tx);
-
- throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
-
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- Transaction.Record[] enqueues,
- Transaction.Record[] dequeues) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
- keyBinding.objectToEntry(xid,key);
-
- DatabaseEntry value = new DatabaseEntry();
- PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- valueBinding.objectToEntry(preparedTransaction, value);
- try
- {
- _xidDb.put(txn, key, value);
- }
- catch (DatabaseException e)
- {
- _log.error("Failed to write xid: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing xid to database", e);
- }
- }
- private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
- throws AMQStoreException
+ @Override
+ protected void closeInternal() throws Exception
{
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
-
- keyBinding.objectToEntry(xid, key);
-
+ stopCommitThread();
- try
- {
-
- OperationStatus status = _xidDb.delete(txn, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Unable to find xid");
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Unable to remove xid");
- }
-
- }
- catch (DatabaseException e)
- {
-
- _log.error("Failed to remove xid ", e);
- _log.error(txn);
-
- throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e);
- }
+ super.closeInternal();
}
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
- {
- if (tx == null)
- {
- throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- StoreFuture result;
- try
- {
- result = commit(tx, syncCommit);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("commitTranImpl completed for [Transaction:" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("abortTran called for [Transaction:" + tx + "]");
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
- }
- }
-
- /**
- * Primarily for testing purposes.
- *
- * @param queueName
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueName, 0);
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- public long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param contentBody The content of the data chunk.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- protected void addContent(final com.sleepycat.je.Transaction tx, long messageId,
- ByteBuffer contentBody) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
- try
- {
- OperationStatus status = _messageContentDb.put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Error adding content for message id " + messageId + ": " + status);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
- StorableMessageMetaData messageMetaData)
- throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- _messageMetaDataDb.put(tx, key, value);
- if (_log.isDebugEnabled())
- {
- _log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
-
- try
- {
- OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
-
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- try
- {
- int written = 0;
- OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
- if (offset > size)
- {
- throw new RuntimeException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
- dst.put(dataAsBytes, offset, written);
- }
- return written;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- if(metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNewMessageId(), metaData);
- }
- }
-
-
- //Package getters for the various databases used by the Store
-
- Database getMetaDataDb()
- {
- return _messageMetaDataDb;
- }
-
- Database getContentDb()
- {
- return _messageContentDb;
- }
-
- Database getQueuesDb()
- {
- return _queueDb;
- }
-
- Database getDeliveryDb()
- {
- return _deliveryDb;
- }
-
- Database getExchangesDb()
- {
- return _exchangeDb;
- }
-
- Database getBindingsDb()
- {
- return _queueBindingsDb;
- }
-
- Environment getEnvironment()
- {
- return _environment;
- }
-
- private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
+ @Override
+ protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
{
tx.commitNoSync();
@@ -1859,11 +123,17 @@ public class BDBMessageStore implements MessageStore
return commitFuture;
}
- public void startCommitThread()
+ private void startCommitThread()
{
_commitThread.start();
}
+ private void stopCommitThread() throws InterruptedException
+ {
+ _commitThread.close();
+ _commitThread.join();
+ }
+
private static final class BDBCommitFuture implements StoreFuture
{
private final CommitThread _commitThread;
@@ -1881,9 +151,9 @@ public class BDBMessageStore implements MessageStore
public synchronized void complete()
{
- if (_log.isDebugEnabled())
+ if (LOGGER.isDebugEnabled())
{
- _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
}
_complete = true;
@@ -1904,7 +174,7 @@ public class BDBMessageStore implements MessageStore
if(!_syncCommit)
{
- _log.debug("CommitAsync was requested, returning immediately.");
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
return;
}
@@ -1946,7 +216,7 @@ public class BDBMessageStore implements MessageStore
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
* completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
*
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table>
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
private class CommitThread extends Thread
{
@@ -1999,7 +269,7 @@ public class BDBMessageStore implements MessageStore
try
{
- _environment.flushLog(true);
+ getEnvironment().flushLog(true);
for(int i = 0; i < size; i++)
{
@@ -2047,241 +317,4 @@ public class BDBMessageStore implements MessageStore
}
}
-
- private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
- {
-
- private final long _messageId;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<byte[]> _dataRef;
- private byte[] _data;
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, true);
- }
-
-
- StoredBDBMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
- {
- try
- {
- _messageId = messageId;
- _metaData = metaData;
-
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaDataRef.get();
- if(metaData == null)
- {
- try
- {
- metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- try
- {
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- catch (AMQStoreException e)
- {
- // TODO maybe should throw a checked exception, or at least log before throwing
- throw new RuntimeException(e);
- }
- }
- }
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
- buf.position(0);
- return buf;
- }
- }
-
- synchronized void store(com.sleepycat.je.Transaction txn)
- {
-
- if(_metaData != null)
- {
- try
- {
- _dataRef = new SoftReference<byte[]>(_data);
- BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- BDBMessageStore.this.addContent(txn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- catch(DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
- }
- }
-
- public synchronized StoreFuture flushToStore()
- {
- if(_metaData != null)
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- store(txn);
- BDBMessageStore.this.commit(txn,true);
-
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void remove()
- {
- try
- {
- BDBMessageStore.this.removeMessage(_messageId, false);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- private class BDBTransaction implements org.apache.qpid.server.store.Transaction
- {
- private com.sleepycat.je.Transaction _txn;
-
- private BDBTransaction()
- {
- try
- {
- _txn = _environment.beginTransaction(null, null);
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
- if(message.getStoredMessage() instanceof StoredBDBMessage)
- {
- ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
- }
-
- BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
- BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void commitTran() throws AMQStoreException
- {
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
-
- public StoreFuture commitTranAsync() throws AMQStoreException
- {
- return BDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- public void abortTran() throws AMQStoreException
- {
- BDBMessageStore.this.abortTran(_txn);
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
- {
- BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues) throws AMQStoreException
- {
- BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
- }
- }
-
-
}