summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java77
1 files changed, 21 insertions, 56 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2f92..53c0457120 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -19,7 +19,6 @@ package org.apache.qpid.client;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -31,12 +30,9 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -46,7 +42,6 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +56,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory) throws AMQException
+ boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
+ mandatory, waitUntilSent);
userIDBytes = Strings.toUTF8(_userID);
}
@@ -72,15 +68,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
if (destination.getDestSyntax() == DestSyntax.BURL)
{
- if (getSession().isDeclareExchanges())
- {
- String name = destination.getExchangeName().toString();
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
- (name,
- destination.getExchangeClass().toString(),
- null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
- }
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
}
else
{
@@ -103,7 +96,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
@@ -178,7 +171,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
(destination.getSubject() != null ||
- (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
+ (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null))
)
{
Map<String,Object> appProps = messageProps.getApplicationHeaders();
@@ -188,21 +181,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.setApplicationHeaders(appProps);
}
- if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ if (appProps.get("qpid.subject") == null)
{
// use default subject in address string
- appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
+ appProps.put("qpid.subject",destination.getSubject());
}
- if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE)
{
deliveryProp.setRoutingKey((String)
- messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));
+ messageProps.getApplicationHeaders().get("qpid.subject"));
}
}
-
- ByteBuffer data = message.getData();
- messageProps.setContentLength(data.remaining());
+
+ messageProps.setContentLength(message.getContentLength());
// send the message
try
@@ -218,17 +210,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryMode == DeliveryMode.PERSISTENT)
);
- boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
- (destination.getLink().getReliability() == Reliability.UNRELIABLE);
-
-
- ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice();
+ org.apache.mina.common.ByteBuffer data = message.getData();
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(),
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
new Header(deliveryProp, messageProps),
- buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE);
+ buffer, sync ? SYNC : NONE);
if (sync)
{
ssn.sync();
@@ -245,34 +234,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
}
- @Override
+
public boolean isBound(AMQDestination destination) throws JMSException
{
return _session.isQueueBound(destination);
}
-
- @Override
- public void close() throws JMSException
- {
- super.close();
- AMQDestination dest = _destination;
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- try
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- _destination.getQueueName());
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- }
- }
- }
-
}