diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-13 18:20:11 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-13 18:20:11 +0000 |
commit | 092bc25334915b7a8a4cd9f8c4d0f2c84df3bbbd (patch) | |
tree | 85947090f5d3e95468fec48a2332a3f686ec5ed0 /java | |
parent | fe2b1ac2e6968534650ed0341acd5f11ed42f38d (diff) | |
download | qpid-python-092bc25334915b7a8a4cd9f8c4d0f2c84df3bbbd.tar.gz |
QPID-179 Now has hook for pre-send preparation of message which in turn allows us to handle the distinction between null and empty String text message bodies. Actual distinction is carried in a message property. Patch supplied by Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486783 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 141 insertions, 133 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5a16a148cb..dbc074beb5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,9 +25,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -38,7 +38,6 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -287,7 +286,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -303,7 +302,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -319,7 +318,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -335,7 +334,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -351,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -403,7 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -473,7 +472,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -493,7 +492,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Error closing session: " + e); + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; } finally { @@ -536,7 +537,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -747,7 +748,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); @@ -763,7 +764,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); @@ -772,20 +773,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } @@ -795,7 +796,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -807,7 +808,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } @@ -818,7 +819,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector); } @@ -831,7 +832,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } @@ -963,7 +964,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Queue createQueue(String queueName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (queueName.indexOf('/') == -1) { return new AMQQueue(queueName); @@ -993,7 +994,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1009,7 +1010,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); @@ -1018,14 +1019,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueSender createSender(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); //return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } public Topic createTopic(String topicName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (topicName.indexOf('/') == -1) { @@ -1056,8 +1057,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1073,8 +1074,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1088,8 +1089,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1100,8 +1101,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1109,41 +1110,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); - //return (TopicPublisher) createProducer(topic); - return new TopicPublisherAdapter(createProducer(topic), topic); + checkNotClosed(); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); } public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public TemporaryQueue createTemporaryQueue() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryQueue(); } public TemporaryTopic createTemporaryTopic() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryTopic(); } public void unsubscribe(String name) throws JMSException { - checkNotClosed(); + checkNotClosed(); //send a queue.delete for the subscription String queue = _connection.getClientID() + ":" + name; @@ -1350,21 +1349,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws InvalidDestinationException{ - if (topic == null){ - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } + private void checkValidTopic(Topic topic) throws InvalidDestinationException + { + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } } - private void checkValidQueue(Queue queue) throws InvalidDestinationException{ - if (queue == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidQueue(Queue queue) throws InvalidDestinationException + { + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - private void checkValidDestination(Destination destination) throws InvalidDestinationException{ - if (destination == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidDestination(Destination destination) throws InvalidDestinationException + { + if (destination == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index c3d86d15c7..8c53d93de6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,15 +24,10 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import java.io.UnsupportedEncodingException; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer @@ -103,6 +98,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _mandatory; private final boolean _waitUntilSent; + private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -349,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); - } + } declareDestination((AMQDestination)destination); } @@ -382,6 +378,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j currentTime = System.currentTimeMillis(); message.setJMSTimestamp(currentTime); } + message.prepareForSending(); ByteBuffer payload = message.getData(); BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); @@ -402,7 +399,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - int size = payload.limit(); + int size = (payload != null) ? payload.limit() : 0; ContentBody[] contentBodies = createContentBodies(payload); AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; for (int i = 0; i < contentBodies.length; i++) @@ -437,14 +434,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private ContentBody[] createContentBodies(ByteBuffer payload) { - if (payload == null) + if (payload == null || payload.remaining() == 0) { - return null; - } - else if (payload.remaining() == 0) - { - return new ContentBody[0]; + return NO_CONTENT_BODIES; } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int dataLength = payload.remaining(); @@ -485,31 +479,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkNotClosed(); _encoding = encoding; } - + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } } - + private void checkInitialDestination(){ if(_destination == null){ throw new UnsupportedOperationException("Destination is null"); } } - + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ if (_destination != null && suppliedDestination != null){ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } - + if (suppliedDestination == null){ - throw new InvalidDestinationException("Supplied Destination was invalid"); + throw new InvalidDestinationException("Supplied Destination was invalid"); } } - + public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java index edabed90b3..dd82eb13c1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,8 @@ package org.apache.qpid.client.message; import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.client.AMQSession; +import javax.jms.JMSException; + public class AMQMessage { protected ContentHeaderProperties _contentHeaderProperties; @@ -67,5 +69,13 @@ public class AMQMessage public long getDeliveryTag() { return _deliveryTag; - } + } + + /** + * Invoked prior to sending the message. Allows the message to be modified if necessary before + * sending. + */ + public void prepareForSending() throws JMSException + { + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 41eb21a415..c1ed88b167 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -32,7 +32,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.JmsNotImplementedException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import javax.jms.Destination; import javax.jms.JMSException; @@ -40,7 +39,6 @@ import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; import java.util.Collections; import java.util.Enumeration; -import java.util.Iterator; import java.util.Map; public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message @@ -257,13 +255,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - - if (getJmsContentHeaderProperties() == null) - { - System.out.println("HEADERS ARE NULL"); - } - - return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName); } @@ -383,6 +374,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object); } + protected void removeProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().remove(propertyName); + } + public void acknowledge() throws JMSException { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge @@ -470,31 +467,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getHeaders(); } - public FieldTable populateHeadersFromMessageProperties() - { - // - // We need to convert every property into a String representation - // Note that type information is preserved in the property name - // - final FieldTable table = FieldTableFactory.newFieldTable(); - final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); - while (entries.hasNext()) - { - final Map.Entry entry = (Map.Entry) entries.next(); - final String propertyName = (String) entry.getKey(); - if (propertyName == null) - { - continue; - } - else - { - table.put(propertyName, entry.getValue().toString()); - } - } - return table; - - } - public BasicContentHeaderProperties getJmsContentHeaderProperties() { return (BasicContentHeaderProperties) _contentHeaderProperties; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 3061d5a59c..76f8a1c32f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,6 +35,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text private String _decodedValue; + /** + * This constant represents the name of a property that is set when the message payload is null. + */ + private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL"; + JMSTextMessage() throws JMSException { this(null, null); @@ -91,31 +96,34 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return MIME_TYPE; } - public void setText(String string) throws JMSException + public void setText(String text) throws JMSException { checkWritable(); - + clearBody(); try { - _data = ByteBuffer.allocate(string.length()); - _data.limit(string.length()); - //_data.sweep(); - _data.setAutoExpand(true); - if (getJmsContentHeaderProperties().getEncoding() == null) - { - _data.put(string.getBytes()); - } - else - { - _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding())); + if (text != null) + { + _data = ByteBuffer.allocate(text.length()); + _data.limit(text.length()) ; + //_data.sweep(); + _data.setAutoExpand(true); + if (getJmsContentHeaderProperties().getEncoding() == null) + { + _data.put(text.getBytes()); + } + else + { + _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); + } } - _decodedValue = string; + _decodedValue = text; } catch (UnsupportedEncodingException e) { // should never occur - JMSException jmse = new JMSException("Unable to decode string data"); + JMSException jmse = new JMSException("Unable to decode text data"); jmse.setLinkedException(e); } } @@ -133,6 +141,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text else { _data.rewind(); + + if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) + { + return null; + } if (getJmsContentHeaderProperties().getEncoding() != null) { try @@ -162,4 +175,18 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return _decodedValue; } } + + @Override + public void prepareForSending() throws JMSException + { + super.prepareForSending(); + if (_data == null) + { + setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); + } + else + { + removeProperty(PAYLOAD_NULL_PROPERTY); + } + } } |