summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-11-04 22:49:52 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-11-04 22:49:52 +0000
commit3a4ab013713b748c619112ea067fb9ff2a835c63 (patch)
tree449359253f48802e5ce24c7cd33832014e41ed04
parent358e4402c74f7fc9ecfffe50248a4c2cfd275a40 (diff)
downloadqpid-python-3a4ab013713b748c619112ea067fb9ff2a835c63.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1031321 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java40
5 files changed, 87 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1d259eacce..bf04aa1c71 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
preAcquire = !consumer.isNoConsume() &&
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
+
+ arguments.putAll(
+ (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 862daec428..0a78403268 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
- private final boolean _exclusive;
+ protected boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
@@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
-
+
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_noConsume = noConsume;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 35c0c66c7f..b5f3501e5a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
+
+ public boolean isExclusive()
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 64d5b16db0..00503cc650 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
@@ -264,6 +265,7 @@ public class AddressHelper
public Link getLink()
{
Link link = new Link();
+ link.setSubscription(new Subscription());
if (linkProps != null)
{
link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
@@ -283,7 +285,8 @@ public class AddressHelper
.setProducerCapacity(capacityProps
.getInt(CAPACITY_TARGET) == null ? 0
: capacityProps.getInt(CAPACITY_TARGET));
- } else
+ }
+ else
{
int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
.getInt(CAPACITY);
@@ -292,6 +295,21 @@ public class AddressHelper
}
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
+
+ if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ {
+ Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+
+ link.getSubscription().setExclusive(exclusive);
+ }
}
return link;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 0ebcaf548b..a7d19d1bd5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.client.messaging.address.Node.QueueNode;
public class Link
@@ -34,6 +37,7 @@ public class Link
protected int _consumerCapacity = 0;
protected int _producerCapacity = 0;
protected Node node;
+ protected Subscription subscription;
public Node getNode()
{
@@ -114,4 +118,40 @@ public class Link
{
this.name = name;
}
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ this.subscription = subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private boolean exclusive = false;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
}