summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-26 10:20:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-26 10:20:01 +0000
commit0035b023e6240b2c1b2a8f4f6e6b3caa869155ff (patch)
treed36046365c79bbe76613a68c506730acc1e704c7
parent64a5794de6a630a4bdf1b245aadf2548508808ae (diff)
downloadqpid-python-0035b023e6240b2c1b2a8f4f6e6b3caa869155ff.tar.gz
Applied Feature QPID-315
Revision: 499979 Author: marnie Date: 21:08:54, 25 January 2007 Message: QPID-315 Test classes to reproduce problem with missing correlation id on incoming messages from non-Qpid broker ---- Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Revision: 499975 Author: marnie Date: 21:07:49, 25 January 2007 Message: QPID-315 Moved message conversion logic from BasicMessageProducer to MessageConverter Added correlation id to AbstractJMSMessage.toString() ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Revision: 499532 Author: marnie Date: 18:51:22, 24 January 2007 Message: QPID-315 Updated and tidied class prior to addition of tests ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Revision: 499087 Author: marnie Date: 17:28:23, 23 January 2007 Message: QPID-315 INitial commit - AMQSesssion convertToNativeMessage needs replaced with call to this class ---- Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Partial Revision: 494121 Partial Author: rgreig Partial Date: 17:02:26, 08 January 2007 Partial Message: Partial QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String Partial ---- Partial Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Partial Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@500207 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java142
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java160
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java179
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java231
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java99
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java109
8 files changed, 730 insertions, 195 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index d38e461400..74b91c3c30 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
@@ -138,16 +139,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- 0, // ticket
- destination.getExchangeClass()); // type
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -183,7 +184,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ " is illegal");
}
_deliveryMode = i;
}
@@ -368,112 +369,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (message instanceof BytesMessage)
{
- BytesMessage bytesMessage = (BytesMessage) message;
- bytesMessage.reset();
-
- JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
-
-
- byte[] buf = new byte[1024];
-
- int len;
-
- while ((len = bytesMessage.readBytes(buf)) != -1)
- {
- nativeMsg.writeBytes(buf, 0, len);
- }
-
- newMessage = nativeMsg;
+ newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- MapMessage origMessage = (MapMessage) message;
- MapMessage nativeMessage = _session.createMapMessage();
-
- Enumeration mapNames = origMessage.getMapNames();
- while (mapNames.hasMoreElements())
- {
- String name = (String) mapNames.nextElement();
- nativeMessage.setObject(name, origMessage.getObject(name));
- }
- newMessage = (AbstractJMSMessage) nativeMessage;
+ newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- ObjectMessage origMessage = (ObjectMessage) message;
- ObjectMessage nativeMessage = _session.createObjectMessage();
-
- nativeMessage.setObject(origMessage.getObject());
-
- newMessage = (AbstractJMSMessage) nativeMessage;
+ newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- TextMessage origMessage = (TextMessage) message;
- TextMessage nativeMessage = _session.createTextMessage();
-
- nativeMessage.setText(origMessage.getText());
-
- newMessage = (AbstractJMSMessage) nativeMessage;
+ newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- StreamMessage origMessage = (StreamMessage) message;
- StreamMessage nativeMessage = _session.createStreamMessage();
-
-
- try
- {
- origMessage.reset();
- while (true)
- {
- nativeMessage.writeObject(origMessage.readObject());
- }
- }
- catch (MessageEOFException e)
- {
- ;//
- }
- newMessage = (AbstractJMSMessage) nativeMessage;
+ newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
}
else
{
+ //TODO; Do we really want to create an empty message here ?
newMessage = (AbstractJMSMessage) _session.createMessage();
-
- }
-
- Enumeration propertyNames = message.getPropertyNames();
- while (propertyNames.hasMoreElements())
- {
- String propertyName = String.valueOf(propertyNames.nextElement());
- if (!propertyName.startsWith("JMSX_"))
- {
- Object value = message.getObjectProperty(propertyName);
- newMessage.setObjectProperty(propertyName, value);
- }
+ return new MessageConverter(newMessage).getConvertedMessage();
}
- newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
-
-
- int priority = message.getJMSPriority();
- if (priority < 0)
- {
- priority = 0;
- }
- else if (priority > 9)
- {
- priority = 9;
- }
-
- newMessage.setJMSPriority(priority);
- if (message.getJMSReplyTo() != null)
- {
- newMessage.setJMSReplyTo(message.getJMSReplyTo());
- }
- newMessage.setJMSType(message.getJMSType());
-
-
if (newMessage != null)
{
return newMessage;
@@ -491,7 +411,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ (destination != null ? destination.getClass() : null));
}
declareDestination((AMQDestination) destination);
}
@@ -520,19 +440,19 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
-
+
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -576,7 +496,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
// weight argument of zero indicates no child content headers, just bodies
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
+ AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte) 8, (byte) 0), 0,
contentHeaderProperties,
size);
if (_logger.isDebugEnabled())
@@ -603,16 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
- if(destination instanceof TemporaryDestination)
+ if (destination instanceof TemporaryDestination)
{
_logger.debug("destination is temporary destination");
TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession().isClosed())
+ if (tempDest.getSession().isClosed())
{
_logger.debug("session is closed");
throw new JMSException("Session for temporary destination has been closed");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot send to a deleted temporary destination");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 0c29344c37..3b0a84d5bb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -401,7 +401,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void acknowledge() throws JMSException
{
- if(_session != null)
+ if (_session != null)
{
_session.acknowledge();
}
@@ -429,6 +429,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
StringBuffer buf = new StringBuffer("Body:\n");
buf.append(toBodyString());
+ buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
buf.append("\nJMS expiration: ").append(getJMSExpiration());
buf.append("\nJMS priority: ").append(getJMSPriority());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 88e78a1dad..25c56411b0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -38,9 +38,9 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public static final String MIME_TYPE = "jms/map-message";
- private Map<String,Object> _map = new HashMap<String, Object>();
+ private Map<String, Object> _map = new HashMap<String, Object>();
- JMSMapMessage() throws JMSException
+ public JMSMapMessage() throws JMSException
{
this(null);
}
@@ -59,11 +59,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
try
{
populateMapFromData();
- }
+ }
catch (JMSException je)
{
throw new AMQException("Error populating MapMessage from ByteBuffer", je);
-
+
}
}
@@ -88,7 +88,6 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
-
@Override
public void clearBodyImpl() throws JMSException
{
@@ -100,13 +99,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Boolean)
+ if (value instanceof Boolean)
{
- return ((Boolean)value).booleanValue();
+ return ((Boolean) value).booleanValue();
}
- else if((value instanceof String) || (value == null))
+ else if ((value instanceof String) || (value == null))
{
- return Boolean.valueOf((String)value);
+ return Boolean.valueOf((String) value);
}
else
{
@@ -120,13 +119,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Byte)
+ if (value instanceof Byte)
{
- return ((Byte)value).byteValue();
+ return ((Byte) value).byteValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Byte.valueOf((String)value).byteValue();
+ return Byte.valueOf((String) value).byteValue();
}
else
{
@@ -139,17 +138,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Short)
+ if (value instanceof Short)
{
- return ((Short)value).shortValue();
+ return ((Short) value).shortValue();
}
- else if(value instanceof Byte)
+ else if (value instanceof Byte)
{
- return ((Byte)value).shortValue();
+ return ((Byte) value).shortValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Short.valueOf((String)value).shortValue();
+ return Short.valueOf((String) value).shortValue();
}
else
{
@@ -164,21 +163,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Integer)
+ if (value instanceof Integer)
{
- return ((Integer)value).intValue();
+ return ((Integer) value).intValue();
}
- else if(value instanceof Short)
+ else if (value instanceof Short)
{
- return ((Short)value).intValue();
+ return ((Short) value).intValue();
}
- else if(value instanceof Byte)
+ else if (value instanceof Byte)
{
- return ((Byte)value).intValue();
+ return ((Byte) value).intValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Integer.valueOf((String)value).intValue();
+ return Integer.valueOf((String) value).intValue();
}
else
{
@@ -192,25 +191,25 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Long)
+ if (value instanceof Long)
{
- return ((Long)value).longValue();
+ return ((Long) value).longValue();
}
- else if(value instanceof Integer)
+ else if (value instanceof Integer)
{
- return ((Integer)value).longValue();
+ return ((Integer) value).longValue();
}
- if(value instanceof Short)
+ if (value instanceof Short)
{
- return ((Short)value).longValue();
+ return ((Short) value).longValue();
}
- if(value instanceof Byte)
+ if (value instanceof Byte)
{
- return ((Byte)value).longValue();
+ return ((Byte) value).longValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Long.valueOf((String)value).longValue();
+ return Long.valueOf((String) value).longValue();
}
else
{
@@ -224,13 +223,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(!_map.containsKey(propName))
+ if (!_map.containsKey(propName))
{
throw new MessageFormatException("Property " + propName + " not present");
}
- else if(value instanceof Character)
+ else if (value instanceof Character)
{
- return ((Character)value).charValue();
+ return ((Character) value).charValue();
}
else if (value == null)
{
@@ -246,18 +245,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
-
public float getFloat(String propName) throws JMSException
{
Object value = _map.get(propName);
- if(value instanceof Float)
+ if (value instanceof Float)
{
- return ((Float)value).floatValue();
+ return ((Float) value).floatValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Float.valueOf((String)value).floatValue();
+ return Float.valueOf((String) value).floatValue();
}
else
{
@@ -270,17 +268,17 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(value instanceof Double)
+ if (value instanceof Double)
{
- return ((Double)value).doubleValue();
+ return ((Double) value).doubleValue();
}
- else if(value instanceof Float)
+ else if (value instanceof Float)
{
- return ((Float)value).doubleValue();
+ return ((Float) value).doubleValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Double.valueOf((String)value).doubleValue();
+ return Double.valueOf((String) value).doubleValue();
}
else
{
@@ -293,11 +291,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if((value instanceof String) || (value == null))
+ if ((value instanceof String) || (value == null))
{
return (String) value;
}
- else if(value instanceof byte[])
+ else if (value instanceof byte[])
{
throw new MessageFormatException("Property " + propName + " of type byte[] " +
"cannot be converted to String.");
@@ -313,13 +311,13 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = _map.get(propName);
- if(!_map.containsKey(propName))
+ if (!_map.containsKey(propName))
{
- throw new MessageFormatException("Property " + propName + " not present");
+ throw new MessageFormatException("Property " + propName + " not present");
}
- else if((value instanceof byte[]) || (value == null))
+ else if ((value instanceof byte[]) || (value == null))
{
- return (byte[])value;
+ return (byte[]) value;
}
else
{
@@ -411,33 +409,33 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
{
- if((offset == 0) && (length == bytes.length))
+ if ((offset == 0) && (length == bytes.length))
{
- setBytes(propName,bytes);
+ setBytes(propName, bytes);
}
else
{
byte[] newBytes = new byte[length];
- System.arraycopy(bytes,offset,newBytes,0,length);
- setBytes(propName,newBytes);
+ System.arraycopy(bytes, offset, newBytes, 0, length);
+ setBytes(propName, newBytes);
}
}
public void setObject(String propName, Object value) throws JMSException
- {
+ {
checkWritable();
checkPropertyName(propName);
- if(value instanceof Boolean
- || value instanceof Byte
- || value instanceof Short
- || value instanceof Integer
- || value instanceof Long
- || value instanceof Character
- || value instanceof Float
- || value instanceof Double
- || value instanceof String
- || value instanceof byte[]
- || value == null)
+ if (value instanceof Boolean
+ || value instanceof Byte
+ || value instanceof Short
+ || value instanceof Integer
+ || value instanceof Long
+ || value instanceof Character
+ || value instanceof Float
+ || value instanceof Double
+ || value instanceof String
+ || value instanceof byte[]
+ || value == null)
{
_map.put(propName, value);
}
@@ -450,7 +448,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
private void checkPropertyName(String propName)
{
- if(propName == null || propName.equals(""))
+ if (propName == null || propName.equals(""))
{
throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
}
@@ -464,16 +462,16 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
private void populateMapFromData() throws JMSException
{
- if(_data != null)
+ if (_data != null)
{
_data.rewind();
final int entries = readIntImpl();
- for(int i = 0; i < entries; i++)
+ for (int i = 0; i < entries; i++)
{
String propName = readStringImpl();
Object value = readObject();
- _map.put(propName,value);
+ _map.put(propName, value);
}
}
else
@@ -487,7 +485,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
allocateInitialBuffer();
final int size = _map.size();
writeIntImpl(size);
- for(Map.Entry<String, Object> entry : _map.entrySet())
+ for (Map.Entry<String, Object> entry : _map.entrySet())
{
try
{
@@ -495,7 +493,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
catch (CharacterCodingException e)
{
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e);
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
}
@@ -507,13 +505,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
{
Object value = entry.getValue();
throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() +
- " value : " + value + " (type: " + value.getClass().getName() + ").",e);
+ " value : " + value + " (type: " + value.getClass().getName() + ").", e);
}
}
}
-
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index d8394b0489..fad9c5e15a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -40,7 +40,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
*/
private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
- JMSTextMessage() throws JMSException
+ public JMSTextMessage() throws JMSException
{
this(null, null);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
new file mode 100644
index 0000000000..58089f595b
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import java.util.Enumeration;
+
+public class MessageConverter {
+
+ /**
+ * Log4J logger
+ */
+ protected final Logger _logger = Logger.getLogger(getClass());
+
+ /**
+ * AbstractJMSMessage which will hold the converted message
+ */
+ private AbstractJMSMessage _newMessage;
+
+ public MessageConverter(AbstractJMSMessage message) throws JMSException
+ {
+ _newMessage = message;
+ }
+
+ public MessageConverter(BytesMessage message) throws JMSException
+ {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ bytesMessage.reset();
+
+ JMSBytesMessage nativeMsg = new JMSBytesMessage();
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf, 0, len);
+ }
+
+ _newMessage = nativeMsg;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(MapMessage message) throws JMSException
+ {
+ MapMessage nativeMessage = new JMSMapMessage();
+
+ Enumeration mapNames = message.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMessage.setObject(name, message.getObject(name));
+ }
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(ObjectMessage message) throws JMSException
+ {
+ ObjectMessage origMessage = (ObjectMessage) message;
+ ObjectMessage nativeMessage = new JMSObjectMessage();
+
+ nativeMessage.setObject(origMessage.getObject());
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+
+ }
+
+ public MessageConverter(TextMessage message) throws JMSException
+ {
+ TextMessage nativeMessage = new JMSTextMessage();
+
+ nativeMessage.setText(message.getText());
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(StreamMessage message) throws JMSException
+ {
+ StreamMessage nativeMessage = new JMSStreamMessage();
+
+ try
+ {
+ message.reset();
+ while (true)
+ {
+ nativeMessage.writeObject(message.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ //we're at the end so don't mind the exception
+ }
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public AbstractJMSMessage getConvertedMessage()
+ {
+ return _newMessage;
+ }
+
+ /**
+ * Sets all message properties
+ */
+ protected void setMessageProperties(Message message) throws JMSException
+ {
+ setNonJMSProperties(message);
+ setJMSProperties(message);
+ }
+
+ /**
+ * Sets all non-JMS defined properties on converted message
+ */
+ protected void setNonJMSProperties(Message message) throws JMSException
+ {
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ //TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ _newMessage.setObjectProperty(propertyName, value);
+ }
+ }
+ }
+
+ /**
+ * Exposed JMS defined properties on converted message:
+ * JMSDestination - we don't set here
+ * JMSDeliveryMode - set
+ * JMSExpiration - we don't set here
+ * JMSPriority - we don't set here
+ * JMSMessageID - we don't set here
+ * JMSTimestamp - we don't set here
+ * JMSCorrelationID - set
+ * JMSReplyTo - set
+ * JMSType - set
+ * JMSRedlivered - we don't set here
+ */
+ protected void setJMSProperties(Message message) throws JMSException
+ {
+ _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+ if (message.getJMSReplyTo() != null)
+ {
+ _newMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+ _newMessage.setJMSType(message.getJMSType());
+
+ _newMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
new file mode 100644
index 0000000000..f7bea1b36a
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.*;
+import java.util.Enumeration;
+import java.io.Serializable;
+
+public class TestNonQpidTextMessage implements ObjectMessage {
+
+ private JMSObjectMessage _realMessage;
+ private String _contentString;
+
+ /**
+ * Allows us to construct a JMS message which
+ * does not inherit from the Qpid message superclasses
+ * and expand our unit testing of MessageConverter et al
+ */
+ public TestNonQpidTextMessage()
+ {
+ _realMessage = new JMSObjectMessage();
+ }
+
+ public String getJMSMessageID() throws JMSException {
+ return _realMessage.getJMSMessageID();
+ }
+
+ public void setJMSMessageID(String string) throws JMSException {
+ _realMessage.setJMSMessageID(string);
+ }
+
+ public long getJMSTimestamp() throws JMSException {
+ return _realMessage.getJMSTimestamp();
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException {
+ _realMessage.setJMSTimestamp(l);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return _realMessage.getJMSCorrelationIDAsBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
+ _realMessage.setJMSCorrelationIDAsBytes(bytes);
+ }
+
+ public void setJMSCorrelationID(String string) throws JMSException {
+ _realMessage.setJMSCorrelationID(string);
+ }
+
+ public String getJMSCorrelationID() throws JMSException {
+ return _realMessage.getJMSCorrelationID();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException {
+ return _realMessage.getJMSReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException {
+ _realMessage.setJMSReplyTo(destination);
+ }
+
+ public Destination getJMSDestination() throws JMSException {
+ return _realMessage.getJMSDestination();
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException {
+ _realMessage.setJMSDestination(destination);
+ }
+
+ public int getJMSDeliveryMode() throws JMSException {
+ return _realMessage.getJMSDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException {
+ _realMessage.setJMSDeliveryMode(i);
+ }
+
+ public boolean getJMSRedelivered() throws JMSException {
+ return _realMessage.getJMSRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException {
+ _realMessage.setJMSRedelivered(b);
+ }
+
+ public String getJMSType() throws JMSException {
+ return _realMessage.getJMSType();
+ }
+
+ public void setJMSType(String string) throws JMSException {
+ _realMessage.setJMSType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException {
+ return _realMessage.getJMSExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException {
+ _realMessage.setJMSExpiration(l);
+ }
+
+ public int getJMSPriority() throws JMSException {
+ return _realMessage.getJMSPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException {
+ _realMessage.setJMSPriority(i);
+ }
+
+ public void clearProperties() throws JMSException {
+ _realMessage.clearProperties();
+ }
+
+ public boolean propertyExists(String string) throws JMSException {
+ return _realMessage.propertyExists(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException {
+ return _realMessage.getBooleanProperty(string);
+ }
+
+ public byte getByteProperty(String string) throws JMSException {
+ return _realMessage.getByteProperty(string);
+ }
+
+ public short getShortProperty(String string) throws JMSException {
+ return _realMessage.getShortProperty(string);
+ }
+
+ public int getIntProperty(String string) throws JMSException {
+ return _realMessage.getIntProperty(string);
+ }
+
+ public long getLongProperty(String string) throws JMSException {
+ return _realMessage.getLongProperty(string);
+ }
+
+ public float getFloatProperty(String string) throws JMSException {
+ return _realMessage.getFloatProperty(string);
+ }
+
+ public double getDoubleProperty(String string) throws JMSException {
+ return _realMessage.getDoubleProperty(string);
+ }
+
+ public String getStringProperty(String string) throws JMSException {
+ return _realMessage.getStringProperty(string);
+ }
+
+ public Object getObjectProperty(String string) throws JMSException {
+ return _realMessage.getObjectProperty(string);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException {
+ return _realMessage.getPropertyNames();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException {
+ _realMessage.setBooleanProperty(string,b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException {
+ _realMessage.setByteProperty(string,b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException {
+ _realMessage.setShortProperty(string,i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException {
+ _realMessage.setIntProperty(string,i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException {
+ _realMessage.setLongProperty(string,l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException {
+ _realMessage.setFloatProperty(string,v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException {
+ _realMessage.setDoubleProperty(string,v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException {
+ _realMessage.setStringProperty(string,string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException {
+ _realMessage.setObjectProperty(string,object);
+ }
+
+ public void acknowledge() throws JMSException {
+ _realMessage.acknowledge();
+ }
+
+ public void clearBody() throws JMSException {
+ _realMessage.clearBody();
+ }
+
+ public void setObject(Serializable serializable) throws JMSException {
+ if (serializable instanceof String)
+ {
+ _contentString = (String)serializable;
+ }
+ }
+
+ public Serializable getObject() throws JMSException {
+ return _contentString; }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
new file mode 100644
index 0000000000..456748e0d2
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestNonQpidTextMessage;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSPropertiesTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(JMSPropertiesTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ public static final String JMS_CORR_ID = "QPIDID_01";
+ public static final int JMS_DELIV_MODE = 1;
+ public static final String JMS_TYPE = "test.jms.type";
+ public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testJMSProperties() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ //create a test message to send
+ ObjectMessage sentMsg = new TestNonQpidTextMessage();
+ sentMsg.setJMSCorrelationID(JMS_CORR_ID);
+ sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE);
+ sentMsg.setJMSType(JMS_TYPE);
+ sentMsg.setJMSReplyTo(JMS_REPLY_TO);
+
+ //send it
+ producer.send(sentMsg);
+
+ con2.close();
+
+ con.start();
+
+ //get message and check JMS properties
+ ObjectMessage rm = (ObjectMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals("JMS Correlation ID mismatch",sentMsg.getJMSCorrelationID(),rm.getJMSCorrelationID());
+ //TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion
+ //assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch",sentMsg.getJMSType(),rm.getJMSType());
+ assertEquals("JMS Reply To mismatch",sentMsg.getJMSReplyTo(),rm.getJMSReplyTo());
+
+ con.close();
+ }
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
new file mode 100644
index 0000000000..6a335b8627
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Message;
+import javax.jms.Destination;
+import javax.jms.TextMessage;
+import javax.jms.MapMessage;
+import java.util.HashMap;
+
+
+public class MessageConverterTest extends TestCase {
+
+ public static final String JMS_CORR_ID = "QPIDID_01";
+ public static final int JMS_DELIV_MODE = 1;
+ public static final String JMS_TYPE = "test.jms.type";
+ public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+
+ protected JMSTextMessage testTextMessage;
+
+ protected JMSMapMessage testMapMessage;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ testTextMessage = new JMSTextMessage();
+
+ //Add JMSProperties
+ testTextMessage.setJMSCorrelationID(JMS_CORR_ID);
+ testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE);
+ testTextMessage.setJMSType(JMS_TYPE);
+ testTextMessage.setJMSReplyTo(JMS_REPLY_TO);
+ testTextMessage.setText("testTextMessage text");
+
+ //Add non-JMS properties
+ testTextMessage.setStringProperty("testProp1","testValue1");
+ testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE);
+
+ testMapMessage = new JMSMapMessage();
+ testMapMessage.setString("testMapString","testMapStringValue");
+ testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE);
+ }
+
+ public void testSetProperties() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
+
+ //check JMS prop values on newMessage match
+ assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID());
+ assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType());
+ assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo());
+
+ //check non-JMS standard props ok too
+ assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"),
+ newMessage.getStringProperty("testProp1"));
+ assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"),
+ newMessage.getDoubleProperty("testProp2"));
+ }
+
+ public void testJMSTextMessageConversion() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
+ assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText());
+ }
+
+ public void testJMSMapMessageConversion() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage();
+ assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"),
+ testMapMessage.getString("testMapString"));
+ assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"),
+ testMapMessage.getDouble("testMapDouble"));
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ testTextMessage = null;
+ }
+
+
+}