summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java51
1 files changed, 17 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index fd6070a045..8c53d93de6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,15 +24,10 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import java.io.UnsupportedEncodingException;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
@@ -103,6 +98,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _mandatory;
private final boolean _waitUntilSent;
+ private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -349,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
- }
+ }
declareDestination((AMQDestination)destination);
}
@@ -382,17 +378,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
currentTime = System.currentTimeMillis();
message.setJMSTimestamp(currentTime);
}
- //
- // Very nasty temporary hack for GRM-206. Will be altered ASAP.
- //
- if (message instanceof JMSBytesMessage)
- {
- JMSBytesMessage msg = (JMSBytesMessage) message;
- if (!msg.isReadable())
- {
- msg.reset();
- }
- }
+ message.prepareForSending();
ByteBuffer payload = message.getData();
BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
@@ -413,7 +399,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- int size = payload.limit();
+ int size = (payload != null) ? payload.limit() : 0;
ContentBody[] contentBodies = createContentBodies(payload);
AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
for (int i = 0; i < contentBodies.length; i++)
@@ -448,14 +434,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
*/
private ContentBody[] createContentBodies(ByteBuffer payload)
{
- if (payload == null)
- {
- return null;
- }
- else if (payload.remaining() == 0)
+ if (payload == null || payload.remaining() == 0)
{
- return new ContentBody[0];
+ return NO_CONTENT_BODIES;
}
+
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int dataLength = payload.remaining();
@@ -496,31 +479,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkNotClosed();
_encoding = encoding;
}
-
+
private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
-
+
private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is null");
}
}
-
+
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
if (_destination != null && suppliedDestination != null){
throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
-
+
if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
+ throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
-
+
public AMQSession getSession() {
return _session;