summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-09 14:55:01 +0000
committerRobert Greig <rgreig@apache.org>2006-12-09 14:55:01 +0000
commit78c062d60e6e5bb193e86d79c592091cfd52688e (patch)
tree6204699091908ece26855907fc1f0515d2eb0d7d
parent367e5eca21beb941a283497ec1d0bd021a0d0d22 (diff)
downloadqpid-python-78c062d60e6e5bb193e86d79c592091cfd52688e.tar.gz
Merge of revision 484987 from trunk - StreamMessage and related changes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@484990 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java396
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java1
5 files changed, 203 insertions, 302 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 39c8298add..77b3bd7566 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -37,8 +37,7 @@ import java.nio.charset.CharacterCodingException;
* @author Apache Software Foundation
*/
public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{
- private boolean _readable = false;
+{
/**
* The default initial size of the buffer. The buffer expands automatically.
@@ -66,7 +65,6 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
_data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
_data.setAutoExpand(true);
}
- _readable = (data != null);
}
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
@@ -75,15 +73,13 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
getJmsContentHeaderProperties().setContentType(getMimeType());
- _readable = true;
}
- public void clearBody() throws JMSException
+ public void clearBodyImpl() throws JMSException
{
_data.clear();
- _readable = false;
}
-
+
public String toBodyString() throws JMSException
{
checkReadable();
@@ -128,15 +124,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
return data;
}
}
-
- protected void checkReadable() throws MessageNotReadableException
- {
- if (!_readable)
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
-
+
/**
* Check that there is at least a certain number of bytes available to read
*
@@ -151,23 +139,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
}
- protected void checkWritable() throws MessageNotWriteableException
- {
- if (_readable)
- {
- throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
- }
- }
-
public void reset() throws JMSException
{
- //checkWritable();
+ super.reset();
_data.flip();
- _readable = true;
- }
-
- public boolean isReadable()
- {
- return _readable;
- }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index a07a87b850..329153534b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -28,12 +28,13 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTableKeyEnumeration;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.PropertyFieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
@@ -43,23 +44,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
-
- //todo Remove these and Change _headers to use a subclass of PropertyFieldTable that limits
- // the properties that can be added... or suitably handles the values that cannot be added to the
- // AMQP header field table.
- public static final char BOOLEAN_PROPERTY_PREFIX = PropertyFieldTable.BOOLEAN_PROPERTY_PREFIX;
- public static final char BYTE_PROPERTY_PREFIX = PropertyFieldTable.BYTE_PROPERTY_PREFIX;
- public static final char SHORT_PROPERTY_PREFIX = PropertyFieldTable.SHORT_PROPERTY_PREFIX;
- public static final char INT_PROPERTY_PREFIX = PropertyFieldTable.INT_PROPERTY_PREFIX;
- public static final char LONG_PROPERTY_PREFIX = PropertyFieldTable.LONG_PROPERTY_PREFIX;
- public static final char FLOAT_PROPERTY_PREFIX = PropertyFieldTable.FLOAT_PROPERTY_PREFIX;
- public static final char DOUBLE_PROPERTY_PREFIX = PropertyFieldTable.DOUBLE_PROPERTY_PREFIX;
- public static final char STRING_PROPERTY_PREFIX = PropertyFieldTable.STRING_PROPERTY_PREFIX ;
-
-
protected boolean _redelivered;
protected ByteBuffer _data;
+ private boolean _readableProperties = false;
+ private boolean _readableMessage = false;
protected AbstractJMSMessage(ByteBuffer data)
{
@@ -69,6 +58,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
{
_data.acquire();
}
+ _readableProperties = false;
+ _readableMessage = (data != null);
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -79,11 +70,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
{
_data.acquire();
}
+
+ _readableMessage = data != null;
}
protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
{
super(contentHeader, deliveryTag);
+ _readableProperties = (_contentHeaderProperties != null);
}
public String getJMSMessageID() throws JMSException
@@ -170,7 +164,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " +
- destination.getClass());
+ destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -212,12 +206,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public String getJMSType() throws JMSException
{
- return getMimeType();
+ return getJmsContentHeaderProperties().getType();
}
public void setJMSType(String string) throws JMSException
{
- throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type");
+ getJmsContentHeaderProperties().setType(string);
}
public long getJMSExpiration() throws JMSException
@@ -242,282 +236,151 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public void clearProperties() throws JMSException
{
- if (getJmsContentHeaderProperties().getHeaders() != null)
- {
- getJmsContentHeaderProperties().getHeaders().clear();
- }
+ getJmsContentHeaderProperties().getHeaders().clear();
+
+ _readableProperties = false;
}
- public boolean propertyExists(String propertyName) throws JMSException
+ public void clearBody() throws JMSException
{
- checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return false;
- }
- else
- {
- Iterator keys = getJmsContentHeaderProperties().getHeaders().keySet().iterator();
+ clearBodyImpl();
+ _readableMessage = false;
+ }
- while (keys.hasNext())
- {
- String key = (String) keys.next();
- if (key.endsWith(propertyName))
- {
- return true;
- }
- }
- return false;
- }
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName);
}
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
+
+ if (getJmsContentHeaderProperties() == null)
{
- return Boolean.valueOf(null).booleanValue();
+ System.out.println("HEADERS ARE NULL");
}
- else
- {
- // store as integer as temporary workaround
- //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName);
- Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName);
- if (b == null)
- {
- return Boolean.valueOf(null).booleanValue();
- }
- else
- {
- return b.longValue() != 0;
- }
- }
+
+ return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Byte.valueOf(null).byteValue();
- }
- else
- {
- Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName);
- if (b == null)
- {
- return Byte.valueOf(null).byteValue();
- }
- else
- {
- return b.byteValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Short.valueOf(null).shortValue();
- }
- else
- {
- Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName);
- if (s == null)
- {
- return Short.valueOf(null).shortValue();
- }
- else
- {
- return s.shortValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Integer.valueOf(null).intValue();
- }
- else
- {
- Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName);
- if (i == null)
- {
- return Integer.valueOf(null).intValue();
- }
- else
- {
- return i.intValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Long.valueOf(null).longValue();
- }
- else
- {
- Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName);
- if (l == null)
- {
- // temp - the spec says do this but this throws a NumberFormatException
- //return Long.valueOf(null).longValue();
- return 0;
- }
- else
- {
- return l.longValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Float.valueOf(null).floatValue();
- }
- else
- {
- final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName);
- if (f == null)
- {
- return Float.valueOf(null).floatValue();
- }
- else
- {
- return f.floatValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return Double.valueOf(null).doubleValue();
- }
- else
- {
- final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName);
- if (d == null)
- {
- return Double.valueOf(null).doubleValue();
- }
- else
- {
- return d.shortValue();
- }
- }
+ return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return null;
- }
- else
- {
- return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName);
- }
+ return getJmsContentHeaderProperties().getHeaders().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- throw new JmsNotImplementedException();
+ return getJmsContentHeaderProperties().getHeaders().getObject(propertyName);
}
public Enumeration getPropertyNames() throws JMSException
{
- return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders())
- {
- public Object nextElement()
- {
- String propName = (String) _iterator.next();
-
- //The propertyName has a single Char prefix. Skip this.
- return propName.substring(1);
- }
- };
+ return getJmsContentHeaderProperties().getHeaders().getPropertyNames();
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b));
- getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0));
+ getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b);
}
public void setByteProperty(String propertyName, byte b) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b));
+ getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b));
}
public void setShortProperty(String propertyName, short i) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i));
+ getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i));
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i));
+ getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i));
}
public void setLongProperty(String propertyName, long l) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l));
+ getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l));
}
public void setFloatProperty(String propertyName, float f) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f));
+ getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f));
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v));
+ getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v));
}
public void setStringProperty(String propertyName, String value) throws JMSException
{
+ checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().put(STRING_PROPERTY_PREFIX + propertyName, value);
+ getJmsContentHeaderProperties().getHeaders().setString(propertyName, value);
}
- private void createPropertyMapIfRequired()
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
{
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- getJmsContentHeaderProperties().setHeaders(new FieldTable());
- }
- }
-
- public void setObjectProperty(String string, Object object) throws JMSException
- {
- //todo this should be changed to something else.. the Header doesn't support objects.
- throw new RuntimeException("Not Implemented");
+ checkWritableProperties();
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
}
public void acknowledge() throws JMSException
@@ -532,7 +395,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
}
- public abstract void clearBody() throws JMSException;
+
+ /**
+ * This forces concrete classes to implement clearBody()
+ *
+ * @throws JMSException
+ */
+ public abstract void clearBodyImpl() throws JMSException;
/**
* Get a String representation of the body of the message. Used in the
@@ -555,59 +424,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
buf.append("\nAMQ message number: ").append(_deliveryTag);
buf.append("\nProperties:");
- if (getJmsContentHeaderProperties().getHeaders() == null)
+ if (getJmsContentHeaderProperties().getHeaders().isEmpty())
{
buf.append("<NONE>");
}
else
{
- final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
- while (it.hasNext())
- {
- final Map.Entry entry = (Map.Entry) it.next();
- final String propertyName = (String) entry.getKey();
- if (propertyName == null)
- {
- buf.append("\nInternal error: Property with NULL key defined");
- }
- else
- {
- buf.append('\n').append(propertyName.substring(1));
-
- char typeIdentifier = propertyName.charAt(0);
- switch (typeIdentifier)
- {
- case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX:
- buf.append("<boolean> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX:
- buf.append("<byte> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX:
- buf.append("<short> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX:
- buf.append("<int> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX:
- buf.append("<long> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX:
- buf.append("<float> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX:
- buf.append("<double> ");
- break;
- case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX:
- buf.append("<string> ");
- break;
- default:
- buf.append("<unknown type (identifier " +
- typeIdentifier + ") ");
- }
- buf.append(String.valueOf(entry.getValue()));
- }
- }
+ buf.append('\n').append(getJmsContentHeaderProperties().getHeaders());
}
return buf.toString();
}
@@ -638,38 +461,33 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
throw new IllegalArgumentException("Property name must not be the empty string");
}
- createPropertyMapIfRequired();
+ // Call to ensure that the it has been set.
+ getJmsContentHeaderProperties().getHeaders();
}
public FieldTable populateHeadersFromMessageProperties()
{
- if (getJmsContentHeaderProperties().getHeaders() == null)
- {
- return null;
- }
- else
- {
- //
- // We need to convert every property into a String representation
- // Note that type information is preserved in the property name
- //
- final FieldTable table = new FieldTable();
- final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
- while (entries.hasNext())
+ //
+ // We need to convert every property into a String representation
+ // Note that type information is preserved in the property name
+ //
+ final FieldTable table = FieldTableFactory.newFieldTable();
+ final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
+ while (entries.hasNext())
+ {
+ final Map.Entry entry = (Map.Entry) entries.next();
+ final String propertyName = (String) entry.getKey();
+ if (propertyName == null)
{
- final Map.Entry entry = (Map.Entry) entries.next();
- final String propertyName = (String) entry.getKey();
- if (propertyName == null)
- {
- continue;
- }
- else
- {
- table.put(propertyName, entry.getValue().toString());
- }
+ continue;
+ }
+ else
+ {
+ table.put(propertyName, entry.getValue().toString());
}
- return table;
}
+ return table;
+
}
public BasicContentHeaderProperties getJmsContentHeaderProperties()
@@ -687,4 +505,44 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
return _data;
}
+
+ protected void checkReadable() throws MessageNotReadableException
+ {
+ if (!_readableMessage)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ protected void checkWritable() throws MessageNotWriteableException
+ {
+ if (_readableMessage)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ }
+
+ public boolean isReadable()
+ {
+ return _readableMessage;
+ }
+
+ public boolean isWritable()
+ {
+ return !_readableMessage;
+ }
+
+ public void reset() throws JMSException
+ {
+ _readableMessage = true;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 8efe1e17f4..cc820a5623 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.message;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
@@ -32,7 +36,7 @@ import java.nio.charset.Charset;
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- private static final String MIME_TYPE="jms/stream-message";
+ public static final String MIME_TYPE="jms/stream-message";
private static final String[] _typeNames = { "boolean",
"byte",
@@ -71,6 +75,29 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
*/
private int _byteArrayRemaining = -1;
+ JMSStreamMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ JMSStreamMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+
+ JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
+ throws AMQException
+ {
+ super(messageNbr, contentHeader, data);
+ }
+
public String getMimeType()
{
return MIME_TYPE;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
new file mode 100644
index 0000000000..aae9f0cdb2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws
+ AMQException
+ {
+ return new JMSStreamMessage(deliveryTag, contentHeader, data);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSStreamMessage();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 31c9c2ed91..348988f06d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -103,6 +103,7 @@ public class MessageFactoryRegistry
mf.registerFactory("text/xml", new JMSTextMessageFactory());
mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(null, new JMSBytesMessageFactory());
return mf;
}