summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-29 20:05:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-29 20:05:31 +0000
commit7c23b488255727d21464c8665cb714f79bbf8a4e (patch)
tree37d057fe6bc674ef75e1cd2dd7f90d6c2e142dea
parent065cf4329f0fc01bf756f015bcb605713a354c38 (diff)
downloadqpid-python-7c23b488255727d21464c8665cb714f79bbf8a4e.tar.gz
NO-JIRA: fixes to BytesMessage / StreamMessage for 1-0 JMS Impl
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162969 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java20
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java42
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java5
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java29
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java1
5 files changed, 77 insertions, 20 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
index 5963068a69..97cbc837c4 100644
--- a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
+++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
@@ -97,6 +97,19 @@ public class Hello
System.out.println("=======================");
System.out.println(((BytesMessage) message).readUTF());
}
+ else if(message instanceof StreamMessage)
+ {
+ System.out.println("Received Stream Message:");
+ System.out.println("========================");
+ StreamMessage streamMessage = (StreamMessage)message;
+ Object o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+
+ }
}
catch (JMSException e)
{
@@ -126,7 +139,12 @@ public class Hello
messageProducer.send(bytesMessage);
-
+ StreamMessage streamMessage = producersession.createStreamMessage();
+ streamMessage.writeBoolean(true);
+ streamMessage.writeLong(18031974L);
+ streamMessage.writeString("this is a stream Message");
+ streamMessage.writeChar('£');
+ messageProducer.send(streamMessage);
Thread.sleep(50000L);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
index f750d8ef95..14a97b8c4f 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
@@ -28,24 +28,25 @@ import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
+import java.io.*;
import java.util.*;
public class BytesMessageImpl extends MessageImpl implements BytesMessage
{
- private Data _data;
private DataInputStream _dataAsInput;
private DataOutputStream _dataAsOutput;
+ private ByteArrayOutputStream _bytesOut;
+ private Data _dataIn;
// message created for reading
protected BytesMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Data data,
Footer footer, SessionImpl session)
{
super(header, properties, appProperties, footer, session);
- _data = data;
+ _dataIn = data;
+ final Binary dataBuffer = data.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
}
// message created to be sent
@@ -53,14 +54,34 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
{
super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
- _data = new Data(new Binary(new byte[0]));
- _dataAsOutput = null;
+
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ }
+
+
+ private Data getDataSection()
+ {
+ if(_bytesOut != null)
+ {
+ return new Data(new Binary(_bytesOut.toByteArray()));
+ }
+ else
+ {
+ return _dataIn;
+ }
+ }
+
+ @Override
+ protected boolean isReadOnly()
+ {
+ return _dataIn != null;
}
public long getBodyLength() throws JMSException
{
checkReadable();
- return _data.getValue().getLength();
+ return getDataSection().getValue().getLength();
}
public boolean readBoolean() throws JMSException
@@ -477,8 +498,9 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
sections.add(getHeader());
sections.add(getProperties());
sections.add(getApplicationProperties());
- sections.add(_data);
+ sections.add(getDataSection());
sections.add(getFooter());
return sections;
}
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
index 84cf1bb4c6..e78ab4ffe1 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
@@ -97,6 +97,11 @@ class MessageFactory
{
message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
}
+ else if(bodySection instanceof AmqpSequence)
+ {
+ message = new StreamMessageImpl(header, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ }
+
/*else if(bodySection instanceof AmqpDataSection)
{
AmqpDataSection dataSection = (AmqpDataSection) bodySection;
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
index c5be6375c6..50238460b2 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
@@ -30,6 +30,9 @@ import java.util.*;
public class StreamMessageImpl extends MessageImpl implements StreamMessage
{
private List _list;
+ private boolean _readOnly;
+ private int _position = -1;
+ private int _offset = -1;
protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
Footer footer, SessionImpl session)
@@ -42,6 +45,7 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
{
super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
+ _list = new ArrayList();
}
public StreamMessageImpl(final Header header,
@@ -105,7 +109,14 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public Object readObject() throws JMSException
{
- return null; //TODO
+ if(_offset == -1)
+ {
+ return _list.get(++_position);
+ }
+ else
+ {
+ return null; //TODO
+ }
}
public void writeBoolean(final boolean b) throws JMSException
@@ -115,42 +126,42 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public void writeByte(final byte b) throws JMSException
{
- //TODO
+ _list.add(b);
}
public void writeShort(final short i) throws JMSException
{
- //TODO
+ _list.add(i);
}
public void writeChar(final char c) throws JMSException
{
- //TODO
+ _list.add(c);
}
public void writeInt(final int i) throws JMSException
{
- //TODO
+ _list.add(i);
}
public void writeLong(final long l) throws JMSException
{
- //TODO
+ _list.add(l);
}
public void writeFloat(final float v) throws JMSException
{
- //TODO
+ _list.add(v);
}
public void writeDouble(final double v) throws JMSException
{
- //TODO
+ _list.add(v);
}
public void writeString(final String s) throws JMSException
{
- //TODO
+ _list.add(s);
}
public void writeBytes(final byte[] bytes) throws JMSException
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java
index be82c72b70..3e0164705f 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java
@@ -107,6 +107,7 @@ public class ListWriter implements ValueWriter<List>
{
_delegate = _nonEmptyListWriter;
}
+ _delegate.setValue(frameBody);
}
public boolean isComplete()