diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java | 77 |
1 files changed, 21 insertions, 56 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 57f64c2f92..53c0457120 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -19,7 +19,6 @@ package org.apache.qpid.client; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; import java.nio.ByteBuffer; import java.util.HashMap; @@ -31,12 +30,9 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -46,7 +42,6 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +56,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - boolean immediate, boolean mandatory) throws AMQException + boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException { - super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, + mandatory, waitUntilSent); userIDBytes = Strings.toUTF8(_userID); } @@ -72,15 +68,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { if (destination.getDestSyntax() == DestSyntax.BURL) { - if (getSession().isDeclareExchanges()) - { - String name = destination.getExchangeName().toString(); - ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare - (name, - destination.getExchangeClass().toString(), - null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); - } + String name = destination.getExchangeName().toString(); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare + (name, + destination.getExchangeClass().toString(), + null, null, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); } else { @@ -103,7 +96,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer */ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, - boolean immediate) throws JMSException + boolean immediate, boolean wait) throws JMSException { message.prepareForSending(); @@ -178,7 +171,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && (destination.getSubject() != null || - (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) + (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null)) ) { Map<String,Object> appProps = messageProps.getApplicationHeaders(); @@ -188,21 +181,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setApplicationHeaders(appProps); } - if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) + if (appProps.get("qpid.subject") == null) { // use default subject in address string - appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject()); + appProps.put("qpid.subject",destination.getSubject()); } - if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE) { deliveryProp.setRoutingKey((String) - messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); + messageProps.getApplicationHeaders().get("qpid.subject")); } } - - ByteBuffer data = message.getData(); - messageProps.setContentLength(data.remaining()); + + messageProps.setContentLength(message.getContentLength()); // send the message try @@ -218,17 +210,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); - boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && - (destination.getLink().getReliability() == Reliability.UNRELIABLE); - - - ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); + org.apache.mina.common.ByteBuffer data = message.getData(); + ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProp, messageProps), - buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE); + buffer, sync ? SYNC : NONE); if (sync) { ssn.sync(); @@ -245,34 +234,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } } - @Override + public boolean isBound(AMQDestination destination) throws JMSException { return _session.isQueueBound(destination); } - - @Override - public void close() throws JMSException - { - super.close(); - AMQDestination dest = _destination; - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } - } - } - } - } |