diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-08 14:15:28 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-08 14:15:28 +0000 |
commit | 4da0b6a50acf9b0ec95cb0202f049173402f9568 (patch) | |
tree | a1ecdbcf0d1088c22f51955152b991abfc41e80c | |
parent | 5d6a13afe68d81a8f94e9b5fd59eaa3f1cd8b35b (diff) | |
download | qpid-python-4da0b6a50acf9b0ec95cb0202f049173402f9568.tar.gz |
QPID-946 , QPID-2379 : Fix for BDBUpgradeTest failure introduced by previous commit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228853 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 225 |
1 files changed, 117 insertions, 108 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9efa2937aa..81b5c8599f 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,6 +25,7 @@ import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -120,9 +121,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore /* ======= * Schema: * ======= - * + * * Queue: - * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), + * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), * arguments(FieldTable encoded as binary), exclusive (boolean) * * Exchange: @@ -174,7 +175,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private boolean _configured; - + public BDBMessageStore() { this(DATABASE_FORMAT_VERSION); @@ -206,9 +207,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BRIDGEDB_NAME += "_v" + version; } } - - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, + + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration, LogSubject logSubject) throws Exception { @@ -221,7 +222,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _configured = true; stateTransition(State.CONFIGURING, State.CONFIGURED); } - + recover(recoveryHandler); stateTransition(State.RECOVERING, State.STARTED); } @@ -240,7 +241,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _configured = true; stateTransition(State.CONFIGURING, State.CONFIGURED); } - + recoverMessages(recoveryHandler); } @@ -260,7 +261,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore recoverQueueEntries(recoveryHandler); - + } public org.apache.qpid.server.store.MessageStore.Transaction newTransaction() @@ -268,7 +269,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return new BDBTransaction(); } - + /** * Called after instantiation in order to configure the message store. * @@ -279,8 +280,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore */ public boolean configure(String name, Configuration storeConfig) throws Exception { - File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK") + "/bdbstore/" + name)); + File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, + System.getProperty("QPID_WORK") + "/bdbstore/" + name)); if (!environmentPath.exists()) { if (!environmentPath.mkdirs()) @@ -299,7 +300,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore /** * @param environmentPath location for the store to be created in/recovered from - * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists + * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists * @return whether or not a new store environment was created * @throws AMQStoreException * @throws DatabaseException @@ -312,12 +313,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _log.info("Configuring BDB message store"); createTupleBindingFactories(_version); - + setDatabaseNames(_version); return setupStore(environmentPath, readonly); } - + private void createTupleBindingFactories(int version) { _bindingTupleBindingFactory = new BindingTupleBindingFactory(version); @@ -417,7 +418,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore envConfig.setTransactional(true); envConfig.setConfigParam("je.lock.nLockTables", "7"); - // Restore 500,000 default timeout. + // Restore 500,000 default timeout. //envConfig.setLockTimeout(15000); // Added to help diagnosis of Deadlock issue @@ -427,10 +428,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); envConfig.setConfigParam("je.txn.dumpLocks", "true"); } - + // Set transaction mode _transactionConfig.setReadCommitted(true); - + //This prevents background threads running which will potentially update the store. envConfig.setReadOnly(readonly); try @@ -469,18 +470,26 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore //This is required if we are wanting read only access. dbConfig.setReadOnly(readonly); - _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig); - _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig); - _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig); - _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig); - _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig); - _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig); - _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig); - _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig); + _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); + _queueDb = openDatabase(QUEUEDB_NAME, dbConfig); + _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig); + _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig); + _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); + _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); + _linkDb = openDatabase(LINKDB_NAME, dbConfig); + _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig); } + private Database openDatabase(final String dbName, final DatabaseConfig dbConfig) + { + // if opening read-only and the database doesn't exist, then you can't create it + return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName) + ? null + : _environment.openDatabase(null, dbName, dbConfig); + } + /** * Called to close and cleanup any resources used by the message store. * @@ -549,7 +558,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore closeEnvironment(); _state = State.CLOSED; - + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); } @@ -568,7 +577,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - + public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException { stateTransition(State.CONFIGURED, State.RECOVERING); @@ -582,7 +591,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); loadExchanges(erh); - + BindingRecoveryHandler brh = erh.completeExchangeRecovery(); recoverBindings(brh); @@ -609,13 +618,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value); - - String queueName = queueRecord.getNameShortString() == null ? null : + + String queueName = queueRecord.getNameShortString() == null ? null : queueRecord.getNameShortString().asString(); - String owner = queueRecord.getOwner() == null ? null : + String owner = queueRecord.getOwner() == null ? null : queueRecord.getOwner().asString(); boolean exclusive = queueRecord.isExclusive(); - + FieldTable arguments = queueRecord.getArguments(); qrh.queue(queueName, owner, exclusive, arguments); @@ -630,8 +639,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } } - - + + private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException { Cursor cursor = null; @@ -642,17 +651,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); TupleBinding binding = new ExchangeTB(); - + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value); - String exchangeName = exchangeRec.getNameShortString() == null ? null : + String exchangeName = exchangeRec.getNameShortString() == null ? null : exchangeRec.getNameShortString().asString(); - String type = exchangeRec.getType() == null ? null : + String type = exchangeRec.getType() == null ? null : exchangeRec.getType().asString(); boolean autoDelete = exchangeRec.isAutoDelete(); - + erh.exchange(exchangeName, type, autoDelete); } } @@ -665,7 +674,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - + private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException { Cursor cursor = null; @@ -675,22 +684,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); TupleBinding binding = _bindingTupleBindingFactory.getInstance(); - + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { //yes, this is retrieving all the useful information from the key only. //For table compatibility it shall currently be left as is BindingKey bindingRecord = (BindingKey) binding.entryToObject(key); - + String exchangeName = bindingRecord.getExchangeName() == null ? null : bindingRecord.getExchangeName().asString(); String queueName = bindingRecord.getQueueName() == null ? null : bindingRecord.getQueueName().asString(); String routingKey = bindingRecord.getRoutingKey() == null ? null : bindingRecord.getRoutingKey().asString(); - ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : + ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes())); - + brh.binding(exchangeName, queueName, routingKey, argumentsBB); } } @@ -714,7 +723,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore cursor = _linkDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); @@ -793,7 +802,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); mrh.message(message); - + maxId = Math.max(maxId, messageId); } @@ -812,14 +821,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) + + private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws DatabaseException { QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - + Cursor cursor = null; try { @@ -844,12 +853,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { cursor = null; } - + for(QueueEntryKey entry : entries) { AMQShortString queueName = entry.getQueueName(); long messageId = entry.getMessageId(); - + qerh.queueEntry(queueName.asString(),messageId); } } @@ -886,12 +895,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); com.sleepycat.je.Transaction tx = null; - + Cursor cursor = null; try { tx = _environment.beginTransaction(null, null); - + //remove the message meta data from the store DatabaseEntry key = new DatabaseEntry(); LongBinding.longToEntry(messageId, key); @@ -901,7 +910,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _log.debug("Removing message id " + messageId); } - + OperationStatus status = _messageMetaDataDb.delete(tx, key); if (status == OperationStatus.NOTFOUND) { @@ -922,7 +931,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - //Use a partial record for the value to prevent retrieving the + //Use a partial record for the value to prevent retrieving the //data itself as we only need the key to identify what to remove. DatabaseEntry value = new DatabaseEntry(); value.setPartial(0, 0, true); @@ -933,7 +942,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore while (status == OperationStatus.SUCCESS) { mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - + if(mck.getMessageId() != messageId) { //we have exhausted all chunks for this message id, break @@ -942,34 +951,34 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore else { status = cursor.delete(); - + if(status == OperationStatus.NOTFOUND) { cursor.close(); cursor = null; - + tx.abort(); throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); } - + if (_log.isDebugEnabled()) { _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); } } - + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } cursor.close(); cursor = null; - + commit(tx, sync); } catch (DatabaseException e) { e.printStackTrace(); - + if (tx != null) { try @@ -979,7 +988,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore cursor.close(); cursor = null; } - + tx.abort(); } catch (DatabaseException e1) @@ -1013,7 +1022,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_state != State.RECOVERING) { - ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), + ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), exchange.getTypeShortString(), exchange.isAutoDelete()); DatabaseEntry key = new DatabaseEntry(); @@ -1070,20 +1079,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (_state != State.RECOVERING) { - BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), + BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args); DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); - + keyBinding.objectToEntry(bindingRecord, key); - //yes, this is writing out 0 as a value and putting all the + //yes, this is writing out 0 as a value and putting all the //useful info into the key, don't ask me why. For table //compatibility it shall currently be left as is DatabaseEntry value = new DatabaseEntry(); ByteBinding.byteToEntry((byte) 0, value); - + try { _queueBindingsDb.put(null, key, value); @@ -1139,16 +1148,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called"); } - - QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), + + QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), queue.getOwner(), queue.isExclusive(), arguments); - + createQueue(queueRecord); } /** - * Makes the specified queue persistent. - * + * Makes the specified queue persistent. + * * Only intended for direct use during store upgrades. * * @param queueRecord Details of the queue to store. @@ -1182,7 +1191,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. - * + * * NOTE: Currently only updates the exclusivity. * * @param queue The queue to update the entry for. @@ -1208,13 +1217,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT); if(status == OperationStatus.SUCCESS) { - //read the existing record and apply the new exclusivity setting + //read the existing record and apply the new exclusivity setting QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value); queueRecord.setExclusive(queue.isExclusive()); - + //write the updated entry to the store queueBinding.objectToEntry(queueRecord, newValue); - + _queueDb.put(null, key, newValue); } else if(status != OperationStatus.NOTFOUND) @@ -1243,7 +1252,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _log.debug("public void removeQueue(AMQShortString name = " + name + "): called"); } - + DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = new AMQShortStringTB(); keyBinding.objectToEntry(name, key); @@ -1360,7 +1369,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called"); AMQShortString name = AMQShortString.valueOf(queue.getResourceName()); - + DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = new QueueEntryTB(); QueueEntryKey dd = new QueueEntryKey(name, messageId); @@ -1408,7 +1417,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _log.debug("Dequeue message id " + messageId); } - + try { @@ -1416,7 +1425,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (status == OperationStatus.NOTFOUND) { throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); - } + } else if (status != OperationStatus.SUCCESS) { throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name); @@ -1451,12 +1460,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore //{ // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")"); //} - + if (tx == null) { throw new AMQStoreException("Fatal internal error: transactional is null at commitTran"); } - + StoreFuture result; try { @@ -1471,7 +1480,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); } - + return result; } @@ -1618,7 +1627,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, + private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, StorableMessageMetaData messageMetaData) throws AMQStoreException { @@ -1631,7 +1640,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore DatabaseEntry key = new DatabaseEntry(); LongBinding.longToEntry(messageId, key); DatabaseEntry value = new DatabaseEntry(); - + TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); messageBinding.objectToEntry(messageMetaData, value); try @@ -1701,16 +1710,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException - { + { DatabaseEntry contentKeyEntry = new DatabaseEntry(); - - //Start from 0 offset and search for the starting chunk. + + //Start from 0 offset and search for the starting chunk. MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0); TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); DatabaseEntry value = new DatabaseEntry(); TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB(); - + if (_log.isDebugEnabled()) { _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); @@ -1718,32 +1727,32 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore int written = 0; int seenSoFar = 0; - + Cursor cursor = null; try { cursor = _messageContentDb.openCursor(null, null); - + OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED); while (status == OperationStatus.SUCCESS) { mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); long id = mck.getMessageId(); - + if(id != messageId) { //we have exhausted all chunks for this message id, break break; } - + int offsetInMessage = mck.getOffset(); ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value); - + final int size = (int) buf.limit(); - + seenSoFar += size; - + if(seenSoFar >= offset) { byte[] dataAsBytes = buf.array(); @@ -1762,7 +1771,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore break; } } - + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } @@ -1817,7 +1826,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { return _bindingTupleBindingFactory; } - + protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory() { return _metaDataTupleBindingFactory; @@ -1924,7 +1933,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); commitFuture.commit(); - + return commitFuture; } @@ -1980,13 +1989,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore //_log.debug("public void commit(): called"); _commitThread.addJob(this, _syncCommit); - + if(!_syncCommit) { _log.debug("CommitAsync was requested, returning immediately."); return; } - + waitForCompletion(); // _log.debug("Commit completed, _databaseException = " + _databaseException); @@ -2062,7 +2071,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { try { - // RHM-7 Periodically wake up and check, just in case we + // RHM-7 Periodically wake up and check, just in case we // missed a notification. Don't want to lock the broker hard. _lock.wait(250); } @@ -2132,14 +2141,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } } - - + + private class StoredBDBMessage implements StoredMessage { private final long _messageId; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - + private StorableMessageMetaData _metaData; private volatile SoftReference<byte[]> _dataRef; private byte[] _data; @@ -2195,7 +2204,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { src = src.slice(); - + if(_data == null) { _data = new byte[src.remaining()]; @@ -2211,7 +2220,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore System.arraycopy(oldData,0,_data,0,oldData.length); src.duplicate().get(_data, oldData.length, src.remaining()); } - + } public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) |