summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java104
1 files changed, 94 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 140a815f57..fbce1666b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -23,9 +23,10 @@ package org.apache.qpid.server.protocol.v1_0;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_1_0;
import org.apache.qpid.server.message.MessageReference;
@@ -34,11 +35,45 @@ import org.apache.qpid.server.store.StoredMessage;
public class Message_1_0 implements ServerMessage, InboundMessage
{
+
+
+ private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
+
+ private volatile int _referenceCount = 0;
+
private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
private WeakReference<Session_1_0> _session;
+ public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ _storedMessage = storedMessage;
+ _session = null;
+ _fragments = restoreFragments(storedMessage);
+ }
+
+ private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
+ final int FRAGMENT_SIZE = 2048;
+ int offset = 0;
+ ByteBuffer b;
+ do
+ {
+
+ b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+ if(b.hasRemaining())
+ {
+ fragments.add(b);
+ offset+= b.remaining();
+ }
+ }
+ while(b.hasRemaining());
+ return fragments;
+ }
+
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
final Session_1_0 session)
@@ -136,11 +171,6 @@ public class Message_1_0 implements ServerMessage, InboundMessage
return buf;
}
- public SessionConfig getSessionConfig()
- {
- return null; //TODO
- }
-
public List<ByteBuffer> getFragments()
{
return _fragments;
@@ -148,7 +178,61 @@ public class Message_1_0 implements ServerMessage, InboundMessage
public Session_1_0 getSession()
{
- return _session.get();
+ return _session == null ? null : _session.get();
+ }
+
+
+ public boolean incrementReference()
+ {
+ if(_refCountUpdater.incrementAndGet(this) <= 0)
+ {
+ _refCountUpdater.decrementAndGet(this);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _refCountUpdater.decrementAndGet(this);
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _refCountUpdater.set(this,Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_storedMessage != null)
+ {
+ _storedMessage.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + getMessageNumber()
+ + " has gone below 0.");
+ }
+ }
}
public static class Reference extends MessageReference<Message_1_0>
@@ -160,13 +244,13 @@ public class Message_1_0 implements ServerMessage, InboundMessage
protected void onReference(Message_1_0 message)
{
-
+ message.incrementReference();
}
protected void onRelease(Message_1_0 message)
{
-
+ message.decrementReference();
}
-}
+ }
}