summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java50
1 files changed, 29 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 36ac8b3d40..bc9cda7f71 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -1575,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -2037,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore
{
private final long _messageId;
+ private final boolean _isRecovered;
+
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -2045,21 +2047,18 @@ public class DerbyMessageStore implements MessageStore
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
-
+ _isRecovered = isRecovered;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _metaData = metaData;
- }
+ _metaData = metaData;
}
@Override
@@ -2140,16 +2139,16 @@ public class DerbyMessageStore implements MessageStore
@Override
public synchronized StoreFuture flushToStore()
{
+ Connection conn = null;
try
{
- if(_metaData != null)
+ if(!stored())
{
- Connection conn = newConnection();
+ conn = newConnection();
store(conn);
conn.commit();
- conn.close();
storedSizeChange(getMetaData().getContentSize());
}
}
@@ -2161,12 +2160,24 @@ public class DerbyMessageStore implements MessageStore
}
throw new RuntimeException(e);
}
+ finally
+ {
+ closeConnection(conn);
+ }
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
private synchronized void store(final Connection conn) throws SQLException
{
- if(_metaData != null)
+ if (!stored())
{
try
{
@@ -2179,20 +2190,17 @@ public class DerbyMessageStore implements MessageStore
_metaData = null;
_data = null;
}
- }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Storing message " + _messageId + " to store");
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
+ }
}
}
- @Override
- public void remove()
+ private boolean stored()
{
- int delta = getMetaData().getContentSize();
- DerbyMessageStore.this.removeMessage(_messageId);
- storedSizeChange(-delta);
+ return _metaData == null || _isRecovered;
}
}