diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-01-29 16:41:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-01-29 16:41:50 +0000 |
commit | 6e051fc4b84042cefe787c76d6f675f024602aa6 (patch) | |
tree | 774c5004af3f046197f512fbec34f63c914025d8 | |
parent | 5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (diff) | |
download | qpid-python-6e051fc4b84042cefe787c76d6f675f024602aa6.tar.gz |
filled in a bunch of stubs in AMQMessage and made AMQMessage.getContents() return duplicate ByteBuffers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501099 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 39 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3c2c44d422..b0ef08d0a5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -281,7 +281,7 @@ public class AMQChannel mtb.destination = destination; ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize()); for (ByteBuffer bb : msg.getContents()) { - buf.put(bb.duplicate()); + buf.put(bb); } buf.flip(); mtb.body = new Content(Content.TypeEnum.INLINE_T, buf); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b074d0ffa8..63643923b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.qpid.AMQException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.LinkedList; import java.util.Set; @@ -53,6 +54,23 @@ public class AMQMessage private List<ByteBuffer> _contents; + private Iterable<ByteBuffer> _dupContentsIterable = new Iterable() { + public Iterator<ByteBuffer> iterator() { + return new Iterator() { + private Iterator<ByteBuffer> iter = _contents.iterator(); + public boolean hasNext() { + return iter.hasNext(); + } + public ByteBuffer next() { + return iter.next().duplicate(); + } + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + private boolean _redelivered; private final long _messageId; @@ -202,7 +220,7 @@ public class AMQMessage } public String getCorrelationId() { - throw new Error("XXX"); + return _transferBody.getCorrelationId(); } public void setPriority(byte priority) { @@ -210,7 +228,7 @@ public class AMQMessage } public byte getPriority() { - throw new Error("XXX"); + return (byte) _transferBody.getPriority(); } public void setExpiration(long l) { @@ -218,7 +236,7 @@ public class AMQMessage } public long getExpiration() { - throw new Error("XXX"); + return _transferBody.getExpiration(); } public void setTimestamp(long l) { @@ -226,19 +244,24 @@ public class AMQMessage } public long getTimestamp() { - throw new Error("XXX"); + return _transferBody.getTimestamp(); } public String getContentType() { - throw new Error("XXX"); + return _transferBody.getContentType(); } public String getEncoding() { - throw new Error("XXX"); + return _transferBody.getContentEncoding(); } public byte[] getMessageBytes() { - throw new Error("XXX"); + byte[] result = new byte[(int) getBodySize()]; + int offset = 0; + for (ByteBuffer bb : getContents()) { + bb.get(result, offset, bb.remaining()); + } + return result; } public void storeMessage() throws AMQException @@ -254,8 +277,8 @@ public class AMQMessage return _transferBody; } - public List<ByteBuffer> getContents() { - return _contents; + public Iterable<ByteBuffer> getContents() { + return _dupContentsIterable; } public List<AMQBody> getPayload() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 40032651aa..d3d52cd627 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,8 +274,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } + // get message content - byte[] msgContent = msg.getMessageBytes(); + byte[] bytes = msg.getMessageBytes(); + Byte[] msgContent = new Byte[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + msgContent[i] = Byte.valueOf(bytes[i]); + } // Create header attributes list String mimeType = msg.getContentType(); |