diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java | 104 |
1 files changed, 94 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index 140a815f57..fbce1666b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -23,9 +23,10 @@ package org.apache.qpid.server.protocol.v1_0; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.message.MessageReference; @@ -34,11 +35,45 @@ import org.apache.qpid.server.store.StoredMessage; public class Message_1_0 implements ServerMessage, InboundMessage { + + + private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount"); + + private volatile int _referenceCount = 0; + private final StoredMessage<MessageMetaData_1_0> _storedMessage; private List<ByteBuffer> _fragments; private WeakReference<Session_1_0> _session; + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) + { + _storedMessage = storedMessage; + _session = null; + _fragments = restoreFragments(storedMessage); + } + + private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) + { + ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); + final int FRAGMENT_SIZE = 2048; + int offset = 0; + ByteBuffer b; + do + { + + b = storedMessage.getContent(offset,FRAGMENT_SIZE); + if(b.hasRemaining()) + { + fragments.add(b); + offset+= b.remaining(); + } + } + while(b.hasRemaining()); + return fragments; + } + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments, final Session_1_0 session) @@ -136,11 +171,6 @@ public class Message_1_0 implements ServerMessage, InboundMessage return buf; } - public SessionConfig getSessionConfig() - { - return null; //TODO - } - public List<ByteBuffer> getFragments() { return _fragments; @@ -148,7 +178,61 @@ public class Message_1_0 implements ServerMessage, InboundMessage public Session_1_0 getSession() { - return _session.get(); + return _session == null ? null : _session.get(); + } + + + public boolean incrementReference() + { + if(_refCountUpdater.incrementAndGet(this) <= 0) + { + _refCountUpdater.decrementAndGet(this); + return false; + } + else + { + return true; + } + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * + * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference() + { + int count = _refCountUpdater.decrementAndGet(this); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _refCountUpdater.set(this,Integer.MIN_VALUE/2); + + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_storedMessage != null) + { + _storedMessage.remove(); + } + } + else + { + if (count < 0) + { + throw new RuntimeException("Reference count for message id " + getMessageNumber() + + " has gone below 0."); + } + } } public static class Reference extends MessageReference<Message_1_0> @@ -160,13 +244,13 @@ public class Message_1_0 implements ServerMessage, InboundMessage protected void onReference(Message_1_0 message) { - + message.incrementReference(); } protected void onRelease(Message_1_0 message) { - + message.decrementReference(); } -} + } } |