summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
commit6713bfc5ddc1ff6202dad0d950a252273f73f795 (patch)
treed789ac52d18fdc493f5d7e1731384c43cbfde9f1 /qpid/java/client/src
parent58c93e3b5e6c2227cc0018720a8781b25ec0d288 (diff)
downloadqpid-python-6713bfc5ddc1ff6202dad0d950a252273f73f795.tar.gz
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java99
5 files changed, 100 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 91c23ff384..6c421a9610 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -112,6 +112,11 @@ public abstract class AMQDestination implements Destination, Referenceable
_name = name;
}
+ public boolean neverDeclare()
+ {
+ return false;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8224c77ba9..29c2a3b279 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2864,16 +2864,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (_declareExchanges)
+ if (_declareExchanges && !amqd.neverDeclare())
{
declareExchange(amqd, nowait);
}
- if (_delareQueues || amqd.isNameRequired())
+ if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
- if (_bindQueues)
+ if (_bindQueues && !amqd.neverDeclare())
{
if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
index fa2afb3ee4..ec9c595f99 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
@@ -37,4 +37,10 @@ public class AMQUndefinedDestination extends AMQDestination
{
return getAMQQueueName() == null;
}
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5acaa5c543..1981d134af 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -131,7 +131,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_channelId = channelId;
_session = session;
_producerId = producerId;
- if (destination != null && !(destination instanceof AMQUndefinedDestination))
+ if (destination != null && !(destination.neverDeclare()))
{
declareDestination(destination);
}
@@ -177,7 +177,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
void resubscribe() throws AMQException
{
- if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
+ if (_destination != null && !_destination.neverDeclare())
{
declareDestination(_destination);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index dbfbb743ec..fef9769d06 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -36,6 +36,7 @@ import org.apache.qpid.url.BindingURL;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Enumeration;
@@ -258,7 +259,29 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
catch (URISyntaxException e)
{
- throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+ if(replyToEncoding.startsWith("/"))
+ {
+ dest = new DefaultRouterDestination(replyToEncoding);
+ }
+ else if(replyToEncoding.contains("/"))
+ {
+ String[] parts = replyToEncoding.split("/",2);
+ dest = new NonBURLReplyToDestination(parts[0], parts[1]);
+
+
+ }
+ else
+ {
+ if(getAMQSession().isQueueBound(AMQShortString.valueOf(replyToEncoding), null, null))
+ {
+ dest = new NonBURLReplyToDestination(replyToEncoding, "");
+ }
+ else
+ {
+ dest = new DefaultRouterDestination(replyToEncoding);
+ }
+ }
+
}
_destinationCache.put(replyToEncoding, dest);
@@ -371,7 +394,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getBoolean(propertyName);
@@ -381,7 +404,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getByte(propertyName);
@@ -391,7 +414,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getShort(propertyName);
@@ -401,7 +424,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getInteger(propertyName);
@@ -411,7 +434,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getLong(propertyName);
@@ -421,7 +444,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getFloat(propertyName);
@@ -431,7 +454,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getDouble(propertyName);
@@ -448,7 +471,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getString(propertyName);
@@ -469,7 +492,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -480,7 +503,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -491,7 +514,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -508,7 +531,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -519,7 +542,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -530,7 +553,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -585,4 +608,50 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
+
+ private static class DefaultRouterDestination extends AMQDestination implements Queue
+ {
+ public DefaultRouterDestination(final String replyToEncoding)
+ {
+ super(AMQShortString.EMPTY_STRING,
+ AMQShortString.valueOf("direct"),
+ AMQShortString.valueOf(replyToEncoding),
+ AMQShortString.valueOf(replyToEncoding));
+ }
+
+ @Override
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
+ }
+
+ private static class NonBURLReplyToDestination extends AMQDestination implements Queue
+ {
+ public NonBURLReplyToDestination(final String exchange, final String routingKey)
+ {
+ super(AMQShortString.valueOf(exchange),
+ null,
+ AMQShortString.valueOf(routingKey),
+ AMQShortString.valueOf(routingKey));
+ }
+
+ @Override
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
+ }
}