diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java | 54 |
1 files changed, 35 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 02c5526e03..a1b5ce6f4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -35,7 +35,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.nclient.util.ByteBufferMessage; +import org.apache.qpid.util.Strings; import org.apache.qpid.njms.ExceptionHelper; import org.apache.qpid.transport.*; import static org.apache.qpid.transport.Option.*; @@ -45,6 +45,7 @@ import static org.apache.qpid.transport.Option.*; */ public class BasicMessageProducer_0_10 extends BasicMessageProducer { + private byte[] userIDBytes; BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, @@ -52,13 +53,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory, waitUntilSent); + + userIDBytes = Strings.toUTF8(_userID); } void declareDestination(AMQDestination destination) { - ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(), - destination.getExchangeClass().toString(), - null, null); + String name = destination.getExchangeName().toString(); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare + (name, + destination.getExchangeClass().toString(), + null, null, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); } //--- Overwritten methods @@ -77,6 +83,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer DeliveryProperties deliveryProp = delegate.getDeliveryProperties(); MessageProperties messageProps = delegate.getMessageProperties(); + // On the receiving side, this will be read in to the JMSXUserID as well. + messageProps.setUserId(userIDBytes); + if (messageId != null) { messageProps.setMessageId(messageId); @@ -86,20 +95,22 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.clearMessageId(); } + long currentTime = 0; + if (timeToLive > 0 || !_disableTimestamps) + { + currentTime = System.currentTimeMillis(); + } + + if (timeToLive > 0) + { + deliveryProp.setTtl(timeToLive); + message.setJMSExpiration(currentTime + timeToLive); + } + if (!_disableTimestamps) { - final long currentTime = System.currentTimeMillis(); - deliveryProp.setTimestamp(currentTime); - if (timeToLive > 0) - { - deliveryProp.setExpiration(currentTime + timeToLive); - message.setJMSExpiration(currentTime + timeToLive); - } - else - { - deliveryProp.setExpiration(0); - message.setJMSExpiration(0); - } + + deliveryProp.setTimestamp(currentTime); message.setJMSTimestamp(currentTime); } @@ -145,9 +156,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer ((AMQSession_0_10) getSession()).getQpidSession(); // if true, we need to sync the delivery of this message - boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && - getSession().getAMQConnection().getSyncPersistence()); + boolean sync = false; + sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || + (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + deliveryMode == DeliveryMode.PERSISTENT) + ); + org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); @@ -159,11 +174,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { ssn.sync(); } + + } catch (RuntimeException rte) { JMSException ex = new JMSException("Exception when sending message"); - rte.printStackTrace(); ex.setLinkedException(rte); throw ex; } |