summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-08 14:15:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-08 14:15:28 +0000
commit4da0b6a50acf9b0ec95cb0202f049173402f9568 (patch)
treea1ecdbcf0d1088c22f51955152b991abfc41e80c
parent5d6a13afe68d81a8f94e9b5fd59eaa3f1cd8b35b (diff)
downloadqpid-python-4da0b6a50acf9b0ec95cb0202f049173402f9568.tar.gz
QPID-946 , QPID-2379 : Fix for BDBUpgradeTest failure introduced by previous commit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228853 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java225
1 files changed, 117 insertions, 108 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9efa2937aa..81b5c8599f 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -25,6 +25,7 @@ import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -120,9 +121,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
/* =======
* Schema:
* =======
- *
+ *
* Queue:
- * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
+ * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
* arguments(FieldTable encoded as binary), exclusive (boolean)
*
* Exchange:
@@ -174,7 +175,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private boolean _configured;
-
+
public BDBMessageStore()
{
this(DATABASE_FORMAT_VERSION);
@@ -206,9 +207,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BRIDGEDB_NAME += "_v" + version;
}
}
-
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
@@ -221,7 +222,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_configured = true;
stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
+
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
}
@@ -240,7 +241,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_configured = true;
stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
+
recoverMessages(recoveryHandler);
}
@@ -260,7 +261,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
recoverQueueEntries(recoveryHandler);
-
+
}
public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
@@ -268,7 +269,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return new BDBTransaction();
}
-
+
/**
* Called after instantiation in order to configure the message store.
*
@@ -279,8 +280,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*/
public boolean configure(String name, Configuration storeConfig) throws Exception
{
- File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK") + "/bdbstore/" + name));
+ File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK") + "/bdbstore/" + name));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -299,7 +300,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
/**
* @param environmentPath location for the store to be created in/recovered from
- * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
+ * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
* @return whether or not a new store environment was created
* @throws AMQStoreException
* @throws DatabaseException
@@ -312,12 +313,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_log.info("Configuring BDB message store");
createTupleBindingFactories(_version);
-
+
setDatabaseNames(_version);
return setupStore(environmentPath, readonly);
}
-
+
private void createTupleBindingFactories(int version)
{
_bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
@@ -417,7 +418,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
- // Restore 500,000 default timeout.
+ // Restore 500,000 default timeout.
//envConfig.setLockTimeout(15000);
// Added to help diagnosis of Deadlock issue
@@ -427,10 +428,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
envConfig.setConfigParam("je.txn.dumpLocks", "true");
}
-
+
// Set transaction mode
_transactionConfig.setReadCommitted(true);
-
+
//This prevents background threads running which will potentially update the store.
envConfig.setReadOnly(readonly);
try
@@ -469,18 +470,26 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
//This is required if we are wanting read only access.
dbConfig.setReadOnly(readonly);
- _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig);
- _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig);
- _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig);
- _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
- _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
- _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
- _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig);
- _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig);
+ _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
+ _queueDb = openDatabase(QUEUEDB_NAME, dbConfig);
+ _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig);
+ _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig);
+ _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
+ _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
+ _linkDb = openDatabase(LINKDB_NAME, dbConfig);
+ _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig);
}
+ private Database openDatabase(final String dbName, final DatabaseConfig dbConfig)
+ {
+ // if opening read-only and the database doesn't exist, then you can't create it
+ return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName)
+ ? null
+ : _environment.openDatabase(null, dbName, dbConfig);
+ }
+
/**
* Called to close and cleanup any resources used by the message store.
*
@@ -549,7 +558,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
closeEnvironment();
_state = State.CLOSED;
-
+
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
}
@@ -568,7 +577,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
-
+
public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
{
stateTransition(State.CONFIGURED, State.RECOVERING);
@@ -582,7 +591,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
loadExchanges(erh);
-
+
BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh);
@@ -609,13 +618,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
-
- String queueName = queueRecord.getNameShortString() == null ? null :
+
+ String queueName = queueRecord.getNameShortString() == null ? null :
queueRecord.getNameShortString().asString();
- String owner = queueRecord.getOwner() == null ? null :
+ String owner = queueRecord.getOwner() == null ? null :
queueRecord.getOwner().asString();
boolean exclusive = queueRecord.isExclusive();
-
+
FieldTable arguments = queueRecord.getArguments();
qrh.queue(queueName, owner, exclusive, arguments);
@@ -630,8 +639,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
}
-
-
+
+
private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
{
Cursor cursor = null;
@@ -642,17 +651,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
TupleBinding binding = new ExchangeTB();
-
+
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
- String exchangeName = exchangeRec.getNameShortString() == null ? null :
+ String exchangeName = exchangeRec.getNameShortString() == null ? null :
exchangeRec.getNameShortString().asString();
- String type = exchangeRec.getType() == null ? null :
+ String type = exchangeRec.getType() == null ? null :
exchangeRec.getType().asString();
boolean autoDelete = exchangeRec.isAutoDelete();
-
+
erh.exchange(exchangeName, type, autoDelete);
}
}
@@ -665,7 +674,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
-
+
private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
{
Cursor cursor = null;
@@ -675,22 +684,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
TupleBinding binding = _bindingTupleBindingFactory.getInstance();
-
+
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
//yes, this is retrieving all the useful information from the key only.
//For table compatibility it shall currently be left as is
BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
-
+
String exchangeName = bindingRecord.getExchangeName() == null ? null :
bindingRecord.getExchangeName().asString();
String queueName = bindingRecord.getQueueName() == null ? null :
bindingRecord.getQueueName().asString();
String routingKey = bindingRecord.getRoutingKey() == null ? null :
bindingRecord.getRoutingKey().asString();
- ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
+ ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
-
+
brh.binding(exchangeName, queueName, routingKey, argumentsBB);
}
}
@@ -714,7 +723,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _linkDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
-
+
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
@@ -793,7 +802,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
mrh.message(message);
-
+
maxId = Math.max(maxId, messageId);
}
@@ -812,14 +821,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
}
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
throws DatabaseException
{
QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
+
Cursor cursor = null;
try
{
@@ -844,12 +853,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
cursor = null;
}
-
+
for(QueueEntryKey entry : entries)
{
AMQShortString queueName = entry.getQueueName();
long messageId = entry.getMessageId();
-
+
qerh.queueEntry(queueName.asString(),messageId);
}
}
@@ -886,12 +895,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
-
+
Cursor cursor = null;
try
{
tx = _environment.beginTransaction(null, null);
-
+
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
@@ -901,7 +910,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_log.debug("Removing message id " + messageId);
}
-
+
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
@@ -922,7 +931,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
+ //Use a partial record for the value to prevent retrieving the
//data itself as we only need the key to identify what to remove.
DatabaseEntry value = new DatabaseEntry();
value.setPartial(0, 0, true);
@@ -933,7 +942,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
-
+
if(mck.getMessageId() != messageId)
{
//we have exhausted all chunks for this message id, break
@@ -942,34 +951,34 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
else
{
status = cursor.delete();
-
+
if(status == OperationStatus.NOTFOUND)
{
cursor.close();
cursor = null;
-
+
tx.abort();
throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
}
-
+
if (_log.isDebugEnabled())
{
_log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
}
}
-
+
status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
cursor.close();
cursor = null;
-
+
commit(tx, sync);
}
catch (DatabaseException e)
{
e.printStackTrace();
-
+
if (tx != null)
{
try
@@ -979,7 +988,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor.close();
cursor = null;
}
-
+
tx.abort();
}
catch (DatabaseException e1)
@@ -1013,7 +1022,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (_state != State.RECOVERING)
{
- ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
+ ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
exchange.getTypeShortString(), exchange.isAutoDelete());
DatabaseEntry key = new DatabaseEntry();
@@ -1070,20 +1079,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_state != State.RECOVERING)
{
- BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
+ BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
queue.getNameShortString(), routingKey, args);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
-
+
keyBinding.objectToEntry(bindingRecord, key);
- //yes, this is writing out 0 as a value and putting all the
+ //yes, this is writing out 0 as a value and putting all the
//useful info into the key, don't ask me why. For table
//compatibility it shall currently be left as is
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
-
+
try
{
_queueBindingsDb.put(null, key, value);
@@ -1139,16 +1148,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
}
-
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
queue.getOwner(), queue.isExclusive(), arguments);
-
+
createQueue(queueRecord);
}
/**
- * Makes the specified queue persistent.
- *
+ * Makes the specified queue persistent.
+ *
* Only intended for direct use during store upgrades.
*
* @param queueRecord Details of the queue to store.
@@ -1182,7 +1191,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
- *
+ *
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
@@ -1208,13 +1217,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
if(status == OperationStatus.SUCCESS)
{
- //read the existing record and apply the new exclusivity setting
+ //read the existing record and apply the new exclusivity setting
QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
queueRecord.setExclusive(queue.isExclusive());
-
+
//write the updated entry to the store
queueBinding.objectToEntry(queueRecord, newValue);
-
+
_queueDb.put(null, key, newValue);
}
else if(status != OperationStatus.NOTFOUND)
@@ -1243,7 +1252,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
}
-
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
@@ -1360,7 +1369,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
// _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
-
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
@@ -1408,7 +1417,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_log.debug("Dequeue message id " + messageId);
}
-
+
try
{
@@ -1416,7 +1425,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
- }
+ }
else if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
@@ -1451,12 +1460,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
//{
// _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
//}
-
+
if (tx == null)
{
throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
}
-
+
StoreFuture result;
try
{
@@ -1471,7 +1480,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
-
+
return result;
}
@@ -1618,7 +1627,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
@@ -1631,7 +1640,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
-
+
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
@@ -1701,16 +1710,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
- {
+ {
DatabaseEntry contentKeyEntry = new DatabaseEntry();
-
- //Start from 0 offset and search for the starting chunk.
+
+ //Start from 0 offset and search for the starting chunk.
MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
-
+
if (_log.isDebugEnabled())
{
_log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
@@ -1718,32 +1727,32 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
int written = 0;
int seenSoFar = 0;
-
+
Cursor cursor = null;
try
{
cursor = _messageContentDb.openCursor(null, null);
-
+
OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
long id = mck.getMessageId();
-
+
if(id != messageId)
{
//we have exhausted all chunks for this message id, break
break;
}
-
+
int offsetInMessage = mck.getOffset();
ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
-
+
final int size = (int) buf.limit();
-
+
seenSoFar += size;
-
+
if(seenSoFar >= offset)
{
byte[] dataAsBytes = buf.array();
@@ -1762,7 +1771,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
break;
}
}
-
+
status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
@@ -1817,7 +1826,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
return _bindingTupleBindingFactory;
}
-
+
protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
{
return _metaDataTupleBindingFactory;
@@ -1924,7 +1933,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
commitFuture.commit();
-
+
return commitFuture;
}
@@ -1980,13 +1989,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
//_log.debug("public void commit(): called");
_commitThread.addJob(this, _syncCommit);
-
+
if(!_syncCommit)
{
_log.debug("CommitAsync was requested, returning immediately.");
return;
}
-
+
waitForCompletion();
// _log.debug("Commit completed, _databaseException = " + _databaseException);
@@ -2062,7 +2071,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
try
{
- // RHM-7 Periodically wake up and check, just in case we
+ // RHM-7 Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
_lock.wait(250);
}
@@ -2132,14 +2141,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
}
-
-
+
+
private class StoredBDBMessage implements StoredMessage
{
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-
+
private StorableMessageMetaData _metaData;
private volatile SoftReference<byte[]> _dataRef;
private byte[] _data;
@@ -2195,7 +2204,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
-
+
if(_data == null)
{
_data = new byte[src.remaining()];
@@ -2211,7 +2220,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
System.arraycopy(oldData,0,_data,0,oldData.length);
src.duplicate().get(_data, oldData.length, src.remaining());
}
-
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)