summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java61
1 files changed, 49 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d7e317cfa5..73264c5310 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -44,8 +44,7 @@ public class AMQMessage
private final Set<Object> _tokens = new HashSet<Object>();
/**
- * Used in clustering
- * TODO need to get rid of this
+ * Only use in clustering - should ideally be removed?
*/
private AMQProtocolSession _publisher;
@@ -156,7 +155,8 @@ public class AMQMessage
}
}
- public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
@@ -175,16 +175,41 @@ public class AMQMessage
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext,
- ContentHeaderBody contentHeader) throws AMQException
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, publishBody, txnContext);
setContentHeaderBody(contentHeader);
}
+ /**
+ * Used in testing only. This allows the passing of the content header and some body fragments on
+ * construction.
+ * @param messageId
+ * @param publishBody
+ * @param txnContext
+ * @param contentHeader
+ * @param destinationQueues
+ * @param contentBodies
+ * @throws AMQException
+ */
+ public AMQMessage(long messageId, BasicPublishBody publishBody,
+ TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
+ List<ContentBody> contentBodies, MessageStore messageStore,
+ MessageHandleFactory messageHandleFactory) throws AMQException
+ {
+ this(messageId, publishBody, txnContext, contentHeader);
+ _destinationQueues = destinationQueues;
+ routingComplete(messageStore, messageHandleFactory);
+ for (ContentBody cb : contentBodies)
+ {
+ addContentBodyFrame(cb);
+ }
+ }
+
protected AMQMessage(AMQMessage msg) throws AMQException
{
- _publisher = msg._publisher;
_messageId = msg._messageId;
_messageHandle = msg._messageHandle;
_txnContext = msg._txnContext;
@@ -203,7 +228,14 @@ public class AMQMessage
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _messageHandle.getContentHeaderBody(_messageId);
+ if (_contentHeaderBody != null)
+ {
+ return _contentHeaderBody;
+ }
+ else
+ {
+ return _messageHandle.getContentHeaderBody(_messageId);
+ }
}
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
@@ -292,7 +324,12 @@ public class AMQMessage
{
_log.debug("Ref count on message " + _messageId + " is zero; removing message");
}
- _messageHandle.removeMessage(_messageId);
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_messageHandle != null)
+ {
+ _messageHandle.removeMessage(_messageId);
+ }
}
catch (AMQException e)
{
@@ -403,7 +440,7 @@ public class AMQMessage
{
return _messageHandle.isRedelivered();
}
-
+
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
@@ -494,10 +531,10 @@ public class AMQMessage
return buf;
}
- private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText)
+private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException
{
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, _publishBody.exchange,
- _publishBody.routingKey);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, getPublishBody().exchange,
+ getPublishBody().routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
returnFrame.writePayload(buf);
buf.flip();