diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java | 44 |
1 files changed, 28 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index a3a1e9c28b..f717ca4655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer private byte[] userIDBytes; BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory); userIDBytes = Strings.toUTF8(getUserID()); } @@ -79,14 +77,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer (name, destination.getExchangeClass().toString(), null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE, + destination.isExchangeDurable() ? Option.DURABLE : Option.NONE, + destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE); } } else { try { - getSession().handleAddressBasedDestination(destination,false,false,false); + getSession().resolveAddress(destination,false,false); + ((AMQSession_0_10)getSession()).handleLinkCreation(destination); + ((AMQSession_0_10)getSession()).sync(); } catch(Exception e) { @@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer return getSession().isQueueBound(destination); } + // We should have a close and closed method to distinguish between normal close + // and a close due to session or connection error. @Override public void close() throws JMSException { super.close(); AMQDestination dest = getAMQDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - getAMQDestination().getQueueName()); - } - catch(TransportException e) + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + ssn.handleNodeDelete(dest); } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; } } } |