diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java | 40 |
1 files changed, 21 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 57f64c2f92..d2a88bcc52 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -31,13 +31,13 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.address.Link.Reliability; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -86,7 +86,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { try { - getSession().handleAddressBasedDestination(destination,false,false); + AddressBasedDestination addrDest = (AddressBasedDestination)destination; + addrDest.resolveAddress((AMQSession_0_10)getSession()); + addrDest.create((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); + addrDest.azzert((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); } catch(Exception e) { @@ -165,11 +168,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); message.setJMSPriority(priority); } + String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(); if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) { deliveryProp.setExchange(exchangeName); } + String routingKey = destination.getRoutingKey().toString(); if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey)) { @@ -177,7 +182,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && - (destination.getSubject() != null || + (((AddressBasedDestination)destination).getAddress().getSubject() != null || (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) ) { @@ -191,10 +196,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) { // use default subject in address string - appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject()); + appProps.put(QpidMessageProperties.QPID_SUBJECT, + ((AddressBasedDestination)destination).getAddress().getSubject()); } - if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + if (((AddressBasedDestination)destination).isTopic()) { deliveryProp.setRoutingKey((String) messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); @@ -218,8 +224,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); - boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && - (destination.getLink().getReliability() == Reliability.UNRELIABLE); + boolean unreliable = false; //(destination.getDestSyntax() == DestSyntax.ADDR) && + // (destination.getLink().getReliability() == Reliability.UNRELIABLE); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); @@ -258,19 +264,15 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer AMQDestination dest = _destination; if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } + ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + } } |