diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 165 |
1 files changed, 85 insertions, 80 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index c60c22c4e4..aa7ea16afc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -36,21 +36,15 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.txn.TransactionalContext; -/** - * Combines the information that make up a deliverable message into a more manageable form. - */ +/** Combines the information that make up a deliverable message into a more manageable form. */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); - /** - * Used in clustering - */ + /** Used in clustering */ private Set<Object> _tokens; - /** - * Only use in clustering - should ideally be removed? - */ + /** Only use in clustering - should ideally be removed? */ private AMQProtocolSession _publisher; private final Long _messageId; @@ -63,16 +57,14 @@ public class AMQMessage private TransactionalContext _txnContext; /** - * Flag to indicate whether message has been delivered to a - * consumer. Used in implementing return functionality for + * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; /** - * We need to keep track of whether the message was 'immediate' - * as in extreme circumstances, when the checkDelieveredToConsumer - * is called, the message may already have been received and acknowledged, - * and the body removed from the store. + * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the + * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body + * removed from the store. */ private boolean _immediate; @@ -80,11 +72,16 @@ public class AMQMessage private TransientMessageData _transientMessageData = new TransientMessageData(); + private Subscription _takenBySubcription; + public boolean isTaken() + { + return _taken.get(); + } /** - * Used to iterate through all the body frames associated with this message. Will not - * keep all the data in memory therefore is memory-efficient. + * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory + * therefore is memory-efficient. */ private class BodyFrameIterator implements Iterator<AMQDataBlock> { @@ -103,7 +100,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; } catch (AMQException e) { @@ -153,7 +150,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; } catch (AMQException e) { @@ -166,7 +163,7 @@ public class AMQMessage { try { - return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index); + return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index); } catch (AMQException e) { @@ -196,12 +193,14 @@ public class AMQMessage } /** - * Used when recovering, i.e. when the message store is creating references to messages. - * In that case, the normal enqueue/routingComplete is not done since the recovery process - * is responsible for routing the messages to queues. + * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal + * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to + * queues. + * * @param messageId * @param store * @param factory + * * @throws AMQException */ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException @@ -213,8 +212,8 @@ public class AMQMessage } /** - * Used in testing only. This allows the passing of the content header immediately - * on construction. + * Used in testing only. This allows the passing of the content header immediately on construction. + * * @param messageId * @param info * @param txnContext @@ -228,14 +227,15 @@ public class AMQMessage } /** - * Used in testing only. This allows the passing of the content header and some body fragments on - * construction. + * Used in testing only. This allows the passing of the content header and some body fragments on construction. + * * @param messageId * @param info * @param txnContext * @param contentHeader * @param destinationQueues * @param contentBodies + * * @throws AMQException */ public AMQMessage(Long messageId, MessagePublishInfo info, @@ -280,7 +280,7 @@ public class AMQMessage } else { - return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId); + return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId); } } @@ -338,16 +338,14 @@ public class AMQMessage return _messageId; } - /** - * Threadsafe. Increment the reference count on the message. - */ + /** Threadsafe. Increment the reference count on the message. */ public void incrementReference() { _referenceCount.incrementAndGet(); if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); } } @@ -355,7 +353,7 @@ public class AMQMessage /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. - * + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ @@ -371,7 +369,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); } @@ -394,13 +392,13 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); if (_referenceCount.get() < 0) { Thread.dumpStack(); } } - if(_referenceCount.get()<0) + if (_referenceCount.get() < 0) { throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0."); } @@ -419,7 +417,8 @@ public class AMQMessage /** * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer + * + * @return _deliveredToConsumer */ public boolean getDeliveredToConsumer() { @@ -427,10 +426,17 @@ public class AMQMessage } - - public boolean taken() + public boolean taken(Subscription sub) { - return _taken.getAndSet(true); + if (_taken.getAndSet(true)) + { + return true; + } + else + { + _takenBySubcription = sub; + return false; + } } public void release() @@ -441,9 +447,9 @@ public class AMQMessage public boolean checkToken(Object token) { - if(_tokens==null) + if (_tokens == null) { - _tokens = new HashSet<Object>(); + _tokens = new HashSet<Object>(); } if (_tokens.contains(token)) @@ -458,11 +464,12 @@ public class AMQMessage } /** - * Registers a queue to which this message is to be delivered. This is - * called from the exchange when it is routing the message. This will be called before any content bodies have - * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria. + * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing + * the message. This will be called before any content bodies have been received so that the choice of + * AMQMessageHandle implementation can be picked based on various criteria. * * @param queue the queue + * * @throws org.apache.qpid.AMQException if there is an error enqueuing the message */ public void enqueue(AMQQueue queue) throws AMQException @@ -483,16 +490,15 @@ public class AMQMessage } else { - return _messageHandle.isPersistent(getStoreContext(),_messageId); + return _messageHandle.isPersistent(getStoreContext(), _messageId); } } /** * Called to enforce the 'immediate' flag. * - * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer */ public void checkDeliveredToConsumer() throws NoConsumersException, AMQException { @@ -500,7 +506,7 @@ public class AMQMessage if (_immediate && !_deliveredToConsumer) { throw new NoConsumersException(this); - } + } } public MessagePublishInfo getMessagePublishInfo() throws AMQException @@ -512,7 +518,7 @@ public class AMQMessage } else { - pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId); + pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } return pb; } @@ -533,10 +539,7 @@ public class AMQMessage } - /** - * Called when this message is delivered to a consumer. (used to - * implement the 'immediate' flag functionality). - */ + /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; @@ -566,7 +569,7 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - _txnContext.deliver(this, q); + _txnContext.deliver(this, q, true); } } finally @@ -583,23 +586,22 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); - if(bodyCount == 0) + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + contentHeader); protocolSession.writeFrame(compositeBlock); } else { - // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -609,9 +611,9 @@ public class AMQMessage // // Now start writing out the other content bodies // - for(int i = 1; i < bodyCount; i++) + for (int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -627,22 +629,21 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); - if(bodyCount == 0) + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + contentHeader); protocolSession.writeFrame(compositeBlock); } else { - // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -652,9 +653,9 @@ public class AMQMessage // // Now start writing out the other content bodies // - for(int i = 1; i < bodyCount; i++) + for (int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -685,10 +686,10 @@ public class AMQMessage AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); + deliveryTag, pb.getExchange(), + queueSize, + _messageHandle.isRedelivered(), + pb.getRoutingKey()); ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? getOkFrame.writePayload(buf); buf.flip(); @@ -699,7 +700,7 @@ public class AMQMessage { AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), + protocolSession.getProtocolMinorVersion(), getMessagePublishInfo().getExchange(), replyCode, replyText, getMessagePublishInfo().getRoutingKey()); @@ -757,12 +758,11 @@ public class AMQMessage } catch (AMQException e) { - _log.error(e.toString(),e); + _log.error(e.toString(), e); return 0; } - } - + } public void restoreTransientMessageData() throws AMQException @@ -771,7 +771,7 @@ public class AMQMessage transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); transientMessageData.setContentHeaderBody(getContentHeaderBody()); transientMessageData.addBodyLength(getContentHeaderBody().getSize()); - _transientMessageData = transientMessageData; + _transientMessageData = transientMessageData; } @@ -784,6 +784,11 @@ public class AMQMessage public String toString() { return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + - _taken; + _taken + " by:" + _takenBySubcription; + } + + public Subscription getDeliveredSubscription() + { + return _takenBySubcription; } } |