diff options
Diffstat (limited to 'qpid/java/client/src')
5 files changed, 100 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 91c23ff384..6c421a9610 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,6 +112,11 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } + public boolean neverDeclare() + { + return false; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8224c77ba9..29c2a3b279 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2864,16 +2864,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - if (_declareExchanges) + if (_declareExchanges && !amqd.neverDeclare()) { declareExchange(amqd, nowait); } - if (_delareQueues || amqd.isNameRequired()) + if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } - if (_bindQueues) + if (_bindQueues && !amqd.neverDeclare()) { if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index fa2afb3ee4..ec9c595f99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -37,4 +37,10 @@ public class AMQUndefinedDestination extends AMQDestination { return getAMQQueueName() == null; } + + @Override + public boolean neverDeclare() + { + return true; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5acaa5c543..1981d134af 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -131,7 +131,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _channelId = channelId; _session = session; _producerId = producerId; - if (destination != null && !(destination instanceof AMQUndefinedDestination)) + if (destination != null && !(destination.neverDeclare())) { declareDestination(destination); } @@ -177,7 +177,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac void resubscribe() throws AMQException { - if (_destination != null && !(_destination instanceof AMQUndefinedDestination)) + if (_destination != null && !_destination.neverDeclare()) { declareDestination(_destination); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index dbfbb743ec..fef9769d06 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -36,6 +36,7 @@ import org.apache.qpid.url.BindingURL; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; +import javax.jms.Queue; import java.net.URISyntaxException; import java.util.Collections; import java.util.Enumeration; @@ -258,7 +259,29 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } catch (URISyntaxException e) { - throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); + if(replyToEncoding.startsWith("/")) + { + dest = new DefaultRouterDestination(replyToEncoding); + } + else if(replyToEncoding.contains("/")) + { + String[] parts = replyToEncoding.split("/",2); + dest = new NonBURLReplyToDestination(parts[0], parts[1]); + + + } + else + { + if(getAMQSession().isQueueBound(AMQShortString.valueOf(replyToEncoding), null, null)) + { + dest = new NonBURLReplyToDestination(replyToEncoding, ""); + } + else + { + dest = new DefaultRouterDestination(replyToEncoding); + } + } + } _destinationCache.put(replyToEncoding, dest); @@ -371,7 +394,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getBoolean(propertyName); @@ -381,7 +404,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getByte(propertyName); @@ -391,7 +414,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getShort(propertyName); @@ -401,7 +424,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getInteger(propertyName); @@ -411,7 +434,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getLong(propertyName); @@ -421,7 +444,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getFloat(propertyName); @@ -431,7 +454,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getDouble(propertyName); @@ -448,7 +471,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getString(propertyName); @@ -469,7 +492,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -480,7 +503,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -491,7 +514,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -508,7 +531,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -519,7 +542,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -530,7 +553,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -585,4 +608,50 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate _readableProperties = false; } + + private static class DefaultRouterDestination extends AMQDestination implements Queue + { + public DefaultRouterDestination(final String replyToEncoding) + { + super(AMQShortString.EMPTY_STRING, + AMQShortString.valueOf("direct"), + AMQShortString.valueOf(replyToEncoding), + AMQShortString.valueOf(replyToEncoding)); + } + + @Override + public boolean isNameRequired() + { + return false; + } + + @Override + public boolean neverDeclare() + { + return true; + } + } + + private static class NonBURLReplyToDestination extends AMQDestination implements Queue + { + public NonBURLReplyToDestination(final String exchange, final String routingKey) + { + super(AMQShortString.valueOf(exchange), + null, + AMQShortString.valueOf(routingKey), + AMQShortString.valueOf(routingKey)); + } + + @Override + public boolean isNameRequired() + { + return false; + } + + @Override + public boolean neverDeclare() + { + return true; + } + } } |