summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java54
1 files changed, 35 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 02c5526e03..a1b5ce6f4c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -35,7 +35,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.nclient.util.ByteBufferMessage;
+import org.apache.qpid.util.Strings;
import org.apache.qpid.njms.ExceptionHelper;
import org.apache.qpid.transport.*;
import static org.apache.qpid.transport.Option.*;
@@ -45,6 +45,7 @@ import static org.apache.qpid.transport.Option.*;
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
+ private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
@@ -52,13 +53,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
mandatory, waitUntilSent);
+
+ userIDBytes = Strings.toUTF8(_userID);
}
void declareDestination(AMQDestination destination)
{
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
- destination.getExchangeClass().toString(),
- null, null);
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
}
//--- Overwritten methods
@@ -77,6 +83,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
MessageProperties messageProps = delegate.getMessageProperties();
+ // On the receiving side, this will be read in to the JMSXUserID as well.
+ messageProps.setUserId(userIDBytes);
+
if (messageId != null)
{
messageProps.setMessageId(messageId);
@@ -86,20 +95,22 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.clearMessageId();
}
+ long currentTime = 0;
+ if (timeToLive > 0 || !_disableTimestamps)
+ {
+ currentTime = System.currentTimeMillis();
+ }
+
+ if (timeToLive > 0)
+ {
+ deliveryProp.setTtl(timeToLive);
+ message.setJMSExpiration(currentTime + timeToLive);
+ }
+
if (!_disableTimestamps)
{
- final long currentTime = System.currentTimeMillis();
- deliveryProp.setTimestamp(currentTime);
- if (timeToLive > 0)
- {
- deliveryProp.setExpiration(currentTime + timeToLive);
- message.setJMSExpiration(currentTime + timeToLive);
- }
- else
- {
- deliveryProp.setExpiration(0);
- message.setJMSExpiration(0);
- }
+
+ deliveryProp.setTimestamp(currentTime);
message.setJMSTimestamp(currentTime);
}
@@ -145,9 +156,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
- boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
- getSession().getAMQConnection().getSyncPersistence());
+ boolean sync = false;
+ sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+ (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ deliveryMode == DeliveryMode.PERSISTENT)
+ );
+
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
@@ -159,11 +174,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
ssn.sync();
}
+
+
}
catch (RuntimeException rte)
{
JMSException ex = new JMSException("Exception when sending message");
- rte.printStackTrace();
ex.setLinkedException(rte);
throw ex;
}