diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-09 14:55:01 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-09 14:55:01 +0000 |
commit | 78c062d60e6e5bb193e86d79c592091cfd52688e (patch) | |
tree | 6204699091908ece26855907fc1f0515d2eb0d7d | |
parent | 367e5eca21beb941a283497ec1d0bd021a0d0d22 (diff) | |
download | qpid-python-78c062d60e6e5bb193e86d79c592091cfd52688e.tar.gz |
Merge of revision 484987 from trunk - StreamMessage and related changes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@484990 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 203 insertions, 302 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 39c8298add..77b3bd7566 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -37,8 +37,7 @@ import java.nio.charset.CharacterCodingException; * @author Apache Software Foundation */ public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ - private boolean _readable = false; +{ /** * The default initial size of the buffer. The buffer expands automatically. @@ -66,7 +65,6 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); _data.setAutoExpand(true); } - _readable = (data != null); } AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) @@ -75,15 +73,13 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); getJmsContentHeaderProperties().setContentType(getMimeType()); - _readable = true; } - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { _data.clear(); - _readable = false; } - + public String toBodyString() throws JMSException { checkReadable(); @@ -128,15 +124,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage return data; } } - - protected void checkReadable() throws MessageNotReadableException - { - if (!_readable) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } - + /** * Check that there is at least a certain number of bytes available to read * @@ -151,23 +139,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } } - protected void checkWritable() throws MessageNotWriteableException - { - if (_readable) - { - throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); - } - } - public void reset() throws JMSException { - //checkWritable(); + super.reset(); _data.flip(); - _readable = true; - } - - public boolean isReadable() - { - return _readable; - } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index a07a87b850..329153534b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -28,12 +28,13 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.JmsNotImplementedException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTableKeyEnumeration; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.PropertyFieldTable; +import org.apache.qpid.framing.FieldTableFactory; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; import java.util.Collections; import java.util.Enumeration; import java.util.Iterator; @@ -43,23 +44,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - - //todo Remove these and Change _headers to use a subclass of PropertyFieldTable that limits - // the properties that can be added... or suitably handles the values that cannot be added to the - // AMQP header field table. - public static final char BOOLEAN_PROPERTY_PREFIX = PropertyFieldTable.BOOLEAN_PROPERTY_PREFIX; - public static final char BYTE_PROPERTY_PREFIX = PropertyFieldTable.BYTE_PROPERTY_PREFIX; - public static final char SHORT_PROPERTY_PREFIX = PropertyFieldTable.SHORT_PROPERTY_PREFIX; - public static final char INT_PROPERTY_PREFIX = PropertyFieldTable.INT_PROPERTY_PREFIX; - public static final char LONG_PROPERTY_PREFIX = PropertyFieldTable.LONG_PROPERTY_PREFIX; - public static final char FLOAT_PROPERTY_PREFIX = PropertyFieldTable.FLOAT_PROPERTY_PREFIX; - public static final char DOUBLE_PROPERTY_PREFIX = PropertyFieldTable.DOUBLE_PROPERTY_PREFIX; - public static final char STRING_PROPERTY_PREFIX = PropertyFieldTable.STRING_PROPERTY_PREFIX ; - - protected boolean _redelivered; protected ByteBuffer _data; + private boolean _readableProperties = false; + private boolean _readableMessage = false; protected AbstractJMSMessage(ByteBuffer data) { @@ -69,6 +58,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { _data.acquire(); } + _readableProperties = false; + _readableMessage = (data != null); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException @@ -79,11 +70,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { _data.acquire(); } + + _readableMessage = data != null; } protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) { super(contentHeader, deliveryTag); + _readableProperties = (_contentHeaderProperties != null); } public String getJMSMessageID() throws JMSException @@ -170,7 +164,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " + - destination.getClass()); + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -212,12 +206,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public String getJMSType() throws JMSException { - return getMimeType(); + return getJmsContentHeaderProperties().getType(); } public void setJMSType(String string) throws JMSException { - throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type"); + getJmsContentHeaderProperties().setType(string); } public long getJMSExpiration() throws JMSException @@ -242,282 +236,151 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public void clearProperties() throws JMSException { - if (getJmsContentHeaderProperties().getHeaders() != null) - { - getJmsContentHeaderProperties().getHeaders().clear(); - } + getJmsContentHeaderProperties().getHeaders().clear(); + + _readableProperties = false; } - public boolean propertyExists(String propertyName) throws JMSException + public void clearBody() throws JMSException { - checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return false; - } - else - { - Iterator keys = getJmsContentHeaderProperties().getHeaders().keySet().iterator(); + clearBodyImpl(); + _readableMessage = false; + } - while (keys.hasNext()) - { - String key = (String) keys.next(); - if (key.endsWith(propertyName)) - { - return true; - } - } - return false; - } + public boolean propertyExists(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName); } public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) + + if (getJmsContentHeaderProperties() == null) { - return Boolean.valueOf(null).booleanValue(); + System.out.println("HEADERS ARE NULL"); } - else - { - // store as integer as temporary workaround - //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName); - Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName); - if (b == null) - { - return Boolean.valueOf(null).booleanValue(); - } - else - { - return b.longValue() != 0; - } - } + + return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Byte.valueOf(null).byteValue(); - } - else - { - Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName); - if (b == null) - { - return Byte.valueOf(null).byteValue(); - } - else - { - return b.byteValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Short.valueOf(null).shortValue(); - } - else - { - Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName); - if (s == null) - { - return Short.valueOf(null).shortValue(); - } - else - { - return s.shortValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Integer.valueOf(null).intValue(); - } - else - { - Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName); - if (i == null) - { - return Integer.valueOf(null).intValue(); - } - else - { - return i.intValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Long.valueOf(null).longValue(); - } - else - { - Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName); - if (l == null) - { - // temp - the spec says do this but this throws a NumberFormatException - //return Long.valueOf(null).longValue(); - return 0; - } - else - { - return l.longValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Float.valueOf(null).floatValue(); - } - else - { - final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName); - if (f == null) - { - return Float.valueOf(null).floatValue(); - } - else - { - return f.floatValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Double.valueOf(null).doubleValue(); - } - else - { - final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName); - if (d == null) - { - return Double.valueOf(null).doubleValue(); - } - else - { - return d.shortValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return null; - } - else - { - return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName); - } + return getJmsContentHeaderProperties().getHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - throw new JmsNotImplementedException(); + return getJmsContentHeaderProperties().getHeaders().getObject(propertyName); } public Enumeration getPropertyNames() throws JMSException { - return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders()) - { - public Object nextElement() - { - String propName = (String) _iterator.next(); - - //The propertyName has a single Char prefix. Skip this. - return propName.substring(1); - } - }; + return getJmsContentHeaderProperties().getHeaders().getPropertyNames(); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b)); - getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0)); + getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b)); + getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b)); } public void setShortProperty(String propertyName, short i) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i)); + getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i)); + getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l)); + getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f)); + getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v)); + getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(STRING_PROPERTY_PREFIX + propertyName, value); + getJmsContentHeaderProperties().getHeaders().setString(propertyName, value); } - private void createPropertyMapIfRequired() + public void setObjectProperty(String propertyName, Object object) throws JMSException { - if (getJmsContentHeaderProperties().getHeaders() == null) - { - getJmsContentHeaderProperties().setHeaders(new FieldTable()); - } - } - - public void setObjectProperty(String string, Object object) throws JMSException - { - //todo this should be changed to something else.. the Header doesn't support objects. - throw new RuntimeException("Not Implemented"); + checkWritableProperties(); + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object); } public void acknowledge() throws JMSException @@ -532,7 +395,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } } - public abstract void clearBody() throws JMSException; + + /** + * This forces concrete classes to implement clearBody() + * + * @throws JMSException + */ + public abstract void clearBodyImpl() throws JMSException; /** * Get a String representation of the body of the message. Used in the @@ -555,59 +424,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); buf.append("\nAMQ message number: ").append(_deliveryTag); buf.append("\nProperties:"); - if (getJmsContentHeaderProperties().getHeaders() == null) + if (getJmsContentHeaderProperties().getHeaders().isEmpty()) { buf.append("<NONE>"); } else { - final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); - while (it.hasNext()) - { - final Map.Entry entry = (Map.Entry) it.next(); - final String propertyName = (String) entry.getKey(); - if (propertyName == null) - { - buf.append("\nInternal error: Property with NULL key defined"); - } - else - { - buf.append('\n').append(propertyName.substring(1)); - - char typeIdentifier = propertyName.charAt(0); - switch (typeIdentifier) - { - case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX: - buf.append("<boolean> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX: - buf.append("<byte> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX: - buf.append("<short> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX: - buf.append("<int> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX: - buf.append("<long> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX: - buf.append("<float> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX: - buf.append("<double> "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX: - buf.append("<string> "); - break; - default: - buf.append("<unknown type (identifier " + - typeIdentifier + ") "); - } - buf.append(String.valueOf(entry.getValue())); - } - } + buf.append('\n').append(getJmsContentHeaderProperties().getHeaders()); } return buf.toString(); } @@ -638,38 +461,33 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms throw new IllegalArgumentException("Property name must not be the empty string"); } - createPropertyMapIfRequired(); + // Call to ensure that the it has been set. + getJmsContentHeaderProperties().getHeaders(); } public FieldTable populateHeadersFromMessageProperties() { - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return null; - } - else - { - // - // We need to convert every property into a String representation - // Note that type information is preserved in the property name - // - final FieldTable table = new FieldTable(); - final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); - while (entries.hasNext()) + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + final FieldTable table = FieldTableFactory.newFieldTable(); + final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); + while (entries.hasNext()) + { + final Map.Entry entry = (Map.Entry) entries.next(); + final String propertyName = (String) entry.getKey(); + if (propertyName == null) { - final Map.Entry entry = (Map.Entry) entries.next(); - final String propertyName = (String) entry.getKey(); - if (propertyName == null) - { - continue; - } - else - { - table.put(propertyName, entry.getValue().toString()); - } + continue; + } + else + { + table.put(propertyName, entry.getValue().toString()); } - return table; } + return table; + } public BasicContentHeaderProperties getJmsContentHeaderProperties() @@ -687,4 +505,44 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } return _data; } + + protected void checkReadable() throws MessageNotReadableException + { + if (!_readableMessage) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + protected void checkWritable() throws MessageNotWriteableException + { + if (_readableMessage) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + protected void checkWritableProperties() throws MessageNotWriteableException + { + if (_readableProperties) + { + throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); + } + } + + public boolean isReadable() + { + return _readableMessage; + } + + public boolean isWritable() + { + return !_readableMessage; + } + + public void reset() throws JMSException + { + _readableMessage = true; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 8efe1e17f4..cc820a5623 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.message; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; +import org.apache.mina.common.ByteBuffer; + import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; @@ -32,7 +36,7 @@ import java.nio.charset.Charset; */ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage { - private static final String MIME_TYPE="jms/stream-message"; + public static final String MIME_TYPE="jms/stream-message"; private static final String[] _typeNames = { "boolean", "byte", @@ -71,6 +75,29 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ private int _byteArrayRemaining = -1; + JMSStreamMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSStreamMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + super(messageNbr, contentHeader, data); + } + public String getMimeType() { return MIME_TYPE; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java new file mode 100644 index 0000000000..aae9f0cdb2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +public class JMSStreamMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws + AMQException + { + return new JMSStreamMessage(deliveryTag, contentHeader, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSStreamMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 31c9c2ed91..348988f06d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -103,6 +103,7 @@ public class MessageFactoryRegistry mf.registerFactory("text/xml", new JMSTextMessageFactory()); mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory()); mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(null, new JMSBytesMessageFactory()); return mf; } |