diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-26 10:20:01 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-26 10:20:01 +0000 |
commit | 0035b023e6240b2c1b2a8f4f6e6b3caa869155ff (patch) | |
tree | d36046365c79bbe76613a68c506730acc1e704c7 | |
parent | 64a5794de6a630a4bdf1b245aadf2548508808ae (diff) | |
download | qpid-python-0035b023e6240b2c1b2a8f4f6e6b3caa869155ff.tar.gz |
Applied Feature QPID-315
Revision: 499979
Author: marnie
Date: 21:08:54, 25 January 2007
Message:
QPID-315
Test classes to reproduce problem with missing correlation id on incoming messages from non-Qpid broker
----
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
Revision: 499975
Author: marnie
Date: 21:07:49, 25 January 2007
Message:
QPID-315
Moved message conversion logic from BasicMessageProducer to MessageConverter
Added correlation id to AbstractJMSMessage.toString()
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
Revision: 499532
Author: marnie
Date: 18:51:22, 24 January 2007
Message:
QPID-315
Updated and tidied class prior to addition of tests
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
Revision: 499087
Author: marnie
Date: 17:28:23, 23 January 2007
Message:
QPID-315
INitial commit - AMQSesssion convertToNativeMessage needs replaced with call to this class
----
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
Partial Revision: 494121
Partial Author: rgreig
Partial Date: 17:02:26, 08 January 2007
Partial Message:
Partial QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
Partial ----
Partial Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
Partial Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@500207 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 730 insertions, 195 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index d38e461400..74b91c3c30 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; @@ -138,16 +139,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - 0, // ticket - destination.getExchangeClass()); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -183,7 +184,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + - " is illegal"); + " is illegal"); } _deliveryMode = i; } @@ -368,112 +369,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (message instanceof BytesMessage) { - BytesMessage bytesMessage = (BytesMessage) message; - bytesMessage.reset(); - - JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage(); - - - byte[] buf = new byte[1024]; - - int len; - - while ((len = bytesMessage.readBytes(buf)) != -1) - { - nativeMsg.writeBytes(buf, 0, len); - } - - newMessage = nativeMsg; + newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage(); } else if (message instanceof MapMessage) { - MapMessage origMessage = (MapMessage) message; - MapMessage nativeMessage = _session.createMapMessage(); - - Enumeration mapNames = origMessage.getMapNames(); - while (mapNames.hasMoreElements()) - { - String name = (String) mapNames.nextElement(); - nativeMessage.setObject(name, origMessage.getObject(name)); - } - newMessage = (AbstractJMSMessage) nativeMessage; + newMessage = new MessageConverter((MapMessage) message).getConvertedMessage(); } else if (message instanceof ObjectMessage) { - ObjectMessage origMessage = (ObjectMessage) message; - ObjectMessage nativeMessage = _session.createObjectMessage(); - - nativeMessage.setObject(origMessage.getObject()); - - newMessage = (AbstractJMSMessage) nativeMessage; + newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage(); } else if (message instanceof TextMessage) { - TextMessage origMessage = (TextMessage) message; - TextMessage nativeMessage = _session.createTextMessage(); - - nativeMessage.setText(origMessage.getText()); - - newMessage = (AbstractJMSMessage) nativeMessage; + newMessage = new MessageConverter((TextMessage) message).getConvertedMessage(); } else if (message instanceof StreamMessage) { - StreamMessage origMessage = (StreamMessage) message; - StreamMessage nativeMessage = _session.createStreamMessage(); - - - try - { - origMessage.reset(); - while (true) - { - nativeMessage.writeObject(origMessage.readObject()); - } - } - catch (MessageEOFException e) - { - ;// - } - newMessage = (AbstractJMSMessage) nativeMessage; + newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage(); } else { + //TODO; Do we really want to create an empty message here ? newMessage = (AbstractJMSMessage) _session.createMessage(); - - } - - Enumeration propertyNames = message.getPropertyNames(); - while (propertyNames.hasMoreElements()) - { - String propertyName = String.valueOf(propertyNames.nextElement()); - if (!propertyName.startsWith("JMSX_")) - { - Object value = message.getObjectProperty(propertyName); - newMessage.setObjectProperty(propertyName, value); - } + return new MessageConverter(newMessage).getConvertedMessage(); } - newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); - - - int priority = message.getJMSPriority(); - if (priority < 0) - { - priority = 0; - } - else if (priority > 9) - { - priority = 9; - } - - newMessage.setJMSPriority(priority); - if (message.getJMSReplyTo() != null) - { - newMessage.setJMSReplyTo(message.getJMSReplyTo()); - } - newMessage.setJMSType(message.getJMSType()); - - if (newMessage != null) { return newMessage; @@ -491,7 +411,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null ? destination.getClass() : null)); + (destination != null ? destination.getClass() : null)); } declareDestination((AMQDestination) destination); } @@ -520,19 +440,19 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkTemporaryDestination(destination); origMessage.setJMSDestination(destination); - + AbstractJMSMessage message = convertToNativeMessage(origMessage); message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + 0); // ticket long currentTime = 0; if (!_disableTimestamps) @@ -576,7 +496,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // weight argument of zero indicates no child content headers, just bodies // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0, + AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte) 8, (byte) 0), 0, contentHeaderProperties, size); if (_logger.isDebugEnabled()) @@ -603,16 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void checkTemporaryDestination(AMQDestination destination) throws JMSException { - if(destination instanceof TemporaryDestination) + if (destination instanceof TemporaryDestination) { _logger.debug("destination is temporary destination"); TemporaryDestination tempDest = (TemporaryDestination) destination; - if(tempDest.getSession().isClosed()) + if (tempDest.getSession().isClosed()) { _logger.debug("session is closed"); throw new JMSException("Session for temporary destination has been closed"); } - if(tempDest.isDeleted()) + if (tempDest.isDeleted()) { _logger.debug("destination is deleted"); throw new JMSException("Cannot send to a deleted temporary destination"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 0c29344c37..3b0a84d5bb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -401,7 +401,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void acknowledge() throws JMSException { - if(_session != null) + if (_session != null) { _session.acknowledge(); } @@ -429,6 +429,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { StringBuffer buf = new StringBuffer("Body:\n"); buf.append(toBodyString()); + buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); buf.append("\nJMS expiration: ").append(getJMSExpiration()); buf.append("\nJMS priority: ").append(getJMSPriority()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 88e78a1dad..25c56411b0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -38,9 +38,9 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public static final String MIME_TYPE = "jms/map-message"; - private Map<String,Object> _map = new HashMap<String, Object>(); + private Map<String, Object> _map = new HashMap<String, Object>(); - JMSMapMessage() throws JMSException + public JMSMapMessage() throws JMSException { this(null); } @@ -59,11 +59,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm try { populateMapFromData(); - } + } catch (JMSException je) { throw new AMQException("Error populating MapMessage from ByteBuffer", je); - + } } @@ -88,7 +88,6 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } - @Override public void clearBodyImpl() throws JMSException { @@ -100,13 +99,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Boolean) + if (value instanceof Boolean) { - return ((Boolean)value).booleanValue(); + return ((Boolean) value).booleanValue(); } - else if((value instanceof String) || (value == null)) + else if ((value instanceof String) || (value == null)) { - return Boolean.valueOf((String)value); + return Boolean.valueOf((String) value); } else { @@ -120,13 +119,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Byte) + if (value instanceof Byte) { - return ((Byte)value).byteValue(); + return ((Byte) value).byteValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Byte.valueOf((String)value).byteValue(); + return Byte.valueOf((String) value).byteValue(); } else { @@ -139,17 +138,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Short) + if (value instanceof Short) { - return ((Short)value).shortValue(); + return ((Short) value).shortValue(); } - else if(value instanceof Byte) + else if (value instanceof Byte) { - return ((Byte)value).shortValue(); + return ((Byte) value).shortValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Short.valueOf((String)value).shortValue(); + return Short.valueOf((String) value).shortValue(); } else { @@ -164,21 +163,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Integer) + if (value instanceof Integer) { - return ((Integer)value).intValue(); + return ((Integer) value).intValue(); } - else if(value instanceof Short) + else if (value instanceof Short) { - return ((Short)value).intValue(); + return ((Short) value).intValue(); } - else if(value instanceof Byte) + else if (value instanceof Byte) { - return ((Byte)value).intValue(); + return ((Byte) value).intValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Integer.valueOf((String)value).intValue(); + return Integer.valueOf((String) value).intValue(); } else { @@ -192,25 +191,25 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Long) + if (value instanceof Long) { - return ((Long)value).longValue(); + return ((Long) value).longValue(); } - else if(value instanceof Integer) + else if (value instanceof Integer) { - return ((Integer)value).longValue(); + return ((Integer) value).longValue(); } - if(value instanceof Short) + if (value instanceof Short) { - return ((Short)value).longValue(); + return ((Short) value).longValue(); } - if(value instanceof Byte) + if (value instanceof Byte) { - return ((Byte)value).longValue(); + return ((Byte) value).longValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Long.valueOf((String)value).longValue(); + return Long.valueOf((String) value).longValue(); } else { @@ -224,13 +223,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(!_map.containsKey(propName)) + if (!_map.containsKey(propName)) { throw new MessageFormatException("Property " + propName + " not present"); } - else if(value instanceof Character) + else if (value instanceof Character) { - return ((Character)value).charValue(); + return ((Character) value).charValue(); } else if (value == null) { @@ -246,18 +245,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } - public float getFloat(String propName) throws JMSException { Object value = _map.get(propName); - if(value instanceof Float) + if (value instanceof Float) { - return ((Float)value).floatValue(); + return ((Float) value).floatValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Float.valueOf((String)value).floatValue(); + return Float.valueOf((String) value).floatValue(); } else { @@ -270,17 +268,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(value instanceof Double) + if (value instanceof Double) { - return ((Double)value).doubleValue(); + return ((Double) value).doubleValue(); } - else if(value instanceof Float) + else if (value instanceof Float) { - return ((Float)value).doubleValue(); + return ((Float) value).doubleValue(); } - else if((value instanceof String) || (value==null)) + else if ((value instanceof String) || (value == null)) { - return Double.valueOf((String)value).doubleValue(); + return Double.valueOf((String) value).doubleValue(); } else { @@ -293,11 +291,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if((value instanceof String) || (value == null)) + if ((value instanceof String) || (value == null)) { return (String) value; } - else if(value instanceof byte[]) + else if (value instanceof byte[]) { throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String."); @@ -313,13 +311,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = _map.get(propName); - if(!_map.containsKey(propName)) + if (!_map.containsKey(propName)) { - throw new MessageFormatException("Property " + propName + " not present"); + throw new MessageFormatException("Property " + propName + " not present"); } - else if((value instanceof byte[]) || (value == null)) + else if ((value instanceof byte[]) || (value == null)) { - return (byte[])value; + return (byte[]) value; } else { @@ -411,33 +409,33 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException { - if((offset == 0) && (length == bytes.length)) + if ((offset == 0) && (length == bytes.length)) { - setBytes(propName,bytes); + setBytes(propName, bytes); } else { byte[] newBytes = new byte[length]; - System.arraycopy(bytes,offset,newBytes,0,length); - setBytes(propName,newBytes); + System.arraycopy(bytes, offset, newBytes, 0, length); + setBytes(propName, newBytes); } } public void setObject(String propName, Object value) throws JMSException - { + { checkWritable(); checkPropertyName(propName); - if(value instanceof Boolean - || value instanceof Byte - || value instanceof Short - || value instanceof Integer - || value instanceof Long - || value instanceof Character - || value instanceof Float - || value instanceof Double - || value instanceof String - || value instanceof byte[] - || value == null) + if (value instanceof Boolean + || value instanceof Byte + || value instanceof Short + || value instanceof Integer + || value instanceof Long + || value instanceof Character + || value instanceof Float + || value instanceof Double + || value instanceof String + || value instanceof byte[] + || value == null) { _map.put(propName, value); } @@ -450,7 +448,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm private void checkPropertyName(String propName) { - if(propName == null || propName.equals("")) + if (propName == null || propName.equals("")) { throw new IllegalArgumentException("Property name cannot be null, or the empty String."); } @@ -464,16 +462,16 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm private void populateMapFromData() throws JMSException { - if(_data != null) + if (_data != null) { _data.rewind(); final int entries = readIntImpl(); - for(int i = 0; i < entries; i++) + for (int i = 0; i < entries; i++) { String propName = readStringImpl(); Object value = readObject(); - _map.put(propName,value); + _map.put(propName, value); } } else @@ -487,7 +485,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm allocateInitialBuffer(); final int size = _map.size(); writeIntImpl(size); - for(Map.Entry<String, Object> entry : _map.entrySet()) + for (Map.Entry<String, Object> entry : _map.entrySet()) { try { @@ -495,7 +493,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } catch (CharacterCodingException e) { - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e); + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); } @@ -507,13 +505,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm { Object value = entry.getValue(); throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + - " value : " + value + " (type: " + value.getClass().getName() + ").",e); + " value : " + value + " (type: " + value.getClass().getName() + ").", e); } } } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index d8394b0489..fad9c5e15a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -40,7 +40,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text */ private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL"; - JMSTextMessage() throws JMSException + public JMSTextMessage() throws JMSException { this(null, null); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java new file mode 100644 index 0000000000..58089f595b --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.log4j.Logger; + +import javax.jms.*; +import java.util.Enumeration; + +public class MessageConverter { + + /** + * Log4J logger + */ + protected final Logger _logger = Logger.getLogger(getClass()); + + /** + * AbstractJMSMessage which will hold the converted message + */ + private AbstractJMSMessage _newMessage; + + public MessageConverter(AbstractJMSMessage message) throws JMSException + { + _newMessage = message; + } + + public MessageConverter(BytesMessage message) throws JMSException + { + BytesMessage bytesMessage = (BytesMessage) message; + bytesMessage.reset(); + + JMSBytesMessage nativeMsg = new JMSBytesMessage(); + + byte[] buf = new byte[1024]; + + int len; + + while ((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf, 0, len); + } + + _newMessage = nativeMsg; + setMessageProperties(message); + } + + public MessageConverter(MapMessage message) throws JMSException + { + MapMessage nativeMessage = new JMSMapMessage(); + + Enumeration mapNames = message.getMapNames(); + while (mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMessage.setObject(name, message.getObject(name)); + } + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(ObjectMessage message) throws JMSException + { + ObjectMessage origMessage = (ObjectMessage) message; + ObjectMessage nativeMessage = new JMSObjectMessage(); + + nativeMessage.setObject(origMessage.getObject()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + + } + + public MessageConverter(TextMessage message) throws JMSException + { + TextMessage nativeMessage = new JMSTextMessage(); + + nativeMessage.setText(message.getText()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(StreamMessage message) throws JMSException + { + StreamMessage nativeMessage = new JMSStreamMessage(); + + try + { + message.reset(); + while (true) + { + nativeMessage.writeObject(message.readObject()); + } + } + catch (MessageEOFException e) + { + //we're at the end so don't mind the exception + } + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public AbstractJMSMessage getConvertedMessage() + { + return _newMessage; + } + + /** + * Sets all message properties + */ + protected void setMessageProperties(Message message) throws JMSException + { + setNonJMSProperties(message); + setJMSProperties(message); + } + + /** + * Sets all non-JMS defined properties on converted message + */ + protected void setNonJMSProperties(Message message) throws JMSException + { + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + //TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them + if (!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + _newMessage.setObjectProperty(propertyName, value); + } + } + } + + /** + * Exposed JMS defined properties on converted message: + * JMSDestination - we don't set here + * JMSDeliveryMode - set + * JMSExpiration - we don't set here + * JMSPriority - we don't set here + * JMSMessageID - we don't set here + * JMSTimestamp - we don't set here + * JMSCorrelationID - set + * JMSReplyTo - set + * JMSType - set + * JMSRedlivered - we don't set here + */ + protected void setJMSProperties(Message message) throws JMSException + { + _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); + + if (message.getJMSReplyTo() != null) + { + _newMessage.setJMSReplyTo(message.getJMSReplyTo()); + } + _newMessage.setJMSType(message.getJMSType()); + + _newMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java new file mode 100644 index 0000000000..f7bea1b36a --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java @@ -0,0 +1,231 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.*; +import java.util.Enumeration; +import java.io.Serializable; + +public class TestNonQpidTextMessage implements ObjectMessage { + + private JMSObjectMessage _realMessage; + private String _contentString; + + /** + * Allows us to construct a JMS message which + * does not inherit from the Qpid message superclasses + * and expand our unit testing of MessageConverter et al + */ + public TestNonQpidTextMessage() + { + _realMessage = new JMSObjectMessage(); + } + + public String getJMSMessageID() throws JMSException { + return _realMessage.getJMSMessageID(); + } + + public void setJMSMessageID(String string) throws JMSException { + _realMessage.setJMSMessageID(string); + } + + public long getJMSTimestamp() throws JMSException { + return _realMessage.getJMSTimestamp(); + } + + public void setJMSTimestamp(long l) throws JMSException { + _realMessage.setJMSTimestamp(l); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return _realMessage.getJMSCorrelationIDAsBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException { + _realMessage.setJMSCorrelationIDAsBytes(bytes); + } + + public void setJMSCorrelationID(String string) throws JMSException { + _realMessage.setJMSCorrelationID(string); + } + + public String getJMSCorrelationID() throws JMSException { + return _realMessage.getJMSCorrelationID(); + } + + public Destination getJMSReplyTo() throws JMSException { + return _realMessage.getJMSReplyTo(); + } + + public void setJMSReplyTo(Destination destination) throws JMSException { + _realMessage.setJMSReplyTo(destination); + } + + public Destination getJMSDestination() throws JMSException { + return _realMessage.getJMSDestination(); + } + + public void setJMSDestination(Destination destination) throws JMSException { + _realMessage.setJMSDestination(destination); + } + + public int getJMSDeliveryMode() throws JMSException { + return _realMessage.getJMSDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException { + _realMessage.setJMSDeliveryMode(i); + } + + public boolean getJMSRedelivered() throws JMSException { + return _realMessage.getJMSRedelivered(); + } + + public void setJMSRedelivered(boolean b) throws JMSException { + _realMessage.setJMSRedelivered(b); + } + + public String getJMSType() throws JMSException { + return _realMessage.getJMSType(); + } + + public void setJMSType(String string) throws JMSException { + _realMessage.setJMSType(string); + } + + public long getJMSExpiration() throws JMSException { + return _realMessage.getJMSExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException { + _realMessage.setJMSExpiration(l); + } + + public int getJMSPriority() throws JMSException { + return _realMessage.getJMSPriority(); + } + + public void setJMSPriority(int i) throws JMSException { + _realMessage.setJMSPriority(i); + } + + public void clearProperties() throws JMSException { + _realMessage.clearProperties(); + } + + public boolean propertyExists(String string) throws JMSException { + return _realMessage.propertyExists(string); + } + + public boolean getBooleanProperty(String string) throws JMSException { + return _realMessage.getBooleanProperty(string); + } + + public byte getByteProperty(String string) throws JMSException { + return _realMessage.getByteProperty(string); + } + + public short getShortProperty(String string) throws JMSException { + return _realMessage.getShortProperty(string); + } + + public int getIntProperty(String string) throws JMSException { + return _realMessage.getIntProperty(string); + } + + public long getLongProperty(String string) throws JMSException { + return _realMessage.getLongProperty(string); + } + + public float getFloatProperty(String string) throws JMSException { + return _realMessage.getFloatProperty(string); + } + + public double getDoubleProperty(String string) throws JMSException { + return _realMessage.getDoubleProperty(string); + } + + public String getStringProperty(String string) throws JMSException { + return _realMessage.getStringProperty(string); + } + + public Object getObjectProperty(String string) throws JMSException { + return _realMessage.getObjectProperty(string); + } + + public Enumeration getPropertyNames() throws JMSException { + return _realMessage.getPropertyNames(); + } + + public void setBooleanProperty(String string, boolean b) throws JMSException { + _realMessage.setBooleanProperty(string,b); + } + + public void setByteProperty(String string, byte b) throws JMSException { + _realMessage.setByteProperty(string,b); + } + + public void setShortProperty(String string, short i) throws JMSException { + _realMessage.setShortProperty(string,i); + } + + public void setIntProperty(String string, int i) throws JMSException { + _realMessage.setIntProperty(string,i); + } + + public void setLongProperty(String string, long l) throws JMSException { + _realMessage.setLongProperty(string,l); + } + + public void setFloatProperty(String string, float v) throws JMSException { + _realMessage.setFloatProperty(string,v); + } + + public void setDoubleProperty(String string, double v) throws JMSException { + _realMessage.setDoubleProperty(string,v); + } + + public void setStringProperty(String string, String string1) throws JMSException { + _realMessage.setStringProperty(string,string1); + } + + public void setObjectProperty(String string, Object object) throws JMSException { + _realMessage.setObjectProperty(string,object); + } + + public void acknowledge() throws JMSException { + _realMessage.acknowledge(); + } + + public void clearBody() throws JMSException { + _realMessage.clearBody(); + } + + public void setObject(Serializable serializable) throws JMSException { + if (serializable instanceof String) + { + _contentString = (String)serializable; + } + } + + public Serializable getObject() throws JMSException { + return _contentString; } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java new file mode 100644 index 0000000000..456748e0d2 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.message.TestNonQpidTextMessage; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class JMSPropertiesTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(JMSPropertiesTest.class); + + public String _connectionString = "vm://:1"; + + public static final String JMS_CORR_ID = "QPIDID_01"; + public static final int JMS_DELIV_MODE = 1; + public static final String JMS_TYPE = "test.jms.type"; + public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + public void testJMSProperties() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("someQ", "someQ", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + //create a test message to send + ObjectMessage sentMsg = new TestNonQpidTextMessage(); + sentMsg.setJMSCorrelationID(JMS_CORR_ID); + sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); + sentMsg.setJMSType(JMS_TYPE); + sentMsg.setJMSReplyTo(JMS_REPLY_TO); + + //send it + producer.send(sentMsg); + + con2.close(); + + con.start(); + + //get message and check JMS properties + ObjectMessage rm = (ObjectMessage) consumer.receive(); + assertNotNull(rm); + + assertEquals("JMS Correlation ID mismatch",sentMsg.getJMSCorrelationID(),rm.getJMSCorrelationID()); + //TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion + //assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch",sentMsg.getJMSType(),rm.getJMSType()); + assertEquals("JMS Reply To mismatch",sentMsg.getJMSReplyTo(),rm.getJMSReplyTo()); + + con.close(); + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java new file mode 100644 index 0000000000..6a335b8627 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import junit.framework.TestCase; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.AMQQueue; + +import javax.jms.Message; +import javax.jms.Destination; +import javax.jms.TextMessage; +import javax.jms.MapMessage; +import java.util.HashMap; + + +public class MessageConverterTest extends TestCase { + + public static final String JMS_CORR_ID = "QPIDID_01"; + public static final int JMS_DELIV_MODE = 1; + public static final String JMS_TYPE = "test.jms.type"; + public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + + protected JMSTextMessage testTextMessage; + + protected JMSMapMessage testMapMessage; + + protected void setUp() throws Exception + { + super.setUp(); + testTextMessage = new JMSTextMessage(); + + //Add JMSProperties + testTextMessage.setJMSCorrelationID(JMS_CORR_ID); + testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE); + testTextMessage.setJMSType(JMS_TYPE); + testTextMessage.setJMSReplyTo(JMS_REPLY_TO); + testTextMessage.setText("testTextMessage text"); + + //Add non-JMS properties + testTextMessage.setStringProperty("testProp1","testValue1"); + testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE); + + testMapMessage = new JMSMapMessage(); + testMapMessage.setString("testMapString","testMapStringValue"); + testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE); + } + + public void testSetProperties() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); + + //check JMS prop values on newMessage match + assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID()); + assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType()); + assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo()); + + //check non-JMS standard props ok too + assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"), + newMessage.getStringProperty("testProp1")); + assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"), + newMessage.getDoubleProperty("testProp2")); + } + + public void testJMSTextMessageConversion() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); + assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText()); + } + + public void testJMSMapMessageConversion() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage(); + assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"), + testMapMessage.getString("testMapString")); + assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"), + testMapMessage.getDouble("testMapDouble")); + + } + + protected void tearDown() throws Exception + { + super.tearDown(); + testTextMessage = null; + } + + +} |