diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d7e317cfa5..73264c5310 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -44,8 +44,7 @@ public class AMQMessage private final Set<Object> _tokens = new HashSet<Object>(); /** - * Used in clustering - * TODO need to get rid of this + * Only use in clustering - should ideally be removed? */ private AMQProtocolSession _publisher; @@ -156,7 +155,8 @@ public class AMQMessage } } - public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; @@ -175,16 +175,41 @@ public class AMQMessage * @param txnContext * @param contentHeader */ - public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext, - ContentHeaderBody contentHeader) throws AMQException + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException { this(messageId, publishBody, txnContext); setContentHeaderBody(contentHeader); } + /** + * Used in testing only. This allows the passing of the content header and some body fragments on + * construction. + * @param messageId + * @param publishBody + * @param txnContext + * @param contentHeader + * @param destinationQueues + * @param contentBodies + * @throws AMQException + */ + public AMQMessage(long messageId, BasicPublishBody publishBody, + TransactionalContext txnContext, + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, + List<ContentBody> contentBodies, MessageStore messageStore, + MessageHandleFactory messageHandleFactory) throws AMQException + { + this(messageId, publishBody, txnContext, contentHeader); + _destinationQueues = destinationQueues; + routingComplete(messageStore, messageHandleFactory); + for (ContentBody cb : contentBodies) + { + addContentBodyFrame(cb); + } + } + protected AMQMessage(AMQMessage msg) throws AMQException { - _publisher = msg._publisher; _messageId = msg._messageId; _messageHandle = msg._messageHandle; _txnContext = msg._txnContext; @@ -203,7 +228,14 @@ public class AMQMessage public ContentHeaderBody getContentHeaderBody() throws AMQException { - return _messageHandle.getContentHeaderBody(_messageId); + if (_contentHeaderBody != null) + { + return _contentHeaderBody; + } + else + { + return _messageHandle.getContentHeaderBody(_messageId); + } } public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) @@ -292,7 +324,12 @@ public class AMQMessage { _log.debug("Ref count on message " + _messageId + " is zero; removing message"); } - _messageHandle.removeMessage(_messageId); + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_messageHandle != null) + { + _messageHandle.removeMessage(_messageId); + } } catch (AMQException e) { @@ -403,7 +440,7 @@ public class AMQMessage { return _messageHandle.isRedelivered(); } - + /** * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). @@ -494,10 +531,10 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) +private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, _publishBody.exchange, - _publishBody.routingKey); + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, getPublishBody().exchange, + getPublishBody().routingKey); ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? returnFrame.writePayload(buf); buf.flip(); |