summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-27 20:39:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-27 20:39:01 +0000
commitc8fd56202be79d5a1c4bc0c27552072fe6d0b580 (patch)
tree28518b16fcc0437e35905188e0e68817e6a27d3f /qpid/java/broker-plugins
parentd1b8f6e25ceaa628fd4f1c97847f5fb557032f7f (diff)
downloadqpid-python-c8fd56202be79d5a1c4bc0c27552072fe6d0b580.tar.gz
QPID-6331 : Allow AMQP 1.0 message content to be evicted to disk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655154 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java46
1 files changed, 32 insertions, 14 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 36796851e0..18f5ba9e2e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -21,23 +21,42 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
- private List<ByteBuffer> _fragments;
+ private volatile SoftReference<List<ByteBuffer>> _fragmentsRef;
private long _arrivalTime;
+ private final long _size;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
super(storedMessage, null);
- _fragments = restoreFragments(storedMessage);
+ final List<ByteBuffer> fragments = restoreFragments(getStoredMessage());
+ _fragmentsRef = new SoftReference<>(fragments);
+ _size = calculateSize(fragments);
+ }
+
+ private long calculateSize(final List<ByteBuffer> fragments)
+ {
+
+ long size = 0l;
+ if(fragments != null)
+ {
+ for(ByteBuffer buf : fragments)
+ {
+ size += buf.remaining();
+ }
+ }
+ return size;
}
private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
@@ -65,7 +84,8 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
final Object connectionReference)
{
super(storedMessage, connectionReference);
- _fragments = fragments;
+ _fragmentsRef = new SoftReference<>(fragments);
+ _size = calculateSize(fragments);
_arrivalTime = System.currentTimeMillis();
}
@@ -94,16 +114,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
public long getSize()
{
- long size = 0l;
- if(_fragments != null)
- {
- for(ByteBuffer buf : _fragments)
- {
- size += buf.remaining();
- }
- }
-
- return size;
+ return _size;
}
public long getExpiration()
@@ -118,7 +129,14 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
public List<ByteBuffer> getFragments()
{
- return _fragments;
+
+ List<ByteBuffer> fragments = _fragmentsRef.get();
+ if(fragments == null)
+ {
+ fragments = restoreFragments(getStoredMessage());
+ _fragmentsRef = new SoftReference<>(fragments);
+ }
+ return fragments;
}
}