diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 51 |
1 files changed, 17 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index fd6070a045..8c53d93de6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,15 +24,10 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import java.io.UnsupportedEncodingException; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer @@ -103,6 +98,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _mandatory; private final boolean _waitUntilSent; + private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -349,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); - } + } declareDestination((AMQDestination)destination); } @@ -382,17 +378,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j currentTime = System.currentTimeMillis(); message.setJMSTimestamp(currentTime); } - // - // Very nasty temporary hack for GRM-206. Will be altered ASAP. - // - if (message instanceof JMSBytesMessage) - { - JMSBytesMessage msg = (JMSBytesMessage) message; - if (!msg.isReadable()) - { - msg.reset(); - } - } + message.prepareForSending(); ByteBuffer payload = message.getData(); BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); @@ -413,7 +399,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - int size = payload.limit(); + int size = (payload != null) ? payload.limit() : 0; ContentBody[] contentBodies = createContentBodies(payload); AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; for (int i = 0; i < contentBodies.length; i++) @@ -448,14 +434,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private ContentBody[] createContentBodies(ByteBuffer payload) { - if (payload == null) - { - return null; - } - else if (payload.remaining() == 0) + if (payload == null || payload.remaining() == 0) { - return new ContentBody[0]; + return NO_CONTENT_BODIES; } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int dataLength = payload.remaining(); @@ -496,31 +479,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkNotClosed(); _encoding = encoding; } - + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } } - + private void checkInitialDestination(){ if(_destination == null){ throw new UnsupportedOperationException("Destination is null"); } } - + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ if (_destination != null && suppliedDestination != null){ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } - + if (suppliedDestination == null){ - throw new InvalidDestinationException("Supplied Destination was invalid"); + throw new InvalidDestinationException("Supplied Destination was invalid"); } } - + public AMQSession getSession() { return _session; |