summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java165
1 files changed, 85 insertions, 80 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c60c22c4e4..aa7ea16afc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -36,21 +36,15 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /**
- * Used in clustering
- */
+ /** Used in clustering */
private Set<Object> _tokens;
- /**
- * Only use in clustering - should ideally be removed?
- */
+ /** Only use in clustering - should ideally be removed? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -63,16 +57,14 @@ public class AMQMessage
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a
- * consumer. Used in implementing return functionality for
+ * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
/**
- * We need to keep track of whether the message was 'immediate'
- * as in extreme circumstances, when the checkDelieveredToConsumer
- * is called, the message may already have been received and acknowledged,
- * and the body removed from the store.
+ * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
+ * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
+ * removed from the store.
*/
private boolean _immediate;
@@ -80,11 +72,16 @@ public class AMQMessage
private TransientMessageData _transientMessageData = new TransientMessageData();
+ private Subscription _takenBySubcription;
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
/**
- * Used to iterate through all the body frames associated with this message. Will not
- * keep all the data in memory therefore is memory-efficient.
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
*/
private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
@@ -103,7 +100,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -153,7 +150,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -166,7 +163,7 @@ public class AMQMessage
{
try
{
- return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
}
catch (AMQException e)
{
@@ -196,12 +193,14 @@ public class AMQMessage
}
/**
- * Used when recovering, i.e. when the message store is creating references to messages.
- * In that case, the normal enqueue/routingComplete is not done since the recovery process
- * is responsible for routing the messages to queues.
+ * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+ * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+ * queues.
+ *
* @param messageId
* @param store
* @param factory
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
@@ -213,8 +212,8 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header immediately
- * on construction.
+ * Used in testing only. This allows the passing of the content header immediately on construction.
+ *
* @param messageId
* @param info
* @param txnContext
@@ -228,14 +227,15 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header and some body fragments on
- * construction.
+ * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+ *
* @param messageId
* @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
@@ -280,7 +280,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
@@ -338,16 +338,14 @@ public class AMQMessage
return _messageId;
}
- /**
- * Threadsafe. Increment the reference count on the message.
- */
+ /** Threadsafe. Increment the reference count on the message. */
public void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
}
@@ -355,7 +353,7 @@ public class AMQMessage
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -371,7 +369,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
@@ -394,13 +392,13 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
}
}
- if(_referenceCount.get()<0)
+ if (_referenceCount.get() < 0)
{
throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
}
@@ -419,7 +417,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
@@ -427,10 +426,17 @@ public class AMQMessage
}
-
- public boolean taken()
+ public boolean taken(Subscription sub)
{
- return _taken.getAndSet(true);
+ if (_taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
+ }
}
public void release()
@@ -441,9 +447,9 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens==null)
+ if (_tokens == null)
{
- _tokens = new HashSet<Object>();
+ _tokens = new HashSet<Object>();
}
if (_tokens.contains(token))
@@ -458,11 +464,12 @@ public class AMQMessage
}
/**
- * Registers a queue to which this message is to be delivered. This is
- * called from the exchange when it is routing the message. This will be called before any content bodies have
- * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
+ * the message. This will be called before any content bodies have been received so that the choice of
+ * AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
+ *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
public void enqueue(AMQQueue queue) throws AMQException
@@ -483,16 +490,15 @@ public class AMQMessage
}
else
{
- return _messageHandle.isPersistent(getStoreContext(),_messageId);
+ return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
}
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
{
@@ -500,7 +506,7 @@ public class AMQMessage
if (_immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public MessagePublishInfo getMessagePublishInfo() throws AMQException
@@ -512,7 +518,7 @@ public class AMQMessage
}
else
{
- pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
return pb;
}
@@ -533,10 +539,7 @@ public class AMQMessage
}
- /**
- * Called when this message is delivered to a consumer. (used to
- * implement the 'immediate' flag functionality).
- */
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
@@ -566,7 +569,7 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q);
+ _txnContext.deliver(this, q, true);
}
}
finally
@@ -583,23 +586,22 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -609,9 +611,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -627,22 +629,21 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -652,9 +653,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -685,10 +686,10 @@ public class AMQMessage
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,7 +700,7 @@ public class AMQMessage
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ protocolSession.getProtocolMinorVersion(),
getMessagePublishInfo().getExchange(),
replyCode, replyText,
getMessagePublishInfo().getRoutingKey());
@@ -757,12 +758,11 @@ public class AMQMessage
}
catch (AMQException e)
{
- _log.error(e.toString(),e);
+ _log.error(e.toString(), e);
return 0;
}
- }
-
+ }
public void restoreTransientMessageData() throws AMQException
@@ -771,7 +771,7 @@ public class AMQMessage
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
- _transientMessageData = transientMessageData;
+ _transientMessageData = transientMessageData;
}
@@ -784,6 +784,11 @@ public class AMQMessage
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken;
+ _taken + " by:" + _takenBySubcription;
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return _takenBySubcription;
}
}