summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-02-13 22:30:47 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-02-13 22:30:47 +0000
commita3542fa13e0096b53319216532b2a79fe1d3f0f5 (patch)
treef095e3c0b136afebb6aad14d7d599feb0e0f6d75
parent938df01f68a303b8d54167093452e0756befdf0b (diff)
downloadqpid-python-a3542fa13e0096b53319216532b2a79fe1d3f0f5.tar.gz
QPID-3836 Modified the address handling code to pass the noLocal
argument to queue-declare method. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1243719 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
4 files changed, 19 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ac728e78eb..e7e937b689 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1045,7 +1045,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
try
{
- handleAddressBasedDestination(dest,false,true);
+ handleAddressBasedDestination(dest,false,noLocal,true);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
@@ -2905,7 +2905,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,nowait);
+ handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
}
else
{
@@ -2966,6 +2966,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a27c52c686..816ad1f222 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -766,8 +766,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
+ if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
- node.getDeclareArgs(),
+ arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
@@ -1167,13 +1173,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
@SuppressWarnings("deprecation")
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException
{
if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
}
else
@@ -1202,7 +1209,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,false,noWait);
+ send0_10QueueDeclare(dest,null,noLocal,noWait);
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1217,7 +1224,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
verifySubject(dest);
if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest, noLocal);
}
break;
}
@@ -1232,7 +1239,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
false);
if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
break;
}
@@ -1295,7 +1302,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void createSubscriptionQueue(AMQDestination dest) throws AMQException
+ private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
{
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
@@ -1308,7 +1315,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,false,true);
+ send0_10QueueDeclare(dest,null,noLocal,true);
getQpidSession().exchangeBind(dest.getQueueName(),
dest.getAddressName(),
dest.getSubject(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index a49fb256a7..29f1925cbc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -624,6 +624,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException
{
throw new UnsupportedOperationException("The new addressing based sytanx is "
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 91811ccf98..024219cfd6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
try
{
- getSession().handleAddressBasedDestination(destination,false,false);
+ getSession().handleAddressBasedDestination(destination,false,false,false);
}
catch(Exception e)
{