summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java44
1 files changed, 28 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index a3a1e9c28b..f717ca4655 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(getUserID());
}
@@ -79,14 +77,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
(name,
destination.getExchangeClass().toString(),
null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ destination.isExchangeDurable() ? Option.DURABLE : Option.NONE,
+ destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE);
}
}
else
{
try
{
- getSession().handleAddressBasedDestination(destination,false,false,false);
+ getSession().resolveAddress(destination,false,false);
+ ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
+ ((AMQSession_0_10)getSession()).sync();
}
catch(Exception e)
{
@@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
return getSession().isQueueBound(destination);
}
+ // We should have a close and closed method to distinguish between normal close
+ // and a close due to session or connection error.
@Override
public void close() throws JMSException
{
super.close();
AMQDestination dest = getAMQDestination();
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
+ try
{
- try
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- getAMQDestination().getQueueName());
- }
- catch(TransportException e)
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
{
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ ssn.handleNodeDelete(dest);
}
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
}
}
}