summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java99
1 files changed, 4 insertions, 95 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index e0c181a5fc..db32f13d8d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage implements ServerMessage
+public class AMQMessage extends AbstractServerMessageImpl
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
+ super(handle);
+
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage
_channelRef = channelRef;
}
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
-
public void setExpiration(final long expiration)
{
@@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public MessageMetaData getMessageMetaData()
{
return _handle.getMetaData();
@@ -117,88 +105,12 @@ public class AMQMessage implements ServerMessage
return getMessageMetaData().getContentHeaderBody();
}
-
-
public Long getMessageId()
{
return _handle.getMessageNumber();
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
-
- if(_referenceCount.addAndGet(count) <= 0)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- *
- * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference()
- {
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_handle != null)
- {
- _handle.remove();
-
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
-
- /**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
@@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage
public String toString()
{
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
-
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
}
public int getContent(ByteBuffer buf, int offset)