diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java | 51 |
1 files changed, 12 insertions, 39 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5821fee7ff..53c0457120 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -19,7 +19,6 @@ package org.apache.qpid.client; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; import java.nio.ByteBuffer; import java.util.HashMap; @@ -31,13 +30,9 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -73,15 +68,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { if (destination.getDestSyntax() == DestSyntax.BURL) { - if (getSession().isDeclareExchanges()) - { - String name = destination.getExchangeName().toString(); - ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare - (name, - destination.getExchangeClass().toString(), - null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); - } + String name = destination.getExchangeName().toString(); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare + (name, + destination.getExchangeClass().toString(), + null, null, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); } else { @@ -179,7 +171,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && (destination.getSubject() != null || - (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) + (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null)) ) { Map<String,Object> appProps = messageProps.getApplicationHeaders(); @@ -189,16 +181,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setApplicationHeaders(appProps); } - if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) + if (appProps.get("qpid.subject") == null) { // use default subject in address string - appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject()); + appProps.put("qpid.subject",destination.getSubject()); } - if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE) { deliveryProp.setRoutingKey((String) - messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); + messageProps.getApplicationHeaders().get("qpid.subject")); } } @@ -218,9 +210,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); - boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && - (destination.getLink().getReliability() == Reliability.UNRELIABLE); - org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); @@ -228,7 +217,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProp, messageProps), - buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE); + buffer, sync ? SYNC : NONE); if (sync) { ssn.sync(); @@ -250,21 +239,5 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { return _session.isQueueBound(destination); } - - @Override - public void close() - { - super.close(); - AMQDestination dest = _destination; - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); - } - } - } } |