diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-29 20:05:31 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-29 20:05:31 +0000 |
commit | 7c23b488255727d21464c8665cb714f79bbf8a4e (patch) | |
tree | 37d057fe6bc674ef75e1cd2dd7f90d6c2e142dea | |
parent | 065cf4329f0fc01bf756f015bcb605713a354c38 (diff) | |
download | qpid-python-7c23b488255727d21464c8665cb714f79bbf8a4e.tar.gz |
NO-JIRA: fixes to BytesMessage / StreamMessage for 1-0 JMS Impl
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162969 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 77 insertions, 20 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java index 5963068a69..97cbc837c4 100644 --- a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java +++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java @@ -97,6 +97,19 @@ public class Hello System.out.println("======================="); System.out.println(((BytesMessage) message).readUTF()); } + else if(message instanceof StreamMessage) + { + System.out.println("Received Stream Message:"); + System.out.println("========================"); + StreamMessage streamMessage = (StreamMessage)message; + Object o = streamMessage.readObject(); + System.out.println(o.getClass().getName() + ": " + o); + o = streamMessage.readObject(); + System.out.println(o.getClass().getName() + ": " + o); + o = streamMessage.readObject(); + System.out.println(o.getClass().getName() + ": " + o); + + } } catch (JMSException e) { @@ -126,7 +139,12 @@ public class Hello messageProducer.send(bytesMessage); - + StreamMessage streamMessage = producersession.createStreamMessage(); + streamMessage.writeBoolean(true); + streamMessage.writeLong(18031974L); + streamMessage.writeString("this is a stream Message"); + streamMessage.writeChar('£'); + messageProducer.send(streamMessage); Thread.sleep(50000L); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java index f750d8ef95..14a97b8c4f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java @@ -28,24 +28,25 @@ import org.apache.qpid.amqp_1_0.type.messaging.Properties; import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
+import java.io.*;
import java.util.*;
public class BytesMessageImpl extends MessageImpl implements BytesMessage
{
- private Data _data;
private DataInputStream _dataAsInput;
private DataOutputStream _dataAsOutput;
+ private ByteArrayOutputStream _bytesOut;
+ private Data _dataIn;
// message created for reading
protected BytesMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Data data,
Footer footer, SessionImpl session)
{
super(header, properties, appProperties, footer, session);
- _data = data;
+ _dataIn = data;
+ final Binary dataBuffer = data.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
}
// message created to be sent
@@ -53,14 +54,34 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage {
super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
- _data = new Data(new Binary(new byte[0]));
- _dataAsOutput = null;
+
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ }
+
+
+ private Data getDataSection()
+ {
+ if(_bytesOut != null)
+ {
+ return new Data(new Binary(_bytesOut.toByteArray()));
+ }
+ else
+ {
+ return _dataIn;
+ }
+ }
+
+ @Override
+ protected boolean isReadOnly()
+ {
+ return _dataIn != null;
}
public long getBodyLength() throws JMSException
{
checkReadable();
- return _data.getValue().getLength();
+ return getDataSection().getValue().getLength();
}
public boolean readBoolean() throws JMSException
@@ -477,8 +498,9 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage sections.add(getHeader());
sections.add(getProperties());
sections.add(getApplicationProperties());
- sections.add(_data);
+ sections.add(getDataSection());
sections.add(getFooter());
return sections;
}
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java index 84cf1bb4c6..e78ab4ffe1 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java @@ -97,6 +97,11 @@ class MessageFactory {
message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
}
+ else if(bodySection instanceof AmqpSequence)
+ {
+ message = new StreamMessageImpl(header, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ }
+
/*else if(bodySection instanceof AmqpDataSection)
{
AmqpDataSection dataSection = (AmqpDataSection) bodySection;
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java index c5be6375c6..50238460b2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java @@ -30,6 +30,9 @@ import java.util.*; public class StreamMessageImpl extends MessageImpl implements StreamMessage
{
private List _list;
+ private boolean _readOnly;
+ private int _position = -1;
+ private int _offset = -1;
protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
Footer footer, SessionImpl session)
@@ -42,6 +45,7 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage {
super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
+ _list = new ArrayList();
}
public StreamMessageImpl(final Header header,
@@ -105,7 +109,14 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage public Object readObject() throws JMSException
{
- return null; //TODO
+ if(_offset == -1)
+ {
+ return _list.get(++_position);
+ }
+ else
+ {
+ return null; //TODO
+ }
}
public void writeBoolean(final boolean b) throws JMSException
@@ -115,42 +126,42 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage public void writeByte(final byte b) throws JMSException
{
- //TODO
+ _list.add(b);
}
public void writeShort(final short i) throws JMSException
{
- //TODO
+ _list.add(i);
}
public void writeChar(final char c) throws JMSException
{
- //TODO
+ _list.add(c);
}
public void writeInt(final int i) throws JMSException
{
- //TODO
+ _list.add(i);
}
public void writeLong(final long l) throws JMSException
{
- //TODO
+ _list.add(l);
}
public void writeFloat(final float v) throws JMSException
{
- //TODO
+ _list.add(v);
}
public void writeDouble(final double v) throws JMSException
{
- //TODO
+ _list.add(v);
}
public void writeString(final String s) throws JMSException
{
- //TODO
+ _list.add(s);
}
public void writeBytes(final byte[] bytes) throws JMSException
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java index be82c72b70..3e0164705f 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java @@ -107,6 +107,7 @@ public class ListWriter implements ValueWriter<List> {
_delegate = _nonEmptyListWriter;
}
+ _delegate.setValue(frameBody);
}
public boolean isComplete()
|