summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java51
1 files changed, 12 insertions, 39 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 5821fee7ff..53c0457120 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -19,7 +19,6 @@ package org.apache.qpid.client;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -31,13 +30,9 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -73,15 +68,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
if (destination.getDestSyntax() == DestSyntax.BURL)
{
- if (getSession().isDeclareExchanges())
- {
- String name = destination.getExchangeName().toString();
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
- (name,
- destination.getExchangeClass().toString(),
- null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
- }
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
}
else
{
@@ -179,7 +171,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
(destination.getSubject() != null ||
- (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
+ (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null))
)
{
Map<String,Object> appProps = messageProps.getApplicationHeaders();
@@ -189,16 +181,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.setApplicationHeaders(appProps);
}
- if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ if (appProps.get("qpid.subject") == null)
{
// use default subject in address string
- appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
+ appProps.put("qpid.subject",destination.getSubject());
}
- if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE)
{
deliveryProp.setRoutingKey((String)
- messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));
+ messageProps.getApplicationHeaders().get("qpid.subject"));
}
}
@@ -218,9 +210,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryMode == DeliveryMode.PERSISTENT)
);
- boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
- (destination.getLink().getReliability() == Reliability.UNRELIABLE);
-
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
@@ -228,7 +217,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
new Header(deliveryProp, messageProps),
- buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE);
+ buffer, sync ? SYNC : NONE);
if (sync)
{
ssn.sync();
@@ -250,21 +239,5 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
return _session.isQueueBound(destination);
}
-
- @Override
- public void close()
- {
- super.close();
- AMQDestination dest = _destination;
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- _destination.getQueueName());
- }
- }
- }
}