diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-11-04 22:49:52 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-11-04 22:49:52 +0000 |
commit | 3a4ab013713b748c619112ea067fb9ff2a835c63 (patch) | |
tree | 449359253f48802e5ce24c7cd33832014e41ed04 | |
parent | 358e4402c74f7fc9ecfffe50248a4c2cfd275a40 (diff) | |
download | qpid-python-3a4ab013713b748c619112ea067fb9ff2a835c63.tar.gz |
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1031321 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 87 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1d259eacce..bf04aa1c71 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = !consumer.isNoConsume() && (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); + + arguments.putAll( + (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 862daec428..0a78403268 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private final boolean _exclusive; + protected boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per @@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 35c0c66c7f..b5f3501e5a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } + + public boolean isExclusive() + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 64d5b16db0..00503cc650 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; @@ -264,6 +265,7 @@ public class AddressHelper public Link getLink() { Link link = new Link(); + link.setSubscription(new Subscription()); if (linkProps != null) { link.setDurable(linkProps.getBoolean(DURABLE) == null ? false @@ -283,7 +285,8 @@ public class AddressHelper .setProducerCapacity(capacityProps .getInt(CAPACITY_TARGET) == null ? 0 : capacityProps.getInt(CAPACITY_TARGET)); - } else + } + else { int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps .getInt(CAPACITY); @@ -292,6 +295,21 @@ public class AddressHelper } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used + + if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + { + Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } } return link; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 0ebcaf548b..a7d19d1bd5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.client.messaging.address.Node.QueueNode; public class Link @@ -34,6 +37,7 @@ public class Link protected int _consumerCapacity = 0; protected int _producerCapacity = 0; protected Node node; + protected Subscription subscription; public Node getNode() { @@ -114,4 +118,40 @@ public class Link { this.name = name; } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private boolean exclusive = false; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } } |