diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java | 89 |
1 files changed, 50 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index 580e7d21f0..e866ad5078 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.message; import org.apache.qpid.transport.*; +import org.apache.qpid.server.store.StoredMessage; import java.util.concurrent.atomic.AtomicLong; import java.nio.ByteBuffer; @@ -29,76 +30,65 @@ import java.lang.ref.WeakReference; public class MessageTransferMessage implements InboundMessage, ServerMessage { - private static final AtomicLong _numberSource = new AtomicLong(0L); - - private final MessageTransfer _xfr; - private final DeliveryProperties _deliveryProps; - private final MessageProperties _messageProps; - private final AMQMessageHeader _messageHeader; - private final long _messageNumber; - private final long _arrivalTime; + + + private StoredMessage<MessageMetaData_0_10> _storeMessage; + + private WeakReference<Session> _sessionRef; - public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef) - { - this(_numberSource.getAndIncrement(), xfr, sessionRef); - } - public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef) + public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef) { - _xfr = xfr; - _messageNumber = messageNumber; - Header header = _xfr.getHeader(); - if(header != null) - { - _deliveryProps = header.get(DeliveryProperties.class); - _messageProps = header.get(MessageProperties.class); - } - else - { - _deliveryProps = null; - _messageProps = null; - } - _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); - _arrivalTime = System.currentTimeMillis(); + _storeMessage = storeMessage; _sessionRef = sessionRef; + + } + + private MessageMetaData_0_10 getMetaData() + { + return _storeMessage.getMetaData(); } public String getRoutingKey() { - return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); + return getMetaData().getRoutingKey(); + } public AMQMessageHeader getMessageHeader() { - return _messageHeader; + return getMetaData().getMessageHeader(); } public boolean isPersistent() { - return (_deliveryProps != null) && (_deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + return getMetaData().isPersistent(); } + public boolean isRedelivered() { + // The *Message* is never redelivered, only queue entries are... this is here so that filters + // can run against the message on entry to an exchange return false; } public long getSize() { - return _xfr.getBodySize(); + return getMetaData().getSize(); } public boolean isImmediate() { - return _deliveryProps != null && _deliveryProps.getImmediate(); + return getMetaData().isImmediate(); } public long getExpiration() { - return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); + return getMetaData().getExpiration(); } public MessageReference newReference() @@ -108,23 +98,43 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage public Long getMessageNumber() { - return _messageNumber; + return _storeMessage.getMessageNumber(); } public long getArrivalTime() { - return _arrivalTime; + return getMetaData().getArrivalTime(); } - public Header getHeader() + public int getContent(ByteBuffer buf, int offset) { - return _xfr.getHeader(); + return _storeMessage.getContent(offset, buf); + } + public Header getHeader() + { + return getMetaData().getHeader(); } public ByteBuffer getBody() { - return _xfr.getBody(); + ByteBuffer body = getMetaData().getBody(); + if(body == null) + { + final int size = (int) getSize(); + int pos = 0; + body = ByteBuffer.allocate(size); + + while(pos < size) + { + pos += getContent(body, pos); + } + + body.flip(); + + getMetaData().setBody(body.duplicate()); + } + return body; } public Session getSession() @@ -132,4 +142,5 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage return _sessionRef == null ? null : _sessionRef.get(); } + } |