From 78c062d60e6e5bb193e86d79c592091cfd52688e Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Sat, 9 Dec 2006 14:55:01 +0000 Subject: Merge of revision 484987 from trunk - StreamMessage and related changes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@484990 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/message/AbstractBytesMessage.java | 38 +- .../qpid/client/message/AbstractJMSMessage.java | 396 +++++++-------------- .../qpid/client/message/JMSStreamMessage.java | 29 +- .../client/message/JMSStreamMessageFactory.java | 41 +++ .../client/message/MessageFactoryRegistry.java | 1 + 5 files changed, 203 insertions(+), 302 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 39c8298add..77b3bd7566 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -37,8 +37,7 @@ import java.nio.charset.CharacterCodingException; * @author Apache Software Foundation */ public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ - private boolean _readable = false; +{ /** * The default initial size of the buffer. The buffer expands automatically. @@ -66,7 +65,6 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); _data.setAutoExpand(true); } - _readable = (data != null); } AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) @@ -75,15 +73,13 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); getJmsContentHeaderProperties().setContentType(getMimeType()); - _readable = true; } - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { _data.clear(); - _readable = false; } - + public String toBodyString() throws JMSException { checkReadable(); @@ -128,15 +124,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage return data; } } - - protected void checkReadable() throws MessageNotReadableException - { - if (!_readable) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } - + /** * Check that there is at least a certain number of bytes available to read * @@ -151,23 +139,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } } - protected void checkWritable() throws MessageNotWriteableException - { - if (_readable) - { - throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); - } - } - public void reset() throws JMSException { - //checkWritable(); + super.reset(); _data.flip(); - _readable = true; - } - - public boolean isReadable() - { - return _readable; - } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index a07a87b850..329153534b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -28,12 +28,13 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.JmsNotImplementedException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTableKeyEnumeration; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.PropertyFieldTable; +import org.apache.qpid.framing.FieldTableFactory; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; import java.util.Collections; import java.util.Enumeration; import java.util.Iterator; @@ -43,23 +44,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - - //todo Remove these and Change _headers to use a subclass of PropertyFieldTable that limits - // the properties that can be added... or suitably handles the values that cannot be added to the - // AMQP header field table. - public static final char BOOLEAN_PROPERTY_PREFIX = PropertyFieldTable.BOOLEAN_PROPERTY_PREFIX; - public static final char BYTE_PROPERTY_PREFIX = PropertyFieldTable.BYTE_PROPERTY_PREFIX; - public static final char SHORT_PROPERTY_PREFIX = PropertyFieldTable.SHORT_PROPERTY_PREFIX; - public static final char INT_PROPERTY_PREFIX = PropertyFieldTable.INT_PROPERTY_PREFIX; - public static final char LONG_PROPERTY_PREFIX = PropertyFieldTable.LONG_PROPERTY_PREFIX; - public static final char FLOAT_PROPERTY_PREFIX = PropertyFieldTable.FLOAT_PROPERTY_PREFIX; - public static final char DOUBLE_PROPERTY_PREFIX = PropertyFieldTable.DOUBLE_PROPERTY_PREFIX; - public static final char STRING_PROPERTY_PREFIX = PropertyFieldTable.STRING_PROPERTY_PREFIX ; - - protected boolean _redelivered; protected ByteBuffer _data; + private boolean _readableProperties = false; + private boolean _readableMessage = false; protected AbstractJMSMessage(ByteBuffer data) { @@ -69,6 +58,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { _data.acquire(); } + _readableProperties = false; + _readableMessage = (data != null); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException @@ -79,11 +70,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { _data.acquire(); } + + _readableMessage = data != null; } protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) { super(contentHeader, deliveryTag); + _readableProperties = (_contentHeaderProperties != null); } public String getJMSMessageID() throws JMSException @@ -170,7 +164,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " + - destination.getClass()); + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -212,12 +206,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public String getJMSType() throws JMSException { - return getMimeType(); + return getJmsContentHeaderProperties().getType(); } public void setJMSType(String string) throws JMSException { - throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type"); + getJmsContentHeaderProperties().setType(string); } public long getJMSExpiration() throws JMSException @@ -242,282 +236,151 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public void clearProperties() throws JMSException { - if (getJmsContentHeaderProperties().getHeaders() != null) - { - getJmsContentHeaderProperties().getHeaders().clear(); - } + getJmsContentHeaderProperties().getHeaders().clear(); + + _readableProperties = false; } - public boolean propertyExists(String propertyName) throws JMSException + public void clearBody() throws JMSException { - checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return false; - } - else - { - Iterator keys = getJmsContentHeaderProperties().getHeaders().keySet().iterator(); + clearBodyImpl(); + _readableMessage = false; + } - while (keys.hasNext()) - { - String key = (String) keys.next(); - if (key.endsWith(propertyName)) - { - return true; - } - } - return false; - } + public boolean propertyExists(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName); } public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) + + if (getJmsContentHeaderProperties() == null) { - return Boolean.valueOf(null).booleanValue(); + System.out.println("HEADERS ARE NULL"); } - else - { - // store as integer as temporary workaround - //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName); - Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName); - if (b == null) - { - return Boolean.valueOf(null).booleanValue(); - } - else - { - return b.longValue() != 0; - } - } + + return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Byte.valueOf(null).byteValue(); - } - else - { - Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName); - if (b == null) - { - return Byte.valueOf(null).byteValue(); - } - else - { - return b.byteValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Short.valueOf(null).shortValue(); - } - else - { - Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName); - if (s == null) - { - return Short.valueOf(null).shortValue(); - } - else - { - return s.shortValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Integer.valueOf(null).intValue(); - } - else - { - Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName); - if (i == null) - { - return Integer.valueOf(null).intValue(); - } - else - { - return i.intValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Long.valueOf(null).longValue(); - } - else - { - Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName); - if (l == null) - { - // temp - the spec says do this but this throws a NumberFormatException - //return Long.valueOf(null).longValue(); - return 0; - } - else - { - return l.longValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Float.valueOf(null).floatValue(); - } - else - { - final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName); - if (f == null) - { - return Float.valueOf(null).floatValue(); - } - else - { - return f.floatValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return Double.valueOf(null).doubleValue(); - } - else - { - final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName); - if (d == null) - { - return Double.valueOf(null).doubleValue(); - } - else - { - return d.shortValue(); - } - } + return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - if (getJmsContentHeaderProperties().getHeaders() == null) - { - return null; - } - else - { - return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName); - } + return getJmsContentHeaderProperties().getHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - throw new JmsNotImplementedException(); + return getJmsContentHeaderProperties().getHeaders().getObject(propertyName); } public Enumeration getPropertyNames() throws JMSException { - return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders()) - { - public Object nextElement() - { - String propName = (String) _iterator.next(); - - //The propertyName has a single Char prefix. Skip this. - return propName.substring(1); - } - }; + return getJmsContentHeaderProperties().getHeaders().getPropertyNames(); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b)); - getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0)); + getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b)); + getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b)); } public void setShortProperty(String propertyName, short i) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i)); + getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i)); + getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l)); + getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f)); + getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v)); + getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { + checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().put(STRING_PROPERTY_PREFIX + propertyName, value); + getJmsContentHeaderProperties().getHeaders().setString(propertyName, value); } - private void createPropertyMapIfRequired() + public void setObjectProperty(String propertyName, Object object) throws JMSException { - if (getJmsContentHeaderProperties().getHeaders() == null) - { - getJmsContentHeaderProperties().setHeaders(new FieldTable()); - } - } - - public void setObjectProperty(String string, Object object) throws JMSException - { - //todo this should be changed to something else.. the Header doesn't support objects. - throw new RuntimeException("Not Implemented"); + checkWritableProperties(); + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object); } public void acknowledge() throws JMSException @@ -532,7 +395,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } } - public abstract void clearBody() throws JMSException; + + /** + * This forces concrete classes to implement clearBody() + * + * @throws JMSException + */ + public abstract void clearBodyImpl() throws JMSException; /** * Get a String representation of the body of the message. Used in the @@ -555,59 +424,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); buf.append("\nAMQ message number: ").append(_deliveryTag); buf.append("\nProperties:"); - if (getJmsContentHeaderProperties().getHeaders() == null) + if (getJmsContentHeaderProperties().getHeaders().isEmpty()) { buf.append(""); } else { - final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); - while (it.hasNext()) - { - final Map.Entry entry = (Map.Entry) it.next(); - final String propertyName = (String) entry.getKey(); - if (propertyName == null) - { - buf.append("\nInternal error: Property with NULL key defined"); - } - else - { - buf.append('\n').append(propertyName.substring(1)); - - char typeIdentifier = propertyName.charAt(0); - switch (typeIdentifier) - { - case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX: - buf.append(" "); - break; - case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX: - buf.append(" "); - break; - default: - buf.append("