summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java89
1 files changed, 50 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 580e7d21f0..e866ad5078 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.message;
import org.apache.qpid.transport.*;
+import org.apache.qpid.server.store.StoredMessage;
import java.util.concurrent.atomic.AtomicLong;
import java.nio.ByteBuffer;
@@ -29,76 +30,65 @@ import java.lang.ref.WeakReference;
public class MessageTransferMessage implements InboundMessage, ServerMessage
{
- private static final AtomicLong _numberSource = new AtomicLong(0L);
-
- private final MessageTransfer _xfr;
- private final DeliveryProperties _deliveryProps;
- private final MessageProperties _messageProps;
- private final AMQMessageHeader _messageHeader;
- private final long _messageNumber;
- private final long _arrivalTime;
+
+
+ private StoredMessage<MessageMetaData_0_10> _storeMessage;
+
+
private WeakReference<Session> _sessionRef;
- public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef)
- {
- this(_numberSource.getAndIncrement(), xfr, sessionRef);
- }
- public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef)
+ public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
- _xfr = xfr;
- _messageNumber = messageNumber;
- Header header = _xfr.getHeader();
- if(header != null)
- {
- _deliveryProps = header.get(DeliveryProperties.class);
- _messageProps = header.get(MessageProperties.class);
- }
- else
- {
- _deliveryProps = null;
- _messageProps = null;
- }
- _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
- _arrivalTime = System.currentTimeMillis();
+ _storeMessage = storeMessage;
_sessionRef = sessionRef;
+
+ }
+
+ private MessageMetaData_0_10 getMetaData()
+ {
+ return _storeMessage.getMetaData();
}
public String getRoutingKey()
{
- return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
+ return getMetaData().getRoutingKey();
+
}
public AMQMessageHeader getMessageHeader()
{
- return _messageHeader;
+ return getMetaData().getMessageHeader();
}
public boolean isPersistent()
{
- return (_deliveryProps != null) && (_deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+ return getMetaData().isPersistent();
}
+
public boolean isRedelivered()
{
+ // The *Message* is never redelivered, only queue entries are... this is here so that filters
+ // can run against the message on entry to an exchange
return false;
}
public long getSize()
{
- return _xfr.getBodySize();
+ return getMetaData().getSize();
}
public boolean isImmediate()
{
- return _deliveryProps != null && _deliveryProps.getImmediate();
+ return getMetaData().isImmediate();
}
public long getExpiration()
{
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+ return getMetaData().getExpiration();
}
public MessageReference newReference()
@@ -108,23 +98,43 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
public Long getMessageNumber()
{
- return _messageNumber;
+ return _storeMessage.getMessageNumber();
}
public long getArrivalTime()
{
- return _arrivalTime;
+ return getMetaData().getArrivalTime();
}
- public Header getHeader()
+ public int getContent(ByteBuffer buf, int offset)
{
- return _xfr.getHeader();
+ return _storeMessage.getContent(offset, buf);
+ }
+ public Header getHeader()
+ {
+ return getMetaData().getHeader();
}
public ByteBuffer getBody()
{
- return _xfr.getBody();
+ ByteBuffer body = getMetaData().getBody();
+ if(body == null)
+ {
+ final int size = (int) getSize();
+ int pos = 0;
+ body = ByteBuffer.allocate(size);
+
+ while(pos < size)
+ {
+ pos += getContent(body, pos);
+ }
+
+ body.flip();
+
+ getMetaData().setBody(body.duplicate());
+ }
+ return body;
}
public Session getSession()
@@ -132,4 +142,5 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
return _sessionRef == null ? null : _sessionRef.get();
}
+
}