diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 36ac8b3d40..bc9cda7f71 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -1575,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore buf = buf.slice(); MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true); messageHandler.message(message); } @@ -2037,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore { private final long _messageId; + private final boolean _isRecovered; + private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -2045,21 +2047,18 @@ public class DerbyMessageStore implements MessageStore StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } StoredDerbyMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StorableMessageMetaData metaData, boolean isRecovered) { _messageId = messageId; - + _isRecovered = isRecovered; _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _metaData = metaData; - } + _metaData = metaData; } @Override @@ -2140,16 +2139,16 @@ public class DerbyMessageStore implements MessageStore @Override public synchronized StoreFuture flushToStore() { + Connection conn = null; try { - if(_metaData != null) + if(!stored()) { - Connection conn = newConnection(); + conn = newConnection(); store(conn); conn.commit(); - conn.close(); storedSizeChange(getMetaData().getContentSize()); } } @@ -2161,12 +2160,24 @@ public class DerbyMessageStore implements MessageStore } throw new RuntimeException(e); } + finally + { + closeConnection(conn); + } return StoreFuture.IMMEDIATE_FUTURE; } + @Override + public void remove() + { + int delta = getMetaData().getContentSize(); + DerbyMessageStore.this.removeMessage(_messageId); + storedSizeChange(-delta); + } + private synchronized void store(final Connection conn) throws SQLException { - if(_metaData != null) + if (!stored()) { try { @@ -2179,20 +2190,17 @@ public class DerbyMessageStore implements MessageStore _metaData = null; _data = null; } - } - if(_logger.isDebugEnabled()) - { - _logger.debug("Storing message " + _messageId + " to store"); + if(_logger.isDebugEnabled()) + { + _logger.debug("Storing message " + _messageId + " to store"); + } } } - @Override - public void remove() + private boolean stored() { - int delta = getMetaData().getContentSize(); - DerbyMessageStore.this.removeMessage(_messageId); - storedSizeChange(-delta); + return _metaData == null || _isRecovered; } } |