summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-13 18:20:11 +0000
committerRobert Greig <rgreig@apache.org>2006-12-13 18:20:11 +0000
commit092bc25334915b7a8a4cd9f8c4d0f2c84df3bbbd (patch)
tree85947090f5d3e95468fec48a2332a3f686ec5ed0 /java
parentfe2b1ac2e6968534650ed0341acd5f11ed42f38d (diff)
downloadqpid-python-092bc25334915b7a8a4cd9f8c4d0f2c84df3bbbd.tar.gz
QPID-179 Now has hook for pre-send preparation of message which in turn allows us to handle the distinction between null and empty String text message bodies. Actual distinction is carried in a message property. Patch supplied by Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486783 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java117
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java61
5 files changed, 141 insertions, 133 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5a16a148cb..dbc074beb5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,9 +25,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -38,7 +38,6 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
-
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -287,7 +286,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -303,7 +302,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -319,7 +318,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -335,7 +334,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -351,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -403,7 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -473,7 +472,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -493,7 +492,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw new JMSException("Error closing session: " + e);
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
finally
{
@@ -536,7 +537,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -747,7 +748,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -763,7 +764,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -772,20 +773,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -795,7 +796,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -807,7 +808,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
@@ -818,7 +819,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
selector, rawSelector);
}
@@ -831,7 +832,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
@@ -963,7 +964,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public Queue createQueue(String queueName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (queueName.indexOf('/') == -1)
{
return new AMQQueue(queueName);
@@ -993,7 +994,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1009,7 +1010,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector);
@@ -1018,14 +1019,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueSender createSender(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
//return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
public Topic createTopic(String topicName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (topicName.indexOf('/') == -1)
{
@@ -1056,8 +1057,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1073,8 +1074,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1088,8 +1089,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1100,8 +1101,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1109,41 +1110,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
- //return (TopicPublisher) createProducer(topic);
- return new TopicPublisherAdapter(createProducer(topic), topic);
+ checkNotClosed();
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
//send a queue.delete for the subscription
String queue = _connection.getClientID() + ":" + name;
@@ -1350,21 +1349,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws InvalidDestinationException{
- if (topic == null){
- throw new javax.jms.InvalidDestinationException("Invalid Topic");
- }
+ private void checkValidTopic(Topic topic) throws InvalidDestinationException
+ {
+ if (topic == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
}
- private void checkValidQueue(Queue queue) throws InvalidDestinationException{
- if (queue == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException
+ {
+ if (queue == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
- private void checkValidDestination(Destination destination) throws InvalidDestinationException{
- if (destination == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException
+ {
+ if (destination == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index c3d86d15c7..8c53d93de6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,15 +24,10 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import java.io.UnsupportedEncodingException;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
@@ -103,6 +98,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _mandatory;
private final boolean _waitUntilSent;
+ private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -349,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
- }
+ }
declareDestination((AMQDestination)destination);
}
@@ -382,6 +378,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
currentTime = System.currentTimeMillis();
message.setJMSTimestamp(currentTime);
}
+ message.prepareForSending();
ByteBuffer payload = message.getData();
BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
@@ -402,7 +399,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- int size = payload.limit();
+ int size = (payload != null) ? payload.limit() : 0;
ContentBody[] contentBodies = createContentBodies(payload);
AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
for (int i = 0; i < contentBodies.length; i++)
@@ -437,14 +434,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
*/
private ContentBody[] createContentBodies(ByteBuffer payload)
{
- if (payload == null)
+ if (payload == null || payload.remaining() == 0)
{
- return null;
- }
- else if (payload.remaining() == 0)
- {
- return new ContentBody[0];
+ return NO_CONTENT_BODIES;
}
+
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int dataLength = payload.remaining();
@@ -485,31 +479,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkNotClosed();
_encoding = encoding;
}
-
+
private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
-
+
private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is null");
}
}
-
+
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
if (_destination != null && suppliedDestination != null){
throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
-
+
if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
+ throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
-
+
public AMQSession getSession() {
return _session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
index edabed90b3..dd82eb13c1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,8 @@ package org.apache.qpid.client.message;
import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.client.AMQSession;
+import javax.jms.JMSException;
+
public class AMQMessage
{
protected ContentHeaderProperties _contentHeaderProperties;
@@ -67,5 +69,13 @@ public class AMQMessage
public long getDeliveryTag()
{
return _deliveryTag;
- }
+ }
+
+ /**
+ * Invoked prior to sending the message. Allows the message to be modified if necessary before
+ * sending.
+ */
+ public void prepareForSending() throws JMSException
+ {
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 41eb21a415..c1ed88b167 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -32,7 +32,6 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -40,7 +39,6 @@ import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.Iterator;
import java.util.Map;
public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
@@ -257,13 +255,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
-
- if (getJmsContentHeaderProperties() == null)
- {
- System.out.println("HEADERS ARE NULL");
- }
-
-
return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
}
@@ -383,6 +374,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
}
+ protected void removeProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+ }
+
public void acknowledge() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
@@ -470,31 +467,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
getJmsContentHeaderProperties().getHeaders();
}
- public FieldTable populateHeadersFromMessageProperties()
- {
- //
- // We need to convert every property into a String representation
- // Note that type information is preserved in the property name
- //
- final FieldTable table = FieldTableFactory.newFieldTable();
- final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
- while (entries.hasNext())
- {
- final Map.Entry entry = (Map.Entry) entries.next();
- final String propertyName = (String) entry.getKey();
- if (propertyName == null)
- {
- continue;
- }
- else
- {
- table.put(propertyName, entry.getValue().toString());
- }
- }
- return table;
-
- }
-
public BasicContentHeaderProperties getJmsContentHeaderProperties()
{
return (BasicContentHeaderProperties) _contentHeaderProperties;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 3061d5a59c..76f8a1c32f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,6 +35,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
private String _decodedValue;
+ /**
+ * This constant represents the name of a property that is set when the message payload is null.
+ */
+ private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+
JMSTextMessage() throws JMSException
{
this(null, null);
@@ -91,31 +96,34 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
return MIME_TYPE;
}
- public void setText(String string) throws JMSException
+ public void setText(String text) throws JMSException
{
checkWritable();
-
+
clearBody();
try
{
- _data = ByteBuffer.allocate(string.length());
- _data.limit(string.length());
- //_data.sweep();
- _data.setAutoExpand(true);
- if (getJmsContentHeaderProperties().getEncoding() == null)
- {
- _data.put(string.getBytes());
- }
- else
- {
- _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ if (text != null)
+ {
+ _data = ByteBuffer.allocate(text.length());
+ _data.limit(text.length()) ;
+ //_data.sweep();
+ _data.setAutoExpand(true);
+ if (getJmsContentHeaderProperties().getEncoding() == null)
+ {
+ _data.put(text.getBytes());
+ }
+ else
+ {
+ _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ }
}
- _decodedValue = string;
+ _decodedValue = text;
}
catch (UnsupportedEncodingException e)
{
// should never occur
- JMSException jmse = new JMSException("Unable to decode string data");
+ JMSException jmse = new JMSException("Unable to decode text data");
jmse.setLinkedException(e);
}
}
@@ -133,6 +141,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
else
{
_data.rewind();
+
+ if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
+ {
+ return null;
+ }
if (getJmsContentHeaderProperties().getEncoding() != null)
{
try
@@ -162,4 +175,18 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
return _decodedValue;
}
}
+
+ @Override
+ public void prepareForSending() throws JMSException
+ {
+ super.prepareForSending();
+ if (_data == null)
+ {
+ setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
+ }
+ else
+ {
+ removeProperty(PAYLOAD_NULL_PROPERTY);
+ }
+ }
}